-
Notifications
You must be signed in to change notification settings - Fork 1
/
worker.go
130 lines (122 loc) · 4.87 KB
/
worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package services
import (
"context"
"log"
"math"
"os"
"strconv"
"time"
"github.com/ceramicnetwork/go-cas/models"
)
const defaultAnchorBatchMonitorTick = 30 * time.Second
const defaultMaxAnchorWorkers = 1
const defaultAmortizationFactor = 1.0
type WorkerService struct {
batchMonitor models.QueueMonitor
jobDb models.JobRepository
metricService models.MetricService
monitorTick time.Duration
maxAnchorWorkers int
amortizationFactor float64
anchorJobs map[string]*models.JobState
}
func NewWorkerService(batchMonitor models.QueueMonitor, jobDb models.JobRepository, metricService models.MetricService) *WorkerService {
batchMonitorTick := defaultAnchorBatchMonitorTick
if configBatchMonitorTick, found := os.LookupEnv("ANCHOR_BATCH_MONITOR_TICK"); found {
if parsedBatchMonitorTick, err := time.ParseDuration(configBatchMonitorTick); err == nil {
batchMonitorTick = parsedBatchMonitorTick
}
}
maxAnchorWorkers := defaultMaxAnchorWorkers
if configMaxAnchorWorkers, found := os.LookupEnv("MAX_ANCHOR_WORKERS"); found {
if parsedMaxAnchorWorkers, err := strconv.Atoi(configMaxAnchorWorkers); err == nil {
maxAnchorWorkers = parsedMaxAnchorWorkers
}
}
amortizationFactor := defaultAmortizationFactor
if configAmortizationFactor, found := os.LookupEnv("ANCHOR_WORKER_AMORTIZATION"); found {
if parsedAmortizationFactor, err := strconv.ParseFloat(configAmortizationFactor, 64); err == nil {
amortizationFactor = parsedAmortizationFactor
}
}
return &WorkerService{
batchMonitor,
jobDb,
metricService,
batchMonitorTick,
maxAnchorWorkers,
amortizationFactor,
make(map[string]*models.JobState),
}
}
func (w WorkerService) Run(ctx context.Context) {
log.Printf("worker: started")
tick := time.NewTicker(w.monitorTick)
for {
select {
case <-ctx.Done():
log.Printf("worker: stopped")
return
case <-tick.C:
numJobsCreated, err := w.createJobs(ctx)
log.Printf("worker: created %d jobs, error = %v", numJobsCreated, err)
}
}
}
func (w WorkerService) createJobs(ctx context.Context) (int, error) {
if numJobsRequired, numExistingJobs, err := w.calculateLoad(ctx); err != nil {
return 0, err
} else {
numJobsAllowed := 0
if w.maxAnchorWorkers == -1 {
// We can create as many workers as needed to service unprocessed batches
numJobsAllowed = numJobsRequired
} else if numExistingJobs < w.maxAnchorWorkers {
// We can create workers upto the maximum allowed minus the number of jobs already created
numJobsAllowed = w.maxAnchorWorkers - numExistingJobs
}
amortizedNumJobsAllowed := math.Ceil(float64(numJobsAllowed) * w.amortizationFactor)
numJobsToCreate := int(math.Min(amortizedNumJobsAllowed, float64(numJobsRequired)))
var numJobsCreated int
for numJobsCreated = 0; numJobsCreated < numJobsToCreate; numJobsCreated++ {
if jobId, err := w.jobDb.CreateJob(ctx); err != nil {
break
} else {
w.anchorJobs[jobId] = nil
}
}
log.Printf("worker: numJobsRequired=%d, numExistingJobs=%v, numJobsAllowed=%v, amortizedNumJobsAllowed=%f, numJobsToCreate=%d, numJobsCreated=%d, anchorJobs=%v", numJobsRequired, numExistingJobs, numJobsAllowed, amortizedNumJobsAllowed, numJobsToCreate, numJobsCreated, w.anchorJobs)
w.metricService.Count(ctx, models.MetricName_WorkerJobCreated, numJobsCreated)
return numJobsCreated, err
}
}
func (w WorkerService) calculateLoad(ctx context.Context) (int, int, error) {
// Clean up finished jobs
for jobId, _ := range w.anchorJobs {
if jobState, err := w.jobDb.QueryJob(ctx, jobId); err != nil {
return 0, 0, err
} else if (jobState.Stage == models.JobStage_Completed) || (jobState.Stage == models.JobStage_Failed) {
// Clean out finished jobs - "completed" and "failed" are the only possible terminal stages for anchor jobs.
delete(w.anchorJobs, jobId)
} else {
w.anchorJobs[jobId] = jobState
}
}
// This includes jobs that have been created but not yet started (perhaps due to a deployment in progress). If there
// is a problem starting jobs on the CD manager or anchor worker side, we won't keep creating new jobs unless we
// have newer batches to process.
//
// Alerting on the size of the batch queue backlog will inform us if jobs haven't been making progress while batches
// have continued to be created.
numExistingJobs := len(w.anchorJobs)
if numBatchesUnprocessed, numBatchesInflight, err := w.batchMonitor.GetUtilization(ctx); err != nil {
return 0, 0, err
} else {
// The total number of unprocessed and inflight batches can be used to observe the current load on the system
// and thus to determine how many jobs need to be created to handle this load.
//
// Each anchor worker will process a single batch at a time, even if it continues to poll the queue for more
// batches as it gets done with earlier ones.
return int(math.Max(float64(numBatchesUnprocessed+numBatchesInflight-numExistingJobs), 0)), numExistingJobs, nil
}
}