Skip to content

Commit

Permalink
Collapse HTTP/2 channel handlers. (apple#52)
Browse files Browse the repository at this point in the history
Motivation:

We attempted to use the NIO channel pipeline to spread out the HTTP/2
implementation across a number of channel handlers. Unfortunately, due
to the concerns of reentrancy it's not really safely possible to do this:
it's too easy for the pipeline ordering to become so confused that the
implementation ends up incorrect.

For this reason, we are going to squash together the three handlers that
provide a complete HTTP/2 protocol implementation into just one.

Modifications:

- Move the ConcurrentStreamsHandler and FlowControlHandler into the
    HTTP2Handler.
- Update setup and test code accordingly.

Result:

More correct implementation
  • Loading branch information
Lukasa authored Mar 13, 2019
1 parent d2697cf commit 38516fe
Show file tree
Hide file tree
Showing 16 changed files with 1,639 additions and 1,274 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,150 +14,13 @@
import NIO


/// A `ChannelHandler` that buffers new stream creation attempts to avoid violating
/// An object that buffers new stream creation attempts to avoid violating
/// the HTTP/2 setting `SETTINGS_MAX_CONCURRENT_STREAMS`.
///
/// HTTP/2 provides tools for bounding the maximum amount of concurrent streams that a
/// given peer can create. This is used to limit the amount of state that a peer will need
/// to allocate for a given connection.
///
/// In an high-efficiency pipeline it will generally be better to limit the maximum number of
/// concurrent streams by avoiding creating new outbound streams. However, simpler applications
/// may prefer to delay stream creation instead, buffering frames until they can be delivered.
/// This `ChannelHandler` manages this transparently to the user by keeping track of the number
/// of active outbound streams and delaying any stream creation until safe.
///
/// Note that this `ChannelHandler` can and will re-order flushed frames. Having this `ChannelHandler`
/// in the pipeline means that frames written logically before others may nonetheless have their write
/// promises satisfied later. If this is a concern for your application, consider using the
/// `HTTP2StreamMultiplexer` to avoid a single `Channel` having its writes reordered.
public class NIOHTTP2ConcurrentStreamsHandler: ChannelDuplexHandler {
public typealias InboundIn = HTTP2Frame
public typealias InboundOut = HTTP2Frame
public typealias OutboundIn = HTTP2Frame
public typealias OutboundOut = HTTP2Frame

/// The buffer that will store frames as needed. Provides most of the logic of this
/// ChannelHandler.
private var frameBuffer: StreamFrameBuffer

/// The mode of the Channel we're operating in
private var mode: NIOHTTP2Handler.ParserMode {
get {
return self.frameBuffer.mode
}
}

public init(mode: NIOHTTP2Handler.ParserMode, initialMaxOutboundStreams: Int) {
self.frameBuffer = StreamFrameBuffer(mode: mode, initialMaxOutboundStreams: initialMaxOutboundStreams)
}

public func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
switch event {
case let event as StreamClosedEvent:
// This event may make some frames writable. If it does, then we may be in write()
// (in which case we expect a pending flush), or we may be in flush() (in which case we're
// already looping) or we may be in a read cycle (in which case we'll get channelReadComplete
// soon). In any case, we will soon be writing frames, so we do not need to do so here.
// Instead, just keep track of the change internally.
self.frameBuffer.streamClosed(event.streamID)
case let event as NIOHTTP2StreamCreatedEvent:
self.frameBuffer.streamCreated(event.streamID)
default:
break
}

context.fireUserInboundEventTriggered(event)
}

public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
let frame = self.unwrapInboundIn(data)
guard case .settings(.settings(let newSettings)) = frame.payload else {
// Either this is not a settings frame, or it's a settings ACK. Either way we don't care.
// TODO(cory): We should handle GOAWAY!
context.fireChannelRead(data)
return
}

guard let newMaxConcurrentStreams = newSettings.lazy.reversed().first(where: { $0.parameter == .maxConcurrentStreams }).map( { $0.value } ) else {
// This settings frame didn't change the value of SETTINGS_MAX_CONCURRENT_STREAMS
context.fireChannelRead(data)
return
}

// This is allowed to shrink maxConcurrentStreams.
self.frameBuffer.maxOutboundStreams = newMaxConcurrentStreams
context.fireChannelRead(data)
}

public func channelReadComplete(context: ChannelHandlerContext) {
if self.writeIfPossible(context: context) {
context.flush()
}

context.fireChannelReadComplete()
}

public func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
let frame = self.unwrapOutboundIn(data)

let operation: StreamFrameBuffer.OutboundFrameAction
do {
operation = try self.frameBuffer.processOutboundFrame(frame, promise: promise)
} catch {
promise?.fail(error)
return
}

