Skip to content

Commit

Permalink
chain: optimize removal of inputs in mempool
Browse files Browse the repository at this point in the history
This commit implements a new struct, `cachedInputs`, to provide faster
lookup when removing inputs based on a given transaction. Instead of
iterating all the inputs to decide whether they should be removed, we
now cache `txid -> inputs` so we can update our local mempool faster by
iterating much less data.
  • Loading branch information
yyforyongyu committed May 9, 2023
1 parent 68f7e23 commit a2f9224
Show file tree
Hide file tree
Showing 2 changed files with 151 additions and 28 deletions.
97 changes: 69 additions & 28 deletions chain/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,69 @@ import (
"golang.org/x/sync/errgroup"
)

// cachedInputs caches the inputs of the transactions in the mempool. This is
// used to provide fast lookup between txids and inputs.
type cachedInputs struct {
// inputs provides a fast lookup from input -> txid.
inputs map[wire.OutPoint]chainhash.Hash

// txids provides a fast lookup from txid -> inputs.
txids map[chainhash.Hash]map[wire.OutPoint]struct{}
}

// newCachedInputs creates a new cachedInputs.
func newCachedInputs() *cachedInputs {
return &cachedInputs{
inputs: make(map[wire.OutPoint]chainhash.Hash),
txids: make(map[chainhash.Hash]map[wire.OutPoint]struct{}),
}
}

// hasInput returns the txid and a boolean to indicate the given input is
// found.
func (c *cachedInputs) hasInput(op wire.OutPoint) (chainhash.Hash, bool) {
txid, ok := c.inputs[op]
return txid, ok
}

// addInput adds the given input to our cached inputs. If the input already
// exists, the `inputs` map will be overwritten and the `txids` map will be
// updated.
func (c *cachedInputs) addInput(op wire.OutPoint, txid chainhash.Hash) {
// Init the map for this txid if it doesn't exist.
if _, ok := c.txids[txid]; !ok {
c.txids[txid] = make(map[wire.OutPoint]struct{})
}

// Add the input under this txid.
c.txids[txid][op] = struct{}{}

// Check if the input already exists.
oldTxid, existed := c.inputs[op]

// If existed, update the old txid to remove the input.
if existed {
log.Tracef("Input %s was spent in tx %s, now spent in %s",
op, oldTxid, txid)

delete(c.txids[oldTxid], op)
}

// Add the input to the inputs map with the new txid.
c.inputs[op] = txid
}

// removeInputsFromTx removes the inputs of the given txid from our cached
// inputs maps.
func (c *cachedInputs) removeInputsFromTx(txid chainhash.Hash) {
// Remove the inputs stored of this tx.
for op := range c.txids[txid] {
delete(c.inputs, op)
}

delete(c.txids, txid)
}

