Skip to content

Commit

Permalink
No Responders and Prepare for Release (#193)
Browse files Browse the repository at this point in the history
* Handle no-responders
* prep for release

Signed-off-by: Colin Sullivan <[email protected]>
  • Loading branch information
ColinSullivan1 committed Oct 13, 2020
1 parent 901c8fe commit b5c310d
Show file tree
Hide file tree
Showing 9 changed files with 62 additions and 26 deletions.
2 changes: 1 addition & 1 deletion doc/DoxyFile.STAN.Client
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ PROJECT_NAME = "NATS Streaming .NET Client"
# could be handy for archiving the generated documentation or if some version
# control system is used.

PROJECT_NUMBER = 0.2.1
PROJECT_NUMBER = 0.3.0

# Using the PROJECT_BRIEF tag one can provide an optional one line description
# for a project that appears at the top of each page and should give viewer a
Expand Down
6 changes: 3 additions & 3 deletions src/STAN.Client/STAN.Client.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Google.Protobuf" Version="3.10.0" />
<PackageReference Include="Google.Protobuf.Tools" Version="3.10.0" PrivateAssets="All" />
<PackageReference Include="NATS.Client" Version="0.10.0" />
<PackageReference Include="Google.Protobuf" Version="3.13.0" />
<PackageReference Include="Google.Protobuf.Tools" Version="3.13.0" PrivateAssets="All" />
<PackageReference Include="NATS.Client" Version="0.11.0" />
</ItemGroup>

</Project>
43 changes: 31 additions & 12 deletions src/STAN.Client/StanConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,11 @@ public class Connection : IStanConnection, IDisposable
private ISubscription pingSubscription = null;

private readonly object pingLock = new object();

#pragma warning disable 0618
private readonly NUID pubNUID = new NUID();
#pragma warning restore 0618

private Timer pingTimer;
private readonly byte[] pingBytes;
private readonly string pingRequests;
Expand Down Expand Up @@ -246,6 +250,8 @@ internal Connection(string stanClusterID, string clientID, StanOptions options)
}
catch (NATSTimeoutException)
{
// This also covers no-responders as the NATSNoRespondersException
// is a subclass of NATSTimeoutException
protoUnsubscribe();
if (ncOwned)
{
Expand Down Expand Up @@ -324,7 +330,7 @@ internal Connection(string stanClusterID, string clientID, StanOptions options)
pingMaxOut = response.PingMaxOut;
pingBytes = ProtocolSerializer.createPing(connID);

pingTimer = new Timer(pingServer, null, pingInterval, Timeout.Infinite);
pingTimer = new Timer(pingServer, null, pingInterval, pingInterval);
}
}
if (unsubPing)
Expand Down Expand Up @@ -353,21 +359,13 @@ private void pingServer(object state)
return;
}

// disable timer, will never be called.
pingTimer.Change(Timeout.Infinite, Timeout.Infinite);

pingOut++;
conn = nc;

if (pingOut > pingMaxOut)
{
lostConnection = true;
}
else
{
//reactivate timer.
pingTimer.Change(pingInterval, Timeout.Infinite);
}
}

if (lostConnection)
Expand All @@ -379,6 +377,7 @@ private void pingServer(object state)
try
{
conn.Publish(pingRequests, pingInbox, pingBytes);
conn.FlushBuffer();
}
catch (Exception ex)
when (
Expand Down Expand Up @@ -461,10 +460,18 @@ private void cleanupOnClose(Exception ex)
}
}

private void processPingResponse(object sender, MsgHandlerEventArgs e)
private bool IsNoResponseMsg(Msg m)
{
return m != null && m.Data.Length == 0 && m.HasHeaders && "503".Equals(m.Header["Status"]);
}

private void processPingResponse(object sender, MsgHandlerEventArgs args)
{
// No data means OK (no need to unmarshall)
var data = e.Message.Data;
if (IsNoResponseMsg(args.Message))
return;

// No data can be a valid message
var data = args.Message.Data;
if (data?.Length > 0)
{
var pingResp = new PingResponse();
Expand All @@ -482,6 +489,7 @@ private void processPingResponse(object sender, MsgHandlerEventArgs e)
if (err?.Length > 0)
{
closeDueToPing(new StanException(err));
return;
}
}

Expand All @@ -501,7 +509,10 @@ private void processHeartBeat(object sender, MsgHandlerEventArgs args)
}

