From 5e75956d7263a772e947ea17e0f4ac768d8b3a60 Mon Sep 17 00:00:00 2001 From: tcocagne Date: Mon, 1 Apr 2013 07:25:47 -0500 Subject: [PATCH] started on practical implementation --- src/tom/cocagne/paxos/EssentialAcceptor.java | 8 +- src/tom/cocagne/paxos/PracticalAcceptor.java | 59 +++++++++++++ src/tom/cocagne/paxos/PracticalMessenger.java | 10 +++ src/tom/cocagne/paxos/PracticalProposer.java | 85 +++++++++++++++++++ 4 files changed, 158 insertions(+), 4 deletions(-) create mode 100644 src/tom/cocagne/paxos/PracticalAcceptor.java create mode 100644 src/tom/cocagne/paxos/PracticalMessenger.java create mode 100644 src/tom/cocagne/paxos/PracticalProposer.java diff --git a/src/tom/cocagne/paxos/EssentialAcceptor.java b/src/tom/cocagne/paxos/EssentialAcceptor.java index 919ec05..ec03c7f 100644 --- a/src/tom/cocagne/paxos/EssentialAcceptor.java +++ b/src/tom/cocagne/paxos/EssentialAcceptor.java @@ -2,10 +2,10 @@ public class EssentialAcceptor implements Acceptor { - private EssentialMessenger messenger; - private ProposalID promisedID; - private ProposalID acceptedID; - private Object acceptedValue; + protected EssentialMessenger messenger; + protected ProposalID promisedID; + protected ProposalID acceptedID; + protected Object acceptedValue; public EssentialAcceptor( EssentialMessenger messenger ) { this.messenger = messenger; diff --git a/src/tom/cocagne/paxos/PracticalAcceptor.java b/src/tom/cocagne/paxos/PracticalAcceptor.java new file mode 100644 index 0000000..7d75c1f --- /dev/null +++ b/src/tom/cocagne/paxos/PracticalAcceptor.java @@ -0,0 +1,59 @@ +package tom.cocagne.paxos; + +public class PracticalAcceptor extends EssentialAcceptor { + + protected String pendingAccepted = null; + protected String pendingPromise = null; + protected boolean active = true; + + public PracticalAcceptor(PracticalMessenger messenger) { + super(messenger); + } + + public boolean persistenceRequired() { + return pendingAccepted != null || pendingPromise != null; + } + + public void recover(ProposalID promisedID, ProposalID acceptedID, Object acceptedValue) { + this.promisedID = promisedID; + this.acceptedID = acceptedID; + this.acceptedValue = acceptedValue; + } + + @Override + public void receivePrepare(String fromUID, ProposalID proposalID) { + if (this.promisedID != null && proposalID.equals(promisedID)) { // duplicate message + if (active) + messenger.sendPromise(fromUID, proposalID, acceptedID, acceptedValue); + } + else if (this.promisedID == null || proposalID.isGreaterThan(promisedID)) { + if (pendingPromise == null) { + promisedID = proposalID; + if (active) + pendingPromise = fromUID; + } + } + else { + if (active) + ((PracticalMessenger)messenger).sendPrepareNACK(fromUID, proposalID, promisedID); + } + } + + @Override + public void receiveAcceptRequest(String fromUID, ProposalID proposalID, + Object value) { + if (acceptedID != null and proposalID.equals(acceptedID) && acceptedValue != null && acceptedValue.equals(value)) + if (active) + messenger.sendAccepted(proposalID, value); + + if (promisedID == null || proposalID.isGreaterThan(promisedID) || proposalID.equals(promisedID)) { + promisedID = proposalID; + acceptedID = proposalID; + acceptedValue = value; + + messenger.sendAccepted(acceptedID, acceptedValue); + } + } + + //messenger.sendPromise(fromUID, proposalID, acceptedID, acceptedValue); +} diff --git a/src/tom/cocagne/paxos/PracticalMessenger.java b/src/tom/cocagne/paxos/PracticalMessenger.java new file mode 100644 index 0000000..48e764c --- /dev/null +++ b/src/tom/cocagne/paxos/PracticalMessenger.java @@ -0,0 +1,10 @@ +package tom.cocagne.paxos; + +public interface PracticalMessenger extends EssentialMessenger { + + public void sendPrepareNACK(String proposerUID, ProposalID proposalID, ProposalID promisedID); + + public void sendAcceptNACK(String proposerUID, ProposalID proposalID, ProposalID promisedID); + + public void onLeadershipAcquired(); +} diff --git a/src/tom/cocagne/paxos/PracticalProposer.java b/src/tom/cocagne/paxos/PracticalProposer.java new file mode 100644 index 0000000..fbbceb6 --- /dev/null +++ b/src/tom/cocagne/paxos/PracticalProposer.java @@ -0,0 +1,85 @@ +package tom.cocagne.paxos; + +public class PracticalProposer extends EssentialProposer { + + private boolean leader = false; + private boolean active = true; + + public PracticalProposer(PracticalMessenger messenger, String proposerUID, + int quorumSize) { + super(messenger, proposerUID, quorumSize); + } + + @Override + public void setProposal(Object value) { + if ( proposedValue == null ) { + proposedValue = value; + + if (leader && active) + messenger.sendAccept(proposalID, proposedValue); + } + } + + @Override + public void prepare() { + prepare(true); + } + + public void prepare( boolean incrementProposalNumber ) { + if (incrementProposalNumber) { + leader = false; + + promisesReceived.clear(); + + proposalID.incrementNumber(); + } + + if (active) + messenger.sendPrepare(proposalID); + } + + public void observeProposal(String fromUID, ProposalID proposalID) { + if (proposalID.isGreaterThan(this.proposalID)) + this.proposalID.setNumber(proposalID.getNumber() + 1); + } + + public void receivePrepareNACK(String proposerUID, ProposalID proposalID, ProposalID promisedID) { + observeProposal(proposerUID, promisedID); + } + + public void receiveAcceptNACK(String proposerUID, ProposalID proposalID, ProposalID promisedID) { + + } + + public void resendAccept() { + if (leader && active && proposedValue != null) + messenger.sendAccept(proposalID, proposedValue); + } + + @Override + public void receivePromise(String fromUID, ProposalID proposalID, + ProposalID prevAcceptedID, Object prevAcceptedValue) { + + observeProposal(fromUID, proposalID); + + if ( leader || !proposalID.equals(this.proposalID) || promisesReceived.contains(fromUID) ) + return; + + promisesReceived.add( fromUID ); + + if (lastAcceptedID == null || prevAcceptedID.isGreaterThan(lastAcceptedID)) + { + lastAcceptedID = prevAcceptedID; + + if (prevAcceptedValue != null) + proposedValue = prevAcceptedValue; + } + + if (promisesReceived.size() == quorumSize) { + leader = true; + ((PracticalMessenger)messenger).onLeadershipAcquired(); + if (proposedValue != null) + messenger.sendAccept(this.proposalID, proposedValue); + } + } +}