Skip to content

Commit

Permalink
raft: Fix infinite election loop
Browse files Browse the repository at this point in the history
It looks like the election loops on startup are caused by something
proposing a value before all entries in the log have become committed.
This would cause ApplyStoreActions to try and get a lock on the raft
store, while processInternalRaftRequest is holding the lock (since it's
called by the memory store with the lock held). This situation is a
deadlock. It blocks the loop in Node.Run and prevents further
participation in the cluster.

To try and solve the issue, don't signal that the local node has become
the leader until it has committed all the uncommitted entries from
before. Also, block proposals until it is caught up.

Signed-off-by: Aaron Lehmann <[email protected]>
  • Loading branch information
aaronlehmann committed Aug 4, 2016
1 parent f3bcaff commit e3031a9
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 18 deletions.
55 changes: 37 additions & 18 deletions manager/state/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,18 +87,18 @@ type Node struct {
StateDir string
Error error

raftStore *raft.MemoryStorage
memoryStore *store.MemoryStore
Config *raft.Config
opts NewNodeOptions
reqIDGen *idutil.Generator
wait *wait
wal *wal.WAL
snapshotter *snap.Snapshotter
wasLeader bool
restored bool
isMember uint32
joinAddr string
raftStore *raft.MemoryStorage
memoryStore *store.MemoryStore
Config *raft.Config
opts NewNodeOptions
reqIDGen *idutil.Generator
wait *wait
wal *wal.WAL
snapshotter *snap.Snapshotter
restored bool
signalledLeadership uint32
isMember uint32
joinAddr string

// waitProp waits for all the proposals to be terminated before
// shutting down the node.
Expand Down Expand Up @@ -329,6 +329,8 @@ func (n *Node) Run(ctx context.Context) error {
close(n.doneCh)
}()

wasLeader := false

for {
select {
case <-n.ticker.C():
Expand Down Expand Up @@ -389,13 +391,24 @@ func (n *Node) Run(ctx context.Context) error {
// if that happens we will apply them as any
// follower would.
if rd.SoftState != nil {
if n.wasLeader && rd.SoftState.RaftState != raft.StateLeader {
n.wasLeader = false
if wasLeader && rd.SoftState.RaftState != raft.StateLeader {
wasLeader = false
n.wait.cancelAll()
n.leadershipBroadcast.Write(IsFollower)
} else if !n.wasLeader && rd.SoftState.RaftState == raft.StateLeader {
n.wasLeader = true
if atomic.LoadUint32(&n.signalledLeadership) == 1 {
n.leadershipBroadcast.Write(IsFollower)
atomic.StoreUint32(&n.signalledLeadership, 0)
}
} else if !wasLeader && rd.SoftState.RaftState == raft.StateLeader {
wasLeader = true
}
}

if wasLeader && atomic.LoadUint32(&n.signalledLeadership) != 1 {
// If all the entries in the log have become
// committed, broadcast our leadership status.
if n.caughtUp() {
n.leadershipBroadcast.Write(IsLeader)
atomic.StoreUint32(&n.signalledLeadership, 1)
}
}

Expand Down Expand Up @@ -506,6 +519,12 @@ func (n *Node) Leader() uint64 {
return n.Node.Status().Lead
}

func (n *Node) caughtUp() bool {
// obnoxious function that always returns a nil error
lastIndex, _ := n.raftStore.LastIndex()
return n.appliedIndex >= lastIndex
}

// Join asks to a member of the raft to propose
// a configuration change and add us as a member thus
// beginning the log replication process. This method
Expand Down Expand Up @@ -1093,7 +1112,7 @@ func (n *Node) processInternalRaftRequest(ctx context.Context, r *api.InternalRa
ch := n.wait.register(r.ID, cb)

// Do this check after calling register to avoid a race.
if !n.IsLeader() {
if atomic.LoadUint32(&n.signalledLeadership) != 1 {
n.wait.cancel(r.ID)
return nil, ErrLostLeadership
}
Expand Down
6 changes: 6 additions & 0 deletions manager/state/raft/testutils/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,14 @@ func NewInitNode(t *testing.T, tc *cautils.TestCA, raftConfig *api.RaftConfig, o
err := n.Node.JoinAndStart()
require.NoError(t, err, "can't join cluster")

leadershipCh, cancel := n.SubscribeLeadership()
defer cancel()

go n.Run(ctx)

// Wait for the node to become the leader.
<-leadershipCh

if raftConfig != nil {
assert.NoError(t, n.MemoryStore().Update(func(tx store.Tx) error {
return store.CreateCluster(tx, &api.Cluster{
Expand Down

0 comments on commit e3031a9

Please sign in to comment.