Skip to content

Commit

Permalink
Fixed #155
Browse files Browse the repository at this point in the history
  • Loading branch information
sakno committed Mar 22, 2023
1 parent 636a766 commit a4996ba
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -174,17 +174,29 @@ ValueTask IRaftClusterMember.CancelPendingRequestsAsync()
}

Task<Result<bool>> IRaftClusterMember.VoteAsync(long term, long lastLogIndex, long lastLogTerm, CancellationToken token)
=> IsRemote
{
return token.IsCancellationRequested
? Task.FromCanceled<Result<bool>>(token)
: IsRemote
? SendAsync<Result<bool>, RequestVoteMessage>(new RequestVoteMessage(context.LocalMemberId, term, lastLogIndex, lastLogTerm), token)
: Task.FromResult(new Result<bool>(term, true));
}

Task<Result<PreVoteResult>> IRaftClusterMember.PreVoteAsync(long term, long lastLogIndex, long lastLogTerm, CancellationToken token)
=> IsRemote
{
return token.IsCancellationRequested
? Task.FromCanceled<Result<PreVoteResult>>(token)
: IsRemote
? SendAsync<Result<PreVoteResult>, PreVoteMessage>(new PreVoteMessage(context.LocalMemberId, term, lastLogIndex, lastLogTerm), token)
: Task.FromResult(new Result<PreVoteResult>(term, PreVoteResult.Accepted));
}

Task<bool> IClusterMember.ResignAsync(CancellationToken token)
=> SendAsync<bool, ResignMessage>(new ResignMessage(context.LocalMemberId), token);
{
return token.IsCancellationRequested
? Task.FromCanceled<bool>(token)
: SendAsync<bool, ResignMessage>(new ResignMessage(context.LocalMemberId), token);
}

Task<Result<bool>> IRaftClusterMember.AppendEntriesAsync<TEntry, TList>(
long term,
Expand All @@ -196,15 +208,49 @@ Task<Result<bool>> IRaftClusterMember.AppendEntriesAsync<TEntry, TList>(
bool applyConfig,
CancellationToken token)
{
return IsRemote
? SendAsync<Result<bool>, AppendEntriesMessage<TEntry, TList>>(new AppendEntriesMessage<TEntry, TList>(context.LocalMemberId, term, prevLogIndex, prevLogTerm, commitIndex, entries, configuration, applyConfig) { UseOptimizedTransfer = context.UseEfficientTransferOfLogEntries }, token)
: Task.FromResult(new Result<bool>(term, true));
Task<Result<bool>> result;

if (token.IsCancellationRequested)
{
result = Task.FromCanceled<Result<bool>>(token);
}
else if (IsRemote)
{
AppendEntriesMessage<TEntry, TList> message;

// See bug https://github.com/dotnet/dotNext/issues/155
try
{
message = new(context.LocalMemberId, term, prevLogIndex, prevLogTerm, commitIndex, entries, configuration, applyConfig)
{
UseOptimizedTransfer = context.UseEfficientTransferOfLogEntries
};
}
catch (Exception e)
{
result = Task.FromException<Result<bool>>(e);
goto exit;
}

result = SendAsync<Result<bool>, AppendEntriesMessage<TEntry, TList>>(message, token);
}
else
{
result = Task.FromResult(new Result<bool>(term, true));
}

exit:
return result;
}

Task<Result<bool>> IRaftClusterMember.InstallSnapshotAsync(long term, IRaftLogEntry snapshot, long snapshotIndex, CancellationToken token)
=> IsRemote
{
return token.IsCancellationRequested
? Task.FromCanceled<Result<bool>>(token)
: IsRemote
? SendAsync<Result<bool>, InstallSnapshotMessage>(new InstallSnapshotMessage(context.LocalMemberId, term, snapshotIndex, snapshot), token)
: Task.FromResult(new Result<bool>(term, true));
}

async ValueTask<IReadOnlyDictionary<string, string>> IClusterMember.GetMetadataAsync(bool refresh, CancellationToken token)
{
Expand All @@ -218,7 +264,13 @@ async ValueTask<IReadOnlyDictionary<string, string>> IClusterMember.GetMetadataA
}

Task<long?> IRaftClusterMember.SynchronizeAsync(long commitIndex, CancellationToken token)
=> IsRemote ? SendAsync<long?, SynchronizeMessage>(new SynchronizeMessage(context.LocalMemberId, commitIndex), token) : Task.FromResult<long?>(null);
{
return token.IsCancellationRequested
? Task.FromCanceled<long?>(token)
: IsRemote
? SendAsync<long?, SynchronizeMessage>(new SynchronizeMessage(context.LocalMemberId, commitIndex), token)
: Task.FromResult<long?>(null);
}

EndPoint IPeer.EndPoint => EndPoint;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,9 @@ private bool ProcessMemberResponse(Timestamp startTime, Task<Result<bool>> respo
}
catch (Exception e)
{
// treat any exception as faulty member
quorum -= 1;
commitQuorum -= 1;
Logger.LogError(e, ExceptionMessages.UnexpectedError);
}
finally
Expand Down

0 comments on commit a4996ba

Please sign in to comment.