Skip to content
This repository has been archived by the owner on Aug 15, 2024. It is now read-only.

Commit

Permalink
opt.mode_pipeline_manager now supports 'session', as long as opt.mode…
Browse files Browse the repository at this point in the history
… is not session.
  • Loading branch information
pbellec committed Jul 7, 2015
1 parent 16b7a99 commit e853f88
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 44 deletions.
23 changes: 17 additions & 6 deletions psom_deamon.m
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
% 'bsub' : remote execution using bsub (IBM)
% 'condor' : remote execution using condor
%
% MODE_PIPELINE_MANAGER
% (string) same as OPT.MODE, but applies to the pipeline manager itself.
%
% MAX_QUEUED
% (integer) The maximum number of jobs that can be processed
% simultaneously. Some qsub systems actually put restrictions
Expand Down Expand Up @@ -121,8 +124,8 @@
opt = struct;
end
opt = psom_struct_defaults( opt , ...
{ 'nb_resub' , 'flag_verbose' , 'init_matlab' , 'shell_options' , 'command_matlab' , 'mode' , 'max_queued' , 'max_buffer' , 'qsub_options' , 'time_between_checks' , 'nb_checks_per_point' }, ...
{ NaN , 1 , NaN , NaN , NaN , NaN , NaN , NaN , NaN , NaN , NaN });
{ 'mode_pipeline_manager' , 'nb_resub' , 'flag_verbose' , 'init_matlab' , 'shell_options' , 'command_matlab' , 'mode' , 'max_queued' , 'max_buffer' , 'qsub_options' , 'time_between_checks' , 'nb_checks_per_point' }, ...
{ NaN , NaN , 1 , NaN , NaN , NaN , NaN , NaN , NaN , NaN , NaN , NaN });

if ~strcmp(path_logs(end),filesep)
path_logs = [path_logs filesep];
Expand Down Expand Up @@ -172,6 +175,7 @@
nb_resub = 0; % Number of resubmission
nb_checks = 0; % Number of checks to print a points
nb_points = 0; % Number of printed points
nb_chars_logs = 0; % Number of characters printed from the pipeline history
flag_pipe_running = false; % Is the pipeline started?
flag_pipe_finished = false; % Is the pipeline finished?
flag_started = false([opt.max_queued+2 1]); % Has the worker ever started? two last entries are for the PM and the GC
Expand Down Expand Up @@ -236,12 +240,12 @@
opt_logs_worker(num_w).failed = sprintf('%spsom%i%sworker.failed',path_worker,num_w,filesep);
opt_logs_worker(num_w).exit = sprintf('%spsom%i%sworker.exit',path_worker,num_w,filesep);
opt_worker(num_w) = opt_script;
opt_worker(num_w).name_job = sprintf('psom%i',num_w);
opt_worker(num_w).name_job = name_worker{num_w};
cmd_worker{num_w} = sprintf('flag.heartbeat = true; flag.spawn = true; psom_worker(''%s'',flag);',path_worker_w{num_w});
if ispc % this is windows
script_worker{num_w} = [path_tmp filesep opt_worker.name_job '.bat'];
script_worker{num_w} = [path_tmp filesep opt_worker(num_w).name_job '.bat'];
else
script_worker{num_w} = [path_tmp filesep opt_worker.name_job '.sh'];
script_worker{num_w} = [path_tmp filesep opt_worker(num_w).name_job '.sh'];
end
end

Expand Down Expand Up @@ -363,7 +367,14 @@
end
end
end
sub_sleep(opt.time_between_checks*10)
if ~strcmp(opt.mode_pipeline_manager,'session')
sub_sleep(opt.time_between_checks*10)
else
if opt.flag_verbose
nb_chars_logs = psom_pipeline_visu(path_logs,'monitor',nb_chars_logs);
end
sub_sleep(opt.time_between_checks)
end
flag_pipe_finished = ~psom_exist(file_pipe_running);
end

Expand Down
2 changes: 1 addition & 1 deletion psom_pipeline_init.m
Original file line number Diff line number Diff line change
Expand Up @@ -951,7 +951,7 @@ function sub_save_struct_fields(file_name,var_struct,flag_append)
pipeline_str = load(file_name);
catch
[path_f,name_f,ext_f] = fileparts(file_name);
file_backup = [path_f name_f '_backup' ext_f];
file_backup = [path_f filesep name_f '_backup' ext_f];
warning('There was something wrong when loading the file %s, I''ll try loading the backup instead',file_name)
pipeline_str = load(file_backup);
copyfile(file_backup,file_name,'f');
Expand Down
36 changes: 22 additions & 14 deletions psom_pipeline_visu.m
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,9 @@
%
% Copyright (c) Pierre Bellec,
% Montreal Neurological Institute, 2008-2010
% Département d'informatique et de recherche opérationnelle
% Centre de recherche de l'institut de Gériatrie de Montréal
% Université de Montréal, 2011
% Dpartement d'informatique et de recherche oprationnelle
% Centre de recherche de l'institut de Griatrie de Montral
% Universit de Montral, 2011
% Maintainer : [email protected]
% See licensing information in the code.
% Keywords : pipeline
Expand Down Expand Up @@ -236,24 +236,22 @@
file_monitor = [path_logs filesep name_pipeline '_history.txt'];
file_pipe_running = [path_logs filesep name_pipeline '.lock'];

