Skip to content
This repository has been archived by the owner on Aug 15, 2024. It is now read-only.

Commit

Permalink
Simplified the code. 'session' mode is not implemented anymore.
Browse files Browse the repository at this point in the history
  • Loading branch information
pbellec committed Sep 4, 2015
1 parent 6a78e26 commit 4e21c2a
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 122 deletions.
2 changes: 1 addition & 1 deletion psom_deamon.m
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@
opt_logs_worker(num_w).exit = sprintf('%spsom%i%sworker.exit',path_worker,num_w,filesep);
opt_worker(num_w) = opt_script;
opt_worker(num_w).name_job = name_worker{num_w};
cmd_worker{num_w} = sprintf('flag.heartbeat = true; flag.spawn = true; psom_worker(''%s'',flag,''%s'',%i);',path_worker_w{num_w},path_logs,num_w);
cmd_worker{num_w} = sprintf('psom_worker(''%s'',''%s'',%i);',path_worker_w{num_w},path_logs,num_w);
if ispc % this is windows
script_worker{num_w} = [path_tmp filesep opt_worker(num_w).name_job '.bat'];
else
Expand Down
163 changes: 42 additions & 121 deletions psom_worker.m
Original file line number Diff line number Diff line change
@@ -1,17 +1,9 @@
function status_pipe = psom_worker(path_worker,flag,path_logs,num_worker)
function status_pipe = psom_worker(path_worker,path_logs,num_worker)
% Execute jobs.
%
% status = psom_worker( path_worker , [flag_heartbeat] )
% status = psom_worker( path_worker , path_logs , num_worker )
%
% PATH_WORKER (string) The name of a path where all logs will be saved.
% FLAG.HEARTBEAT (boolean, default false) if the flag is true, then a new
% subprocess will be started , using matlab or octave, that will generate
% a heartbeat.mat file updated every 5 seconds. This subprocess will
% also detect the presence of a kill file and, if detected, this file will
% kill the worker.
% FLAG.SPAWN (boolean, default false) if FLAG_RESPAWN is true, the pipeline process
% will not stop until PIPE.lock is removed. It will constantly screen for
% new jobs in the form of a .mat file where each variable is a job.
% PATH_LOGS (string)
% NUM_WORKER (integer, default 1)
%
Expand Down Expand Up @@ -57,42 +49,24 @@
path_worker = [path_worker filesep];
end

if nargin < 3
path_logs = path_worker;
end

if nargin < 4
num_worker = 1;
end

if ~strcmp(path_logs(end),filesep)
path_logs = [path_logs filesep];
end

%% Options
if nargin < 2
flag = struct;
if nargin < 3
error('Please specify NUM_WORKER')
end
flag = psom_struct_defaults( flag , ...
{ 'heartbeat' , 'spawn' } , ...
{ false , false });

%% Create folder for worker
if ~psom_exist(path_worker)
psom_mkdir(path_worker);
end

%% Generating file names
file_heartbeat = [path_worker filesep 'heartbeat.mat'];
file_kill = [path_worker filesep 'worker.kill'];
file_news_feed = [path_worker filesep 'news_feed.csv'];
file_pipe = [path_worker filesep 'PIPE_jobs.mat'];
file_lock = [path_logs filesep 'PIPE.lock'];
file_status = [path_logs 'PIPE_status.mat'];
file_status_backup = [path_logs 'PIPE_status_backup.mat'];
file_logs = [path_logs 'PIPE_logs.mat'];
file_logs_backup = [path_logs 'PIPE_logs_backup.mat'];
file_profile = [path_logs 'PIPE_profile.mat'];
file_profile_backup = [path_logs 'PIPE_profile_backup.mat'];

%% Open the news feed file
if strcmp(gb_psom_language,'matlab');
Expand Down Expand Up @@ -121,33 +95,14 @@
end

%% Start a heartbeat
if flag.heartbeat
main_pid = getpid;
cmd = sprintf('psom_heartbeat(''%s'',''%s'',%i)',file_heartbeat,file_kill,main_pid);
if strcmp(gb_psom_language,'octave')
instr_heartbeat = sprintf('"%s" %s "addpath(''%s''), %s,exit"',gb_psom_command_octave,gb_psom_opt_matlab,gb_psom_path_psom,cmd);
else
instr_heartbeat = sprintf('"%s" %s "addpath(''%s''), %s,exit"',gb_psom_command_matlab,gb_psom_opt_matlab,gb_psom_path_psom,cmd);
end
system([instr_heartbeat '&']);
end

%% Create a lock file, if not in spawn mode
if ~flag.spawn
start_time = clock;
save(file_lock,'start_time');
end

