Skip to content

Commit

Permalink
improved multiprocess forwarding
Browse files Browse the repository at this point in the history
  • Loading branch information
menne committed Jul 20, 2019
1 parent 5dd7d9c commit 5205dab
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 77 deletions.
94 changes: 44 additions & 50 deletions core.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from data_io import read_lab_fea,open_or_fd,write_mat
from utils import shift

def run_nn_refac01(data_name,data_set,data_end_index,fea_dict,lab_dict,arch_dict,cfg_file,processed_first,next_config_file,dry_run=False):
def run_nn_refac01(data_name,data_set,data_end_index,fea_dict,lab_dict,arch_dict,cfg_file,processed_first,next_config_file):
def _read_chunk_specific_config(cfg_file):
if not(os.path.exists(cfg_file)):
sys.stderr.write('ERROR: The config file %s does not exist!\n'%(cfg_file))
Expand Down Expand Up @@ -230,7 +230,9 @@ def _get_dim_from_data_set(data_set_inp, data_set_ref):
data_end_index_fea = data_end_index['fea']
data_end_index_lab = data_end_index['lab']
shared_list = list()
data_loading_process = _read_next_chunk_into_shared_list_with_subprocess(read_lab_fea, shared_list, next_config_file, is_production, output_folder, wait_for_process=False)
data_loading_process = None
if not next_config_file is None:
data_loading_process = _read_next_chunk_into_shared_list_with_subprocess(read_lab_fea, shared_list, next_config_file, is_production, output_folder, wait_for_process=False)
nns, costs, optimizers, inp_out_dict = _load_model_and_optimizer(fea_dict,model,config,arch_dict,use_cuda,multi_gpu,to_do)
if to_do=='forward':
post_file = _open_forward_output_files_and_get_file_handles(forward_outs, require_decodings, info_file, output_folder)
Expand All @@ -253,25 +255,22 @@ def _get_dim_from_data_set(data_set_inp, data_set_ref):
start_time = time.time()
for i in range(N_batches):
inp, ref, max_len_fea, max_len_lab, snt_len_fea, snt_len_lab, beg_snt_fea, beg_snt_lab, snt_index = _prepare_input(snt_index, batch_size, data_set_inp_dim, data_set_ref_dim, beg_snt_fea, beg_snt_lab, data_end_index_fea, data_end_index_lab, beg_batch, end_batch, seq_model, arr_snt_len_fea, arr_snt_len_lab, data_set_inp, data_set_ref, use_cuda)
if dry_run:
outs_dict = dict()
if to_do=='train':
outs_dict = forward_model(fea_dict, lab_dict, arch_dict, model, nns, costs, inp, ref, inp_out_dict, max_len_fea, max_len_lab, batch_size, to_do, forward_outs)
_optimization_step(optimizers, outs_dict, config, arch_dict)
else:
if to_do=='train':
with torch.no_grad():
outs_dict = forward_model(fea_dict, lab_dict, arch_dict, model, nns, costs, inp, ref, inp_out_dict, max_len_fea, max_len_lab, batch_size, to_do, forward_outs)
_optimization_step(optimizers, outs_dict, config, arch_dict)
else:
with torch.no_grad():
outs_dict = forward_model(fea_dict, lab_dict, arch_dict, model, nns, costs, inp, ref, inp_out_dict, max_len_fea, max_len_lab, batch_size, to_do, forward_outs)
if to_do == 'forward':
for out_id in range(len(forward_outs)):
out_save = outs_dict[forward_outs[out_id]].data.cpu().numpy()
if forward_normalize_post[out_id]:
counts = load_counts(forward_count_files[out_id])
out_save=out_save-np.log(counts/np.sum(counts))
write_mat(output_folder,post_file[forward_outs[out_id]], out_save, data_name[i])
else:
loss_sum=loss_sum+outs_dict['loss_final'].detach()
err_sum=err_sum+outs_dict['err_final'].detach()
if to_do == 'forward':
for out_id in range(len(forward_outs)):
out_save = outs_dict[forward_outs[out_id]].data.cpu().numpy()
if forward_normalize_post[out_id]:
counts = load_counts(forward_count_files[out_id])
out_save=out_save-np.log(counts/np.sum(counts))
write_mat(output_folder,post_file[forward_outs[out_id]], out_save, data_name[i])
else:
loss_sum=loss_sum+outs_dict['loss_final'].detach()
err_sum=err_sum+outs_dict['err_final'].detach()
beg_batch=end_batch
end_batch=beg_batch+batch_size
_update_progress_bar(to_do, i, N_batches, loss_sum)
Expand All @@ -286,13 +285,16 @@ def _get_dim_from_data_set(data_set_inp, data_set_ref):
_write_info_file(info_file, to_do, loss_tot, err_tot, elapsed_time_chunk)
if not data_loading_process is None:
data_loading_process.join()
data_name, data_end_index_fea, data_end_index_lab, fea_dict, lab_dict, arch_dict, data_set_dict = _extract_data_from_shared_list(shared_list)
data_set_inp, data_set_ref = _convert_numpy_to_torch(data_set_dict, save_gpumem, use_cuda)
data_set = {'input': data_set_inp, 'ref': data_set_ref}
data_end_index = {'fea': data_end_index_fea,'lab': data_end_index_lab}
return [data_name,data_set,data_end_index,fea_dict,lab_dict,arch_dict]
data_name, data_end_index_fea, data_end_index_lab, fea_dict, lab_dict, arch_dict, data_set_dict = _extract_data_from_shared_list(shared_list)
data_set_inp, data_set_ref = _convert_numpy_to_torch(data_set_dict, save_gpumem, use_cuda)
data_set = {'input': data_set_inp, 'ref': data_set_ref}
data_end_index = {'fea': data_end_index_fea,'lab': data_end_index_lab}
return [data_name,data_set,data_end_index,fea_dict,lab_dict,arch_dict]
else:
return [None,None,None,None,None,None]


