Skip to content

Commit

Permalink
fix: use new job schema
Browse files Browse the repository at this point in the history
  • Loading branch information
smrz2001 committed Oct 23, 2023
1 parent 06b45ed commit de57db4
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 35 deletions.
4 changes: 2 additions & 2 deletions ci/cmd/deploy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func createJob(ctx context.Context) (string, error) {
// what the CD Manager expects.
if attributeValues, err := attributevalue.MarshalMapWithOptions(newJob, func(options *attributevalue.EncoderOptions) {
options.EncodeTime = func(time time.Time) (types.AttributeValue, error) {
return &types.AttributeValueMemberN{Value: strconv.FormatInt(time.UnixMilli(), 10)}, nil
return &types.AttributeValueMemberN{Value: strconv.FormatInt(time.UnixNano(), 10)}, nil
}
}); err != nil {
return "", err
Expand All @@ -61,6 +61,6 @@ func createJob(ctx context.Context) (string, error) {
return "", err
}
}
return newJob.Id, nil
return newJob.Job, nil
}
}
43 changes: 20 additions & 23 deletions common/aws/ddb/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"github.com/ceramicnetwork/go-cas/models"
)

const idTsIndex = "id-ts-index"
const jobTsIndex = "job-ts-index"

type JobDatabase struct {
ddbClient *dynamodb.Client
Expand All @@ -36,41 +36,42 @@ func NewJobDb(ctx context.Context, logger models.Logger, ddbClient *dynamodb.Cli

func (jdb *JobDatabase) createJobTable(ctx context.Context) error {
createJobTableInput := dynamodb.CreateTableInput{
BillingMode: types.BillingModePayPerRequest,
AttributeDefinitions: []types.AttributeDefinition{
{
AttributeName: aws.String("stage"),
AttributeName: aws.String("id"),
AttributeType: "S",
},
{
AttributeName: aws.String("ts"),
AttributeType: "N",
AttributeName: aws.String("job"),
AttributeType: "S",
},
{
AttributeName: aws.String("id"),
AttributeName: aws.String("stage"),
AttributeType: "S",
},
},
KeySchema: []types.KeySchemaElement{
{
AttributeName: aws.String("stage"),
KeyType: "HASH",
AttributeName: aws.String("type"),
AttributeType: "S",
},
{
AttributeName: aws.String("ts"),
KeyType: "RANGE",
AttributeType: "N",
},
},
TableName: aws.String(jdb.jobTable),
ProvisionedThroughput: &types.ProvisionedThroughput{
ReadCapacityUnits: aws.Int64(1),
WriteCapacityUnits: aws.Int64(1),
KeySchema: []types.KeySchemaElement{
{
AttributeName: aws.String("id"),
KeyType: "HASH",
},
},
TableName: aws.String(jdb.jobTable),
GlobalSecondaryIndexes: []types.GlobalSecondaryIndex{
{
IndexName: aws.String(idTsIndex),
IndexName: aws.String(jobTsIndex),
KeySchema: []types.KeySchemaElement{
{
AttributeName: aws.String("id"),
AttributeName: aws.String("job"),
KeyType: "HASH",
},
{
Expand All @@ -81,10 +82,6 @@ func (jdb *JobDatabase) createJobTable(ctx context.Context) error {
Projection: &types.Projection{
ProjectionType: types.ProjectionTypeAll,
},
ProvisionedThroughput: &types.ProvisionedThroughput{
ReadCapacityUnits: aws.Int64(1),
WriteCapacityUnits: aws.Int64(1),
},
},
},
}
Expand All @@ -106,7 +103,7 @@ func (jdb *JobDatabase) CreateJob(ctx context.Context) (string, error) {
newJob := models.NewJob(models.JobType_Anchor, jobParams)
attributeValues, err := attributevalue.MarshalMapWithOptions(newJob, func(options *attributevalue.EncoderOptions) {
options.EncodeTime = func(time time.Time) (types.AttributeValue, error) {
return &types.AttributeValueMemberN{Value: strconv.FormatInt(time.UnixMilli(), 10)}, nil
return &types.AttributeValueMemberN{Value: strconv.FormatInt(time.UnixNano(), 10)}, nil
}
})
if err != nil {
Expand All @@ -122,15 +119,15 @@ func (jdb *JobDatabase) CreateJob(ctx context.Context) (string, error) {
if err != nil {
return "", err
} else {
return newJob.Id, nil
return newJob.Job, nil
}
}
}

func (jdb *JobDatabase) QueryJob(ctx context.Context, id string) (*models.JobState, error) {
queryInput := dynamodb.QueryInput{
TableName: aws.String(jdb.jobTable),
IndexName: aws.String(idTsIndex),
IndexName: aws.String(jobTsIndex),
KeyConditionExpression: aws.String("#id = :id"),
ExpressionAttributeNames: map[string]string{"#id": "id"},
ExpressionAttributeValues: map[string]types.AttributeValue{":id": &types.AttributeValueMemberS{Value: id}},
Expand Down
4 changes: 2 additions & 2 deletions common/aws/ddb/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func (sdb *StateDatabase) StoreCid(ctx context.Context, streamCid *models.Stream
func (sdb *StateDatabase) UpdateTip(ctx context.Context, newTip *models.StreamTip) (bool, *models.StreamTip, error) {
if attributeValues, err := attributevalue.MarshalMapWithOptions(newTip, func(options *attributevalue.EncoderOptions) {
options.EncodeTime = func(time time.Time) (types.AttributeValue, error) {
return &types.AttributeValueMemberN{Value: strconv.FormatInt(time.UnixMilli(), 10)}, nil
return &types.AttributeValueMemberN{Value: strconv.FormatInt(time.UnixNano(), 10)}, nil
}
}); err != nil {
return false, nil, err
Expand All @@ -238,7 +238,7 @@ func (sdb *StateDatabase) UpdateTip(ctx context.Context, newTip *models.StreamTi
ConditionExpression: aws.String("attribute_not_exists(#id) OR (#ts <= :ts)"),
ExpressionAttributeNames: map[string]string{"#id": "id", "#ts": "ts"},
ExpressionAttributeValues: map[string]types.AttributeValue{
":ts": &types.AttributeValueMemberN{Value: strconv.FormatInt(newTip.Timestamp.UnixMilli(), 10)},
":ts": &types.AttributeValueMemberN{Value: strconv.FormatInt(newTip.Timestamp.UnixNano(), 10)},
},
Item: attributeValues,
ReturnValues: types.ReturnValueAllOld,
Expand Down
4 changes: 2 additions & 2 deletions common/aws/ddb/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ func tableExists(ctx context.Context, logger models.Logger, client *dynamodb.Cli
}

func tsDecode(ts string) (time.Time, error) {
msec, err := strconv.ParseInt(ts, 10, 64)
nsec, err := strconv.ParseInt(ts, 10, 64)
if err != nil {
return time.Time{}, err
}
return time.UnixMilli(msec), nil
return time.Unix(0, nsec), nil
}
13 changes: 9 additions & 4 deletions models/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

const DeployComponent = "casv5"
const WorkerVersion = "5"
const DefaultJobTtl = 2 * 7 * 24 * time.Hour

type JobType string

Expand Down Expand Up @@ -42,19 +43,23 @@ const (
)

type JobState struct {
Job string `dynamodbav:"job"`
Stage JobStage `dynamodbav:"stage"`
Ts time.Time `dynamodbav:"ts"`
Id string `dynamodbav:"id"`
Type JobType `dynamodbav:"type"`
Ts time.Time `dynamodbav:"ts"`
Params map[string]interface{} `dynamodbav:"params"`
Id string `dynamodbav:"id" json:"-"`
Ttl time.Time `dynamodbav:"ttl,unixtime" json:"-"`
}

func NewJob(jobType JobType, params map[string]interface{}) JobState {
return JobState{
Job: uuid.New().String(),
Stage: JobStage_Queued,
Ts: time.Now(),
Id: uuid.New().String(),
Type: jobType,
Ts: time.Now(),
Params: params,
Id: uuid.New().String(),
Ttl: time.Now().Add(DefaultJobTtl),
}
}
4 changes: 2 additions & 2 deletions services/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ func (m *MockJobRepository) CreateJob(_ context.Context) (string, error) {
return "", fmt.Errorf("failed to create job")
}
newJob := models.NewJob(models.JobType_Anchor, nil)
m.jobStore[newJob.Id] = &newJob
return newJob.Id, nil
m.jobStore[newJob.Job] = &newJob
return newJob.Job, nil
}

func (m *MockJobRepository) QueryJob(_ context.Context, id string) (*models.JobState, error) {
Expand Down

0 comments on commit de57db4

Please sign in to comment.