Skip to content

Commit

Permalink
started on practical implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
tcocagne committed Apr 1, 2013
1 parent 1e7238c commit 5e75956
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 4 deletions.
8 changes: 4 additions & 4 deletions src/tom/cocagne/paxos/EssentialAcceptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

public class EssentialAcceptor implements Acceptor {

private EssentialMessenger messenger;
private ProposalID promisedID;
private ProposalID acceptedID;
private Object acceptedValue;
protected EssentialMessenger messenger;
protected ProposalID promisedID;
protected ProposalID acceptedID;
protected Object acceptedValue;

public EssentialAcceptor( EssentialMessenger messenger ) {
this.messenger = messenger;
Expand Down
59 changes: 59 additions & 0 deletions src/tom/cocagne/paxos/PracticalAcceptor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package tom.cocagne.paxos;

public class PracticalAcceptor extends EssentialAcceptor {

protected String pendingAccepted = null;
protected String pendingPromise = null;
protected boolean active = true;

public PracticalAcceptor(PracticalMessenger messenger) {
super(messenger);
}

public boolean persistenceRequired() {
return pendingAccepted != null || pendingPromise != null;
}

public void recover(ProposalID promisedID, ProposalID acceptedID, Object acceptedValue) {
this.promisedID = promisedID;
this.acceptedID = acceptedID;
this.acceptedValue = acceptedValue;
}

@Override
public void receivePrepare(String fromUID, ProposalID proposalID) {
if (this.promisedID != null && proposalID.equals(promisedID)) { // duplicate message
if (active)
messenger.sendPromise(fromUID, proposalID, acceptedID, acceptedValue);
}
else if (this.promisedID == null || proposalID.isGreaterThan(promisedID)) {
if (pendingPromise == null) {
promisedID = proposalID;
if (active)
pendingPromise = fromUID;
}
}
else {
if (active)
((PracticalMessenger)messenger).sendPrepareNACK(fromUID, proposalID, promisedID);
}
}

@Override
public void receiveAcceptRequest(String fromUID, ProposalID proposalID,
Object value) {
if (acceptedID != null and proposalID.equals(acceptedID) && acceptedValue != null && acceptedValue.equals(value))
if (active)
messenger.sendAccepted(proposalID, value);

if (promisedID == null || proposalID.isGreaterThan(promisedID) || proposalID.equals(promisedID)) {
promisedID = proposalID;
acceptedID = proposalID;
acceptedValue = value;

messenger.sendAccepted(acceptedID, acceptedValue);
}
}

//messenger.sendPromise(fromUID, proposalID, acceptedID, acceptedValue);
}
10 changes: 10 additions & 0 deletions src/tom/cocagne/paxos/PracticalMessenger.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package tom.cocagne.paxos;

public interface PracticalMessenger extends EssentialMessenger {

public void sendPrepareNACK(String proposerUID, ProposalID proposalID, ProposalID promisedID);

public void sendAcceptNACK(String proposerUID, ProposalID proposalID, ProposalID promisedID);

public void onLeadershipAcquired();
}
85 changes: 85 additions & 0 deletions src/tom/cocagne/paxos/PracticalProposer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package tom.cocagne.paxos;

public class PracticalProposer extends EssentialProposer {

private boolean leader = false;
private boolean active = true;

public PracticalProposer(PracticalMessenger messenger, String proposerUID,
int quorumSize) {
super(messenger, proposerUID, quorumSize);
}

@Override
public void setProposal(Object value) {
if ( proposedValue == null ) {
proposedValue = value;

if (leader && active)
messenger.sendAccept(proposalID, proposedValue);
}
}

@Override
public void prepare() {
prepare(true);
}

public void prepare( boolean incrementProposalNumber ) {
if (incrementProposalNumber) {
leader = false;

promisesReceived.clear();

proposalID.incrementNumber();
}

if (active)
messenger.sendPrepare(proposalID);
}

public void observeProposal(String fromUID, ProposalID proposalID) {
if (proposalID.isGreaterThan(this.proposalID))
this.proposalID.setNumber(proposalID.getNumber() + 1);
}

public void receivePrepareNACK(String proposerUID, ProposalID proposalID, ProposalID promisedID) {
observeProposal(proposerUID, promisedID);
}

public void receiveAcceptNACK(String proposerUID, ProposalID proposalID, ProposalID promisedID) {

}

public void resendAccept() {
if (leader && active && proposedValue != null)
messenger.sendAccept(proposalID, proposedValue);
}

@Override
public void receivePromise(String fromUID, ProposalID proposalID,
ProposalID prevAcceptedID, Object prevAcceptedValue) {

observeProposal(fromUID, proposalID);

if ( leader || !proposalID.equals(this.proposalID) || promisesReceived.contains(fromUID) )
return;

promisesReceived.add( fromUID );

if (lastAcceptedID == null || prevAcceptedID.isGreaterThan(lastAcceptedID))
{
lastAcceptedID = prevAcceptedID;

if (prevAcceptedValue != null)
proposedValue = prevAcceptedValue;
}

if (promisesReceived.size() == quorumSize) {
leader = true;
((PracticalMessenger)messenger).onLeadershipAcquired();
if (proposedValue != null)
messenger.sendAccept(this.proposalID, proposedValue);
}
}
}

0 comments on commit 5e75956

Please sign in to comment.