Skip to content
This repository has been archived by the owner on Aug 15, 2024. It is now read-only.

Commit

Permalink
* the garbage collector can now be resurected by the deamon, and resu…
Browse files Browse the repository at this point in the history
…m its activity seamlessly.

* the garbage collector is also invoked once at initialization. So if the garbage collector was killed at the end of the pipeline and the deamon did not have time to resurect it, simply re-running the pipeline will fix the logs.
Fixes #53
  • Loading branch information
pbellec committed Sep 15, 2015
1 parent 6e798aa commit 5e70394
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 22 deletions.
9 changes: 7 additions & 2 deletions psom_deamon.m
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@
opt_logs_garb.exit = [path_garbage 'garbage.exit'];
opt_garb = opt_script;
opt_garb.name_job = 'psom_garbage';
cmd_garb = sprintf('opt.time_pipeline = ''%s''; opt.max_queued = %i; opt.time_between_checks = %1.2f; opt.nb_checks_per_point = %i; psom_garbage(''%s'',opt);',time_pipeline,opt.max_queued,opt.time_between_checks,opt.nb_checks_per_point,path_logs);
cmd_garb = sprintf('opt.time_pipeline = ''%s''; opt.time_between_checks = %1.2f; opt.nb_checks_per_point = %i; psom_garbage(''%s'',opt);',time_pipeline,opt.time_between_checks,opt.nb_checks_per_point,path_logs);
if ispc % this is windows
script_garb = [path_tmp filesep 'psom_garbage.bat'];
else
Expand Down Expand Up @@ -346,7 +346,12 @@
if opt.flag_verbose >= 3
fprintf('No heartbeat in %1.2fs for process %s\n',elapsed_time,name_worker{num_w})
end
if (elapsed_time > time_death)&&~psom_exist(file_worker_end{num_w})
if num_w<=opt.max_queued
flag_worker_end = psom_exist(file_worker_end{num_w});
else
flag_worker_end = false;
end
if (elapsed_time > time_death)&&~flag_worker_end
if opt.flag_verbose
fprintf('No heartbeat for process %s, counted as dead.\n',name_worker{num_w});
end
Expand Down
22 changes: 14 additions & 8 deletions psom_garbage.m
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
% OPT.MAX_QUEUED
% OPT.TIME_PIPELINE
% OPT.FLAG_VERBOSE (integer 0, 1 or 2, default 1) No verbose (0), verbose (1).
%
% OPT.FLAG_INIT
% See licensing information in the code.

% Copyright (c) Pierre Bellec, Montreal Neurological Institute, 2008-2010.
Expand Down Expand Up @@ -53,8 +53,8 @@
opt = struct;
end
opt = psom_struct_defaults( opt , ...
{ 'flag_verbose' , 'time_pipeline' , 'time_between_checks' , 'nb_checks_per_point' , 'max_queued' }, ...
{ 1 , NaN , NaN , NaN , NaN });
{ 'flag_init' , 'flag_verbose' , 'time_pipeline' , 'time_between_checks' , 'nb_checks_per_point' }, ...
{ false , 1 , '' , 0.1 , 100 });

%% Logs folder
if ~strcmp(path_logs(end),filesep)
Expand All @@ -73,15 +73,18 @@
file_heartbeat = [path_garbage 'heartbeat.mat'];
file_kill = [path_garbage 'garbage.kill'];
file_time = [path_logs 'PIPE_time.mat'];
file_conf = [path_logs 'PIPE_config.mat'];
path_worker = [path_logs 'worker' filesep];
file_news = [path_logs 'news_feed.csv'];

if opt.max_queued == 0
%% Load the pipeline configuration
cfg = load(file_conf);
if cfg.max_queued == 0
% This pipeline is executed in session mode
% log files are located in the main pipeline directory
path_search{1} = path_logs;
else
for num_w = 1:opt.max_queued
for num_w = 1:cfg.max_queued
path_search{num_w} = sprintf('%spsom%i%s',path_worker,num_w,filesep);
end
end
Expand All @@ -90,7 +93,7 @@
%% This check is done to ensure a new pipeline has not been started
%% since the manager was started
logs_time = load(file_time);
if ~strcmp(opt.time_pipeline,logs_time.time_pipeline)
if ~isempty(opt.time_pipeline)&&~strcmp(opt.time_pipeline,logs_time.time_pipeline)
fprintf('The time of the pipeline does not match the logs. I am quitting.')
exit
end
Expand Down Expand Up @@ -123,6 +126,7 @@
nb_char_news = 0;
nb_checks = 0;
news = [];

