Skip to content

Commit

Permalink
it works
Browse files Browse the repository at this point in the history
  • Loading branch information
willemt committed May 4, 2016
1 parent 4287e8c commit 9e614fb
Show file tree
Hide file tree
Showing 5 changed files with 244 additions and 77 deletions.
27 changes: 17 additions & 10 deletions deps/raft/raft.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,16 @@
#ifndef RAFT_H_
#define RAFT_H_

#define RAFT_ERR_NOT_LEADER -2
#define RAFT_ERR_ONE_VOTING_CHANGE_ONLY -3
#define RAFT_ERR_SHUTDOWN -4
#define RAFT_ERR_NOT_LEADER -2
#define RAFT_ERR_ONE_VOTING_CHANGE_ONLY -3
#define RAFT_ERR_SHUTDOWN -4

#define RAFT_REQUESTVOTE_ERR_GRANTED 1
#define RAFT_REQUESTVOTE_ERR_NOT_GRANTED 0
#define RAFT_REQUESTVOTE_ERR_UNKNOWN_NODE -1
#define RAFT_REQUESTVOTE_ERR_GRANTED 1
#define RAFT_REQUESTVOTE_ERR_NOT_GRANTED 0
/**
* vote granting failed because this node is pending removal */
#define RAFT_REQUESTVOTE_ERR_PENDING_REMOVAL -1
#define RAFT_REQUESTVOTE_ERR_UNKNOWN_NODE -2