// mempool represents our view of the mempool and helps to keep track of which
// mempool transactions we already know about. The boolean in the txs map is
// used to indicate if we should remove the tx from our local mempool due to
Expand All @@ -27,7 +90,7 @@ type mempool struct {
//
// TODO(yy): create similar maps to provide faster lookup for output
// scripts.
inputs map[wire.OutPoint]chainhash.Hash
inputs *cachedInputs

// client is the rpc client that we'll use to query for the mempool.
client *rpcclient.Client
Expand All @@ -41,7 +104,7 @@ type mempool struct {
func newMempool(client *rpcclient.Client) *mempool {
return &mempool{
txs: make(map[chainhash.Hash]bool),
inputs: make(map[wire.OutPoint]chainhash.Hash),
inputs: newCachedInputs(),
initFin: make(chan struct{}),
client: client,
}
Expand Down Expand Up @@ -99,8 +162,7 @@ func (m *mempool) containsTx(hash chainhash.Hash) bool {
//
// NOTE: must be used inside a lock.
func (m *mempool) containsInput(op wire.OutPoint) (chainhash.Hash, bool) {
txid, ok := m.inputs[op]
return txid, ok
return m.inputs.hasInput(op)
}

// add inserts the given hash into our mempool and marks it to indicate that it
Expand Down Expand Up @@ -189,15 +251,8 @@ func (m *mempool) deleteUnmarked() {
// mempool's inputs map.
//
// NOTE: must be used inside a lock.
//
// TODO(yy): create a txid -> [inputs] map to make this faster.
func (m *mempool) removeInputs(tx chainhash.Hash) {
for outpoint, txid := range m.inputs {
if txid.IsEqual(&tx) {
// NOTE: it's safe to delete while iterating go map.
delete(m.inputs, outpoint)
}
}
m.inputs.removeInputsFromTx(tx)
}

// updateInputs takes a txid and populates the inputs of the tx into the
Expand All @@ -215,21 +270,8 @@ func (m *mempool) updateInputs(tx *wire.MsgTx) {
for _, input := range tx.TxIn {
outpoint := input.PreviousOutPoint

// Check whether this input has been spent in an old tx.
oldTxid, ok := m.inputs[outpoint]

// If not, add it to the map and continue.
if !ok {
m.inputs[outpoint] = tx.TxHash()
continue
}

log.Tracef("Input %s was spent in tx %s, now spent in %s",
outpoint, oldTxid, tx.TxHash())

// If the input has been spent in an old tx, we need to
// overwrite it.
m.inputs[outpoint] = tx.TxHash()
// Add the input to the cache.
m.inputs.addInput(outpoint, tx.TxHash())
}
}

Expand Down Expand Up @@ -288,7 +330,6 @@ func (m *mempool) LoadMempool() error {
// use it to update its internal mempool. It returns a slice of transactions
// that's new to its internal mempool.
func (m *mempool) UpdateMempoolTxes(txids []*chainhash.Hash) []*wire.MsgTx {

// txesToNotify is a list of txes to be notified to the client.
var notixyMx sync.Mutex
txesToNotify := make([]*wire.MsgTx, 0, len(txids))
Expand Down
82 changes: 82 additions & 0 deletions chain/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,88 @@ import (
"github.com/stretchr/testify/require"
)

// TestCachedInputs tests that the cachedInputs works as expected.
func TestCachedInputs(t *testing.T) {
require := require.New(t)

// Create test inputs and tx.
op1 := wire.OutPoint{Hash: chainhash.Hash{1}}
op2 := wire.OutPoint{Hash: chainhash.Hash{2}}
tx := &wire.MsgTx{
LockTime: 1,
TxIn: []*wire.TxIn{
{PreviousOutPoint: op1},
{PreviousOutPoint: op2},
},
}

c := newCachedInputs()

// Lookup should give us nothing.
txid, ok := c.hasInput(op1)
require.False(ok)
require.Zero(txid)

// Add the input.
c.addInput(op1, tx.TxHash())

// Lookup should now give us the txid.
txid, ok = c.hasInput(op1)
require.True(ok)
require.Equal(tx.TxHash(), txid)

// Add another input.
c.addInput(op2, tx.TxHash())

// Delete the inputs.
c.removeInputsFromTx(txid)

// Lookup should now give us nothing.
txid, ok = c.hasInput(op1)
require.False(ok)
require.Zero(txid)

txid, ok = c.hasInput(op2)
require.False(ok)
require.Zero(txid)
}

func TestCachedInputsAddInput(t *testing.T) {
require := require.New(t)

// Create a test input and tx.
op := wire.OutPoint{Hash: chainhash.Hash{1}}
tx := &wire.MsgTx{
LockTime: 1,
TxIn: []*wire.TxIn{{PreviousOutPoint: op}},
}

// replacedTx spends the same input as tx.
replacedTx := &wire.MsgTx{
// Use a different locktime to ensure the txid is different.
LockTime: 2,
TxIn: []*wire.TxIn{{PreviousOutPoint: op}},
}

c := newCachedInputs()

// Add the input.
c.addInput(op, tx.TxHash())

// Lookup should now give us the txid.
txid, ok := c.hasInput(op)
require.True(ok)
require.Equal(tx.TxHash(), txid)

// Add the input again using the replacement tx.
c.addInput(op, replacedTx.TxHash())

// Lookup should now give us the replacement txid.
txid, ok = c.hasInput(op)
require.True(ok)
require.Equal(replacedTx.TxHash(), txid)
}

// TestMempool tests that each method of the mempool struct works as expected.
func TestMempool(t *testing.T) {
require := require.New(t)
Expand Down

0 comments on commit a2f9224

Please sign in to comment.