Skip to content

Commit

Permalink
cluster: implement stopcursor
Browse files Browse the repository at this point in the history
  • Loading branch information
peterbourgon committed Aug 7, 2014
1 parent 8cc3075 commit b8da21f
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 16 deletions.
35 changes: 28 additions & 7 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type Inserter interface {
// Selecter defines the methods to retrieve elements from a sorted set.
type Selecter interface {
SelectOffset(keys []string, offset, limit int) <-chan Element
SelectCursor(keys []string, cursor common.Cursor, limit int) <-chan Element
SelectCursor(keys []string, cursor, stopcursor common.Cursor, limit int) <-chan Element
}

// Deleter defines the method to delete elements from a sorted set. A key-
Expand Down Expand Up @@ -190,9 +190,9 @@ func (c *cluster) SelectOffset(keys []string, offset, limit int) <-chan Element

// SelectCursor uses ZREVRANGEBYSCORE to do a cursor-based select, similar to
// SelectOffset.
func (c *cluster) SelectCursor(keys []string, cursor common.Cursor, limit int) <-chan Element {
func (c *cluster) SelectCursor(keys []string, cursor, stopcursor common.Cursor, limit int) <-chan Element {
return c.selectCommon(keys, func(conn redis.Conn, myKeys []string) (map[string][]common.KeyScoreMember, error) {
return pipelineRangeByScore(conn, myKeys, cursor, limit)
return pipelineRangeByScore(conn, myKeys, cursor, stopcursor, limit)
})
}

Expand Down Expand Up @@ -500,13 +500,16 @@ func pipelineRange(conn redis.Conn, keys []string, offset, limit int) (map[strin
return m, nil
}

func pipelineRangeByScore(conn redis.Conn, keys []string, cursor common.Cursor, limit int) (map[string][]common.KeyScoreMember, error) {
func pipelineRangeByScore(conn redis.Conn, keys []string, cursor, stopcursor common.Cursor, limit int) (map[string][]common.KeyScoreMember, error) {
if limit < 0 {
// TODO maybe change that
return map[string][]common.KeyScoreMember{}, fmt.Errorf("negative limit is invalid for cursor-based select")
}

pastCursor := func(score float64, member []byte) bool {
// pastCursor returns true when the score+member are "past" the cursor
// (smaller score, larger lexicographically) and can therefore be included
// in the resultset.
pastCursor := func(score float64, member string) bool {
if score < cursor.Score {
return true
}
Expand All @@ -516,6 +519,19 @@ func pipelineRangeByScore(conn redis.Conn, keys []string, cursor common.Cursor,
return false
}

// beforeStop returns true as long as the score+member are "before" the
// stopcursor (larger score, smaller lexicographically) and can therefore
// be included in the resultset.
beforeStop := func(score float64, member string) bool {
if score > stopcursor.Score {
return true
}
if score == stopcursor.Score && bytes.Compare([]byte(member), []byte(stopcursor.Member)) > 0 {
return true
}
return false
}

// An unlimited number of members may exist at cursor.Score. Luckily,
// they're in lexicographically stable order. Walk the elements we get
// back. For as long as element.Score == cursor.Score, and a
Expand Down Expand Up @@ -560,6 +576,7 @@ func pipelineRangeByScore(conn redis.Conn, keys []string, cursor common.Cursor,
var (
collected = 0
validated = make([]common.KeyScoreMember, 0, len(values))
hitStop = false
)

for len(values) > 0 {
Expand All @@ -571,9 +588,13 @@ func pipelineRangeByScore(conn redis.Conn, keys []string, cursor common.Cursor,

collected++

if !pastCursor(score, []byte(member)) {
if !pastCursor(score, member) {
continue // this element is still behind or at our cursor
}
if !beforeStop(score, member) {
hitStop = true
continue // this element is beyond our stop point
}

validated = append(validated, common.KeyScoreMember{Key: key, Score: score, Member: member})
}
Expand All @@ -584,7 +605,7 @@ func pipelineRangeByScore(conn redis.Conn, keys []string, cursor common.Cursor,
haveEnoughElements = len(validated) >= limit
exhaustedElements = collected < selectLimit
)
if haveEnoughElements || exhaustedElements {
if haveEnoughElements || exhaustedElements || hitStop {
if len(validated) > limit {
validated = validated[:limit]
}
Expand Down
53 changes: 44 additions & 9 deletions cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ func TestSelectCursor(t *testing.T) {
}

// Middle of the list, a real element cursor.
ch := c.SelectCursor([]string{"foo"}, common.Cursor{Score: 45.4, Member: "gamma"}, 100)
ch := c.SelectCursor([]string{"foo"}, common.Cursor{Score: 45.4, Member: "gamma"}, common.Cursor{}, 100)
expected := []common.KeyScoreMember{
{"foo", 35.9, "nu"},
{"foo", 34.8, "omicron"},
Expand All @@ -393,7 +393,7 @@ func TestSelectCursor(t *testing.T) {
}

// Top of the list.
ch = c.SelectCursor([]string{"foo"}, common.Cursor{Score: math.MaxFloat64, Member: ""}, 100)
ch = c.SelectCursor([]string{"foo"}, common.Cursor{Score: math.MaxFloat64}, common.Cursor{}, 100)
expected = []common.KeyScoreMember{
{"foo", 99.2, "beta"},
{"foo", 76.6, "iota"},
Expand All @@ -417,7 +417,7 @@ func TestSelectCursor(t *testing.T) {
}

// Restricted limit.
ch = c.SelectCursor([]string{"foo"}, common.Cursor{Score: 50.1, Member: "alpha"}, 3)
ch = c.SelectCursor([]string{"foo"}, common.Cursor{Score: 50.1, Member: "alpha"}, common.Cursor{}, 3)
expected = []common.KeyScoreMember{
{"foo", 45.4, "gamma"},
{"foo", 35.9, "nu"},
Expand All @@ -435,7 +435,7 @@ func TestSelectCursor(t *testing.T) {
}

// Multiple keys, top of the list, all elements.
ch = c.SelectCursor([]string{"bar", "foo"}, common.Cursor{Score: math.MaxFloat64, Member: ""}, 100)
ch = c.SelectCursor([]string{"bar", "foo"}, common.Cursor{Score: math.MaxFloat64, Member: ""}, common.Cursor{}, 100)
m := map[string][]common.KeyScoreMember{}
for e := range ch {
if e.Error != nil {
Expand Down Expand Up @@ -468,7 +468,7 @@ func TestSelectCursor(t *testing.T) {
}

// Multiple keys, middle of the list, all elements.
ch = c.SelectCursor([]string{"bar", "foo"}, common.Cursor{Score: 66.6, Member: "rho"}, 100)
ch = c.SelectCursor([]string{"bar", "foo"}, common.Cursor{Score: 66.6, Member: "rho"}, common.Cursor{}, 100)
m = map[string][]common.KeyScoreMember{}
for e := range ch {
if e.Error != nil {
Expand Down Expand Up @@ -497,7 +497,7 @@ func TestSelectCursor(t *testing.T) {
}

// Multiple keys, middle of the list, limited elements.
ch = c.SelectCursor([]string{"bar", "foo"}, common.Cursor{Score: 66.6, Member: "rho"}, 1)
ch = c.SelectCursor([]string{"bar", "foo"}, common.Cursor{Score: 66.6, Member: "rho"}, common.Cursor{}, 1)
m = map[string][]common.KeyScoreMember{}
for e := range ch {
if e.Error != nil {
Expand All @@ -518,6 +518,41 @@ func TestSelectCursor(t *testing.T) {
continue
}
}

// Top of the list, using the stopcursor.
ch = c.SelectCursor([]string{"foo"}, common.Cursor{Score: math.MaxFloat64}, common.Cursor{Score: 45.4, Member: "gamma"}, 100)
expected = []common.KeyScoreMember{
{"foo", 99.2, "beta"},
{"foo", 76.6, "iota"},
{"foo", 50.1, "alpha"},
}
e = <-ch
if e.Error != nil {
t.Fatalf("key %q: %s", e.Key, e.Error)
}
if got := e.KeyScoreMembers; !reflect.DeepEqual(expected, got) {
t.Fatalf("key %q: expected \n\t%+v, got \n\t%+v", e.Key, expected, got)
}
if _, ok := <-ch; ok {
t.Fatalf("key %q: expected 1 element on the channel, got multiple")
}

// Middle of the list, using the stopcursor.
ch = c.SelectCursor([]string{"foo"}, common.Cursor{Score: 35.9, Member: "nu"}, common.Cursor{Score: 21.5, Member: "kappa"}, 100)
expected = []common.KeyScoreMember{
{"foo", 34.8, "omicron"},
{"foo", 33.7, "sigma"},
}
e = <-ch
if e.Error != nil {
t.Fatalf("key %q: %s", e.Key, e.Error)
}
if got := e.KeyScoreMembers; !reflect.DeepEqual(expected, got) {
t.Fatalf("key %q: expected \n\t%+v, got \n\t%+v", e.Key, expected, got)
}
if _, ok := <-ch; ok {
t.Fatalf("key %q: expected 1 element on the channel, got multiple")
}
}

func TestCursorRetries(t *testing.T) {
Expand Down Expand Up @@ -555,23 +590,23 @@ func TestCursorRetries(t *testing.T) {
// a low limit. A hard-coded, low maxRetries means this will fail. Note
// that this is testing the specific implementation: not a great unit
// test.
element := <-c.SelectCursor([]string{"foo"}, common.Cursor{Score: 1.23, Member: "bbb"}, 2)
element := <-c.SelectCursor([]string{"foo"}, common.Cursor{Score: 1.23, Member: "bbb"}, common.Cursor{}, 2)
if element.Error == nil {
t.Error("expected error, got none")
} else {
t.Logf("got expected error (%s)", element.Error)
}

// If we choose a higher limit, it should work.
element = <-c.SelectCursor([]string{"foo"}, common.Cursor{Score: 1.23, Member: "bbb"}, 5)
element = <-c.SelectCursor([]string{"foo"}, common.Cursor{Score: 1.23, Member: "bbb"}, common.Cursor{}, 5)
if element.Error != nil {
t.Errorf("got unexpected error: %s", element.Error)
} else {
t.Logf("OK: %v", element.KeyScoreMembers)
}

// If we choose a slightly earlier cursor, it should also work.
element = <-c.SelectCursor([]string{"foo"}, common.Cursor{Score: 1.23, Member: "hhh"}, 2)
element = <-c.SelectCursor([]string{"foo"}, common.Cursor{Score: 1.23, Member: "hhh"}, common.Cursor{}, 2)
if element.Error != nil {
t.Errorf("got unexpected error: %s", element.Error)
} else {
Expand Down

0 comments on commit b8da21f

Please sign in to comment.