Skip to content

Commit

Permalink
Implement epoch APIs/dashboard page (#2560)
Browse files Browse the repository at this point in the history
* Implement epoch webapis

* Add Epoch livefeed on dashboard

* Add Epoch Page

* Refactor fetch and update states

* Split locks between commitment and metadata

* Align websocket message types

* Align epoch jsonmodel naming to the protocol

* Create a workerpool for commitment

* Avoid global typesettings in commitment retainer

* Remove cachedCommitment

* Stream ouputs in retainer

* Remove unused const

* Expose Workers in retainer

* Refactor OutputsProvider

* Rename CommitmentByID to stick to camel case naming

* Evict blockMetadata with workerpool

* Process data fields in setCommitment

* Remove unreachable codes

* Minor fixes

---------

Co-authored-by: jonastheis <[email protected]>
  • Loading branch information
jkrvivian and jonastheis authored Feb 25, 2023
1 parent afe8e07 commit ca3a151
Show file tree
Hide file tree
Showing 30 changed files with 849 additions and 553 deletions.
22 changes: 12 additions & 10 deletions packages/app/jsonmodels/epochs.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,20 @@ import (
)

type EpochInfo struct {
EI uint64 `json:"Index"`
ECR string `json:"RootsID"`
PrevEC string `json:"prevEC"`
ID string `json:"id"`
Index uint64 `json:"index"`
RootsID string `json:"rootsID"`
PrevID string `json:"prevID"`
CumulativeWeight int64 `json:"cumulativeWeight"`
}

func EpochInfoFromRecord(c *commitment.Commitment) *EpochInfo {
return &EpochInfo{
EI: uint64(c.Index()),
ECR: c.RootsID().Base58(),
PrevEC: c.PrevID().Base58(),
ID: c.ID().Base58(),
Index: uint64(c.Index()),
RootsID: c.RootsID().Base58(),
PrevID: c.PrevID().Base58(),
CumulativeWeight: c.CumulativeWeight(),
}
}

Expand All @@ -37,12 +41,10 @@ type EpochUTXOsResponse struct {

type EpochBlocksResponse struct {
Blocks []string `json:"blocks"`
Error string `json:"error,omitempty"`
}

type EpochTransactionsResponse struct {
Transactions []string `json:"transactions"`
}

type EpochPendingConflictCountResponse struct {
PendingConflictCount uint64 `json:"pendingConflictCount"`
Error string `json:"error,omitempty"`
}
80 changes: 80 additions & 0 deletions packages/app/retainer/commitment.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package retainer

import (
"context"

"github.com/iotaledger/goshimmer/packages/core/commitment"
"github.com/iotaledger/goshimmer/packages/protocol/engine/notarization"
"github.com/iotaledger/goshimmer/packages/protocol/ledger"
"github.com/iotaledger/goshimmer/packages/protocol/ledger/utxo"
"github.com/iotaledger/goshimmer/packages/protocol/models"
"github.com/iotaledger/hive.go/objectstorage/generic/model"
"github.com/iotaledger/hive.go/serializer/v2/serix"
)

type CommitmentDetails struct {
model.Storable[commitment.ID, CommitmentDetails, *CommitmentDetails, commitmentDetailsModel] `serix:"0"`
}

type commitmentDetailsModel struct {
ID commitment.ID `serix:"0"`

Commitment *commitment.Commitment `serix:"1"`
AcceptedBlocks models.BlockIDs `serix:"2,lengthPrefixType=uint32"`
AcceptedTransactions utxo.TransactionIDs `serix:"3,lengthPrefixType=uint32"`
CreatedOutputs utxo.OutputIDs `serix:"4,lengthPrefixType=uint32"`
SpentOutputs utxo.OutputIDs `serix:"5,lengthPrefixType=uint32"`
}

func newCommitmentDetails() (c *CommitmentDetails) {
c = model.NewStorable[commitment.ID, CommitmentDetails](&commitmentDetailsModel{
AcceptedBlocks: make(models.BlockIDs),
AcceptedTransactions: utxo.NewTransactionIDs(),
SpentOutputs: utxo.NewOutputIDs(),
CreatedOutputs: utxo.NewOutputIDs(),
})
c.SetID(commitment.ID{})

return c
}

func (c *CommitmentDetails) setCommitment(e *notarization.EpochCommittedDetails) {
c.M.Commitment = e.Commitment
c.M.ID = e.Commitment.ID()
c.SetID(e.Commitment.ID())

_ = e.AcceptedBlocks.Stream(func(key models.BlockID) bool {
c.M.AcceptedBlocks.Add(key)
return true
})
_ = e.AcceptedTransactions.Stream(func(key utxo.TransactionID) bool {
c.M.AcceptedTransactions.Add(key)
return true
})

_ = e.SpentOutputs(func(owm *ledger.OutputWithMetadata) error {
c.M.SpentOutputs.Add(owm.ID())
return nil
})

_ = e.CreatedOutputs(func(owm *ledger.OutputWithMetadata) error {
c.M.CreatedOutputs.Add(owm.ID())
return nil
})
}

func (c *CommitmentDetails) Encode() ([]byte, error) {
return serix.DefaultAPI.Encode(context.Background(), c.M)
}

func (c *CommitmentDetails) Decode(bytes []byte) (int, error) {
return serix.DefaultAPI.Decode(context.Background(), bytes, &c.M)
}

func (c *CommitmentDetails) MarshalJSON() ([]byte, error) {
return serix.DefaultAPI.JSONEncode(context.Background(), c.M)
}

func (c *CommitmentDetails) UnmarshalJSON(bytes []byte) error {
return serix.DefaultAPI.JSONDecode(context.Background(), bytes, &c.M)
}
145 changes: 102 additions & 43 deletions packages/app/retainer/retainer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ package retainer
import (
"github.com/pkg/errors"

"github.com/iotaledger/goshimmer/packages/core/commitment"
"github.com/iotaledger/goshimmer/packages/core/database"
"github.com/iotaledger/goshimmer/packages/core/epoch"
"github.com/iotaledger/goshimmer/packages/core/memstorage"
"github.com/iotaledger/goshimmer/packages/protocol"
"github.com/iotaledger/goshimmer/packages/protocol/congestioncontrol/icca/scheduler"
"github.com/iotaledger/goshimmer/packages/protocol/engine/consensus/blockgadget"
"github.com/iotaledger/goshimmer/packages/protocol/engine/notarization"
"github.com/iotaledger/goshimmer/packages/protocol/engine/tangle/blockdag"
"github.com/iotaledger/goshimmer/packages/protocol/engine/tangle/booker"
"github.com/iotaledger/goshimmer/packages/protocol/engine/tangle/booker/virtualvoting"
Expand All @@ -22,33 +24,46 @@ import (
"github.com/iotaledger/hive.go/runtime/workerpool"
)

const (
prefixBlockMetadataStorage byte = iota

prefixCommitmentDetailsStorage
)

type Retainer struct {
workerPool *workerpool.WorkerPool
cachedMetadata *memstorage.EpochStorage[models.BlockID, *cachedMetadata]
blockStorage *database.PersistentEpochStorage[models.BlockID, BlockMetadata, *models.BlockID, *BlockMetadata]
Workers *workerpool.Group

dbManager *database.Manager
protocol *protocol.Protocol
evictionLock *syncutils.DAGMutex[epoch.Index]
blockWorkerPool *workerpool.WorkerPool
commitmentWorkerPool *workerpool.WorkerPool
cachedMetadata *memstorage.EpochStorage[models.BlockID, *cachedMetadata]
blockStorage *database.PersistentEpochStorage[models.BlockID, BlockMetadata, *models.BlockID, *BlockMetadata]
commitmentStorage *database.PersistentEpochStorage[commitment.ID, CommitmentDetails, *commitment.ID, *CommitmentDetails]

dbManager *database.Manager
protocol *protocol.Protocol
metadataEvictionLock *syncutils.DAGMutex[epoch.Index]

optsRealm kvstore.Realm
}

func NewRetainer(workers *workerpool.Group, protocol *protocol.Protocol, dbManager *database.Manager, opts ...options.Option[Retainer]) (r *Retainer) {
return options.Apply(&Retainer{
workerPool: workers.CreatePool("Retainer", 2),
cachedMetadata: memstorage.NewEpochStorage[models.BlockID, *cachedMetadata](),
protocol: protocol,
dbManager: dbManager,
optsRealm: []byte("retainer"),
Workers: workers,
blockWorkerPool: workers.CreatePool("RetainerBlock", 2),
commitmentWorkerPool: workers.CreatePool("RetainerCommitment", 1),
cachedMetadata: memstorage.NewEpochStorage[models.BlockID, *cachedMetadata](),
protocol: protocol,
dbManager: dbManager,
optsRealm: []byte("retainer"),
}, opts, (*Retainer).setupEvents, func(r *Retainer) {
r.blockStorage = database.NewPersistentEpochStorage[models.BlockID, BlockMetadata](dbManager, r.optsRealm)
r.evictionLock = syncutils.NewDAGMutex[epoch.Index]()
r.blockStorage = database.NewPersistentEpochStorage[models.BlockID, BlockMetadata](dbManager, append(r.optsRealm, []byte{prefixBlockMetadataStorage}...))
r.commitmentStorage = database.NewPersistentEpochStorage[commitment.ID, CommitmentDetails](dbManager, append(r.optsRealm, []byte{prefixCommitmentDetailsStorage}...))
r.metadataEvictionLock = syncutils.NewDAGMutex[epoch.Index]()
})
}

func (r *Retainer) Shutdown() {
r.workerPool.Shutdown()
r.Workers.Shutdown()
}

func (r *Retainer) Block(blockID models.BlockID) (block *models.Block, exists bool) {
Expand Down Expand Up @@ -77,20 +92,28 @@ func (r *Retainer) BlockMetadata(blockID models.BlockID) (metadata *BlockMetadat
return metadata, exists
}

func (r *Retainer) LoadAll(index epoch.Index) (ids *advancedset.AdvancedSet[*BlockMetadata]) {
r.evictionLock.RLock(index)
defer r.evictionLock.RUnlock(index)
func (r *Retainer) Commitment(index epoch.Index) (c *CommitmentDetails, exists bool) {
return r.getCommitmentDetails(index)
}

func (r *Retainer) CommitmentByID(id commitment.ID) (c *CommitmentDetails, exists bool) {
return r.getCommitmentDetails(id.Index())
}

func (r *Retainer) LoadAllBlockMetadata(index epoch.Index) (ids *advancedset.AdvancedSet[*BlockMetadata]) {
r.metadataEvictionLock.RLock(index)
defer r.metadataEvictionLock.RUnlock(index)

ids = advancedset.NewAdvancedSet[*BlockMetadata]()
r.Stream(index, func(id models.BlockID, metadata *BlockMetadata) {
r.StreamBlocksMetadata(index, func(id models.BlockID, metadata *BlockMetadata) {
ids.Add(metadata)
})
return
}

func (r *Retainer) Stream(index epoch.Index, callback func(id models.BlockID, metadata *BlockMetadata)) {
r.evictionLock.RLock(index)
defer r.evictionLock.RUnlock(index)
func (r *Retainer) StreamBlocksMetadata(index epoch.Index, callback func(id models.BlockID, metadata *BlockMetadata)) {
r.metadataEvictionLock.RLock(index)
defer r.metadataEvictionLock.RUnlock(index)

if epochStorage := r.cachedMetadata.Get(index, false); epochStorage != nil {
epochStorage.ForEach(func(id models.BlockID, cachedMetadata *cachedMetadata) bool {
Expand All @@ -111,9 +134,9 @@ func (r *Retainer) DatabaseSize() int64 {
return r.dbManager.TotalStorageSize()
}

// WorkerPool returns the worker pool of the retainer.
func (r *Retainer) WorkerPool() *workerpool.WorkerPool {
return r.workerPool
// BlockWorkerPool returns the block worker pool of the retainer.
func (r *Retainer) BlockWorkerPool() *workerpool.WorkerPool {
return r.blockWorkerPool
}

// PruneUntilEpoch prunes storage epochs less than and equal to the given index.
Expand All @@ -126,7 +149,7 @@ func (r *Retainer) setupEvents() {
if cm := r.createOrGetCachedMetadata(block.ID()); cm != nil {
cm.setBlockDAGBlock(block)
}
}, event.WithWorkerPool(r.workerPool))
}, event.WithWorkerPool(r.blockWorkerPool))

// TODO: missing blocks make the node fail due to empty strong parents
// r.protocol.Events.Engine.Tangle.BlockDAG.BlockMissing.AttachWithWorkerPool(event.NewClosure(func(block *blockdag.Block) {
Expand All @@ -138,7 +161,7 @@ func (r *Retainer) setupEvents() {
if cm := r.createOrGetCachedMetadata(block.ID()); cm != nil {
cm.setBlockDAGBlock(block)
}
}, event.WithWorkerPool(r.workerPool))
}, event.WithWorkerPool(r.blockWorkerPool))

r.protocol.Events.Engine.Tangle.Booker.BlockBooked.Hook(func(evt *booker.BlockBookedEvent) {
if cm := r.createOrGetCachedMetadata(evt.Block.ID()); cm != nil {
Expand All @@ -147,40 +170,50 @@ func (r *Retainer) setupEvents() {
cm.ConflictIDs = evt.ConflictIDs
cm.Unlock()
}
}, event.WithWorkerPool(r.workerPool))
}, event.WithWorkerPool(r.blockWorkerPool))

r.protocol.Events.Engine.Tangle.Booker.VirtualVoting.BlockTracked.Hook(func(block *virtualvoting.Block) {
if cm := r.createOrGetCachedMetadata(block.ID()); cm != nil {
cm.setVirtualVotingBlock(block)
}
}, event.WithWorkerPool(r.workerPool))
}, event.WithWorkerPool(r.blockWorkerPool))

congestionControl := func(block *scheduler.Block) {
if cm := r.createOrGetCachedMetadata(block.ID()); cm != nil {
cm.setSchedulerBlock(block)
}
}
r.protocol.Events.CongestionControl.Scheduler.BlockScheduled.Hook(congestionControl, event.WithWorkerPool(r.workerPool))
r.protocol.Events.CongestionControl.Scheduler.BlockDropped.Hook(congestionControl, event.WithWorkerPool(r.workerPool))
r.protocol.Events.CongestionControl.Scheduler.BlockSkipped.Hook(congestionControl, event.WithWorkerPool(r.workerPool))
r.protocol.Events.CongestionControl.Scheduler.BlockScheduled.Hook(congestionControl, event.WithWorkerPool(r.blockWorkerPool))
r.protocol.Events.CongestionControl.Scheduler.BlockDropped.Hook(congestionControl, event.WithWorkerPool(r.blockWorkerPool))
r.protocol.Events.CongestionControl.Scheduler.BlockSkipped.Hook(congestionControl, event.WithWorkerPool(r.blockWorkerPool))

r.protocol.Events.Engine.Consensus.BlockGadget.BlockAccepted.Hook(func(block *blockgadget.Block) {
cm := r.createOrGetCachedMetadata(block.ID())
cm.setAcceptanceBlock(block)
}, event.WithWorkerPool(r.workerPool))
}, event.WithWorkerPool(r.blockWorkerPool))

r.protocol.Events.Engine.Consensus.BlockGadget.BlockConfirmed.Hook(func(block *blockgadget.Block) {
if cm := r.createOrGetCachedMetadata(block.ID()); cm != nil {
cm.setConfirmationBlock(block)
}
}, event.WithWorkerPool(r.workerPool))
}, event.WithWorkerPool(r.blockWorkerPool))

