diff --git a/core.py b/core.py index 31ab65f8..b2016f90 100644 --- a/core.py +++ b/core.py @@ -21,7 +21,7 @@ from data_io import read_lab_fea,open_or_fd,write_mat from utils import shift -def run_nn_refac01(data_name,data_set,data_end_index,fea_dict,lab_dict,arch_dict,cfg_file,processed_first,next_config_file,dry_run=False): +def run_nn_refac01(data_name,data_set,data_end_index,fea_dict,lab_dict,arch_dict,cfg_file,processed_first,next_config_file): def _read_chunk_specific_config(cfg_file): if not(os.path.exists(cfg_file)): sys.stderr.write('ERROR: The config file %s does not exist!\n'%(cfg_file)) @@ -230,7 +230,9 @@ def _get_dim_from_data_set(data_set_inp, data_set_ref): data_end_index_fea = data_end_index['fea'] data_end_index_lab = data_end_index['lab'] shared_list = list() - data_loading_process = _read_next_chunk_into_shared_list_with_subprocess(read_lab_fea, shared_list, next_config_file, is_production, output_folder, wait_for_process=False) + data_loading_process = None + if not next_config_file is None: + data_loading_process = _read_next_chunk_into_shared_list_with_subprocess(read_lab_fea, shared_list, next_config_file, is_production, output_folder, wait_for_process=False) nns, costs, optimizers, inp_out_dict = _load_model_and_optimizer(fea_dict,model,config,arch_dict,use_cuda,multi_gpu,to_do) if to_do=='forward': post_file = _open_forward_output_files_and_get_file_handles(forward_outs, require_decodings, info_file, output_folder) @@ -253,25 +255,22 @@ def _get_dim_from_data_set(data_set_inp, data_set_ref): start_time = time.time() for i in range(N_batches): inp, ref, max_len_fea, max_len_lab, snt_len_fea, snt_len_lab, beg_snt_fea, beg_snt_lab, snt_index = _prepare_input(snt_index, batch_size, data_set_inp_dim, data_set_ref_dim, beg_snt_fea, beg_snt_lab, data_end_index_fea, data_end_index_lab, beg_batch, end_batch, seq_model, arr_snt_len_fea, arr_snt_len_lab, data_set_inp, data_set_ref, use_cuda) - if dry_run: - outs_dict = dict() + if to_do=='train': + outs_dict = forward_model(fea_dict, lab_dict, arch_dict, model, nns, costs, inp, ref, inp_out_dict, max_len_fea, max_len_lab, batch_size, to_do, forward_outs) + _optimization_step(optimizers, outs_dict, config, arch_dict) else: - if to_do=='train': + with torch.no_grad(): outs_dict = forward_model(fea_dict, lab_dict, arch_dict, model, nns, costs, inp, ref, inp_out_dict, max_len_fea, max_len_lab, batch_size, to_do, forward_outs) - _optimization_step(optimizers, outs_dict, config, arch_dict) - else: - with torch.no_grad(): - outs_dict = forward_model(fea_dict, lab_dict, arch_dict, model, nns, costs, inp, ref, inp_out_dict, max_len_fea, max_len_lab, batch_size, to_do, forward_outs) - if to_do == 'forward': - for out_id in range(len(forward_outs)): - out_save = outs_dict[forward_outs[out_id]].data.cpu().numpy() - if forward_normalize_post[out_id]: - counts = load_counts(forward_count_files[out_id]) - out_save=out_save-np.log(counts/np.sum(counts)) - write_mat(output_folder,post_file[forward_outs[out_id]], out_save, data_name[i]) - else: - loss_sum=loss_sum+outs_dict['loss_final'].detach() - err_sum=err_sum+outs_dict['err_final'].detach() + if to_do == 'forward': + for out_id in range(len(forward_outs)): + out_save = outs_dict[forward_outs[out_id]].data.cpu().numpy() + if forward_normalize_post[out_id]: + counts = load_counts(forward_count_files[out_id]) + out_save=out_save-np.log(counts/np.sum(counts)) + write_mat(output_folder,post_file[forward_outs[out_id]], out_save, data_name[i]) + else: + loss_sum=loss_sum+outs_dict['loss_final'].detach() + err_sum=err_sum+outs_dict['err_final'].detach() beg_batch=end_batch end_batch=beg_batch+batch_size _update_progress_bar(to_do, i, N_batches, loss_sum) @@ -286,13 +285,16 @@ def _get_dim_from_data_set(data_set_inp, data_set_ref): _write_info_file(info_file, to_do, loss_tot, err_tot, elapsed_time_chunk) if not data_loading_process is None: data_loading_process.join() - data_name, data_end_index_fea, data_end_index_lab, fea_dict, lab_dict, arch_dict, data_set_dict = _extract_data_from_shared_list(shared_list) - data_set_inp, data_set_ref = _convert_numpy_to_torch(data_set_dict, save_gpumem, use_cuda) - data_set = {'input': data_set_inp, 'ref': data_set_ref} - data_end_index = {'fea': data_end_index_fea,'lab': data_end_index_lab} - return [data_name,data_set,data_end_index,fea_dict,lab_dict,arch_dict] + data_name, data_end_index_fea, data_end_index_lab, fea_dict, lab_dict, arch_dict, data_set_dict = _extract_data_from_shared_list(shared_list) + data_set_inp, data_set_ref = _convert_numpy_to_torch(data_set_dict, save_gpumem, use_cuda) + data_set = {'input': data_set_inp, 'ref': data_set_ref} + data_end_index = {'fea': data_end_index_fea,'lab': data_end_index_lab} + return [data_name,data_set,data_end_index,fea_dict,lab_dict,arch_dict] + else: + return [None,None,None,None,None,None] + -def run_nn(data_name,data_set,data_end_index,fea_dict,lab_dict,arch_dict,cfg_file,processed_first,next_config_file,dry_run=False): +def run_nn(data_name,data_set,data_end_index,fea_dict,lab_dict,arch_dict,cfg_file,processed_first,next_config_file): # This function processes the current chunk using the information in cfg_file. In parallel, the next chunk is load into the CPU memory @@ -479,17 +481,13 @@ def run_nn(data_name,data_set,data_end_index,fea_dict,lab_dict,arch_dict,cfg_fil if to_do=='train': # Forward input, with autograd graph active - if not dry_run: - outs_dict=forward_model(fea_dict,lab_dict,arch_dict,model,nns,costs,inp,inp_out_dict,max_len,batch_size,to_do,forward_outs) - else: - outs_dict = dict() + outs_dict=forward_model(fea_dict,lab_dict,arch_dict,model,nns,costs,inp,inp_out_dict,max_len,batch_size,to_do,forward_outs) for opt in optimizers.keys(): optimizers[opt].zero_grad() - if not dry_run: - outs_dict['loss_final'].backward() + outs_dict['loss_final'].backward() # Gradient Clipping (th 0.1) #for net in nns.keys(): @@ -501,28 +499,24 @@ def run_nn(data_name,data_set,data_end_index,fea_dict,lab_dict,arch_dict,cfg_fil optimizers[opt].step() else: with torch.no_grad(): # Forward input without autograd graph (save memory) - if not dry_run: - outs_dict=forward_model(fea_dict,lab_dict,arch_dict,model,nns,costs,inp,inp_out_dict,max_len,batch_size,to_do,forward_outs) - else: - outs_dict = dict() + outs_dict=forward_model(fea_dict,lab_dict,arch_dict,model,nns,costs,inp,inp_out_dict,max_len,batch_size,to_do,forward_outs) - if not dry_run: - if to_do=='forward': - for out_id in range(len(forward_outs)): - - out_save=outs_dict[forward_outs[out_id]].data.cpu().numpy() + if to_do=='forward': + for out_id in range(len(forward_outs)): + + out_save=outs_dict[forward_outs[out_id]].data.cpu().numpy() + + if forward_normalize_post[out_id]: + # read the config file + counts = load_counts(forward_count_files[out_id]) + out_save=out_save-np.log(counts/np.sum(counts)) - if forward_normalize_post[out_id]: - # read the config file - counts = load_counts(forward_count_files[out_id]) - out_save=out_save-np.log(counts/np.sum(counts)) - - # save the output - write_mat(output_folder,post_file[forward_outs[out_id]], out_save, data_name[i]) - else: - loss_sum=loss_sum+outs_dict['loss_final'].detach() - err_sum=err_sum+outs_dict['err_final'].detach() + # save the output + write_mat(output_folder,post_file[forward_outs[out_id]], out_save, data_name[i]) + else: + loss_sum=loss_sum+outs_dict['loss_final'].detach() + err_sum=err_sum+outs_dict['err_final'].detach() # update it to the next batch beg_batch=end_batch diff --git a/run_exp.py b/run_exp.py index 252a0fbe..0bdd4613 100644 --- a/run_exp.py +++ b/run_exp.py @@ -24,6 +24,13 @@ import math import multiprocessing +def _run_forwarding_in_subprocesses(config): + use_cuda=strtobool(config['exp']['use_cuda']) + if use_cuda: + return False + else: + return True + # Reading global cfg file (first argument-mandatory file) cfg_file=sys.argv[1] if not(os.path.exists(cfg_file)): @@ -309,7 +316,8 @@ N_ck_forward=compute_n_chunks(out_folder,forward_data,ep,N_ep_str_format,'forward') N_ck_str_format='0'+str(max(math.ceil(np.log10(N_ck_forward)),1))+'d' - kwargs_list = list() + processes = list() + info_files = list() for ck in range(N_ck_forward): if not is_production: @@ -331,36 +339,29 @@ next_config_file=cfg_file_list[op_counter] # run chunk processing - #[data_name,data_set,data_end_index,fea_dict,lab_dict,arch_dict]=run_nn(data_name,data_set,data_end_index,fea_dict,lab_dict,arch_dict,config_chunk_file,processed_first,next_config_file) - kwargs = dict() - for e in ['data_name','data_set','data_end_index','fea_dict','lab_dict','arch_dict','config_chunk_file','processed_first','next_config_file']: - if e == "config_chunk_file": - kwargs['cfg_file'] = eval(e) - else: - kwargs[e] = eval(e) - kwargs_list.append(kwargs) - [data_name,data_set,data_end_index,fea_dict,lab_dict,arch_dict]=run_nn(data_name,data_set,data_end_index,fea_dict,lab_dict,arch_dict,config_chunk_file,processed_first,next_config_file,dry_run=True) - + if _run_forwarding_in_subprocesses(config): + p = multiprocessing.Process(target=run_nn, kwargs={'data_name': None, 'data_set': None, 'data_end_index': None, 'fea_dict': None, 'lab_dict': None, 'arch_dict': None, 'cfg_file': config_chunk_file, 'processed_first': True, 'next_config_file': None}) + processes.append(p) + p.start() + else: + [data_name,data_set,data_end_index,fea_dict,lab_dict,arch_dict]=run_nn(data_name,data_set,data_end_index,fea_dict,lab_dict,arch_dict,config_chunk_file,processed_first,next_config_file) + processed_first=False + if not(os.path.exists(info_file)): + sys.stderr.write("ERROR: forward chunk %i of dataset %s not done! File %s does not exist.\nSee %s \n" % (ck,forward_data,info_file,log_file)) + sys.exit(0) - # update the first_processed variable - processed_first=False - - if not(os.path.exists(info_file)): - sys.stderr.write("ERROR: forward chunk %i of dataset %s not done! File %s does not exist.\nSee %s \n" % (ck,forward_data,info_file,log_file)) - sys.exit(0) - + info_files.append(info_file) # update the operation counter op_counter+=1 - processes = list() - for kwargs in kwargs_list: - p = multiprocessing.Process(target=run_nn, kwargs=kwargs) - processes.append(p) - p.start() - for process in processes: - process.join() - - + if _run_forwarding_in_subprocesses(config): + for process in processes: + process.join() + for info_file in info_files: + if not(os.path.exists(info_file)): + sys.stderr.write("ERROR: File %s does not exist. Forwarding did not suceed.\nSee %s \n" % (info_file,log_file)) + sys.exit(0) + # --------DECODING--------#