Skip to content

Commit

Permalink
read data sequentially for multiprocessing forwarding
Browse files Browse the repository at this point in the history
  • Loading branch information
menne committed Jul 20, 2019
1 parent 99aa002 commit a70f1dc
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 33 deletions.
64 changes: 32 additions & 32 deletions core.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,32 @@
from data_io import read_lab_fea,open_or_fd,write_mat
from utils import shift

def read_next_chunk_into_shared_list_with_subprocess(read_lab_fea, shared_list, cfg_file, is_production, output_folder, wait_for_process):
p=threading.Thread(target=read_lab_fea, args=(cfg_file,is_production,shared_list,output_folder,))
p.start()
if wait_for_process:
p.join()
return None
else:
return p
def extract_data_from_shared_list(shared_list):
data_name = shared_list[0]
data_end_index_fea = shared_list[1]
data_end_index_lab = shared_list[2]
fea_dict = shared_list[3]
lab_dict = shared_list[4]
arch_dict = shared_list[5]
data_set = shared_list[6]
return data_name, data_end_index_fea, data_end_index_lab, fea_dict, lab_dict, arch_dict, data_set
def convert_numpy_to_torch(data_set_dict, save_gpumem, use_cuda):
if not(save_gpumem) and use_cuda:
data_set_inp=torch.from_numpy(data_set_dict['input']).float().cuda()
data_set_ref=torch.from_numpy(data_set_dict['ref']).float().cuda()
else:
data_set_inp=torch.from_numpy(data_set_dict['input']).float()
data_set_ref=torch.from_numpy(data_set_dict['ref']).float()
data_set_ref = data_set_ref.view((data_set_ref.shape[0], 1))
return data_set_inp, data_set_ref
def run_nn_refac01(data_name,data_set,data_end_index,fea_dict,lab_dict,arch_dict,cfg_file,processed_first,next_config_file):
def _read_chunk_specific_config(cfg_file):
if not(os.path.exists(cfg_file)):
Expand All @@ -30,23 +56,6 @@ def _read_chunk_specific_config(cfg_file):
config = configparser.ConfigParser()
config.read(cfg_file)
return config
def _read_next_chunk_into_shared_list_with_subprocess(read_lab_fea, shared_list, cfg_file, is_production, output_folder, wait_for_process):
p=threading.Thread(target=read_lab_fea, args=(cfg_file,is_production,shared_list,output_folder,))
p.start()
if wait_for_process:
p.join()
return None
else:
return p
def _extract_data_from_shared_list(shared_list):
data_name = shared_list[0]
data_end_index_fea = shared_list[1]
data_end_index_lab = shared_list[2]
fea_dict = shared_list[3]
lab_dict = shared_list[4]
arch_dict = shared_list[5]
data_set = shared_list[6]
return data_name, data_end_index_fea, data_end_index_lab, fea_dict, lab_dict, arch_dict, data_set
def _get_batch_size_from_config(config, to_do):
if to_do=='train':
batch_size=int(config['batches']['batch_size_train'])
Expand All @@ -60,15 +69,6 @@ def _initialize_random_seed(config):
torch.manual_seed(seed)
random.seed(seed)
np.random.seed(seed)
def _convert_numpy_to_torch(data_set_dict, save_gpumem, use_cuda):
if not(save_gpumem) and use_cuda:
data_set_inp=torch.from_numpy(data_set_dict['input']).float().cuda()
data_set_ref=torch.from_numpy(data_set_dict['ref']).float().cuda()
else:
data_set_inp=torch.from_numpy(data_set_dict['input']).float()
data_set_ref=torch.from_numpy(data_set_dict['ref']).float()
data_set_ref = data_set_ref.view((data_set_ref.shape[0], 1))
return data_set_inp, data_set_ref
def _load_model_and_optimizer(fea_dict,model,config,arch_dict,use_cuda,multi_gpu,to_do):
inp_out_dict = fea_dict
nns, costs = model_init(inp_out_dict,model,config,arch_dict,use_cuda,multi_gpu,to_do)
Expand Down Expand Up @@ -221,9 +221,9 @@ def _get_dim_from_data_set(data_set_inp, data_set_ref):

