Skip to content

Commit

Permalink
started on heartbeat nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
tcocagne committed Apr 3, 2013
1 parent 0b8f0c5 commit dc853c6
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 0 deletions.
7 changes: 7 additions & 0 deletions src/tom/cocagne/paxos/HeartbeatCallback.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package tom.cocagne.paxos;

public interface HeartbeatCallback {

public void execute();

}
12 changes: 12 additions & 0 deletions src/tom/cocagne/paxos/HeartbeatMessenger.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package tom.cocagne.paxos;

public interface HeartbeatMessenger extends PracticalMessenger {

public void sendHeartbeat( ProposalID leaderProposalID);

public void schedule(int millisecondDelay, HeartbeatCallback callback);

public void onLeadershipLost();

public void onLeadershipChange(String previousLeaderUID, String newLeaderUID);
}
65 changes: 65 additions & 0 deletions src/tom/cocagne/paxos/HeartbeatNode.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package tom.cocagne.paxos;

import java.util.HashSet;

public class HeartbeatNode extends PracticalNode {

protected String leaderUID;
protected ProposalID leaderProposalID;
protected long lastHeartbeatTimestamp;
protected long lastPrepareTimestamp;
protected long heartbeatPeriod = 1000; // Milliseconds
protected long livenessWindow = 5000; // Milliseconds
protected boolean acquiringLeadership = false;
protected HashSet<String> acceptNACKs = new HashSet<String>();


public HeartbeatNode(PracticalMessenger messenger, String proposerUID,
int quorumSize, String leaderUID, int heartbeatPeriod, int livenessWindow) {
super(messenger, proposerUID, quorumSize);

this.leaderUID = leaderUID;
this.heartbeatPeriod = heartbeatPeriod;
this.livenessWindow = livenessWindow;

leaderProposalID = new ProposalID(1, leaderUID);
lastHeartbeatTimestamp = timestamp();
lastPrepareTimestamp = timestamp();

if (leaderUID != null && proposerUID.equals(leaderUID))
setLeader(true);
}

public long timestamp() {
return System.currentTimeMillis();
}

@Override
public void prepare(boolean incrementProposalNumber) {
if (incrementProposalNumber)
acceptNACKs.clear();
super.prepare(incrementProposalNumber);
}

public boolean leaderIsAlive() {
return timestamp() - lastHeartbeatTimestamp <= livenessWindow;
}

public boolean observedRecentPrepare() {
return timestamp() - lastPrepareTimestamp <= livenessWindow * 1.5;
}

public void pollLiveness() {
if (!leaderIsAlive() && !observedRecentPrepare()) {
if (acquiringLeadership)
prepare();
else
acquireLeadership();
}
}

private void acquireLeadership() {


}
}

0 comments on commit dc853c6

Please sign in to comment.