%% Load jobs
if psom_exist( file_pipe )
pipeline = load(file_pipe);
list_jobs = fieldnames(pipeline);
flag_session = true;
else
pipeline = struct;
list_jobs = {};
flag_session = false;
end
main_pid = getpid;
cmd = sprintf('psom_heartbeat(''%s'',''%s'',%i)',file_heartbeat,file_kill,main_pid);
if strcmp(gb_psom_language,'octave')
instr_heartbeat = sprintf('"%s" %s "addpath(''%s''), %s,exit"',gb_psom_command_octave,gb_psom_opt_matlab,gb_psom_path_psom,cmd);
else
instr_heartbeat = sprintf('"%s" %s "addpath(''%s''), %s,exit"',gb_psom_command_matlab,gb_psom_opt_matlab,gb_psom_path_psom,cmd);
end
system([instr_heartbeat '&']);

% a try/catch block is used to crash gracefully if the user is
% interrupting the pipeline of if an error occurs
Expand All @@ -157,31 +112,30 @@
num_job = 0;
flag_any_fail = false;
time_scheduled = struct();

list_jobs = {};
pipeline = struct;
while test_loop

%% Check for new spawns
if flag.spawn
list_ready = dir([path_worker '*.ready']);
list_ready = { list_ready.name };
if ~isempty(list_ready)
for num_r = 1:length(list_ready)
[tmp,base_spawn] = fileparts(list_ready{num_r});
file_spawn = [path_worker base_spawn '.mat'];
if ~psom_exist(file_spawn)
error('I could not find %s for spawning',file_spawn)
end
spawn = load(file_spawn);
list_new_jobs = fieldnames(spawn);
%% Add to the news feed
for nn = 1:length(list_new_jobs)
sub_add_line_log(hf_news,sprintf('%s , registered\n',list_new_jobs{nn}));
time_scheduled.(list_new_jobs{nn}) = clock;
end
list_jobs = [ list_jobs ; list_new_jobs ];
pipeline = psom_merge_pipeline(pipeline,spawn);
psom_clean({file_spawn,[path_worker list_ready{num_r}]});
list_ready = dir([path_worker '*.ready']);
list_ready = { list_ready.name };
if ~isempty(list_ready)
for num_r = 1:length(list_ready)
[tmp,base_spawn] = fileparts(list_ready{num_r});
file_spawn = [path_worker base_spawn '.mat'];
if ~psom_exist(file_spawn)
error('I could not find %s for spawning',file_spawn)
end
spawn = load(file_spawn);
list_new_jobs = fieldnames(spawn);
%% Add to the news feed
for nn = 1:length(list_new_jobs)
sub_add_line_log(hf_news,sprintf('%s , registered\n',list_new_jobs{nn}));
time_scheduled.(list_new_jobs{nn}) = clock;
end
list_jobs = [ list_jobs ; list_new_jobs ];
pipeline = psom_merge_pipeline(pipeline,spawn);
psom_clean({file_spawn,[path_worker list_ready{num_r}]});
end
end

Expand Down Expand Up @@ -209,46 +163,18 @@
%% Update profile info
file_prof_job = [path_worker name_job '_profile.mat'];
new_prof = struct();

%% Update logs & profile
if ~flag.spawn
%% Update profile info
tmp = load(file_prof_job);
new_prof.(name_job) = tmp;
new_prof.(name_job).time_scheduled = time_scheduled.(name_job);
new_prof.(name_job).worker = num_worker;

%% Update logs
file_log_job = [path_worker name_job '.log'];
new_logs = struct(name_job,sub_read_txt(file_log_job));
psom_clean(file_log_job);
psom_clean(file_prof_job);

%% Save the status/logs/profile
save(file_logs,'-struct','-append','new_logs');
save(file_logs_backup,'-struct','-append','new_logs');
save(file_status,'-struct','-append','new_status');
save(file_status_backup,'-struct','-append','new_status');
save(file_profile,'-struct','-append','new_prof');
save(file_profile_backup,'-struct','-append','new_prof');
else
new_prof.time_scheduled = time_scheduled.(name_job);
new_prof.worker = num_worker;
save(file_prof_job,'-struct','-append','new_prof');
end
new_prof.time_scheduled = time_scheduled.(name_job);
new_prof.worker = num_worker;
save(file_prof_job,'-struct','-append','new_prof');
end

if flag.spawn
test_loop = psom_exist(file_lock);
if (num_job == length(list_jobs))&&test_loop
if exist('OCTAVE_VERSION','builtin')
[res,msg] = system('sleep 0.1');
else
sleep(0.1);
end
test_loop = psom_exist(file_lock);
if (num_job == length(list_jobs))&&test_loop
if exist('OCTAVE_VERSION','builtin')
[res,msg] = system('sleep 0.1');
else
sleep(0.1);
end
else
test_loop = (num_job<length(list_jobs));
end
end % While there are jobs to do

Expand All @@ -258,11 +184,6 @@
fclose(hf_news);
end

%% Remove the lock
if ~flag.spawn
psom_clean(file_lock)
end

%% Return a 1 status if any job has failed
status_pipe = double(flag_any_fail);

Expand Down

0 comments on commit 4e21c2a

Please sign in to comment.