Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Coalesce flushes during the read cycle. #132

Merged
merged 2 commits into from
May 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Sources/NIOHTTP2/HTTP2StreamChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ final class HTTP2StreamChannel: Channel, ChannelCore {
self.state.beginClosing()
let resetFrame = HTTP2Frame(streamID: self.streamID, payload: .rstStream(.cancel))
self.receiveOutboundFrame(resetFrame, promise: nil)
self.parent?.flush()
self.multiplexer.childChannelFlush()
}

private func closedCleanly() {
Expand Down Expand Up @@ -514,7 +514,7 @@ private extension HTTP2StreamChannel {
let write = self.pendingWrites.removeFirst()
self.receiveOutboundFrame(write.0, promise: write.1)
}
self.parent?.flush()
self.multiplexer.childChannelFlush()
}

/// Fails all pending writes with the given error.
Expand Down
52 changes: 51 additions & 1 deletion Sources/NIOHTTP2/HTTP2StreamMultiplexer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public final class HTTP2StreamMultiplexer: ChannelInboundHandler, ChannelOutboun
private var context: ChannelHandlerContext!
private var nextOutboundStreamID: HTTP2StreamID
private var connectionFlowControlManager: InboundWindowManager
private var flushState: FlushState = .notReading

public func handlerAdded(context: ChannelHandlerContext) {
// We now need to check that we're on the same event loop as the one we were originally given.
Expand All @@ -49,6 +50,8 @@ public final class HTTP2StreamMultiplexer: ChannelInboundHandler, ChannelOutboun
let frame = self.unwrapInboundIn(data)
let streamID = frame.streamID

self.flushState.startReading()

guard streamID != .rootStream else {
// For stream 0 we forward all frames on to the main channel.
context.fireChannelRead(data)
Expand Down Expand Up @@ -80,6 +83,26 @@ public final class HTTP2StreamMultiplexer: ChannelInboundHandler, ChannelOutboun
}
}

public func channelReadComplete(context: ChannelHandlerContext) {
if case .flushPending = self.flushState {
self.flushState = .notReading
context.flush()
} else {
self.flushState = .notReading
}

context.fireChannelReadComplete()
}

public func flush(context: ChannelHandlerContext) {
switch self.flushState {
case .reading, .flushPending:
self.flushState = .flushPending
case .notReading:
context.flush()
}
}

public func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
/* for now just forward */
context.write(data, promise: promise)
Expand Down Expand Up @@ -170,6 +193,33 @@ public final class HTTP2StreamMultiplexer: ChannelInboundHandler, ChannelOutboun
}
}


extension HTTP2StreamMultiplexer {
/// The state of the multiplexer for flush coalescing.
///
/// The stream multiplexer aims to perform limited flush coalescing on the read side by delaying flushes from the child and
/// parent channels until channelReadComplete is received. To do this we need to track what state we're in.
enum FlushState {
/// No channelReads have been fired since the last channelReadComplete, so we probably aren't reading. Let any
/// flushes through.
case notReading

/// We've started reading, but don't have any pending flushes.
case reading

/// We're in the read loop, and have received a flush.
case flushPending

mutating func startReading() {
if case .notReading = self {
self = .reading
}
}
}
}



extension HTTP2StreamMultiplexer {
/// Create a new `Channel` for a new stream initiated by this peer.
///
Expand Down Expand Up @@ -211,6 +261,6 @@ extension HTTP2StreamMultiplexer {
}

internal func childChannelFlush() {
self.context.flush()
self.flush(context: context)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ extension HTTP2StreamMultiplexerTests {
("testCreatedChildChannelCanBeClosedBeforeWritingHeaders", testCreatedChildChannelCanBeClosedBeforeWritingHeaders),
("testCreatedChildChannelCanBeClosedImmediatelyWhenBaseIsActive", testCreatedChildChannelCanBeClosedImmediatelyWhenBaseIsActive),
("testCreatedChildChannelCanBeClosedBeforeWritingHeadersWhenBaseIsActive", testCreatedChildChannelCanBeClosedBeforeWritingHeadersWhenBaseIsActive),
("testMultiplexerCoalescesFlushCallsDuringChannelRead", testMultiplexerCoalescesFlushCallsDuringChannelRead),
]
}
}
Expand Down
69 changes: 69 additions & 0 deletions Tests/NIOHTTP2Tests/HTTP2StreamMultiplexerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,41 @@ final class ReadCounter: ChannelOutboundHandler {
}


