Skip to content
This repository has been archived by the owner on Sep 9, 2024. It is now read-only.

Add ability to have sentinel values for Clustering Columns #71

Merged
merged 4 commits into from
Sep 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ func (s *scanner) iterSlice(iter Scannable) (int, error) {
if err != nil {
return rowsScanned, err
}
removeSentinelValues(ptrs)
fillInZeroedPtrs(ptrs)

sliceElem.Set(reflect.Append(sliceElem, wrapPtrValue(outVal, sliceElemType)))
Expand Down Expand Up @@ -125,6 +126,7 @@ func (s *scanner) iterSingle(iter Scannable) (int, error) {
if err != nil {
return 0, err
}
removeSentinelValues(ptrs)
fillInZeroedPtrs(ptrs)

s.rowsScanned++
Expand Down Expand Up @@ -205,7 +207,21 @@ func fillInZeroedPtrs(ptrs []interface{}) {
elem.Set(reflect.MakeSlice(elem.Type(), 0, 0))
}
}
}
}

// removeSentinelValues removes any clustering sentinel values from being
// exposed as data is scanned
func removeSentinelValues(ptrs []interface{}) {
for _, ptr := range ptrs {
if _, ok := ptr.(*IgnoreFieldType); ok {
continue
}

elem := reflect.ValueOf(ptr).Elem()
if isSentinel, nonSentinelValue := isClusteringSentinelValue(elem.Interface()); isSentinel {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to what degree (if any) should we be thinking about potential performance regressions by adding an extra step of reflection/value comparison for every field pointer now, do you think?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can benchmark this using the existing benchmarks

Before:

goos: darwin
goarch: amd64
pkg: github.com/monzo/gocassa
cpu: Intel(R) Core(TM) i7-8559U CPU @ 2.70GHz
BenchmarkDecodeBlogSliceNoBody
BenchmarkDecodeBlogSliceNoBody-8       	   29456	     34640 ns/op
BenchmarkDecodeBlogStruct
BenchmarkDecodeBlogStruct-8            	  578539	      1841 ns/op
BenchmarkDecodeBlogStructEmptyBody
BenchmarkDecodeBlogStructEmptyBody-8   	  649267	      1840 ns/op
BenchmarkDecodeAlphaSlice
BenchmarkDecodeAlphaSlice-8            	   10000	    101197 ns/op
BenchmarkDecodeAlphaStruct
BenchmarkDecodeAlphaStruct-8           	  236494	      4842 ns/op
BenchmarkStatementMapTable
BenchmarkStatementMapTable-8           	  412954	      2773 ns/op	    3664 B/op	      33 allocs/op
PASS
ok  	github.com/monzo/gocassa	7.574s

After:

goos: darwin
goarch: amd64
pkg: github.com/monzo/gocassa
cpu: Intel(R) Core(TM) i7-8559U CPU @ 2.70GHz
BenchmarkDecodeBlogSliceNoBody
BenchmarkDecodeBlogSliceNoBody-8       	   31040	     39350 ns/op
BenchmarkDecodeBlogStruct
BenchmarkDecodeBlogStruct-8            	  538323	      2429 ns/op
BenchmarkDecodeBlogStructEmptyBody
BenchmarkDecodeBlogStructEmptyBody-8   	  524773	      2916 ns/op
BenchmarkDecodeAlphaSlice
BenchmarkDecodeAlphaSlice-8            	    8732	    133401 ns/op
BenchmarkDecodeAlphaStruct
BenchmarkDecodeAlphaStruct-8           	  181543	      6072 ns/op
BenchmarkStatementMapTable
BenchmarkStatementMapTable-8           	  432660	      2886 ns/op	    3728 B/op	      33 allocs/op
PASS
ok  	github.com/monzo/gocassa	9.648s

I think we can probably optimise this codepath a bit more (the before is quite optimised since #68)

elem.Set(reflect.ValueOf(nonSentinelValue))
}
}
}

Expand Down
69 changes: 69 additions & 0 deletions scanner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

type Account struct {
Expand Down Expand Up @@ -254,6 +255,56 @@ func TestScanIterEmbedded(t *testing.T) {
iter.Reset()
}

func TestScanWithSentinelValues(t *testing.T) {
type accountStruct struct {
ID string
Name string
Metadata []byte
}

t.Run("SliceValues", func(t *testing.T) {
results := []map[string]interface{}{
{"id": "acc_abcd1", "name": ClusteringSentinel, "metadata": []byte{}},
{"id": "acc_abcd2", "name": "Jane", "metadata": []byte(ClusteringSentinel)},
}

fieldNames := []string{"id", "name", "metadata"}
stmt := SelectStatement{keyspace: "test", table: "bench", fields: fieldNames}
iter := newMockIterator(results, stmt.fields)

rows := []*accountStruct{}
rowsRead, err := NewScanner(stmt, &rows).ScanIter(iter)
require.NoError(t, err)
require.Equal(t, 2, rowsRead)

assert.Equal(t, "acc_abcd1", rows[0].ID)
assert.Equal(t, "", rows[0].Name)
assert.Equal(t, []byte{}, rows[0].Metadata)
assert.Equal(t, "acc_abcd2", rows[1].ID)
assert.Equal(t, "Jane", rows[1].Name)
assert.Equal(t, []byte{}, rows[1].Metadata)
})

t.Run("StructValues", func(t *testing.T) {
results := []map[string]interface{}{
{"id": "acc_abcd1", "name": ClusteringSentinel, "metadata": []byte{}},
}

fieldNames := []string{"id", "name", "metadata"}
stmt := SelectStatement{keyspace: "test", table: "bench", fields: fieldNames}
iter := newMockIterator(results, stmt.fields)

row := &accountStruct{}
rowsRead, err := NewScanner(stmt, row).ScanIter(iter)
require.NoError(t, err)
require.Equal(t, 1, rowsRead)

assert.Equal(t, "acc_abcd1", row.ID)
assert.Equal(t, "", row.Name)
assert.Equal(t, []byte{}, row.Metadata)
})
}

func TestFillInZeroedPtrs(t *testing.T) {
str := ""
strSlice := []string{}
Expand All @@ -275,6 +326,24 @@ func TestFillInZeroedPtrs(t *testing.T) {
assert.Equal(t, map[string]string{}, strMapNil)
}

func TestRemoveSentinelValues(t *testing.T) {
str := ""
byteSlice := []byte{}
intVal := 0

removeSentinelValues([]interface{}{&str, &byteSlice, &intVal})
assert.Equal(t, "", str)
assert.Equal(t, []byte{}, byteSlice)
assert.Equal(t, 0, intVal)

str = ClusteringSentinel
byteSlice = []byte(ClusteringSentinel)
removeSentinelValues([]interface{}{&str, &byteSlice, &intVal})
assert.Equal(t, "", str)
assert.Equal(t, []byte{}, byteSlice)
assert.Equal(t, 0, intVal)
}

func TestAllocateNilReference(t *testing.T) {
// Test non pointer, should do nothing
var a string
Expand Down
158 changes: 128 additions & 30 deletions statement.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,32 @@
package gocassa

import (
"bytes"
"fmt"
"reflect"
"strings"
"time"
)

var (
// ClusteringSentinel represents a placeholder value to be used for cases
// where a value needs to be present (ie: a stand-in representing a
// clustering key that is empty)
ClusteringSentinel = "<gocassa.ClusteringSentinel>"
)

// SelectStatement represents a read (SELECT) query for some data in C*
// It satisfies the Statement interface
type SelectStatement struct {
keyspace string // name of the keyspace
table string // name of the table
fields []string // list of fields we want to select
where []Relation // where filter clauses
order []ClusteringOrderColumn // order by clauses
limit int // limit count, 0 means no limit
allowFiltering bool // whether we should allow filtering
keys Keys // partition / clustering keys for table
keyspace string // name of the keyspace
table string // name of the table
fields []string // list of fields we want to select
where []Relation // where filter clauses
order []ClusteringOrderColumn // order by clauses
limit int // limit count, 0 means no limit
allowFiltering bool // whether we should allow filtering
keys Keys // partition / clustering keys for table
clusteringSentinelsEnabled bool // whether we should enable our clustering sentinel
}

// NewSelectStatement adds the ability to craft a new SelectStatement
Expand Down Expand Up @@ -64,7 +74,7 @@ func (s SelectStatement) QueryAndValues() (string, []interface{}) {
fmt.Sprintf("FROM %s.%s", s.Keyspace(), s.Table()),
}

whereCQL, whereValues := generateWhereCQL(s.Relations())
whereCQL, whereValues := generateWhereCQL(s.Relations(), s.Keys(), s.clusteringSentinelsEnabled)
if whereCQL != "" {
query = append(query, "WHERE", whereCQL)
values = append(values, whereValues...)
Expand Down Expand Up @@ -161,14 +171,22 @@ func (s SelectStatement) Keys() Keys {
return s.keys
}

// WithClusteringSentinel allows you to specify whether the use of the
// clustering sentinel value is enabled
func (s SelectStatement) WithClusteringSentinel(enabled bool) SelectStatement {
s.clusteringSentinelsEnabled = enabled
return s
}

// InsertStatement represents an INSERT query to write some data in C*
// It satisfies the Statement interface
type InsertStatement struct {
keyspace string // name of the keyspace
table string // name of the table
fieldMap map[string]interface{} // fields to be inserted
ttl time.Duration // ttl of the row
keys Keys // partition / clustering keys for table
keyspace string // name of the keyspace
table string // name of the table
fieldMap map[string]interface{} // fields to be inserted
ttl time.Duration // ttl of the row
keys Keys // partition / clustering keys for table
allowClusterSentinel bool // whether we should enable our clustering sentinel
}

// NewInsertStatement adds the ability to craft a new InsertStatement
Expand Down Expand Up @@ -217,7 +235,11 @@ func (s InsertStatement) QueryAndValues() (string, []interface{}) {
for _, field := range sortedKeys(fieldMap) {
fieldNames = append(fieldNames, strings.ToLower(field))
placeholders = append(placeholders, "?")
values = append(values, fieldMap[field])
if isClusteringKeyField(field, s.keys) && s.allowClusterSentinel {
values = append(values, clusteringFieldOrSentinel(fieldMap[field]))
} else {
values = append(values, fieldMap[field])
}
}

query = append(query, "("+strings.Join(fieldNames, ", ")+")")
Expand Down Expand Up @@ -272,15 +294,23 @@ func (s InsertStatement) Keys() Keys {
return s.keys
}

// WithClusteringSentinel allows you to specify whether the use of the
// clustering sentinel value is enabled
func (s InsertStatement) WithClusteringSentinel(enabled bool) InsertStatement {
s.allowClusterSentinel = enabled
return s
}

// UpdateStatement represents an UPDATE query to update some data in C*
// It satisfies the Statement interface
type UpdateStatement struct {
keyspace string // name of the keyspace
table string // name of the table
fieldMap map[string]interface{} // fields to be updated
where []Relation // where filter clauses
ttl time.Duration // ttl of the row
keys Keys // partition / clustering keys for table
keyspace string // name of the keyspace
table string // name of the table
fieldMap map[string]interface{} // fields to be updated
where []Relation // where filter clauses
ttl time.Duration // ttl of the row
keys Keys // partition / clustering keys for table
allowClusterSentinel bool // whether we should enable our clustering sentinel
}

// NewUpdateStatement adds the ability to craft a new UpdateStatement
Expand Down Expand Up @@ -338,7 +368,7 @@ func (s UpdateStatement) QueryAndValues() (string, []interface{}) {
query = append(query, "SET", setCQL)
values = append(values, setValues...)

whereCQL, whereValues := generateWhereCQL(s.Relations())
whereCQL, whereValues := generateWhereCQL(s.Relations(), s.Keys(), s.allowClusterSentinel)
if whereCQL != "" {
query = append(query, "WHERE", whereCQL)
values = append(values, whereValues...)
Expand Down Expand Up @@ -392,13 +422,21 @@ func (s UpdateStatement) Keys() Keys {
return s.keys
}

// WithClusteringSentinel allows you to specify whether the use of the
// clustering sentinel value is enabled
func (s UpdateStatement) WithClusteringSentinel(enabled bool) UpdateStatement {
s.allowClusterSentinel = enabled
return s
}

// DeleteStatement represents a DELETE query to delete some data in C*
// It satisfies the Statement interface
type DeleteStatement struct {
keyspace string // name of the keyspace
table string // name of the table
where []Relation // where filter clauses
keys Keys // partition / clustering keys for table
keyspace string // name of the keyspace
table string // name of the table
where []Relation // where filter clauses
keys Keys // partition / clustering keys for table
allowClusterSentinel bool // whether we should enable our clustering sentinel
}

// NewDeleteStatement adds the ability to craft a new DeleteStatement
Expand Down Expand Up @@ -439,7 +477,7 @@ func (s DeleteStatement) Values() []interface{} {
// QueryAndValues returns the CQL query and any bind values
func (s DeleteStatement) QueryAndValues() (string, []interface{}) {
query := fmt.Sprintf("DELETE FROM %s.%s", s.Keyspace(), s.Table())
whereCQL, whereValues := generateWhereCQL(s.Relations())
whereCQL, whereValues := generateWhereCQL(s.Relations(), s.Keys(), s.allowClusterSentinel)
if whereCQL != "" {
query += " WHERE " + whereCQL
}
Expand Down Expand Up @@ -467,6 +505,13 @@ func (s DeleteStatement) Keys() Keys {
return s.keys
}

// WithClusteringSentinel allows you to specify whether the use of the
// clustering sentinel value is enabled
suhailpatel marked this conversation as resolved.
Show resolved Hide resolved
func (s DeleteStatement) WithClusteringSentinel(enabled bool) DeleteStatement {
s.allowClusterSentinel = enabled
return s
}

// cqlStatement represents a statement that executes raw CQL
type cqlStatement struct {
query string
Expand Down Expand Up @@ -509,20 +554,23 @@ func generateUpdateSetCQL(fm map[string]interface{}) (string, []interface{}) {
// a WHERE clause. An expected output may be something like:
// - "foo = ?", {1}
// - "foo = ? AND bar IN ?", {1, {"a", "b", "c"}}
func generateWhereCQL(rs []Relation) (string, []interface{}) {
func generateWhereCQL(rs []Relation, keys Keys, clusteringSentinelsEnabled bool) (string, []interface{}) {
clauses, values := make([]string, 0, len(rs)), make([]interface{}, 0, len(rs))
for _, relation := range rs {
clause, bindValue := generateRelationCQL(relation)
clause, bindValue := generateRelationCQL(relation, keys, clusteringSentinelsEnabled)
clauses = append(clauses, clause)
values = append(values, bindValue)
}
return strings.Join(clauses, " AND "), values
}

func generateRelationCQL(rel Relation) (string, interface{}) {
func generateRelationCQL(rel Relation, keys Keys, clusteringSentinelsEnabled bool) (string, interface{}) {
field := strings.ToLower(rel.Field())
switch rel.Comparator() {
case CmpEquality:
if isClusteringKeyField(rel.Field(), keys) && clusteringSentinelsEnabled {
return field + " = ?", clusteringFieldOrSentinel(rel.Terms()[0])
}
return field + " = ?", rel.Terms()[0]
case CmpIn:
return field + " IN ?", rel.Terms()
Expand Down Expand Up @@ -552,3 +600,53 @@ func generateOrderByCQL(order []ClusteringOrderColumn) string {
}
return strings.Join(out, ", ")
}

// isClusteringKeyField determines whether the relation makes up the
// clustering key of the statement
func isClusteringKeyField(field string, keys Keys) bool {
for _, key := range keys.ClusteringColumns {
if strings.ToLower(key) == strings.ToLower(field) {
return true
}
}
return false
}

// clusteringFieldOrSentinel will check if we should substitute in our
// sentinel value for empty clustering fields
func clusteringFieldOrSentinel(term interface{}) interface{} {
switch v := term.(type) {
case string:
if len(v) == 0 {
return ClusteringSentinel
}
return v
case []byte:
if len(v) == 0 {
return []byte(ClusteringSentinel)
}
return v
default:
return term
}
}

// isClusteringSentinelValue returns a boolean on whether the value passed in
// is the clustering sentinel value and what the non-sentinel value is
func isClusteringSentinelValue(term interface{}) (bool, interface{}) {
val := reflect.ValueOf(term)
switch {
case val.Kind() == reflect.String:
if val.String() == ClusteringSentinel {
return true, reflect.New(val.Type()).Elem().Interface()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a scenario where reflect.New(val.Type()).Elem().Interface() will not just be "" here? could we just return that?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I initially had that but we need to consider if you have a type alias for which that doesn't work

}
return false, term
case val.Kind() == reflect.Slice && val.Type().Elem().Kind() == reflect.Uint8:
if bytes.Equal(val.Bytes(), []byte(ClusteringSentinel)) {
return true, reflect.MakeSlice(val.Type(), 0, 0).Interface()
}
return false, term
default:
return false, term
}
}
Loading