switch operation {
case .nothing:
break
case .forward:
context.write(data, promise: promise)
case .forwardAndDrop(let writesToDrop, let error):
// We forward *first*, drop *second*.
context.write(data, promise: promise)
for (_, promise) in writesToDrop {
promise?.fail(error)
}
case .succeedAndDrop(let writesToDrop, let error):
// We drop *first*, succeed *second*
for (_, promise) in writesToDrop {
promise?.fail(error)
}
promise?.succeed(())
}
}

public func flush(context: ChannelHandlerContext) {
self.frameBuffer.flushReceived()
self.writeIfPossible(context: context)
context.flush()
}

@discardableResult
private func writeIfPossible(context: ChannelHandlerContext) -> Bool {
// We need to spin our writing loop. How does this work?
//
// We may be buffering a number of frames for streams that are now ready to be unbuffered.
// These streams will have become ready to be unbuffered due to some combination of flush
// calls and stream closures. Here we loop forward over all buffers capable of being somewhat
// unbuffered and punt those frames out. The act of punting those frames out may in fact cause
// more streams to become able to be flushed, which is just fine: nothing bad should happen.
var didWrite = false

while let (frame, promise) = self.frameBuffer.nextFlushedWritableFrame() {
context.write(self.wrapOutboundOut(frame), promise: promise)
didWrite = true
}

return didWrite
}
}


