Skip to content

Commit

Permalink
checkpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
kocubinski committed Aug 31, 2023
1 parent bc1d567 commit f25f645
Show file tree
Hide file tree
Showing 13 changed files with 312 additions and 92 deletions.
86 changes: 58 additions & 28 deletions core/core.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package core

import (
"bytes"
"context"
"crypto/sha256"
"fmt"
"os"
"time"

"github.com/dustin/go-humanize"
api "github.com/kocubinski/costor-api"
"github.com/kocubinski/costor-api/compact"
"github.com/prometheus/client_golang/prometheus"
"github.com/rs/zerolog"
Expand All @@ -18,46 +21,66 @@ type TreeContext struct {
Log zerolog.Logger
IndexDir string
LogDir string
Generator ChangesetGenerator
VersionLimit int64
MetricLeafCount prometheus.Counter
MetricTreeSize prometheus.Gauge
MetricsTreeHeight prometheus.Gauge
HashLog *os.File
}

type Tree interface {
Set(key, value []byte) (bool, error)
Get(key []byte) ([]byte, error)
Remove(key []byte) ([]byte, bool, error)
SaveVersion() ([]byte, int64, error)
Size() int64
Height() int8
type NodeIterator interface {
Valid() bool
Next() error
GetNode() *api.Node
}

type kvChange struct {
store string
key []byte
value []byte
delete bool
}

func (c *TreeContext) BuildLegacyIAVL(tree Tree) error {
func (c *TreeContext) BuildLegacyIAVL(multiTree *MultiTree) error {
cnt := 1
since := time.Now()
lastVersion := int64(1)
var (
itr NodeIterator
err error
changes []kvChange
)

stream := &compact.StreamingContext{}
itr, err := stream.NewIterator(c.LogDir)
var changes []kvChange
if err != nil {
return err
if c.LogDir != "" {
stream := &compact.StreamingContext{}
itr, err = stream.NewIterator(c.LogDir)
if err != nil {
return err
}
} else {
//itr, err = NewChangesetIterators(c.Generators)
//if err != nil {
// return err
//}
itr, err = c.Generator.Iterator()
if err != nil {
return err
}
}

for ; itr.Valid(); err = itr.Next() {
if err != nil {
return err
}
n := itr.Node
n := itr.GetNode()

sk := n.StoreKey
if sk == "" {
sk = "bank"
}
changes = append(changes, kvChange{
store: sk,
key: n.Key,
value: n.Value,
delete: n.Delete,
Expand All @@ -66,25 +89,32 @@ func (c *TreeContext) BuildLegacyIAVL(tree Tree) error {
if n.Block > lastVersion {
for _, change := range changes {
if !change.delete {
if _, err := tree.Set(change.key, change.value); err != nil {
if _, err := multiTree.Trees[change.store].Set(change.key, change.value); err != nil {
return err
}
} else {
_, ok, err := tree.Remove(change.key)
_, ok, err := multiTree.Trees[change.store].Remove(change.key)
if err != nil {
return err
}
if !ok {
return fmt.Errorf("failed to remove key %s", string(n.Key))
return fmt.Errorf("failed to remove key %x", n.Key)
}
}
}
h, v, err := tree.SaveVersion()
if err != nil {
return err
var hashes bytes.Buffer
var version int64
for _, tree := range multiTree.Trees {
h, v, err := tree.SaveVersion()
if err != nil {
return err
}
version = v
hashes.Write(h)
}
if v%20000 == 0 && c.HashLog != nil {
_, err = fmt.Fprintf(c.HashLog, "%d|%x\n", v, h)
if version%20000 == 0 && c.HashLog != nil {
h := sha256.Sum256(hashes.Bytes())
_, err = fmt.Fprintf(c.HashLog, "%d|%x\n", version, h)
if err != nil {
return err
}
Expand All @@ -97,12 +127,12 @@ func (c *TreeContext) BuildLegacyIAVL(tree Tree) error {
break
}

if c.MetricTreeSize != nil {
c.MetricTreeSize.Set(float64(tree.Size()))
}
if c.MetricsTreeHeight != nil {
c.MetricsTreeHeight.Set(float64(tree.Height()))
}
//if c.MetricTreeSize != nil {
// c.MetricTreeSize.Set(float64(tree.Size()))
//}
//if c.MetricsTreeHeight != nil {
// c.MetricsTreeHeight.Set(float64(tree.Height()))
//}

if cnt%100_000 == 0 {
c.Log.Info().Msgf("processed %s leaves in %s; %s leaves/s",
Expand Down
152 changes: 135 additions & 17 deletions core/gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,22 @@ import (
api "github.com/kocubinski/costor-api"
)

func BankLikeGenerator(seed int64, versions int) ChangesetGenerator {
return ChangesetGenerator{
StoreKey: "bank",
Seed: seed,
KeyMean: 56,
KeyStdDev: 3,
ValueMean: 100,
ValueStdDev: 1200,
InitialSize: 35_000,
FinalSize: 2_200_200,
Versions: versions,
ChangePerVersion: 368_000_000 / versions,
DeleteFraction: 0.06,
}
}

type ChangesetGenerator struct {
StoreKey string
Seed int64
Expand All @@ -21,34 +37,41 @@ type ChangesetGenerator struct {
DeleteFraction float64
}

func (c ChangesetGenerator) Iterator() (*ChangesetGenIterator, error) {
func (c ChangesetGenerator) Iterator() (*ChangesetIterator, error) {
if c.FinalSize < c.InitialSize {
return nil, fmt.Errorf("final size must be greater than initial size")
}

itr := &ChangesetGenIterator{
gen: c,
rand: rand.New(rand.NewSource(c.Seed)),
itr := &ChangesetIterator{
gen: c,
rand: rand.New(rand.NewSource(c.Seed)),
// TODO
// this approximation must be padded with the expected number of deletes per version
// createsPerVersion needs to be flaot to account for avg creates per version < 1
// accumulate creates and only write when >= 1, then subtract what was created
createsPerVersion: (c.FinalSize - c.InitialSize) / (c.Versions - 1),
keysHashes: map[[16]byte]struct{}{},
}

err := itr.Next()
return itr, err
}

type ChangesetGenIterator struct {
type ChangesetIterator struct {
Node *api.Node
Version int

rand *rand.Rand
gen ChangesetGenerator
keys [][]byte
keysHashes map[[16]byte]struct{}
createsPerVersion int
versionNodes []*api.Node
versionIndex int
deletable [][]byte
}

func (itr *ChangesetGenIterator) nextVersion() {
func (itr *ChangesetIterator) nextVersion() {
itr.Version++
itr.versionIndex = 0
itr.versionNodes = nil
Expand All @@ -60,25 +83,34 @@ func (itr *ChangesetGenIterator) nextVersion() {
if itr.Version > 1 {
for i := 0; i < deletes; i++ {
j := itr.rand.Intn(len(itr.keys))
itr.versionNodes = append(itr.versionNodes, &api.Node{
Key: itr.keys[j],
Delete: true,
})
node := &api.Node{
StoreKey: itr.gen.StoreKey,
Block: int64(itr.Version),
Key: itr.keys[j],
Delete: true,
}

itr.keys = append(itr.keys[:j], itr.keys[j+1:]...)
itr.versionNodes = append(itr.versionNodes, node)
}
}

if itr.Version > 1 {
for i := 0; i < updates; i++ {
j := itr.rand.Intn(len(itr.keys))
itr.versionNodes = append(itr.versionNodes, &api.Node{
Key: itr.keys[j],
Value: itr.genBytes(itr.gen.ValueMean, itr.gen.ValueStdDev),
StoreKey: itr.gen.StoreKey,
Block: int64(itr.Version),
Key: itr.keys[j],
Value: itr.genBytes(itr.gen.ValueMean, itr.gen.ValueStdDev),
})
}
}

var creates int
var (
creates int
//createCollisions int
)
if itr.Version == 1 {
creates = itr.gen.InitialSize
} else {
Expand All @@ -100,7 +132,7 @@ func (itr *ChangesetGenIterator) nextVersion() {
})
}

func (itr *ChangesetGenIterator) Next() error {
func (itr *ChangesetIterator) Next() error {
if itr.versionIndex >= len(itr.versionNodes) {
if itr.Version == itr.gen.Versions {
itr.Node = nil
Expand All @@ -114,16 +146,102 @@ func (itr *ChangesetGenIterator) Next() error {
return nil
}

func (itr *ChangesetGenIterator) Valid() bool {
func (itr *ChangesetIterator) Valid() bool {
return itr.Node != nil
}

func (itr *ChangesetGenIterator) genBytes(mean, stdDev int) []byte {
func (itr *ChangesetIterator) genBytes(mean, stdDev int) []byte {
length := int(itr.rand.NormFloat64()*float64(stdDev) + float64(mean))
// length must be at least 1
// explanation: normal distribution is a poor approximation of certain data sets where std dev is skewed
// by outliers on the upper bound. mean - std dev can be negative, which is not a valid length.
// we could just clamp length at 1, but that would skew the distribution of lengths towards 0 which is
// not realistic. instead we just generate again closer to the mean with a std dev of mean / 3.
// this is not perfect but good enough for test sets.
if length < 1 {
length = 1
length = int(itr.rand.NormFloat64()*float64(mean/3) + float64(mean))
// much lower probability of this happening twice, but just in case
if length < 1 {
length = 1
}
}
b := make([]byte, length)
itr.rand.Read(b)
return b
}

func (itr *ChangesetIterator) GetNode() *api.Node {
return itr.Node
}

type ChangesetIterators struct {
iterators []ChangesetIterator
lastVersion int
idx int
Node *api.Node
}

func NewChangesetIterators(gens []ChangesetGenerator) (*ChangesetIterators, error) {
if len(gens) == 0 {
return nil, fmt.Errorf("must provide at least one generator")
}

var iterators []ChangesetIterator
version := gens[0].Versions
for _, gen := range gens {
if gen.Versions != version {
return nil, fmt.Errorf("all generators must have the same number of versions")
}
itr, err := gen.Iterator()
if err != nil {
return nil, err
}
iterators = append(iterators, *itr)
}

itr := &ChangesetIterators{
iterators: iterators,
}
err := itr.Next()
if err != nil {
return nil, err
}
return itr, nil
}

func (itr *ChangesetIterators) Next() error {
// terminal condition
if len(itr.iterators) == 0 {
itr.Node = nil
return nil
}

// reset index if we've reached the end of the list
if itr.idx >= len(itr.iterators) {
itr.idx = 0
}

curItr := itr.iterators[itr.idx]
err := curItr.Next()
if err != nil {
return err
}
// when we reach the end of an iterator, remove it from the list
if !curItr.Valid() {
itr.iterators = append(itr.iterators[:itr.idx], itr.iterators[itr.idx+1:]...)
return itr.Next()
}

// nominal case
itr.Node = curItr.Node
itr.idx++
return nil
}

func (itr *ChangesetIterators) Valid() bool {
return itr.Node != nil
}

func (itr *ChangesetIterators) GetNode() *api.Node {
return itr.Node
}
Loading

0 comments on commit f25f645

Please sign in to comment.