/// A handler that tracks the number of times flush() was called on the channel.
final class FlushCounter: ChannelOutboundHandler {
typealias OutboundIn = Any
typealias OutboundOut = Any

var flushCount = 0

func flush(context: ChannelHandlerContext) {
self.flushCount += 1
context.flush()
}
}


/// A channel handler that sends a response in response to a HEADERS frame.
final class QuickResponseHandler: ChannelInboundHandler {
typealias InboundIn = HTTP2Frame
typealias OutboundOut = HTTP2Frame

func channelRead(context: ChannelHandlerContext, data: NIOAny) {
let frame = self.unwrapInboundIn(data)

guard case .headers = frame.payload else {
context.fireChannelRead(data)
return
}

let responseHeaders = HPACKHeaders([(":status", "200"), ("content-length", "0")])
let responseFrame = HTTP2Frame(streamID: frame.streamID, payload: .headers(.init(headers: responseHeaders, endStream: true)))
context.writeAndFlush(self.wrapOutboundOut(responseFrame), promise: nil)
context.fireChannelRead(data)
}
}


/// A channel handler that succeeds a promise when removed from
/// a pipeline.
final class HandlerRemovedHandler: ChannelInboundHandler {
Expand Down Expand Up @@ -1428,4 +1463,38 @@ final class HTTP2StreamMultiplexerTests: XCTestCase {
XCTAssertTrue(closed)
XCTAssertNoThrow(XCTAssertTrue(try self.channel.finish().isClean))
}

func testMultiplexerCoalescesFlushCallsDuringChannelRead() throws {
// We need to activate the underlying channel here.
XCTAssertNoThrow(try self.channel.connect(to: SocketAddress(ipAddress: "127.0.0.1", port: 80)).wait())

// Add a flush counter.
let flushCounter = FlushCounter()
XCTAssertNoThrow(try self.channel.pipeline.addHandler(flushCounter).wait())

// Add a server-mode multiplexer that will add an auto-response handler.
let multiplexer = HTTP2StreamMultiplexer(mode: .server, channel: self.channel) { (channel, _) in
channel.pipeline.addHandler(QuickResponseHandler())
}
XCTAssertNoThrow(try self.channel.pipeline.addHandler(multiplexer).wait())

// We're going to send in 10 request frames.
let requestHeaders = HPACKHeaders([(":path", "/"), (":method", "GET"), (":authority", "localhost"), (":scheme", "https")])
XCTAssertEqual(flushCounter.flushCount, 0)

let framesToSend = stride(from: 1, through: 19, by: 2).map { HTTP2Frame(streamID: HTTP2StreamID($0), payload: .headers(.init(headers: requestHeaders, endStream: true))) }
for frame in framesToSend {
self.channel.pipeline.fireChannelRead(NIOAny(frame))
}
self.channel.embeddedEventLoop.run()

// Response frames should have been written, but no flushes, so they aren't visible.
XCTAssertEqual(try self.channel.sentFrames().count, 0)
XCTAssertEqual(flushCounter.flushCount, 0)

// Now send channel read complete. The frames should be flushed through.
self.channel.pipeline.fireChannelReadComplete()
XCTAssertEqual(try self.channel.sentFrames().count, 10)
XCTAssertEqual(flushCounter.flushCount, 1)
}
}
11 changes: 11 additions & 0 deletions Tests/NIOHTTP2Tests/TestUtilities.swift
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,17 @@ extension EmbeddedChannel {
let content: HTTP2Frame? = try? assertNoThrowWithValue(self.readInbound())
XCTAssertNil(content, "Received unexpected content: \(content!)", file: file, line: line)
}

/// Retrieve all sent frames.
func sentFrames(file: StaticString = #file, line: UInt = #line) throws -> [HTTP2Frame] {
var receivedFrames: [HTTP2Frame] = Array()

while let frame = try assertNoThrowWithValue(self.readOutbound(as: HTTP2Frame.self), file: file, line: line) {
receivedFrames.append(frame)
}

return receivedFrames
}
}

extension HTTP2Frame {
Expand Down