if processed_first:
shared_list = list()
p = _read_next_chunk_into_shared_list_with_subprocess(read_lab_fea, shared_list, cfg_file, is_production, output_folder, wait_for_process=True)
data_name, data_end_index_fea, data_end_index_lab, fea_dict, lab_dict, arch_dict, data_set_dict = _extract_data_from_shared_list(shared_list)
data_set_inp, data_set_ref = _convert_numpy_to_torch(data_set_dict, save_gpumem, use_cuda)
p = read_next_chunk_into_shared_list_with_subprocess(read_lab_fea, shared_list, cfg_file, is_production, output_folder, wait_for_process=True)
data_name, data_end_index_fea, data_end_index_lab, fea_dict, lab_dict, arch_dict, data_set_dict = extract_data_from_shared_list(shared_list)
data_set_inp, data_set_ref = convert_numpy_to_torch(data_set_dict, save_gpumem, use_cuda)
else:
data_set_inp = data_set['input']
data_set_ref = data_set['ref']
Expand All @@ -232,7 +232,7 @@ def _get_dim_from_data_set(data_set_inp, data_set_ref):
shared_list = list()
data_loading_process = None
if not next_config_file is None:
data_loading_process = _read_next_chunk_into_shared_list_with_subprocess(read_lab_fea, shared_list, next_config_file, is_production, output_folder, wait_for_process=False)
data_loading_process = read_next_chunk_into_shared_list_with_subprocess(read_lab_fea, shared_list, next_config_file, is_production, output_folder, wait_for_process=False)
nns, costs, optimizers, inp_out_dict = _load_model_and_optimizer(fea_dict,model,config,arch_dict,use_cuda,multi_gpu,to_do)
if to_do=='forward':
post_file = _open_forward_output_files_and_get_file_handles(forward_outs, require_decodings, info_file, output_folder)
Expand Down Expand Up @@ -285,8 +285,8 @@ def _get_dim_from_data_set(data_set_inp, data_set_ref):
_write_info_file(info_file, to_do, loss_tot, err_tot, elapsed_time_chunk)
if not data_loading_process is None:
data_loading_process.join()
data_name, data_end_index_fea, data_end_index_lab, fea_dict, lab_dict, arch_dict, data_set_dict = _extract_data_from_shared_list(shared_list)
data_set_inp, data_set_ref = _convert_numpy_to_torch(data_set_dict, save_gpumem, use_cuda)
data_name, data_end_index_fea, data_end_index_lab, fea_dict, lab_dict, arch_dict, data_set_dict = extract_data_from_shared_list(shared_list)
data_set_inp, data_set_ref = convert_numpy_to_torch(data_set_dict, save_gpumem, use_cuda)
data_set = {'input': data_set_inp, 'ref': data_set_ref}
data_end_index = {'fea': data_end_index_fea,'lab': data_end_index_lab}
return [data_name,data_set,data_end_index,fea_dict,lab_dict,arch_dict]
Expand Down
13 changes: 12 additions & 1 deletion run_exp.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
from utils import check_cfg,create_lists,create_configs, compute_avg_performance, \
read_args_command_line, run_shell,compute_n_chunks, get_all_archs,cfg_item2sec, \
dump_epoch_results, create_curves,change_lr_cfg,expand_str_ep
from data_io import read_lab_fea_refac01 as read_lab_fea
from shutil import copyfile
from core import read_next_chunk_into_shared_list_with_subprocess, extract_data_from_shared_list, convert_numpy_to_torch
import re
from distutils.util import strtobool
import importlib
Expand Down Expand Up @@ -340,7 +342,16 @@ def _run_forwarding_in_subprocesses(config):

# run chunk processing
if _run_forwarding_in_subprocesses(config):
p = multiprocessing.Process(target=run_nn, kwargs={'data_name': None, 'data_set': None, 'data_end_index': None, 'fea_dict': None, 'lab_dict': None, 'arch_dict': None, 'cfg_file': config_chunk_file, 'processed_first': True, 'next_config_file': None})
shared_list = list()
output_folder = config['exp']['out_folder']
save_gpumem = strtobool(config['exp']['save_gpumem'])
use_cuda=strtobool(config['exp']['use_cuda'])
p = read_next_chunk_into_shared_list_with_subprocess(read_lab_fea, shared_list, config_chunk_file, is_production, output_folder, wait_for_process=True)
data_name, data_end_index_fea, data_end_index_lab, fea_dict, lab_dict, arch_dict, data_set_dict = extract_data_from_shared_list(shared_list)
data_set_inp, data_set_ref = convert_numpy_to_torch(data_set_dict, save_gpumem, use_cuda)
data_set = {'input': data_set_inp, 'ref': data_set_ref}
data_end_index = {'fea': data_end_index_fea,'lab': data_end_index_lab}
p = multiprocessing.Process(target=run_nn, kwargs={'data_name': data_name, 'data_set': data_set, 'data_end_index': data_end_index, 'fea_dict': fea_dict, 'lab_dict': lab_dict, 'arch_dict': arch_dict, 'cfg_file': config_chunk_file, 'processed_first': False, 'next_config_file': None})
processes.append(p)
p.start()
else:
Expand Down

0 comments on commit a70f1dc

Please sign in to comment.