Skip to content

Commit

Permalink
fraud proof processing using go-fraud package (cosmos#928)
Browse files Browse the repository at this point in the history
Update existing fraud proof processing logic to use ProofService from
go-fraud package. Also, removes the old fraud proof gossip logic.

Fixes cosmos#895 & cosmos#945

---------

Co-authored-by: Ganesha Upadhyaya <[email protected]>
  • Loading branch information
gupadhyaya and Ganesha Upadhyaya authored May 24, 2023
1 parent c769503 commit e90a07b
Show file tree
Hide file tree
Showing 8 changed files with 231 additions and 139 deletions.
90 changes: 57 additions & 33 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"sync/atomic"
"time"

"github.com/celestiaorg/go-fraud/fraudserv"
"github.com/libp2p/go-libp2p/core/crypto"
abci "github.com/tendermint/tendermint/abci/types"
tmcrypto "github.com/tendermint/tendermint/crypto"
Expand Down Expand Up @@ -60,8 +61,6 @@ type Manager struct {

HeaderCh chan *types.SignedHeader

FraudProofInCh chan *abci.FraudProof

blockInCh chan newBlockEvent
syncCache map[uint64]*types.Block

Expand Down Expand Up @@ -153,7 +152,6 @@ func NewManager(
// channels are buffered to avoid blocking on input/output operations, buffer sizes are arbitrary
HeaderCh: make(chan *types.SignedHeader, 100),
blockInCh: make(chan newBlockEvent, 100),
FraudProofInCh: make(chan *abci.FraudProof, 100),
retrieveMtx: new(sync.Mutex),
lastStateMtx: new(sync.Mutex),
syncCache: make(map[uint64]*types.Block),
Expand Down Expand Up @@ -181,10 +179,6 @@ func (m *Manager) SetDALC(dalc da.DataAvailabilityLayerClient) {
m.retriever = dalc.(da.BlockRetriever)
}

func (m *Manager) GetFraudProofOutChan() chan *abci.FraudProof {
return m.executor.FraudProofOutCh
}

// AggregationLoop is responsible for aggregating transactions into rollup-blocks.
func (m *Manager) AggregationLoop(ctx context.Context, lazy bool) {
initialHeight := uint64(m.genesis.InitialHeight)
Expand Down Expand Up @@ -249,6 +243,57 @@ func (m *Manager) AggregationLoop(ctx context.Context, lazy bool) {
}
}

func (m *Manager) SetFraudProofService(fraudProofServ *fraudserv.ProofService) {
m.executor.SetFraudProofService(fraudProofServ)
}

func (m *Manager) ProcessFraudProof(ctx context.Context, cancel context.CancelFunc) {
// subscribe to state fraud proof
sub, err := m.executor.FraudService.Subscribe(types.StateFraudProofType)
if err != nil {
m.logger.Error("failed to subscribe to fraud proof gossip", "error", err)
return
}
defer sub.Cancel()

// continuously process the fraud proofs received via subscription
for {
// sub.Proof is a blocking call that only returns on proof received or context ended
proof, err := sub.Proof(ctx)
if err != nil {
m.logger.Error("failed to receive gossiped fraud proof", "error", err)
return
}

// only handle the state fraud proofs for now
fraudProof, ok := proof.(*types.StateFraudProof)
if !ok {
m.logger.Error("unexpected type received for state fraud proof", "error", err)
return
}
m.logger.Debug("fraud proof received",
"block height", fraudProof.BlockHeight,
"pre-state app hash", fraudProof.PreStateAppHash,
"expected valid app hash", fraudProof.ExpectedValidAppHash,
"length of state witness", len(fraudProof.StateWitness),
)
// TODO(light-client): Set up a new cosmos-sdk app
// TODO: Add fraud proof window validation

success, err := m.executor.VerifyFraudProof(&fraudProof.FraudProof, fraudProof.ExpectedValidAppHash)
if err != nil {
m.logger.Error("failed to verify fraud proof", "error", err)
continue
}
if success {
// halt chain
m.logger.Info("verified fraud proof, halting chain")
cancel()
return
}
}
}

// SyncLoop is responsible for syncing blocks.
//
// SyncLoop processes headers gossiped in P2p network to know what's the latest block height,
Expand Down Expand Up @@ -277,28 +322,6 @@ func (m *Manager) SyncLoop(ctx context.Context, cancel context.CancelFunc) {
if err != nil {
m.logger.Info("failed to sync next block", "error", err)
}
case fraudProof := <-m.FraudProofInCh:
m.logger.Debug("fraud proof received",
"block height", fraudProof.BlockHeight,
"pre-state app hash", fraudProof.PreStateAppHash,
"expected valid app hash", fraudProof.ExpectedValidAppHash,
"length of state witness", len(fraudProof.StateWitness),
)
// TODO(light-client): Set up a new cosmos-sdk app
// TODO: Add fraud proof window validation

success, err := m.executor.VerifyFraudProof(fraudProof, fraudProof.ExpectedValidAppHash)
if err != nil {
m.logger.Error("failed to verify fraud proof", "error", err)
continue
}
if success {
// halt chain
m.logger.Info("verified fraud proof, halting chain")
cancel()
return
}

case <-ctx.Done():
return
}
Expand Down Expand Up @@ -338,7 +361,6 @@ func (m *Manager) trySyncNextBlock(ctx context.Context, daHeight uint64) error {
if err != nil {
return fmt.Errorf("failed to Commit: %w", err)
}
m.store.SetHeight(uint64(b.SignedHeader.Header.Height()))

err = m.store.SaveBlockResponses(uint64(b.SignedHeader.Header.Height()), responses)
if err != nil {
Expand All @@ -351,6 +373,8 @@ func (m *Manager) trySyncNextBlock(ctx context.Context, daHeight uint64) error {
return err
}

m.store.SetHeight(uint64(b.SignedHeader.Header.Height()))

if daHeight > newState.DAHeight {
newState.DAHeight = daHeight
}
Expand Down Expand Up @@ -568,9 +592,6 @@ func (m *Manager) publishBlock(ctx context.Context) error {

blockHeight := uint64(block.SignedHeader.Header.Height())

// Only update the stored height after successfully submitting to DA layer and committing to the DB
m.store.SetHeight(blockHeight)

// Commit the new state and block which writes to disk on the proxy app
_, _, err = m.executor.Commit(ctx, newState, block, responses)
if err != nil {
Expand All @@ -589,6 +610,9 @@ func (m *Manager) publishBlock(ctx context.Context) error {
return err
}

// Only update the stored height after successfully submitting to DA layer and committing to the DB
m.store.SetHeight(blockHeight)

newState.DAHeight = atomic.LoadUint64(&m.daHeight)
// After this call m.lastState is the NEW state returned from ApplyBlock
m.lastState = newState
Expand Down
47 changes: 4 additions & 43 deletions node/full.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"github.com/rollkit/rollkit/state/txindex"
"github.com/rollkit/rollkit/state/txindex/kv"
"github.com/rollkit/rollkit/store"
"github.com/rollkit/rollkit/types"
)

// prefixes used in KV store to separate main node data from DALC data
Expand Down Expand Up @@ -173,7 +172,7 @@ func newFullNode(
},
mainKV,
true,
types.StateFraudProofType,
genesis.ChainID,
)

ctx, cancel := context.WithCancel(ctx)
Expand Down Expand Up @@ -203,7 +202,6 @@ func newFullNode(
node.BaseService = *service.NewBaseService(logger, "Node", node)

node.P2P.SetTxValidator(node.newTxValidator())
node.P2P.SetFraudProofValidator(node.newFraudProofValidator())

return node, nil
}
Expand Down Expand Up @@ -248,27 +246,6 @@ func (n *FullNode) headerPublishLoop(ctx context.Context) {
}
}

func (n *FullNode) fraudProofPublishLoop(ctx context.Context) {
for {
select {
case fraudProof := <-n.blockManager.GetFraudProofOutChan():
n.Logger.Info("generated fraud proof: ", fraudProof.String())
fraudProofBytes, err := fraudProof.Marshal()
if err != nil {
panic(fmt.Errorf("failed to serialize fraud proof: %w", err))
}
n.Logger.Info("gossipping fraud proof...")
err = n.P2P.GossipFraudProof(context.Background(), fraudProofBytes)
if err != nil {
n.Logger.Error("failed to gossip fraud proof", "error", err)
}
_ = n.Stop()
case <-ctx.Done():
return
}
}
}

// OnStart is a part of Service interface.
func (n *FullNode) OnStart() error {

Expand All @@ -292,16 +269,16 @@ func (n *FullNode) OnStart() error {
if err = n.fraudService.Start(n.ctx); err != nil {
return fmt.Errorf("error while starting fraud exchange service: %w", err)
}
n.blockManager.SetFraudProofService(n.fraudService)

if n.conf.Aggregator {
n.Logger.Info("working in aggregator mode", "block time", n.conf.BlockTime)
go n.blockManager.AggregationLoop(n.ctx, n.conf.LazyAggregator)
go n.headerPublishLoop(n.ctx)
}
go n.blockManager.ProcessFraudProof(n.ctx, n.cancel)
go n.blockManager.RetrieveLoop(n.ctx)
go n.blockManager.SyncLoop(n.ctx, n.cancel)
go n.fraudProofPublishLoop(n.ctx)

return nil
}

Expand All @@ -326,6 +303,7 @@ func (n *FullNode) OnStop() {
err := n.dalc.Stop()
err = multierr.Append(err, n.P2P.Close())
err = multierr.Append(err, n.hExService.Stop())
err = multierr.Append(err, n.fraudService.Stop(n.ctx))
n.Logger.Error("errors while stopping node:", "errors", err)
}

Expand Down Expand Up @@ -384,23 +362,6 @@ func (n *FullNode) newTxValidator() p2p.GossipValidator {
}
}

// newFraudProofValidator returns a pubsub validator that validates a fraud proof and forwards
// it to be verified
func (n *FullNode) newFraudProofValidator() p2p.GossipValidator {
return func(fraudProofMsg *p2p.GossipMessage) bool {
n.Logger.Debug("fraud proof received", "from", fraudProofMsg.From, "bytes", len(fraudProofMsg.Data))
fraudProof := abci.FraudProof{}
err := fraudProof.Unmarshal(fraudProofMsg.Data)
if err != nil {
n.Logger.Error("failed to deserialize fraud proof", "error", err)
return false
}
// TODO(manav): Add validation checks for fraud proof here
n.blockManager.FraudProofInCh <- &fraudProof
return true
}
}

func newPrefixKV(kvStore ds.Datastore, prefix string) ds.TxnDatastore {
return (ktds.Wrap(kvStore, ktds.PrefixTransform{Prefix: ds.NewKey(prefix)}).Children()[0]).(ds.TxnDatastore)
}
Expand Down
114 changes: 113 additions & 1 deletion node/full_node_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,118 @@ func testSingleAggreatorSingleFullNodeSingleLightNode(t *testing.T) {
assert.Equal(n1h, n3h, "heights must match")
}

func testSingleAggreatorSingleFullNodeFraudProofGossip(t *testing.T) {
assert := assert.New(t)
require := require.New(t)

var wg sync.WaitGroup
aggCtx, aggCancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(context.Background())
clientNodes := 1
nodes, apps := createNodes(aggCtx, ctx, clientNodes+1, true, &wg, t)

for _, app := range apps {
app.On("VerifyFraudProof", mock.Anything).Return(abci.ResponseVerifyFraudProof{Success: true}).Run(func(args mock.Arguments) {
wg.Done()
}).Once()
}

aggNode := nodes[0]
fullNode := nodes[1]

wg.Add(clientNodes + 1)
require.NoError(aggNode.Start())
time.Sleep(2 * time.Second)
require.NoError(fullNode.Start())

wg.Wait()
// aggregator should have 0 GenerateFraudProof calls and 1 VerifyFraudProof calls
apps[0].AssertNumberOfCalls(t, "GenerateFraudProof", 0)
apps[0].AssertNumberOfCalls(t, "VerifyFraudProof", 1)
// fullnode should have 1 GenerateFraudProof calls and 1 VerifyFraudProof calls
apps[1].AssertNumberOfCalls(t, "GenerateFraudProof", 1)
apps[1].AssertNumberOfCalls(t, "VerifyFraudProof", 1)

n1Frauds, err := aggNode.fraudService.Get(aggCtx, types.StateFraudProofType)
require.NoError(err)
aggCancel()
require.NoError(aggNode.Stop())

n2Frauds, err := fullNode.fraudService.Get(aggCtx, types.StateFraudProofType)
require.NoError(err)
cancel()
require.NoError(fullNode.Stop())

assert.Equal(len(n1Frauds), 1, "number of fraud proofs received via gossip should be 1")
assert.Equal(len(n2Frauds), 1, "number of fraud proofs received via gossip should be 1")
assert.Equal(n1Frauds, n2Frauds, "the received fraud proofs after gossip must match")
}

func testSingleAggreatorTwoFullNodeFraudProofSync(t *testing.T) {
assert := assert.New(t)
require := require.New(t)

var wg sync.WaitGroup
aggCtx, aggCancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(context.Background())
clientNodes := 2
nodes, apps := createNodes(aggCtx, ctx, clientNodes+1, true, &wg, t)

for _, app := range apps {
app.On("VerifyFraudProof", mock.Anything).Return(abci.ResponseVerifyFraudProof{Success: true}).Run(func(args mock.Arguments) {
wg.Done()
}).Once()
}

aggNode := nodes[0]
fullNode1 := nodes[1]
fullNode2 := nodes[2]

wg.Add(clientNodes)
require.NoError(aggNode.Start())
time.Sleep(2 * time.Second)
require.NoError(fullNode1.Start())

wg.Wait()
// aggregator should have 0 GenerateFraudProof calls and 1 VerifyFraudProof calls
apps[0].AssertNumberOfCalls(t, "GenerateFraudProof", 0)
apps[0].AssertNumberOfCalls(t, "VerifyFraudProof", 1)
// fullnode1 should have 1 GenerateFraudProof calls and 1 VerifyFraudProof calls
apps[1].AssertNumberOfCalls(t, "GenerateFraudProof", 1)
apps[1].AssertNumberOfCalls(t, "VerifyFraudProof", 1)

n1Frauds, err := aggNode.fraudService.Get(aggCtx, types.StateFraudProofType)
require.NoError(err)

n2Frauds, err := fullNode1.fraudService.Get(aggCtx, types.StateFraudProofType)
require.NoError(err)
assert.Equal(n1Frauds, n2Frauds, "number of fraud proofs gossiped between nodes must match")

wg.Add(1)
// delay start node3 such that it can sync the fraud proof from peers, instead of listening to gossip
require.NoError(fullNode2.Start())

wg.Wait()
// fullnode2 should have 1 GenerateFraudProof calls and 1 VerifyFraudProof calls
apps[2].AssertNumberOfCalls(t, "GenerateFraudProof", 1)
apps[2].AssertNumberOfCalls(t, "VerifyFraudProof", 1)

n3Frauds, err := fullNode2.fraudService.Get(aggCtx, types.StateFraudProofType)
require.NoError(err)
assert.Equal(n1Frauds, n3Frauds, "number of fraud proofs gossiped between nodes must match")

aggCancel()
require.NoError(aggNode.Stop())
cancel()
require.NoError(fullNode1.Stop())
require.NoError(fullNode2.Stop())
}

func TestFraudProofService(t *testing.T) {
testSingleAggreatorSingleFullNodeFraudProofGossip(t)
testSingleAggreatorTwoFullNodeFraudProofSync(t)
}

// TODO: rewrite this integration test to accommodate gossip/halting mechanism of full nodes after fraud proof generation (#693)
// TestFraudProofTrigger setups a network of nodes, with single malicious aggregator and multiple producers.
// Aggregator node should produce malicious blocks, nodes should detect fraud, and generate fraud proofs
Expand Down Expand Up @@ -542,7 +654,7 @@ func createNode(ctx context.Context, n int, isMalicious bool, aggregator bool, i
}

if isMalicious && !aggregator {
app.On("GenerateFraudProof", mock.Anything).Return(abci.ResponseGenerateFraudProof{FraudProof: &abci.FraudProof{}})
app.On("GenerateFraudProof", mock.Anything).Return(abci.ResponseGenerateFraudProof{FraudProof: &abci.FraudProof{BlockHeight: 1, FraudulentBeginBlock: &abci.RequestBeginBlock{Hash: []byte("123")}, ExpectedValidAppHash: nonMaliciousAppHash}})
}
app.On("DeliverTx", mock.Anything).Return(abci.ResponseDeliverTx{}).Run(func(args mock.Arguments) {
wg.Done()
Expand Down
Loading

0 comments on commit e90a07b

Please sign in to comment.