if (lnc != null)
{
lnc.Publish(args.Message.Reply, null);
lnc.FlushBuffer();
}
}

internal PublishAck removeAck(string guid)
Expand Down Expand Up @@ -569,7 +580,9 @@ internal void processMsg(object sender, MsgHandlerEventArgs args)

static public string newGUID()
{
#pragma warning disable 0618
return NUID.NextGlobal;
#pragma warning restore 0618
}

public void Publish(string subject, byte[] data)
Expand Down Expand Up @@ -799,6 +812,12 @@ public void Close()
// and gracefully close the streaming connection. The streaming server
// will cleanup this client on its own.
}
catch (NATSTimeoutException)
{
// Also covers NATSNoRespondersException. Either way,
// Assume that there 's no streaming server available and
// close the connection below if required.
}

if (reply != null)
{
Expand Down
3 changes: 3 additions & 0 deletions src/Samples/RxSample/RxSample.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,7 @@
<ProjectReference Include="..\..\STAN.Client\STAN.Client.csproj" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="NATS.Client" Version="0.11.0" />
</ItemGroup>
</Project>
3 changes: 3 additions & 0 deletions src/Samples/StanPub/StanPub.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,7 @@
<ProjectReference Include="..\..\STAN.Client\STAN.Client.csproj" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="NATS.Client" Version="0.11.0" />
</ItemGroup>
</Project>
3 changes: 3 additions & 0 deletions src/Samples/StanSub/StanSub.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,7 @@
<ProjectReference Include="..\..\STAN.Client\STAN.Client.csproj" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="NATS.Client" Version="0.11.0" />
</ItemGroup>
</Project>
7 changes: 5 additions & 2 deletions src/Tests/IntegrationTests/IntegrationTests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.4.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.8.0-preview-20200921-01" />
<PackageReference Include="xunit" Version="2.4.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.3"><IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
<PrivateAssets>all</PrivateAssets>
</PackageReference>
<PackageReference Include="NATS.Client" Version="0.11.0" />
</ItemGroup>

<ItemGroup>
Expand Down
14 changes: 8 additions & 6 deletions src/Tests/IntegrationTests/PingTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ public void TestPingsNatsConnGone()
{
nc.SubscribeAsync(StanConsts.DefaultDiscoverPrefix + "." + Context.ClusterId + ".pings", (obj, args) =>
{
if (args.Message.Data.Length == 0)
{
return;
}
count++;
if (count > StanConsts.DefaultPingMaxOut)
{
Expand Down Expand Up @@ -99,7 +104,7 @@ public void TestPingStreamingServerGone()
AutoResetEvent ev = new AutoResetEvent(false);

var so = Context.GetStanTestOptions(Context.Server1);
so.PingInterval = 200;
so.PingInterval = 100;
so.PingMaxOutstanding = 3;
so.ConnectionLostEventHandler = (obj, args) =>
{
Expand All @@ -108,8 +113,9 @@ public void TestPingStreamingServerGone()

using (var sc = Context.GetStanConnection(so))
{
sc.NATSConnection.FlushBuffer();
nss.Shutdown();
Assert.True(ev.WaitOne(20000));
Assert.True(ev.WaitOne(2000));
}
}
}
Expand All @@ -124,10 +130,6 @@ public void TestPingCloseUnlockPubCalls()
using (var nss = Context.StartStreamingServerWithExternal(Context.Server1))
{
var ev = new AutoResetEvent(false);
//var so = StanOptions.GetDefaultOptions();
//so.PingInterval = 50;
//so.PingMaxOutstanding = 10;
//so.PubAckWait = 100;

using (var sc = Context.GetStanConnection(Context.Server1))
{
Expand Down
7 changes: 5 additions & 2 deletions src/Tests/UnitTests/UnitTests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.4.0" />
<PackageReference Include="xunit" Version="2.4.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.3"><IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
<PrivateAssets>all</PrivateAssets>
</PackageReference>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.7.1" />
<PackageReference Include="NATS.Client" Version="0.11.0" />
</ItemGroup>

<ItemGroup>
Expand Down

0 comments on commit b5c310d

Please sign in to comment.