/// An object that manages buffering stream frames to avoid violating SETTINGS_MAX_CONCURRENT_STREAMS.
struct StreamFrameBuffer {
struct ConcurrentStreamBuffer {
fileprivate struct FrameBuffer {
var frames: MarkedCircularBuffer<(HTTP2Frame, EventLoopPromise<Void>?)>
var streamID: HTTP2StreamID
Expand Down Expand Up @@ -201,11 +64,21 @@ struct StreamFrameBuffer {
///
/// Notes that the current number of outbound streams may have gone down, which is useful information
/// when flushing writes.
mutating func streamClosed(_ streamID: HTTP2StreamID) {
mutating func streamClosed(_ streamID: HTTP2StreamID) -> MarkedCircularBuffer<(HTTP2Frame, EventLoopPromise<Void>?)>? {
// We only care about outbound streams.
if streamID.mayBeInitiatedBy(self.mode) {
self.currentOutboundStreams -= 1

// We should check whether we have frames here. We shouldn't, but we might, and we need to return them if we do.
if let bufferIndex = self.bufferedFrames.binarySearch(key: { $0.streamID }, needle: streamID) {
let buffer = self.bufferedFrames.remove(at: bufferIndex)
if buffer.frames.count > 0 {
return buffer.frames
}
}
}

return nil
}

mutating func streamCreated(_ streamID: HTTP2StreamID) {
Expand All @@ -224,25 +97,6 @@ struct StreamFrameBuffer {
self.bufferedFrames.markFlushPoint()
}

/// The result of receiving a frame that is about to be sent.
enum OutboundFrameAction {
/// The caller should forward the frame on.
case forward

/// This object has buffered the frame, no action should be taken.
case nothing

/// A number of frames have to be dropped on the floor due to a RST_STREAM frame being emitted, and the RST_STREAM
/// frame itself must be forwarded.
/// This cannot be done automatically without potentially causing exclusive access violations.
case forwardAndDrop(MarkedCircularBuffer<(HTTP2Frame, EventLoopPromise<Void>?)>, NIOHTTP2Errors.StreamClosed)

/// A number of frames have to be dropped on the floor due to a RST_STREAM frame being emitted, and the RST_STREAM
/// frame itself should be succeeded and not forwarded.
/// This cannot be done automatically without potentially causing exclusive access violations.
case succeedAndDrop(MarkedCircularBuffer<(HTTP2Frame, EventLoopPromise<Void>?)>, NIOHTTP2Errors.StreamClosed)
}

mutating func processOutboundFrame(_ frame: HTTP2Frame, promise: EventLoopPromise<Void>?) throws -> OutboundFrameAction {
// If this frame is not for a locally initiated stream, then that's fine, just pass it on.
guard frame.streamID != .rootStream && frame.streamID.mayBeInitiatedBy(self.mode) else {
Expand Down Expand Up @@ -379,7 +233,7 @@ struct StreamFrameBuffer {
/// implementation of binarySearch written against the RandomAccessCollection protocol to make it more
/// portable in the future.
private struct SortedCircularBuffer {
private var _base: CircularBuffer<StreamFrameBuffer.FrameBuffer>
private var _base: CircularBuffer<ConcurrentStreamBuffer.FrameBuffer>

init(initialRingCapacity: Int) {
self._base = CircularBuffer(initialCapacity: initialRingCapacity)
Expand Down Expand Up @@ -446,10 +300,10 @@ private struct SortedCircularBuffer {
}

extension SortedCircularBuffer: RandomAccessCollection {
typealias Element = CircularBuffer<StreamFrameBuffer.FrameBuffer>.Element
typealias Index = CircularBuffer<StreamFrameBuffer.FrameBuffer>.Index
typealias SubSequence = CircularBuffer<StreamFrameBuffer.FrameBuffer>.SubSequence
typealias Indices = CircularBuffer<StreamFrameBuffer.FrameBuffer>.Indices
typealias Element = CircularBuffer<ConcurrentStreamBuffer.FrameBuffer>.Element
typealias Index = CircularBuffer<ConcurrentStreamBuffer.FrameBuffer>.Index
typealias SubSequence = CircularBuffer<ConcurrentStreamBuffer.FrameBuffer>.SubSequence
typealias Indices = CircularBuffer<ConcurrentStreamBuffer.FrameBuffer>.Indices

var startIndex: Index {
return self._base.startIndex
Expand Down
88 changes: 79 additions & 9 deletions Sources/NIOHTTP2/HTTP2ChannelHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ public final class NIOHTTP2Handler: ChannelDuplexHandler {
/// user events and frames when re-entrant operations occur.
private var inboundEventBuffer: InboundEventBuffer = InboundEventBuffer()

/// A buffer for outbound frames. In some cases it is necessary to buffer outbound frames before
/// sending, if sending them would trigger a protocol violation. Those buffered frames live here.
private var outboundBuffer: CompoundOutboundBuffer

/// This flag is set to false each time we get a channelReadComplete or flush, and set to true
/// each time we write a frame automatically from this handler. If set to true in channelReadComplete,
/// we will choose to flush automatically ourselves.
Expand Down Expand Up @@ -88,6 +92,7 @@ public final class NIOHTTP2Handler: ChannelDuplexHandler {
self.stateMachine = HTTP2ConnectionStateMachine(role: .init(mode))
self.mode = mode
self.initialSettings = initialSettings
self.outboundBuffer = CompoundOutboundBuffer(mode: mode, initialMaxOutboundStreams: 100)
}

public func handlerAdded(context: ChannelHandlerContext) {
Expand Down Expand Up @@ -123,22 +128,44 @@ public final class NIOHTTP2Handler: ChannelDuplexHandler {
}

public func channelReadComplete(context: ChannelHandlerContext) {
if self.wroteAutomaticFrame {
self.wroteAutomaticFrame = false
context.flush()
}

self.unbufferAndFlushAutomaticFrames(context: context)
context.fireChannelReadComplete()
}

public func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
let frame = self.unwrapOutboundIn(data)
self.processOutboundFrame(context: context, frame: frame, promise: promise)

do {
switch try self.outboundBuffer.processOutboundFrame(frame, promise: promise) {
case .nothing:
// Nothing to do, got buffered.
break
case .forward:
self.processOutboundFrame(context: context, frame: frame, promise: promise)
case .forwardAndDrop(let framesToDrop, let error):
// We need to forward this frame, and then fail these promises.
self.processOutboundFrame(context: context, frame: frame, promise: promise)
for (_, promise) in framesToDrop {
promise?.fail(error)
}
case .succeedAndDrop(let framesToDrop, let error):
// We need to succeed this frame promise and fail the others. We fail the others first to keep the
// promises in order.
for (_, promise) in framesToDrop {
promise?.fail(error)
}
promise?.succeed(())
}
} catch {
promise?.fail(error)
}
}

public func flush(context: ChannelHandlerContext) {
self.wroteAutomaticFrame = false
context.flush()
// We need to always flush here, so we'll pretend we wrote an automatic frame even if we didn't.
self.wroteAutomaticFrame = true
self.outboundBuffer.flushReceived()
self.unbufferAndFlushAutomaticFrames(context: context)
}
}

Expand Down Expand Up @@ -393,23 +420,66 @@ extension NIOHTTP2Handler {
switch stateChange {
case .streamClosed(let streamClosedData):
self.inboundEventBuffer.pendingUserEvent(StreamClosedEvent(streamID: streamClosedData.streamID, reason: streamClosedData.reason))

let failedWrites = self.outboundBuffer.streamClosed(streamClosedData.streamID)
let error = NIOHTTP2Errors.StreamClosed(streamID: streamClosedData.streamID, errorCode: streamClosedData.reason ?? .cancel)
for promise in failedWrites {
promise?.fail(error)
}
case .streamCreated(let streamCreatedData):
self.outboundBuffer.streamCreated(streamCreatedData.streamID, initialWindowSize: streamCreatedData.localStreamWindowSize.map(UInt32.init) ?? 0)
self.inboundEventBuffer.pendingUserEvent(NIOHTTP2StreamCreatedEvent(streamID: streamCreatedData.streamID,
localInitialWindowSize: streamCreatedData.localStreamWindowSize.map(UInt32.init),
remoteInitialWindowSize: streamCreatedData.remoteStreamWindowSize.map(UInt32.init)))
case .bulkStreamClosure(let streamClosureData):
for droppedStream in streamClosureData.closedStreams {
self.inboundEventBuffer.pendingUserEvent(StreamClosedEvent(streamID: droppedStream, reason: .cancel))

let failedWrites = self.outboundBuffer.streamClosed(droppedStream)
let error = NIOHTTP2Errors.StreamClosed(streamID: droppedStream, errorCode: .cancel)
for promise in failedWrites {
promise?.fail(error)
}
}
case .flowControlChange(let change):
self.outboundBuffer.connectionWindowSize = change.localConnectionWindowSize
self.inboundEventBuffer.pendingUserEvent(NIOHTTP2WindowUpdatedEvent(streamID: .rootStream, inboundWindowSize: change.remoteConnectionWindowSize, outboundWindowSize: change.localConnectionWindowSize))
if let streamSize = change.localStreamWindowSize {
self.outboundBuffer.updateStreamWindow(streamSize.streamID, newSize: streamSize.localStreamWindowSize.map(Int32.init) ?? 0)
self.inboundEventBuffer.pendingUserEvent(NIOHTTP2WindowUpdatedEvent(streamID: streamSize.streamID, inboundWindowSize: streamSize.remoteStreamWindowSize, outboundWindowSize: streamSize.localStreamWindowSize))
}
case .settingsChanged, .streamCreatedAndClosed:
case .streamCreatedAndClosed(let cAndCData):
self.outboundBuffer.streamCreated(cAndCData.streamID, initialWindowSize: 0)
let failedWrites = self.outboundBuffer.streamClosed(cAndCData.streamID)
let error = NIOHTTP2Errors.StreamClosed(streamID: cAndCData.streamID, errorCode: .cancel)
for promise in failedWrites {
promise?.fail(error)
}
case .settingsChanged(let settingsChange):
// TODO(cory): update all stream windows, also max concurrent streams.
break
}
}

private func unbufferAndFlushAutomaticFrames(context: ChannelHandlerContext) {
// Two jobs: we have to unbuffer any buffered frames that can be written, and also potentially flush.
loop: while true {
switch self.outboundBuffer.nextFlushedWritableFrame() {
case .noFrame:
break loop
case .error(let promise, let error):
promise?.fail(error)
case .frame(let frame, let promise):
self.processOutboundFrame(context: context, frame: frame, promise: promise)
self.wroteAutomaticFrame = true
}
}

if self.wroteAutomaticFrame {
self.wroteAutomaticFrame = false
context.flush()
}
}
}


Expand Down
11 changes: 1 addition & 10 deletions Sources/NIOHTTP2/HTTP2PipelineHelpers.swift
Original file line number Diff line number Diff line change
Expand Up @@ -96,18 +96,9 @@ extension Channel {
initialLocalSettings: [HTTP2Setting] = nioDefaultSettings,
position: ChannelPipeline.Position = .last,
inboundStreamStateInitializer: ((Channel, HTTP2StreamID) -> EventLoopFuture<Void>)? = nil) -> EventLoopFuture<HTTP2StreamMultiplexer> {
// We need to determine the initial settings to apply to some of the later handlers in the pipeline, based on the initial settings provided
// by the user. We default these when they are mandatory and we have no alternative. In most of these cases the default is in the spec.
// RFC 7540 defaults SETTINGS_MAX_CONCURRENT_STREAMS to unbounded, which we read as Int32.max.
let initialMaxOutboundStreams = initialLocalSettings.lazy.reversed().first(where: { $0.parameter == .maxConcurrentStreams })?.value ?? Int(Int32.max)

var handlers = [ChannelHandler]()
handlers.reserveCapacity(4) // Update this if we need to add more handlers, to avoid unnecessary reallocation.

handlers.reserveCapacity(2) // Update this if we need to add more handlers, to avoid unnecessary reallocation.
handlers.append(NIOHTTP2Handler(mode: mode, initialSettings: initialLocalSettings))
handlers.append(NIOHTTP2FlowControlHandler())
handlers.append(NIOHTTP2ConcurrentStreamsHandler(mode: mode, initialMaxOutboundStreams: initialMaxOutboundStreams))

let multiplexer = HTTP2StreamMultiplexer(mode: mode, channel: self, inboundStreamStateInitializer: inboundStreamStateInitializer)
handlers.append(multiplexer)

Expand Down
Loading

0 comments on commit 38516fe

Please sign in to comment.