Skip to content

Commit

Permalink
remove dummy arraysegment<byte>
Browse files Browse the repository at this point in the history
  • Loading branch information
karlovnv committed Sep 10, 2019
1 parent 82b728e commit 6d8a5c9
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 26 deletions.
2 changes: 1 addition & 1 deletion src/progaudi.tarantool/IRequestWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@ internal interface IRequestWriter : IDisposable

bool IsConnected { get; }

void Write(ArraySegment<byte> header, ArraySegment<byte> body);
void Write(ArraySegment<byte> request);
}
}
6 changes: 1 addition & 5 deletions src/progaudi.tarantool/LogicalConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ internal class LogicalConnection : ILogicalConnection
private readonly IRequestWriter _requestWriter;

private readonly ILog _logWriter;
private readonly ArraySegment<byte> _dummyEmptyArray;
private bool _disposed;

public LogicalConnection(ClientOptions options, RequestIdCounter requestIdCounter)
Expand All @@ -42,7 +41,6 @@ public LogicalConnection(ClientOptions options, RequestIdCounter requestIdCounte
_physicalConnection = new NetworkStreamPhysicalConnection();
_responseReader = new ResponseReader(_clientOptions, _physicalConnection);
_requestWriter = new RequestWriter(_clientOptions, _physicalConnection);
_dummyEmptyArray = new ArraySegment<byte>(new byte[] { }, 0, 0);
}

public uint PingsFailedByTimeoutCount
Expand Down Expand Up @@ -172,9 +170,7 @@ private async Task<MemoryStream> SendRequestImpl<TRequest>(TRequest request, Tim
//keep API for the sake of backward comp.
_requestWriter.Write(
// merged header and body
buffer,
// dummy array
_dummyEmptyArray);
buffer);

try
{
Expand Down
42 changes: 22 additions & 20 deletions src/progaudi.tarantool/RequestWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ internal class RequestWriter : IRequestWriter
{
private readonly ClientOptions _clientOptions;
private readonly IPhysicalConnection _physicalConnection;
private readonly Queue<Tuple<ArraySegment<byte>, ArraySegment<byte>>> _buffer;
private readonly Queue<ArraySegment<byte>> _buffer;
private readonly object _lock = new object();
private readonly Thread _thread;
private readonly ManualResetEventSlim _exitEvent;
Expand All @@ -25,7 +25,7 @@ public RequestWriter(ClientOptions clientOptions, IPhysicalConnection physicalCo
{
_clientOptions = clientOptions;
_physicalConnection = physicalConnection;
_buffer = new Queue<Tuple<ArraySegment<byte>, ArraySegment<byte>>>();
_buffer = new Queue<ArraySegment<byte>>();
_thread = new Thread(WriteFunction)
{
IsBackground = true,
Expand All @@ -49,18 +49,18 @@ public void BeginWriting()

public bool IsConnected => !_disposed;

public void Write(ArraySegment<byte> header, ArraySegment<byte> body)
public void Write(ArraySegment<byte> request)
{
if (_disposed)
{
throw new ObjectDisposedException(nameof(ResponseReader));
}

_clientOptions?.LogWriter?.WriteLine($"Enqueuing request: headers {header.Count} bytes, body {body.Count} bytes.");
_clientOptions?.LogWriter?.WriteLine($"Enqueuing request: {request.Count} bytes.");
bool shouldSignal;
lock (_lock)
{
_buffer.Enqueue(Tuple.Create(header, body));
_buffer.Enqueue(request);
shouldSignal = _buffer.Count == 1;
}

Expand Down Expand Up @@ -94,7 +94,7 @@ private void WriteFunction()
case 0:
return;
case 1:
WriteRequests(_connectionOptions.WriteStreamBufferSize,
WriteRequests(_connectionOptions.WriteStreamBufferSize,
_connectionOptions.MaxRequestsInBatch);

remaining = Interlocked.Read(ref _remaining);
Expand All @@ -119,34 +119,36 @@ void WriteBuffer(ArraySegment<byte> buffer)
_physicalConnection.Write(buffer.Array, buffer.Offset, buffer.Count);
}

Tuple<ArraySegment<byte>, ArraySegment<byte>> GetRequest()
bool GetRequest(out ArraySegment<byte> result)
{
lock (_lock)
{
if (_buffer.Count > 0)
{
_remaining = _buffer.Count + 1;
return _buffer.Dequeue();
result = _buffer.Dequeue();
return true;
}
}

return null;
}
result = default;
return false;
}

Tuple<ArraySegment<byte>, ArraySegment<byte>> request;
ArraySegment<byte> request;
var count = 0;
UInt64 length = 0;
List<Tuple<ArraySegment<byte>, ArraySegment<byte>>> list = new List<Tuple<ArraySegment<byte>, ArraySegment<byte>>>();
while ((request = GetRequest()) != null)
var list = new List<ArraySegment<byte>>();
while (GetRequest(out request))
{
_clientOptions?.LogWriter?.WriteLine($"Writing request: headers {request.Item1.Count} bytes, body {request.Item2.Count} bytes.");
length += (uint)request.Item1.Count;
_clientOptions?.LogWriter?.WriteLine($"Writing request: {request.Count} bytes.");
length += (uint)request.Count;

list.Add(request);
_clientOptions?.LogWriter?.WriteLine($"Wrote request: headers {request.Item1.Count} bytes, body {request.Item2.Count} bytes.");
_clientOptions?.LogWriter?.WriteLine($"Wrote request: {request.Count} bytes.");

count++;
if ((limit > 0 && count > limit ) || length > (ulong) bufferLength)
if ((limit > 0 && count > limit) || length > (ulong)bufferLength)
{
break;
}
Expand All @@ -160,8 +162,8 @@ Tuple<ArraySegment<byte>, ArraySegment<byte>> GetRequest()
int position = 0;
foreach (var r in list)
{
Buffer.BlockCopy(r.Item1.Array, r.Item1.Offset, result, position, r.Item1.Count);
position += r.Item1.Count;
Buffer.BlockCopy(r.Array, r.Offset, result, position, r.Count);
position += r.Count;
}

WriteBuffer(new ArraySegment<byte>(result));
Expand Down

0 comments on commit 6d8a5c9

Please sign in to comment.