typedef enum {
RAFT_STATE_NONE,
Expand Down Expand Up @@ -190,8 +193,9 @@ typedef int (
* This triggers only when there are no pending configuration changes.
* @param[in] raft The Raft server making this callback
* @param[in] user_data User data that is passed from Raft server
* @param[in] node The node */
typedef void (
* @param[in] node The node
* @return 0 does not want to be notified again; otherwise -1 */
typedef int (
*func_node_has_sufficient_logs_f
) (
raft_server_t* raft,
Expand Down Expand Up @@ -374,7 +378,10 @@ raft_node_t* raft_add_node(raft_server_t* me, void* user_data, int id, int is_se

/** Add a node which does not participate in voting.
* If a node already exists the call will fail.
* Parameters are identical to raft_add_node */
* Parameters are identical to raft_add_node
* @return
* node if it was successfully added;
* NULL if the node already exists */
raft_node_t* raft_add_non_voting_node(raft_server_t* me_, void* udata, int id, int is_self);

/** Remove node.
Expand Down Expand Up @@ -485,7 +492,7 @@ int raft_recv_entry(raft_server_t* me,
msg_entry_response_t *r);

/**
* @return the server's node ID */
* @return server's node ID; -1 if it doesn't know what it is */
int raft_get_nodeid(raft_server_t* me);

/**
Expand Down
18 changes: 18 additions & 0 deletions deps/raft/raft_node.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#define RAFT_NODE_VOTED_FOR_ME 1
#define RAFT_NODE_VOTING 1 << 1
#define RAFT_NODE_HAS_SUFFICIENT_LOG 1 << 2
#define RAFT_NODE_PENDING_REMOVAL 1 << 3

typedef struct
{
Expand Down Expand Up @@ -113,15 +114,32 @@ int raft_node_is_voting(raft_node_t* me_)
return (me->flags & RAFT_NODE_VOTING) != 0;
}

void raft_node_set_pending_removal(raft_node_t* me_, int pending_removal)
{
raft_node_private_t* me = (raft_node_private_t*)me_;
if (pending_removal)
me->flags |= RAFT_NODE_PENDING_REMOVAL;
else
me->flags &= ~RAFT_NODE_PENDING_REMOVAL;
}

int raft_node_is_pending_removal(raft_node_t* me_)
{
raft_node_private_t* me = (raft_node_private_t*)me_;
return (me->flags & RAFT_NODE_PENDING_REMOVAL) != 0;
}

void raft_node_set_has_sufficient_logs(raft_node_t* me_)
{
raft_node_private_t* me = (raft_node_private_t*)me_;
me->flags |= RAFT_NODE_HAS_SUFFICIENT_LOG;
printf("%d\n", me->flags);
}

int raft_node_has_sufficient_logs(raft_node_t* me_)
{
raft_node_private_t* me = (raft_node_private_t*)me_;
printf("%d\n", me->flags);
return (me->flags & RAFT_NODE_HAS_SUFFICIENT_LOG) != 0;
}

Expand Down
114 changes: 81 additions & 33 deletions deps/raft/raft_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ void raft_become_candidate(raft_server_t* me_)
raft_set_current_term(me_, raft_get_current_term(me_) + 1);
for (i = 0; i < me->num_nodes; i++)
raft_node_vote_for_me(me->nodes[i], 0);

/* TODO: Shouldn't vote for itself it if isn't a voting node */
raft_vote(me_, me->node);
me->current_leader = NULL;
raft_set_state(me_, RAFT_STATE_CANDIDATE);
Expand All @@ -144,8 +146,9 @@ void raft_become_candidate(raft_server_t* me_)
me->timeout_elapsed = rand() % me->election_timeout;

for (i = 0; i < me->num_nodes; i++)
if (me->node != me->nodes[i] && raft_node_is_voting(me->nodes[i]))
raft_send_requestvote(me_, me->nodes[i]);
if (me->node != me->nodes[i])
if (raft_node_is_voting(me->nodes[i]))
raft_send_requestvote(me_, me->nodes[i]);
}

void raft_become_follower(raft_server_t* me_)
Expand Down Expand Up @@ -183,6 +186,12 @@ raft_entry_t* raft_get_entry_from_idx(raft_server_t* me_, int etyidx)
return log_get_at_idx(me->log, etyidx);
}

int raft_voting_change_is_in_progress(raft_server_t* me_)
{
raft_server_private_t* me = (raft_server_private_t*)me_;
return -1 != me->voting_cfg_change_log_idx;
}

int raft_recv_appendentries_response(raft_server_t* me_,
raft_node_t* node,
msg_appendentries_response_t* r)
Expand Down Expand Up @@ -245,14 +254,16 @@ int raft_recv_appendentries_response(raft_server_t* me_,
raft_node_set_match_idx(node, r->current_idx);

if (!raft_node_is_voting(node) &&
-1 == me->voting_cfg_change_log_idx &&
!raft_voting_change_is_in_progress(me_) &&
raft_get_current_idx(me_) <= r->current_idx + 1 &&
me->cb.node_has_sufficient_logs &&
0 == raft_node_has_sufficient_logs(node)
0 == raft_node_has_sufficient_logs(node) &&
!raft_node_is_pending_removal(node)
)
{
raft_node_set_has_sufficient_logs(node);
me->cb.node_has_sufficient_logs(me_, me->udata, node);
int e = me->cb.node_has_sufficient_logs(me_, me->udata, node);
if (0 == e)
raft_node_set_has_sufficient_logs(node);
}

/* Update commit idx */
Expand Down Expand Up @@ -431,10 +442,18 @@ int raft_already_voted(raft_server_t* me_)

static int __should_grant_vote(raft_server_private_t* me, msg_requestvote_t* vr)
{
/* TODO: 4.2.3 Raft Dissertation:
* if a server receives a RequestVote request within the minimum election
* timeout of hearing from a current leader, it does not update its term or
* grant its vote */

if (!raft_node_is_voting(raft_get_my_node((void*)me)))
return 0;

if (vr->term < raft_get_current_term((void*)me))
return 0;

/* TODO: if voted for is candiate return 1 (if below checks pass) */
/* TODO: if voted for is candidate return 1 (if below checks pass) */
if (raft_already_voted((void*)me))
return 0;

Expand Down Expand Up @@ -463,6 +482,23 @@ int raft_recv_requestvote(raft_server_t* me_,
{
raft_server_private_t* me = (raft_server_private_t*)me_;

if (!node)
node = raft_get_node(me_, vr->candidate_id);

/* It's possible the candidate node has been removed from the cluster but
* hasn't received the appendentries that confirms the removal. Therefore
* the node is partitioned and still thinks its part of the cluster. It
* will eventually send a requestvote. This is error response tells the
* node that it might be removed. */
if (node && raft_node_is_pending_removal(node))
{
r->vote_granted = RAFT_REQUESTVOTE_ERR_PENDING_REMOVAL;
printf("unknown node %d %d\n",
raft_get_nodeid(me_),
node ? raft_node_get_id(node) : -1);
goto done;
}

if (raft_get_current_term(me_) < vr->term)
{
raft_set_current_term(me_, vr->term);
Expand All @@ -485,22 +521,24 @@ int raft_recv_requestvote(raft_server_t* me_,
}
else
{
if (raft_get_node(me_, vr->candidate_id))
r->vote_granted = RAFT_REQUESTVOTE_ERR_NOT_GRANTED;
else
/* It's possible the candidate node has been removed from the
* cluster but hasn't received the appendentries that confirms the
* removal. Therefore the node is partitioned and still thinks its
* part of the cluster. It will eventually send a requestvote. This
* is error response tells the node that it might be removed. */
if (!node)
{
r->vote_granted = RAFT_REQUESTVOTE_ERR_UNKNOWN_NODE;
printf("unknown node %d %d\n",
raft_get_nodeid(me_),
node ? raft_node_get_id(node) : -1);
goto done;
}
else
r->vote_granted = 0;
}

__log(me_, node, "node requested vote: %d replying: %s",
node,
r->vote_granted == 1 ? "granted" :
r->vote_granted == 0 ? "not granted" : "unknown");

done:
r->term = raft_get_current_term(me_);
return 0;
}
Expand Down Expand Up @@ -550,24 +588,25 @@ int raft_recv_requestvote_response(raft_server_t* me_,

switch (r->vote_granted)
{
case 1:
if (node)
raft_node_vote_for_me(node, 1);
int votes = raft_get_nvotes_for_me(me_);
if (raft_votes_is_majority(me->num_nodes, votes))
raft_become_leader(me_);
break;
case RAFT_REQUESTVOTE_ERR_UNKNOWN_NODE:
case RAFT_REQUESTVOTE_ERR_PENDING_REMOVAL:
{
raft_node_t* node = raft_get_my_node(me_);
printf("XXXX %p\n", node);
if (node && !raft_node_is_voting(node))
if (raft_node_is_voting(node))
{
printf("Time to shutdown %d XXXX %p\n", raft_get_nodeid(me_), node);
printf("SHUTDOWN\n");
return RAFT_ERR_SHUTDOWN;
}
}
break;
case RAFT_REQUESTVOTE_ERR_GRANTED:
if (node)
raft_node_vote_for_me(node, 1);
int votes = raft_get_nvotes_for_me(me_);
if (raft_votes_is_majority(me->num_nodes, votes))
raft_become_leader(me_);
break;
case RAFT_REQUESTVOTE_ERR_NOT_GRANTED:
break;
default:
Expand All @@ -586,7 +625,7 @@ int raft_recv_entry(raft_server_t* me_,

/* Only one voting cfg change at a time */
if (raft_entry_is_voting_cfg_change(e))
if (-1 != me->voting_cfg_change_log_idx)
if (raft_voting_change_is_in_progress(me_))
return RAFT_ERR_ONE_VOTING_CHANGE_ONLY;

if (!raft_is_leader(me_))
Expand All @@ -603,8 +642,11 @@ int raft_recv_entry(raft_server_t* me_,
raft_append_entry(me_, &ety);
for (i = 0; i < me->num_nodes; i++)
{
if (me->node == me->nodes[i] || !me->nodes[i] ||
!raft_node_is_voting(me->nodes[i]))
if (me->node == me->nodes[i] ||
!me->nodes[i] ||
!raft_node_is_voting(me->nodes[i]) ||
!raft_node_is_pending_removal(me->nodes[i])
)
continue;

/* Only send new entries.
Expand Down Expand Up @@ -744,7 +786,8 @@ void raft_send_appendentries_all(raft_server_t* me_)

me->timeout_elapsed = 0;
for (i = 0; i < me->num_nodes; i++)
if (me->node != me->nodes[i])
if (me->node != me->nodes[i] &&
!raft_node_is_pending_removal(me->nodes[i]))
raft_send_appendentries(me_, me->nodes[i]);
}

Expand Down Expand Up @@ -793,19 +836,24 @@ void raft_remove_node(raft_server_t* me_, raft_node_t* node)
{
raft_server_private_t* me = (raft_server_private_t*)me_;

raft_node_t* new_array, *new_node;
raft_node_t* new_array, *new_nodes;
new_array = (raft_node_t*)calloc((me->num_nodes - 1), sizeof(raft_node_t*));
new_node = new_array;
new_nodes = new_array;

int i;
int i, found = 0;
for (i = 0; i<me->num_nodes; i++)
{
if (me->nodes[i] == node)
{
found = 1;
continue;
*new_node = me->nodes[i];
new_node++;
}
*new_nodes = me->nodes[i];
new_nodes++;
}

assert(found);

me->num_nodes--;
free(me->nodes);
me->nodes = new_array;
Expand Down
5 changes: 4 additions & 1 deletion deps/raft/raft_server_properties.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ void raft_set_request_timeout(raft_server_t* me_, int millisec)

int raft_get_nodeid(raft_server_t* me_)
{
return raft_node_get_id(((raft_server_private_t*)me_)->node);
raft_server_private_t* me = (raft_server_private_t*)me_;
if (!me->node)
return -1;
return raft_node_get_id(me->node);
}

int raft_get_election_timeout(raft_server_t* me_)
Expand Down
Loading

0 comments on commit 9e614fb

Please sign in to comment.