Skip to content

Commit

Permalink
Inmem DB locks (#60)
Browse files Browse the repository at this point in the history
  • Loading branch information
dgruber committed Apr 2, 2023
1 parent c228743 commit 0cbf24d
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 2 deletions.
7 changes: 5 additions & 2 deletions jobsession_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,13 +283,16 @@ var _ = Describe("JobSession", func() {
})

It("should be possible to terminate a job array (bulk job)", func() {

amount := 10

jt := drmaa2interface.JobTemplate{
RemoteCommand: "/bin/sh",
Args: []string{"-c", "sleep 100"},
JobCategory: "busybox:latest",
}

arrayjob, err := js.RunBulkJobs(jt, 1, 10, 1, 2)
arrayjob, err := js.RunBulkJobs(jt, 1, amount, 1, 2)
Ω(err).Should(BeNil())

jobid := arrayjob.GetID()
Expand All @@ -299,7 +302,7 @@ var _ = Describe("JobSession", func() {
Ω(err).Should(BeNil())

tasks := arrayjob.GetJobs()
Expect(len(tasks)).Should(Equal(10))
Expect(len(tasks)).Should(Equal(amount))

for _, j := range tasks {
err = j.WaitTerminated(time.Second * 12)
Expand Down
28 changes: 28 additions & 0 deletions pkg/jobtracker/simpletracker/jobstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"strconv"
"strings"
"sync"

"github.com/dgruber/drmaa2interface"
)
Expand All @@ -13,6 +14,7 @@ import (
// processed by the job tracker. Jobs are stored until Reap().
// Locking must be done externally.
type JobStore struct {
sync.Mutex
// jobids contains all known jobs in the system until they are reaped (Reap())
// these are jobs, not array jobs and can be in format "1.1" or "1"
jobids []string
Expand All @@ -38,6 +40,8 @@ func NewJobStore() *JobStore {
// SaveJob stores a job, the job submission template, and the process PID of
// the job in an internal job store.
func (js *JobStore) SaveJob(jobid string, t drmaa2interface.JobTemplate, pid int) {
js.Lock()
defer js.Unlock()
js.isArrayJob[jobid] = false
js.templates[jobid] = t
js.jobids = append(js.jobids, jobid)
Expand All @@ -48,6 +52,8 @@ func (js *JobStore) SaveJob(jobid string, t drmaa2interface.JobTemplate, pid int

// HasJob returns true if the job is saved in the job store.
func (js *JobStore) HasJob(jobid string) bool {
js.Lock()
defer js.Unlock()
_, exists := js.templates[jobid]
if !exists {
for i := range js.jobids {
Expand All @@ -60,6 +66,8 @@ func (js *JobStore) HasJob(jobid string) bool {
}

func (js *JobStore) IsArrayJob(jobid string) bool {
js.Lock()
defer js.Unlock()
if isArray, ok := js.isArrayJob[jobid]; ok && isArray {
return true
}
Expand All @@ -70,6 +78,8 @@ func (js *JobStore) IsArrayJob(jobid string) bool {
// The jobid can be the identifier of a job or a job array. In case
// of a job array it removes all tasks which belong to the array job.
func (js *JobStore) RemoveJob(jobid string) {
js.Lock()
defer js.Unlock()
isAJ, exits := js.isArrayJob[jobid]
if isAJ && exits {
jobids := make([]string, 0, len(js.jobids))
Expand Down Expand Up @@ -97,6 +107,8 @@ func (js *JobStore) RemoveJob(jobid string) {
// SaveArrayJob stores all process IDs of the tasks of an array job.
func (js *JobStore) SaveArrayJob(arrayjobid string, pids []int,
t drmaa2interface.JobTemplate, begin, end, step int) {
js.Lock()
defer js.Unlock()
pid := 0
js.templates[arrayjobid] = t
js.isArrayJob[arrayjobid] = true
Expand All @@ -118,6 +130,8 @@ func (js *JobStore) SaveArrayJob(arrayjobid string, pids []int,
// SaveArrayJobPID stores the current PID of main process of the
// job array task.
func (js *JobStore) SaveArrayJobPID(arrayjobid string, taskid, pid int) error {
js.Lock()
defer js.Unlock()
job, exists := js.jobs[arrayjobid]
if !exists {
return errors.New("array job does not exist")
Expand All @@ -135,6 +149,8 @@ func (js *JobStore) SaveArrayJobPID(arrayjobid string, taskid, pid int) error {
// GetPID returns the PID of a job or an array job task.
// It returns -1 and an error if the job is not known.
func (js *JobStore) GetPID(jobid string) (int, error) {
js.Lock()
defer js.Unlock()
jobelements := strings.Split(jobid, ".")
job, exists := js.jobs[jobelements[0]]
if !exists {
Expand Down Expand Up @@ -164,13 +180,17 @@ func (js *JobStore) GetPID(jobid string) (int, error) {

// GetJobIDs returns the IDs of all jobs.
func (js *JobStore) GetJobIDs() []string {
js.Lock()
defer js.Unlock()
tmp := make([]string, len(js.jobids))
copy(tmp, js.jobids)
return tmp
}

// GetArrayJobTaskIDs returns the IDs of all tasks of a job array.
func (js *JobStore) GetArrayJobTaskIDs(arrayjobID string) []string {
js.Lock()
defer js.Unlock()
jobids := make([]string, 0, len(js.jobs[arrayjobID]))
for _, job := range js.jobs[arrayjobID] {
jobids = append(jobids, fmt.Sprintf("%s.%d", arrayjobID, job.TaskID))
Expand All @@ -179,10 +199,14 @@ func (js *JobStore) GetArrayJobTaskIDs(arrayjobID string) []string {
}

func (js *JobStore) NewJobID() string {
js.Lock()
defer js.Unlock()
return GetNextJobID()
}

func (js *JobStore) GetJobTemplate(jobID string) (drmaa2interface.JobTemplate, error) {
js.Lock()
defer js.Unlock()
jt, found := js.templates[jobID]
if found == false {
return jt, fmt.Errorf("job template for job %s not found", jobID)
Expand All @@ -191,11 +215,15 @@ func (js *JobStore) GetJobTemplate(jobID string) (drmaa2interface.JobTemplate, e
}

func (js *JobStore) SaveJobInfo(jobid string, jobInfo drmaa2interface.JobInfo) error {
js.Lock()
defer js.Unlock()
js.jobinfo[jobid] = jobInfo
return nil
}

func (js *JobStore) GetJobInfo(jobid string) (drmaa2interface.JobInfo, error) {
js.Lock()
defer js.Unlock()
jobinfo, exists := js.jobinfo[jobid]
if !exists {
return drmaa2interface.JobInfo{},
Expand Down

0 comments on commit 0cbf24d

Please sign in to comment.