def run_nn(data_name,data_set,data_end_index,fea_dict,lab_dict,arch_dict,cfg_file,processed_first,next_config_file,dry_run=False):
def run_nn(data_name,data_set,data_end_index,fea_dict,lab_dict,arch_dict,cfg_file,processed_first,next_config_file):

# This function processes the current chunk using the information in cfg_file. In parallel, the next chunk is load into the CPU memory

Expand Down Expand Up @@ -479,17 +481,13 @@ def run_nn(data_name,data_set,data_end_index,fea_dict,lab_dict,arch_dict,cfg_fil

if to_do=='train':
# Forward input, with autograd graph active
if not dry_run:
outs_dict=forward_model(fea_dict,lab_dict,arch_dict,model,nns,costs,inp,inp_out_dict,max_len,batch_size,to_do,forward_outs)
else:
outs_dict = dict()
outs_dict=forward_model(fea_dict,lab_dict,arch_dict,model,nns,costs,inp,inp_out_dict,max_len,batch_size,to_do,forward_outs)

for opt in optimizers.keys():
optimizers[opt].zero_grad()


if not dry_run:
outs_dict['loss_final'].backward()
outs_dict['loss_final'].backward()

# Gradient Clipping (th 0.1)
#for net in nns.keys():
Expand All @@ -501,28 +499,24 @@ def run_nn(data_name,data_set,data_end_index,fea_dict,lab_dict,arch_dict,cfg_fil
optimizers[opt].step()
else:
with torch.no_grad(): # Forward input without autograd graph (save memory)
if not dry_run:
outs_dict=forward_model(fea_dict,lab_dict,arch_dict,model,nns,costs,inp,inp_out_dict,max_len,batch_size,to_do,forward_outs)
else:
outs_dict = dict()
outs_dict=forward_model(fea_dict,lab_dict,arch_dict,model,nns,costs,inp,inp_out_dict,max_len,batch_size,to_do,forward_outs)


if not dry_run:
if to_do=='forward':
for out_id in range(len(forward_outs)):

out_save=outs_dict[forward_outs[out_id]].data.cpu().numpy()
if to_do=='forward':
for out_id in range(len(forward_outs)):

out_save=outs_dict[forward_outs[out_id]].data.cpu().numpy()

if forward_normalize_post[out_id]:
# read the config file
counts = load_counts(forward_count_files[out_id])
out_save=out_save-np.log(counts/np.sum(counts))

if forward_normalize_post[out_id]:
# read the config file
counts = load_counts(forward_count_files[out_id])
out_save=out_save-np.log(counts/np.sum(counts))

# save the output
write_mat(output_folder,post_file[forward_outs[out_id]], out_save, data_name[i])
else:
loss_sum=loss_sum+outs_dict['loss_final'].detach()
err_sum=err_sum+outs_dict['err_final'].detach()
# save the output
write_mat(output_folder,post_file[forward_outs[out_id]], out_save, data_name[i])
else:
loss_sum=loss_sum+outs_dict['loss_final'].detach()
err_sum=err_sum+outs_dict['err_final'].detach()

# update it to the next batch
beg_batch=end_batch
Expand Down
55 changes: 28 additions & 27 deletions run_exp.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@
import math
import multiprocessing

def _run_forwarding_in_subprocesses(config):
use_cuda=strtobool(config['exp']['use_cuda'])
if use_cuda:
return False
else:
return True

# Reading global cfg file (first argument-mandatory file)
cfg_file=sys.argv[1]
if not(os.path.exists(cfg_file)):
Expand Down Expand Up @@ -309,7 +316,8 @@
N_ck_forward=compute_n_chunks(out_folder,forward_data,ep,N_ep_str_format,'forward')
N_ck_str_format='0'+str(max(math.ceil(np.log10(N_ck_forward)),1))+'d'

kwargs_list = list()
processes = list()
info_files = list()
for ck in range(N_ck_forward):

if not is_production:
Expand All @@ -331,36 +339,29 @@
next_config_file=cfg_file_list[op_counter]

# run chunk processing
#[data_name,data_set,data_end_index,fea_dict,lab_dict,arch_dict]=run_nn(data_name,data_set,data_end_index,fea_dict,lab_dict,arch_dict,config_chunk_file,processed_first,next_config_file)
kwargs = dict()
for e in ['data_name','data_set','data_end_index','fea_dict','lab_dict','arch_dict','config_chunk_file','processed_first','next_config_file']:
if e == "config_chunk_file":
kwargs['cfg_file'] = eval(e)
else:
kwargs[e] = eval(e)
kwargs_list.append(kwargs)
[data_name,data_set,data_end_index,fea_dict,lab_dict,arch_dict]=run_nn(data_name,data_set,data_end_index,fea_dict,lab_dict,arch_dict,config_chunk_file,processed_first,next_config_file,dry_run=True)

if _run_forwarding_in_subprocesses(config):
p = multiprocessing.Process(target=run_nn, kwargs={'data_name': None, 'data_set': None, 'data_end_index': None, 'fea_dict': None, 'lab_dict': None, 'arch_dict': None, 'cfg_file': config_chunk_file, 'processed_first': True, 'next_config_file': None})
processes.append(p)
p.start()
else:
[data_name,data_set,data_end_index,fea_dict,lab_dict,arch_dict]=run_nn(data_name,data_set,data_end_index,fea_dict,lab_dict,arch_dict,config_chunk_file,processed_first,next_config_file)
processed_first=False
if not(os.path.exists(info_file)):
sys.stderr.write("ERROR: forward chunk %i of dataset %s not done! File %s does not exist.\nSee %s \n" % (ck,forward_data,info_file,log_file))
sys.exit(0)

# update the first_processed variable
processed_first=False

if not(os.path.exists(info_file)):
sys.stderr.write("ERROR: forward chunk %i of dataset %s not done! File %s does not exist.\nSee %s \n" % (ck,forward_data,info_file,log_file))
sys.exit(0)

info_files.append(info_file)

# update the operation counter
op_counter+=1
processes = list()
for kwargs in kwargs_list:
p = multiprocessing.Process(target=run_nn, kwargs=kwargs)
processes.append(p)
p.start()
for process in processes:
process.join()


if _run_forwarding_in_subprocesses(config):
for process in processes:
process.join()
for info_file in info_files:
if not(os.path.exists(info_file)):
sys.stderr.write("ERROR: File %s does not exist. Forwarding did not suceed.\nSee %s \n" % (info_file,log_file))
sys.exit(0)



# --------DECODING--------#
Expand Down

0 comments on commit 5205dab

Please sign in to comment.