Skip to content

Commit

Permalink
feat(server): fixing double summary emails per week (#1054)
Browse files Browse the repository at this point in the history
* feat(server task scheduler): sketch out core task scheduler implementation

* feat(server weekly activity digests): add function lock duration to the weekly digest execution

* feat(server scheduled tasks): add scheduled tasks type definition, db schema and migration

* feat(server scheduled tasks): add scheduled tasks repository

* feat(server task scheduler): add task scheduler service implementation

* chore(server deps): add mocha type definitions

* refactor(server scheduled tasks): refactor scheduled tasks migration

* refactor(server scheduled tasks): refactor scheduled task db schema and type definitions

* feat(server scheduled tasks): implement db side lock acquire

* refactor(server scheduled tasks): refactor task scheduler with lock on query mechanism

* test(server scheduled tasks): add tests for scheduled tasks implementation

* refactor(server weekly activity digests): refactor to new task scheduler implementation

* feat(server weekly activity digest): switch to a 1000 seconds trigger period for testing purposes

* fix(server task scheduler): fix not catching lock acquire function errors

Co-authored-by: Gergő Jedlicska <[email protected]>
  • Loading branch information
fabis94 and gjedlicska authored Oct 4, 2022
1 parent 97ac4a3 commit 1351b6b
Show file tree
Hide file tree
Showing 10 changed files with 255 additions and 16 deletions.
26 changes: 16 additions & 10 deletions packages/server/modules/activitystream/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { sendActivityNotifications } from '@/modules/activitystream/services/sum
import { initializeEventListener } from '@/modules/activitystream/services/eventListener'
import { modulesDebug } from '@/modules/shared/utils/logger'
import { publishNotification } from '@/modules/notifications/services/publication'
import { scheduleExecution } from '@/modules/core/services/taskScheduler'

const activitiesDebug = modulesDebug.extend('activities')

Expand All @@ -12,21 +13,26 @@ let scheduledTask: cron.ScheduledTask | null = null
const scheduleWeeklyActivityNotifications = () => {
// just to test stuff
// every 1000 seconds
// const cronExpression = '*/1000 * * * * *'
const cronExpression = '*/1000 * * * * *'
// at 00 minutest, 10 (am) hours, every month, every year,
// every 1st day of the week (monday)
// cheat sheet https://crontab.guru
const cronExpression = '00 13 * * 5'
// const cronExpression = '00 13 * * 5'
// configure the number of days, the activities are scraped for
const numberOfDays = 7
cron.validate(cronExpression)
return cron.schedule(cronExpression, async () => {
activitiesDebug('Sending weekly activity digests notifications.')
const end = new Date()
const start = new Date(end.getTime())
start.setDate(start.getDate() - numberOfDays)
await sendActivityNotifications(start, end, publishNotification)
})
return scheduleExecution(
cronExpression,
'weeklyActivityNotification',
//task should be locked for 10 minutes
async (now: Date) => {
activitiesDebug('Sending weekly activity digests notifications.')
const end = now
const start = new Date(end.getTime())
start.setDate(start.getDate() - numberOfDays)
await sendActivityNotifications(start, end, publishNotification)
},
10 * 60 * 1000
)
}

const activityModule: SpeckleModule = {
Expand Down
5 changes: 5 additions & 0 deletions packages/server/modules/core/dbSchema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -273,4 +273,9 @@ export const Branches = buildTableHelper('branches', [
'updatedAt'
])

export const ScheduledTasks = buildTableHelper('scheduled_tasks', [
'taskName',
'lockExpiresAt'
])

export { knex }
5 changes: 5 additions & 0 deletions packages/server/modules/core/helpers/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,8 @@ export type BranchRecord = {
createdAt: Date
updatedAt: Date
}

export type ScheduledTaskRecord = {
taskName: string
lockExpiresAt: Date
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import { Knex } from 'knex'

const TABLE_NAME = 'scheduled_tasks'

export async function up(knex: Knex): Promise<void> {
await knex.schema.createTable(TABLE_NAME, (table) => {
table.string('taskName').primary()
table.timestamp('lockExpiresAt', { precision: 3, useTz: true }).notNullable()
})
}

export async function down(knex: Knex): Promise<void> {
await knex.schema.dropTableIfExists(TABLE_NAME)
}
15 changes: 15 additions & 0 deletions packages/server/modules/core/repositories/scheduledTasks.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import { ScheduledTasks } from '@/modules/core/dbSchema'
import { ScheduledTaskRecord } from '@/modules/core/helpers/types'

export async function acquireTaskLock(
scheduledTask: ScheduledTaskRecord
): Promise<ScheduledTaskRecord | null> {
const now = new Date()
const [lock] = await ScheduledTasks.knex<ScheduledTaskRecord>()
.insert(scheduledTask)
.onConflict(ScheduledTasks.withoutTablePrefix.col.taskName)
.merge()
.where(ScheduledTasks.col.lockExpiresAt, '<', now)
.returning('*')
return (lock as ScheduledTaskRecord) ?? null
}
71 changes: 71 additions & 0 deletions packages/server/modules/core/services/taskScheduler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import cron from 'node-cron'
import { InvalidArgumentError } from '@/modules/shared/errors'
import { modulesDebug, errorDebug } from '@/modules/shared/utils/logger'
import { ensureError } from '@/modules/shared/helpers/errorHelper'
import { acquireTaskLock } from '@/modules/core/repositories/scheduledTasks'
import { ScheduledTaskRecord } from '@/modules/core/helpers/types'

const activitiesDebug = modulesDebug.extend('activities')

export const scheduledCallbackWrapper = async (
scheduledTime: Date,
taskName: string,
lockTimeout: number,
callback: (scheduledTime: Date) => Promise<void>,
acquireLock: (
scheduledTask: ScheduledTaskRecord
) => Promise<ScheduledTaskRecord | null>
) => {
// try to acquire the task lock with the function name and a new expiration date
const lockExpiresAt = new Date(scheduledTime.getTime() + lockTimeout)
try {
const lock = await acquireLock({ taskName, lockExpiresAt })

// if couldn't acquire it, stop execution
if (!lock) {
activitiesDebug(
`Could not acquire task lock for ${taskName}, stopping execution.`
)
return null
}

// else continue executing the callback...
activitiesDebug(`Executing scheduled function ${taskName} at ${scheduledTime}`)
await callback(scheduledTime)
// update lock as succeeded
const finishDate = new Date()
activitiesDebug(
`Finished scheduled function ${taskName} execution in ${
(finishDate.getTime() - scheduledTime.getTime()) / 1000
} seconds`
)
} catch (error) {
errorDebug(
`The triggered task execution ${taskName} failed at ${scheduledTime}, with error ${
ensureError(error, 'unknown reason').message
}`
)
}
}

export const scheduleExecution = (
cronExpression: string,
taskName: string,
callback: (scheduledTime: Date) => Promise<void>,
lockTimeout = 60 * 1000
): cron.ScheduledTask => {
const expressionValid = cron.validate(cronExpression)
if (!expressionValid)
throw new InvalidArgumentError(
`The given cron expression ${cronExpression} is not valid`
)
return cron.schedule(cronExpression, async (scheduledTime: Date) => {
await scheduledCallbackWrapper(
scheduledTime,
taskName,
lockTimeout,
callback,
acquireTaskLock
)
})
}
122 changes: 122 additions & 0 deletions packages/server/modules/core/tests/scheduledTasks.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import { describe } from 'mocha'
import { ScheduledTasks } from '@/modules/core/dbSchema'
import { truncateTables } from '@/test/hooks'
import { acquireTaskLock } from '@/modules/core/repositories/scheduledTasks'
import { ensureError } from '@/modules/shared/helpers/errorHelper'
import {
scheduledCallbackWrapper,
scheduleExecution
} from '@/modules/core/services/taskScheduler'
import { expect } from 'chai'
import { sleep } from '@/test/helpers'
import cryptoRandomString from 'crypto-random-string'

describe('Scheduled tasks @core', () => {
describe('Task lock repository', () => {
before(async () => {
await truncateTables([ScheduledTasks.name])
})
it('can acquire task lock for a new function name', async () => {
const taskName = cryptoRandomString({ length: 10 })
const scheduledTask = { taskName, lockExpiresAt: new Date() }
const lock = await acquireTaskLock(scheduledTask)
expect(lock).to.be.deep.equal(scheduledTask)
})
it('can acquire task lock if previous lock has expired', async () => {
const taskName = cryptoRandomString({ length: 10 })
const oldTask = { taskName, lockExpiresAt: new Date() }
await acquireTaskLock(oldTask)

await sleep(100)
const newTask = { taskName, lockExpiresAt: new Date() }
const lock = await acquireTaskLock(newTask)
expect(lock).to.be.deep.equal(newTask)
})
it('returns an invalid lock (null), if there is another lock in place', async () => {
const taskName = cryptoRandomString({ length: 10 })
const oldTask = {
taskName,
lockExpiresAt: new Date('2366-12-28 00:30:57.000+00')
}
await acquireTaskLock(oldTask)
const newTask = { taskName, lockExpiresAt: new Date() }
const lock = await acquireTaskLock(newTask)
expect(lock).to.be.null
})
})
describe('Task scheduler', () => {
describe('scheduled callback wrapper function', () => {
let callbackExecuted = false
async function fakeCallback() {
callbackExecuted = true
}
beforeEach(() => {
callbackExecuted = false
})
it("doesn't invoke the callback if it aquires an invalid lock", async () => {
expect(callbackExecuted).to.be.false
const taskName = cryptoRandomString({ length: 10 })
await scheduledCallbackWrapper(
new Date(),
taskName,
100,
fakeCallback,
// fake lock aquire, always returning an invalid lock
async () => null
)
expect(callbackExecuted).to.be.false
})
it('invokes the callback if a task lock is acquired', async () => {
expect(callbackExecuted).to.be.false
const taskName = cryptoRandomString({ length: 10 })
await scheduledCallbackWrapper(
new Date(),
taskName,
100,
fakeCallback,
// fake lock aquire, always returning an invalid lock
async () => ({ taskName, lockExpiresAt: new Date() })
)
expect(callbackExecuted).to.be.true
})
it('handles all callback errors gracefully', async () => {
expect(callbackExecuted).to.be.false
const taskName = cryptoRandomString({ length: 10 })
await scheduledCallbackWrapper(
new Date(),
taskName,
100,
async () => {
callbackExecuted = true
throw 'catch this'
},
// fake lock aquire, always returning an invalid lock
async () => ({ taskName, lockExpiresAt: new Date() })
)
expect(callbackExecuted).to.be.true
})
})
describe('schedule execution', () => {
it('throws an InvalidArgimentError if the cron expression is not valid', async () => {
const cronExpression = 'this is a borked cron expression'
try {
scheduleExecution(cronExpression, 'tick tick boom', async () => {
return
})
throw new Error('this should have ')
} catch (err) {
expect(ensureError(err).message).to.equal(
`The given cron expression ${cronExpression} is not valid`
)
}
})
it('returns a cron scheduled task instance if the config is valid', async () => {
const cronExpression = '*/1000 * * * *'
const task = scheduleExecution(cronExpression, 'tick tick boom', async () => {
return
})
expect(task).to.not.be.null
})
})
})
})
1 change: 1 addition & 0 deletions packages/server/modules/shared/utils/logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ const debug = dbg('speckle')
export const modulesDebug = debug.extend('modules')
export const notificationsDebug = debug.extend('notifications')
export const cliDebug = debug.extend('cli')
export const errorDebug = debug.extend('error')
2 changes: 1 addition & 1 deletion packages/server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@
"@types/express": "^4.17.13",
"@types/lodash": "^4.14.180",
"@types/mjml": "^4.7.0",
"@types/mocha": "^7.0.2",
"@types/mocha": "^10.0.0",
"@types/mock-require": "^2.0.1",
"@types/module-alias": "^2.0.1",
"@types/node-cron": "^3.0.2",
Expand Down
10 changes: 5 additions & 5 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -5574,7 +5574,7 @@ __metadata:
"@types/express": ^4.17.13
"@types/lodash": ^4.14.180
"@types/mjml": ^4.7.0
"@types/mocha": ^7.0.2
"@types/mocha": ^10.0.0
"@types/mock-require": ^2.0.1
"@types/module-alias": ^2.0.1
"@types/node-cron": ^3.0.2
Expand Down Expand Up @@ -6621,10 +6621,10 @@ __metadata:
languageName: node
linkType: hard

"@types/mocha@npm:^7.0.2":
version: 7.0.2
resolution: "@types/mocha@npm:7.0.2"
checksum: 9be72c563976a809caa3594e3ee1fd71748887aaf105b4cfa99c69e882659d3ddbb331d4c1e3b3af9a67c78f3b7b4ab30fe1baa13cb50561dcc386512a75142a
"@types/mocha@npm:^10.0.0":
version: 10.0.0
resolution: "@types/mocha@npm:10.0.0"
checksum: 69e3896a63ec93374e22afd03fdea4c2f31d609d6ea111c8403508ede90da9dc0140c7bb2edff8404114d9d980308e104e4236324bf6afac1410b2cfe35f98c6
languageName: node
linkType: hard

Expand Down

0 comments on commit 1351b6b

Please sign in to comment.