while ~flag_exit

%% Read the news
Expand Down Expand Up @@ -178,7 +182,9 @@
end
if ~flag_found&&~mask_warning(list_todo(num_t))
mask_warning(list_todo(num_t)) = true;
warning('Could not find logs for job %s',name_job)
if opt.flag_verbose
fprintf('\nCould not find logs for job %s',name_job)
end
end
end

Expand Down Expand Up @@ -213,7 +219,7 @@
end

%% Test if it is time to quit
flag_exit = flag_started&&flag_last_run;
flag_exit = (flag_started&&flag_last_run)||opt.flag_init;
if flag_run
flag_last_run = ~psom_exist(file_pipe_running);
flag_started = true;
Expand Down
17 changes: 7 additions & 10 deletions psom_pipeline_init.m
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,13 @@
%% Stage 2: Load previous pipeline description, logs and status %%
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

%% update the logs
try
opt_g.flag_init = true;
opt_g.flag_verbose = opt.flag_verbose >= 2;
psom_garbage(path_logs,opt_g);
end

%% Test for the existence of an old pipeline
flag_old_pipeline = psom_exist(file_jobs);

Expand Down Expand Up @@ -432,16 +439,6 @@
list_num_finished = list_num_inq(mask_finished);
list_num_finished = list_num_finished(:)';
for num_j = list_num_finished
name_job = list_jobs{num_j};
text_log = sub_read_txt([path_logs filesep name_job '.log']);
text_qsub_o = sub_read_txt([path_logs filesep name_job '.oqsub']);
text_qsub_e = sub_read_txt([path_logs filesep name_job '.eqsub']);

if ~isempty(text_qsub_o)&&isempty(text_qsub_e)
text_log = [text_log hat_qsub_o text_qsub_o hat_qsub_e text_qsub_e];
end

all_logs.(name_job) = text_log;
job_status{num_j} = 'finished';
end
job_status_old = job_status;
Expand Down
9 changes: 7 additions & 2 deletions psom_run_pipeline.m
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@
% on that. Contact your local system administrator for more info.
%
% MAX_BUFFER
% (integer, default 1)
% (integer, default 2)
%
% NB_RESUB
% (integer, default 0 in 'session', opt.max_queued otherwise)
Expand Down Expand Up @@ -304,7 +304,7 @@

opt = psom_struct_defaults( opt , ...
{'max_buffer' , 'flag_spawn' , 'flag_fail' , 'flag_short_job_names' , 'nb_resub' , 'type_restart' , 'flag_pause' , 'init_matlab' , 'flag_update' , 'path_search' , 'restart' , 'shell_options' , 'path_logs' , 'command_matlab' , 'flag_verbose' , 'mode' , 'mode_pipeline_manager' , 'max_queued' , 'qsub_options' , 'time_between_checks' , 'nb_checks_per_point' , 'time_cool_down' }, ...
{1 , false , false , true , gb_psom_nb_resub , 'substring' , false , gb_psom_init_matlab , true , gb_psom_path_search , {} , gb_psom_shell_options , NaN , '' , 1 , gb_psom_mode , gb_psom_mode_pm , gb_psom_max_queued , gb_psom_qsub_options , [] , [] , [] });
{2 , false , false , true , gb_psom_nb_resub , 'substring' , false , gb_psom_init_matlab , true , gb_psom_path_search , {} , gb_psom_shell_options , NaN , '' , 1 , gb_psom_mode , gb_psom_mode_pm , gb_psom_max_queued , gb_psom_qsub_options , [] , [] , [] });

opt.flag_debug = opt.flag_verbose>1;

Expand Down Expand Up @@ -424,9 +424,14 @@
if ~flag_start
return
end

%% Save the configuration
file_config = [opt.path_logs 'PIPE_config.mat'];
save(file_config,'-struct','opt')

%% Run the pipeline manager
file_pipeline = cat(2,opt.path_logs,filesep,name_pipeline,'.mat');

%% Create a folder for the PSOM deamon
path_deamon = [opt.path_logs 'deamon' filesep];
if psom_exist(path_deamon)
Expand Down

0 comments on commit 5e70394

Please sign in to comment.