if exist(file_pipe_running,'file')
msg = 'The pipeline is currently running';
else
msg = 'The pipeline is NOT currently running';
if ~psom_exist(file_pipe_running)
fprintf('The pipeline is NOT currently running\n');
end

stars = repmat('*',size(msg));
fprintf('\n\n%s\n%s\n%s\n\n',stars,msg,stars);


while ~psom_exist(file_monitor) && psom_exist(file_pipe_running) % the pipeline started but the log file has not yet been created

fprintf('I could not find any log file. This pipeline has not been started (yet?). Press CTRL-C to cancel.\n');
pause(1)

end

sub_tail(file_monitor,file_pipe_running,opt_action);
res = [];
if nargin<3
res = sub_tail(file_monitor,file_pipe_running,opt_action);
else
res = sub_read_update(file_monitor,opt_action);
end

case 'time'

Expand Down Expand Up @@ -440,7 +438,17 @@
%% sub-functions %%
%%%%%%%%%%%%%%%%%%%

function [] = sub_tail(file_read,file_running,nb_chars)
function nb_chars = sub_read_update(file_read,nb_chars)

% prints out update on FILE_READ
hf = fopen(file_read,'r');
fseek(hf,nb_chars,'bof');
str_read = fread(hf, Inf , 'uint8=>char')';
nb_chars = ftell(hf);
fclose(hf);
fprintf('%s',str_read);

function nb_chars = sub_tail(file_read,file_running)

% prints out the content of the text file FILE_READ with constant updates
% as long as the file FILE_RUNNING exists.
Expand Down
9 changes: 6 additions & 3 deletions psom_run_pipeline.m
Original file line number Diff line number Diff line change
Expand Up @@ -394,8 +394,7 @@

%% Run the pipeline manager
file_pipeline = cat(2,opt.path_logs,filesep,name_pipeline,'.mat');
if strcmp(opt.mode_pipeline_manager,'session')
else
if 0==0

%% Create a folder for the PSOM deamon
path_deamon = [opt.path_logs 'deamon' filesep];
Expand All @@ -406,6 +405,7 @@

%% The options for the deamon
opt_d.mode = opt.mode;
opt_d.mode_pipeline_manager = opt.mode_pipeline_manager;
opt_d.max_queued = opt.max_queued;
opt_d.max_buffer = opt.max_buffer;
opt_d.nb_resub = opt.nb_resub;
Expand Down Expand Up @@ -446,5 +446,8 @@

%% If not in session mode, monitor the output of the pipeline
if opt.flag_verbose&&~strcmp(opt.mode_pipeline_manager,'session')
psom_pipeline_visu(opt.path_logs,'monitor',0);
nb_chars = 0;
while (nb_chars==0)||psom_exist(file_pipe_running)
nb_chars = psom_pipeline_visu(opt.path_logs,'monitor',nb_chars);
end
end
35 changes: 15 additions & 20 deletions psom_run_script.m
Original file line number Diff line number Diff line change
Expand Up @@ -301,26 +301,16 @@

case 'session'

try
if ~isempty(logs)
diary(logs.txt);
sub_eval(cmd);
diary off;
else
sub_eval(cmd);
end
flag_failed = false;
msg = '';
catch
flag_failed = true;
errmsg = lasterror;
msg = errmsg.message;
if isfield(errmsg,'stack')
for num_e = 1:length(errmsg.stack)
msg = sprintf('%s\nFile %s at line %i\n',msg,errmsg.stack(num_e).file,errmsg.stack(num_e).line);
end
end
if ~isempty(logs)
diary(logs.txt);
sub_eval(cmd);
diary off;
else
sub_eval(cmd);
end
flag_failed = false;
msg = '';

if ~isempty(logs.exit)
save(logs.exit,'flag_failed')
end
Expand Down Expand Up @@ -418,7 +408,12 @@
end

if (flag_failed~=0)&&exist('errmsg','var')
fprintf('\n The execution of the job %s failed.\n The feedback was : %s\n',opt.name_job,errmsg);
fprintf('\n The execution of the job %s failed.\n The feedback was:\n',opt.name_job);
if isfield(errmsg,'stack')
for num_e = 1:length(errmsg.stack)
fprintf('File %s at line %i\n',errmsg.stack(num_e).file,errmsg.stack(num_e).line);
end
end
elseif (flag_failed==0)&&exist('errmsg','var')&&opt.flag_debug
fprintf('\n The feedback from the execution of job %s was : %s\n',opt.name_job,errmsg);
end
Expand Down

0 comments on commit e853f88

Please sign in to comment.