From de57db4f1cdd998bc33444fd49b275905c285631 Mon Sep 17 00:00:00 2001 From: Mohsin Zaidi <2236875+smrz2001@users.noreply.github.com> Date: Mon, 23 Oct 2023 16:12:08 -0400 Subject: [PATCH] fix: use new job schema --- ci/cmd/deploy/main.go | 4 ++-- common/aws/ddb/job.go | 43 +++++++++++++++++++--------------------- common/aws/ddb/state.go | 4 ++-- common/aws/ddb/utils.go | 4 ++-- models/job.go | 13 ++++++++---- services/test_helpers.go | 4 ++-- 6 files changed, 37 insertions(+), 35 deletions(-) diff --git a/ci/cmd/deploy/main.go b/ci/cmd/deploy/main.go index 976b78d..c3988eb 100644 --- a/ci/cmd/deploy/main.go +++ b/ci/cmd/deploy/main.go @@ -38,7 +38,7 @@ func createJob(ctx context.Context) (string, error) { // what the CD Manager expects. if attributeValues, err := attributevalue.MarshalMapWithOptions(newJob, func(options *attributevalue.EncoderOptions) { options.EncodeTime = func(time time.Time) (types.AttributeValue, error) { - return &types.AttributeValueMemberN{Value: strconv.FormatInt(time.UnixMilli(), 10)}, nil + return &types.AttributeValueMemberN{Value: strconv.FormatInt(time.UnixNano(), 10)}, nil } }); err != nil { return "", err @@ -61,6 +61,6 @@ func createJob(ctx context.Context) (string, error) { return "", err } } - return newJob.Id, nil + return newJob.Job, nil } } diff --git a/common/aws/ddb/job.go b/common/aws/ddb/job.go index 4191d8e..60631ef 100644 --- a/common/aws/ddb/job.go +++ b/common/aws/ddb/job.go @@ -17,7 +17,7 @@ import ( "github.com/ceramicnetwork/go-cas/models" ) -const idTsIndex = "id-ts-index" +const jobTsIndex = "job-ts-index" type JobDatabase struct { ddbClient *dynamodb.Client @@ -36,41 +36,42 @@ func NewJobDb(ctx context.Context, logger models.Logger, ddbClient *dynamodb.Cli func (jdb *JobDatabase) createJobTable(ctx context.Context) error { createJobTableInput := dynamodb.CreateTableInput{ + BillingMode: types.BillingModePayPerRequest, AttributeDefinitions: []types.AttributeDefinition{ { - AttributeName: aws.String("stage"), + AttributeName: aws.String("id"), AttributeType: "S", }, { - AttributeName: aws.String("ts"), - AttributeType: "N", + AttributeName: aws.String("job"), + AttributeType: "S", }, { - AttributeName: aws.String("id"), + AttributeName: aws.String("stage"), AttributeType: "S", }, - }, - KeySchema: []types.KeySchemaElement{ { - AttributeName: aws.String("stage"), - KeyType: "HASH", + AttributeName: aws.String("type"), + AttributeType: "S", }, { AttributeName: aws.String("ts"), - KeyType: "RANGE", + AttributeType: "N", }, }, - TableName: aws.String(jdb.jobTable), - ProvisionedThroughput: &types.ProvisionedThroughput{ - ReadCapacityUnits: aws.Int64(1), - WriteCapacityUnits: aws.Int64(1), + KeySchema: []types.KeySchemaElement{ + { + AttributeName: aws.String("id"), + KeyType: "HASH", + }, }, + TableName: aws.String(jdb.jobTable), GlobalSecondaryIndexes: []types.GlobalSecondaryIndex{ { - IndexName: aws.String(idTsIndex), + IndexName: aws.String(jobTsIndex), KeySchema: []types.KeySchemaElement{ { - AttributeName: aws.String("id"), + AttributeName: aws.String("job"), KeyType: "HASH", }, { @@ -81,10 +82,6 @@ func (jdb *JobDatabase) createJobTable(ctx context.Context) error { Projection: &types.Projection{ ProjectionType: types.ProjectionTypeAll, }, - ProvisionedThroughput: &types.ProvisionedThroughput{ - ReadCapacityUnits: aws.Int64(1), - WriteCapacityUnits: aws.Int64(1), - }, }, }, } @@ -106,7 +103,7 @@ func (jdb *JobDatabase) CreateJob(ctx context.Context) (string, error) { newJob := models.NewJob(models.JobType_Anchor, jobParams) attributeValues, err := attributevalue.MarshalMapWithOptions(newJob, func(options *attributevalue.EncoderOptions) { options.EncodeTime = func(time time.Time) (types.AttributeValue, error) { - return &types.AttributeValueMemberN{Value: strconv.FormatInt(time.UnixMilli(), 10)}, nil + return &types.AttributeValueMemberN{Value: strconv.FormatInt(time.UnixNano(), 10)}, nil } }) if err != nil { @@ -122,7 +119,7 @@ func (jdb *JobDatabase) CreateJob(ctx context.Context) (string, error) { if err != nil { return "", err } else { - return newJob.Id, nil + return newJob.Job, nil } } } @@ -130,7 +127,7 @@ func (jdb *JobDatabase) CreateJob(ctx context.Context) (string, error) { func (jdb *JobDatabase) QueryJob(ctx context.Context, id string) (*models.JobState, error) { queryInput := dynamodb.QueryInput{ TableName: aws.String(jdb.jobTable), - IndexName: aws.String(idTsIndex), + IndexName: aws.String(jobTsIndex), KeyConditionExpression: aws.String("#id = :id"), ExpressionAttributeNames: map[string]string{"#id": "id"}, ExpressionAttributeValues: map[string]types.AttributeValue{":id": &types.AttributeValueMemberS{Value: id}}, diff --git a/common/aws/ddb/state.go b/common/aws/ddb/state.go index cb34d80..b1d2205 100644 --- a/common/aws/ddb/state.go +++ b/common/aws/ddb/state.go @@ -227,7 +227,7 @@ func (sdb *StateDatabase) StoreCid(ctx context.Context, streamCid *models.Stream func (sdb *StateDatabase) UpdateTip(ctx context.Context, newTip *models.StreamTip) (bool, *models.StreamTip, error) { if attributeValues, err := attributevalue.MarshalMapWithOptions(newTip, func(options *attributevalue.EncoderOptions) { options.EncodeTime = func(time time.Time) (types.AttributeValue, error) { - return &types.AttributeValueMemberN{Value: strconv.FormatInt(time.UnixMilli(), 10)}, nil + return &types.AttributeValueMemberN{Value: strconv.FormatInt(time.UnixNano(), 10)}, nil } }); err != nil { return false, nil, err @@ -238,7 +238,7 @@ func (sdb *StateDatabase) UpdateTip(ctx context.Context, newTip *models.StreamTi ConditionExpression: aws.String("attribute_not_exists(#id) OR (#ts <= :ts)"), ExpressionAttributeNames: map[string]string{"#id": "id", "#ts": "ts"}, ExpressionAttributeValues: map[string]types.AttributeValue{ - ":ts": &types.AttributeValueMemberN{Value: strconv.FormatInt(newTip.Timestamp.UnixMilli(), 10)}, + ":ts": &types.AttributeValueMemberN{Value: strconv.FormatInt(newTip.Timestamp.UnixNano(), 10)}, }, Item: attributeValues, ReturnValues: types.ReturnValueAllOld, diff --git a/common/aws/ddb/utils.go b/common/aws/ddb/utils.go index dacda9f..62deb55 100644 --- a/common/aws/ddb/utils.go +++ b/common/aws/ddb/utils.go @@ -48,9 +48,9 @@ func tableExists(ctx context.Context, logger models.Logger, client *dynamodb.Cli } func tsDecode(ts string) (time.Time, error) { - msec, err := strconv.ParseInt(ts, 10, 64) + nsec, err := strconv.ParseInt(ts, 10, 64) if err != nil { return time.Time{}, err } - return time.UnixMilli(msec), nil + return time.Unix(0, nsec), nil } diff --git a/models/job.go b/models/job.go index eefa3de..4c9ba37 100644 --- a/models/job.go +++ b/models/job.go @@ -8,6 +8,7 @@ import ( const DeployComponent = "casv5" const WorkerVersion = "5" +const DefaultJobTtl = 2 * 7 * 24 * time.Hour type JobType string @@ -42,19 +43,23 @@ const ( ) type JobState struct { + Job string `dynamodbav:"job"` Stage JobStage `dynamodbav:"stage"` - Ts time.Time `dynamodbav:"ts"` - Id string `dynamodbav:"id"` Type JobType `dynamodbav:"type"` + Ts time.Time `dynamodbav:"ts"` Params map[string]interface{} `dynamodbav:"params"` + Id string `dynamodbav:"id" json:"-"` + Ttl time.Time `dynamodbav:"ttl,unixtime" json:"-"` } func NewJob(jobType JobType, params map[string]interface{}) JobState { return JobState{ + Job: uuid.New().String(), Stage: JobStage_Queued, - Ts: time.Now(), - Id: uuid.New().String(), Type: jobType, + Ts: time.Now(), Params: params, + Id: uuid.New().String(), + Ttl: time.Now().Add(DefaultJobTtl), } } diff --git a/services/test_helpers.go b/services/test_helpers.go index 734f2c9..bb678c2 100644 --- a/services/test_helpers.go +++ b/services/test_helpers.go @@ -111,8 +111,8 @@ func (m *MockJobRepository) CreateJob(_ context.Context) (string, error) { return "", fmt.Errorf("failed to create job") } newJob := models.NewJob(models.JobType_Anchor, nil) - m.jobStore[newJob.Id] = &newJob - return newJob.Id, nil + m.jobStore[newJob.Job] = &newJob + return newJob.Job, nil } func (m *MockJobRepository) QueryJob(_ context.Context, id string) (*models.JobState, error) {