r.protocol.Events.Engine.EvictionState.EpochEvicted.Hook(r.storeAndEvictEpoch, event.WithWorkerPool(r.blockWorkerPool))

r.protocol.Engine().NotarizationManager.Events.EpochCommitted.Hook(func(e *notarization.EpochCommittedDetails) {
if e.Commitment.Index() < r.protocol.Engine().EvictionState.LastEvictedEpoch() {
return
}

r.protocol.Events.Engine.EvictionState.EpochEvicted.Hook(r.storeAndEvictEpoch)
cd := newCommitmentDetails()
cd.setCommitment(e)
r.storeCommitmentDetails(cd)
}, event.WithWorkerPool(r.commitmentWorkerPool))
}

func (r *Retainer) createOrGetCachedMetadata(id models.BlockID) *cachedMetadata {
r.evictionLock.RLock(id.Index())
defer r.evictionLock.RUnlock(id.Index())
r.metadataEvictionLock.RLock(id.Index())
defer r.metadataEvictionLock.RUnlock(id.Index())

if id.EpochIndex < r.protocol.Engine().EvictionState.LastEvictedEpoch() {
return nil
Expand All @@ -201,15 +234,14 @@ func (r *Retainer) storeAndEvictEpoch(epochIndex epoch.Index) {
// Once everything is stored to disk, we evict it from cache.
// Therefore, we make sure that we can always first try to read BlockMetadata from cache and if it's not in cache
// anymore it is already written to disk.
r.evictionLock.Lock(epochIndex)
defer r.evictionLock.Unlock(epochIndex)

r.metadataEvictionLock.Lock(epochIndex)
r.cachedMetadata.Evict(epochIndex)
r.metadataEvictionLock.Unlock(epochIndex)
}

func (r *Retainer) createStorableBlockMetadata(epochIndex epoch.Index) (metas []*BlockMetadata) {
r.evictionLock.RLock(epochIndex)
defer r.evictionLock.RUnlock(epochIndex)
r.metadataEvictionLock.RLock(epochIndex)
defer r.metadataEvictionLock.RUnlock(epochIndex)

storage := r.cachedMetadata.Get(epochIndex)
if storage == nil {
Expand Down Expand Up @@ -240,9 +272,15 @@ func (r *Retainer) storeBlockMetadata(metas []*BlockMetadata) {
}
}

func (r *Retainer) storeCommitmentDetails(c *CommitmentDetails) {
if err := r.commitmentStorage.Set(c.ID(), *c); err != nil {
panic(errors.Wrapf(err, "could not save %s to commitment storage", c.ID()))
}
}

func (r *Retainer) blockMetadataFromCache(blockID models.BlockID) (storageExists bool, metadata *BlockMetadata, exists bool) {
r.evictionLock.RLock(blockID.Index())
defer r.evictionLock.RUnlock(blockID.Index())
r.metadataEvictionLock.RLock(blockID.Index())
defer r.metadataEvictionLock.RUnlock(blockID.Index())

storage := r.cachedMetadata.Get(blockID.Index())
if storage == nil {
Expand All @@ -253,6 +291,27 @@ func (r *Retainer) blockMetadataFromCache(blockID models.BlockID) (storageExists
return true, newBlockMetadata(cm), exists
}

func (r *Retainer) getCommitmentDetails(index epoch.Index) (c *CommitmentDetails, exists bool) {
if index < 0 {
return
}

// get from persistent storage
c = newCommitmentDetails()
err := r.commitmentStorage.Iterate(index, func(key commitment.ID, value CommitmentDetails) (advance bool) {
*c = value
c.SetID(c.M.ID)
exists = true

return false
})
if err != nil {
return nil, false
}

return c, exists
}

// region Options //////////////////////////////////////////////////////////////////////////////////////////////////////

func WithRealm(realm kvstore.Realm) options.Option[Retainer] {
Expand Down
Loading

0 comments on commit ca3a151

Please sign in to comment.