Skip to content

Commit

Permalink
Respect changes to INITIAL_WINDOW_SIZE in outbound buffer. (#64)
Browse files Browse the repository at this point in the history
Motivation:

When the remote peer changes SETTINGS_INITIAL_WINDOW_SIZE, this change
affects the flow control window sizes of all existing streams (as though
they were created with this new initial window size). We should respect
those flow control changes.

Modifications:

- Propagated changes in SETTINGS_INITIAL_WINDOW_SIZE.
- Made NIOHTTP2ConnectionStateChange internal.
- Added support for changing initial window size.

Result:

SETTINGS frames that change initial window size now work properly.
  • Loading branch information
Lukasa authored and weissi committed Mar 15, 2019
1 parent 348a10e commit 9e07639
Show file tree
Hide file tree
Showing 10 changed files with 173 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ extension HasRemoteSettings {
swap(&temporarySettings, &self.remoteSettings)
}

var effect = NIOHTTP2ConnectionStateChange.RemoteSettingsChanged()

do {
try temporarySettings.receiveSettings(settings) { (setting, originalValue, newValue) in
switch setting {
Expand All @@ -56,6 +58,10 @@ extension HasRemoteSettings {
try self.streamState.forAllStreams {
try $0.remoteInitialWindowSizeChanged(by: delta)
}

// We do a += here because the value may change multiple times in one settings block. This way, we correctly
// respect that possibility.
effect.streamWindowSizeChange += Int(delta)
case .maxFrameSize:
// TODO(cory): Implement!
break
Expand All @@ -64,7 +70,7 @@ extension HasRemoteSettings {
return
}
}
return (.init(result: .succeed, effect: nil), .sendAck)
return (.init(result: .succeed, effect: .remoteSettingsChanged(effect)), .sendAck)
} catch let err where err is NIOHTTP2Errors.InvalidFlowControlWindowSize {
return (.init(result: .connectionError(underlyingError: err, type: .flowControlError), effect: nil), .nothing)
} catch {
Expand Down
8 changes: 5 additions & 3 deletions Sources/NIOHTTP2/HTTP2ChannelHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -459,9 +459,11 @@ extension NIOHTTP2Handler {
for promise in failedWrites {
promise?.fail(error)
}
case .settingsChanged(let settingsChange):
// TODO(cory): update all stream windows, also max concurrent streams.
break
case .remoteSettingsChanged(let settingsChange):
// TODO(cory): also max concurrent streams.
if settingsChange.streamWindowSizeChange != 0 {
self.outboundBuffer.initialWindowSizeChanged(settingsChange.streamWindowSizeChange)
}
}
}

Expand Down
70 changes: 33 additions & 37 deletions Sources/NIOHTTP2/HTTP2ConnectionStateChange.swift
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
/// This enumeration allows users to avoid needing to replicate the complete HTTP/2
/// state machine. Instead, users can use this enumeration to determine the new state
/// of the connection and affected streams.
public enum NIOHTTP2ConnectionStateChange: Hashable {
internal enum NIOHTTP2ConnectionStateChange: Hashable {
/// A stream has been created.
case streamCreated(StreamCreated)

Expand All @@ -36,41 +36,41 @@ public enum NIOHTTP2ConnectionStateChange: Hashable {
/// being sent or received.
case bulkStreamClosure(BulkStreamClosure)

/// Settings have been changed.
case settingsChanged(SettingsChanged)
/// The remote peer's settings have been changed.
case remoteSettingsChanged(RemoteSettingsChanged)

/// A stream has been created.
public struct StreamCreated: Hashable {
public var streamID: HTTP2StreamID
internal struct StreamCreated: Hashable {
internal var streamID: HTTP2StreamID

/// The initial local stream window size. This may be nil if there is no local stream window.
/// This occurs if the stream has been pushed by the remote peer, in which case we will never be able
/// to send on it.
public var localStreamWindowSize: Int?
internal var localStreamWindowSize: Int?

/// The initial remote stream window size. This may be nil if there is no remote stream window.
/// This occurs if the stream has been pushed by the local peer, in which case tje remote peer will never be able
/// to send on it.
public var remoteStreamWindowSize: Int?
internal var remoteStreamWindowSize: Int?

public init(streamID: HTTP2StreamID, localStreamWindowSize: Int?, remoteStreamWindowSize: Int?) {
internal init(streamID: HTTP2StreamID, localStreamWindowSize: Int?, remoteStreamWindowSize: Int?) {
self.streamID = streamID
self.localStreamWindowSize = localStreamWindowSize
self.remoteStreamWindowSize = remoteStreamWindowSize
}
}

/// A stream has been closed.
public struct StreamClosed: Hashable {
public var streamID: HTTP2StreamID
internal struct StreamClosed: Hashable {
internal var streamID: HTTP2StreamID

public var localConnectionWindowSize: Int
internal var localConnectionWindowSize: Int

public var remoteConnectionWindowSize: Int
internal var remoteConnectionWindowSize: Int

public var reason: HTTP2ErrorCode?
internal var reason: HTTP2ErrorCode?

public init(streamID: HTTP2StreamID, localConnectionWindowSize: Int, remoteConnectionWindowSize: Int, reason: HTTP2ErrorCode?) {
internal init(streamID: HTTP2StreamID, localConnectionWindowSize: Int, remoteConnectionWindowSize: Int, reason: HTTP2ErrorCode?) {
self.streamID = streamID
self.localConnectionWindowSize = localConnectionWindowSize
self.remoteConnectionWindowSize = remoteConnectionWindowSize
Expand All @@ -81,10 +81,10 @@ public enum NIOHTTP2ConnectionStateChange: Hashable {
/// A stream has been created and immediately closed. In this case, the only relevant bit of information
/// is the stream ID: flow control windows are not relevant as this frame is not flow controlled and does
/// not change window sizes.
public struct StreamCreatedAndClosed: Hashable {
public var streamID: HTTP2StreamID
internal struct StreamCreatedAndClosed: Hashable {
internal var streamID: HTTP2StreamID

public init(streamID: HTTP2StreamID) {
internal init(streamID: HTTP2StreamID) {
self.streamID = streamID
}
}
Expand All @@ -97,56 +97,52 @@ public enum NIOHTTP2ConnectionStateChange: Hashable {
/// both the connection and stream window sizes: window update frames change
/// only one. To avoid ambiguity, we report the current window size of the connection
/// on all such events, and the relevant stream if there is one (which there usually is).
public struct FlowControlChange: Hashable {
public var localConnectionWindowSize: Int
internal struct FlowControlChange: Hashable {
internal var localConnectionWindowSize: Int

public var remoteConnectionWindowSize: Int
internal var remoteConnectionWindowSize: Int

public var localStreamWindowSize: StreamWindowSizeChange?
internal var localStreamWindowSize: StreamWindowSizeChange?

/// The information about the stream window size. Either the local or remote
/// stream window information may be nil, if there is no flow control window
/// for that direction (e.g. if the stream is half-closed).
public struct StreamWindowSizeChange: Hashable {
public var streamID: HTTP2StreamID
internal struct StreamWindowSizeChange: Hashable {
internal var streamID: HTTP2StreamID

public var localStreamWindowSize: Int?
internal var localStreamWindowSize: Int?

public var remoteStreamWindowSize: Int?
internal var remoteStreamWindowSize: Int?

public init(streamID: HTTP2StreamID, localStreamWindowSize: Int?, remoteStreamWindowSize: Int?) {
internal init(streamID: HTTP2StreamID, localStreamWindowSize: Int?, remoteStreamWindowSize: Int?) {
self.streamID = streamID
self.localStreamWindowSize = localStreamWindowSize
self.remoteStreamWindowSize = remoteStreamWindowSize
}
}

public init(localConnectionWindowSize: Int, remoteConnectionWindowSize: Int, localStreamWindowSize: StreamWindowSizeChange?) {
internal init(localConnectionWindowSize: Int, remoteConnectionWindowSize: Int, localStreamWindowSize: StreamWindowSizeChange?) {
self.localConnectionWindowSize = localConnectionWindowSize
self.remoteConnectionWindowSize = remoteConnectionWindowSize
self.localStreamWindowSize = localStreamWindowSize
}
}

/// A large number of streams have been closed at once.
public struct BulkStreamClosure: Hashable {
public var closedStreams: [HTTP2StreamID]
internal struct BulkStreamClosure: Hashable {
internal var closedStreams: [HTTP2StreamID]

public init(closedStreams: [HTTP2StreamID]) {
internal init(closedStreams: [HTTP2StreamID]) {
self.closedStreams = closedStreams
}
}

/// Settings have changed in a way that is not trivial to decode.
/// The remote peer's settings have changed in a way that is not trivial to decode.
///
/// This object keeps track of the change on all stream window sizes via
/// SETTINGS frame.
public struct SettingsChanged: Hashable {
public var streamWindowSizeChange: Int

public init(streamWindowSizeChange: Int) {
self.streamWindowSizeChange = streamWindowSizeChange
}
internal struct RemoteSettingsChanged: Hashable {
internal var streamWindowSizeChange: Int = 0
}
}

Expand Down
16 changes: 16 additions & 0 deletions Sources/NIOHTTP2/OutboundFlowControlBuffer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,22 @@ internal struct OutboundFlowControlBuffer {
let frame = HTTP2Frame(streamID: nextStreamID, payload: payload)
return (frame, promise)
}

internal mutating func initialWindowSizeChanged(_ delta: Int) {
self.streamDataBuffers.mutatingForEachValue {
let hadPendingData = $0.hasPendingData
$0.currentWindowSize += delta
let hasPendingData = $0.hasPendingData

if !hadPendingData && hasPendingData {
assert(!self.flushableStreams.contains($0.streamID))
self.flushableStreams.insert($0.streamID)
} else if hadPendingData && !hasPendingData {
assert(self.flushableStreams.contains($0.streamID))
self.flushableStreams.remove($0.streamID)
}
}
}
}


Expand Down
4 changes: 4 additions & 0 deletions Sources/NIOHTTP2/OutboundFrameBuffer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,10 @@ extension CompoundOutboundBuffer {
self.flowControlBuffer.updateWindowOfStream(streamID, newSize: newSize)
}

mutating func initialWindowSizeChanged(_ delta: Int) {
self.flowControlBuffer.initialWindowSizeChanged(delta)
}

var maxFrameSize: Int {
get {
return self.flowControlBuffer.maxFrameSize
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ extension OutboundFlowControlBufferTests {
("testOverlargeFramesAreSplitOnMaxFrameSizeByteBuffer", testOverlargeFramesAreSplitOnMaxFrameSizeByteBuffer),
("testOverlargeFramesAreSplitOnMaxFrameSizeFileRegion", testOverlargeFramesAreSplitOnMaxFrameSizeFileRegion),
("testChangingStreamWindowSizeToZeroAndBack", testChangingStreamWindowSizeToZeroAndBack),
("testStreamWindowChanges", testStreamWindowChanges),
]
}
}
Expand Down
34 changes: 34 additions & 0 deletions Tests/NIOHTTP2Tests/OutboundFlowControlBufferTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,40 @@ class OutboundFlowControlBufferTests: XCTestCase {

self.receivedFrames().assertFramesMatch([dataFrame], dataFileRegionToByteBuffer: false)
}

func testStreamWindowChanges() {
let streamOne = HTTP2StreamID(1)
let streamThree = HTTP2StreamID(3)
self.buffer.streamCreated(streamOne, initialWindowSize: 15)
self.buffer.streamCreated(streamThree, initialWindowSize: 15)

var oneFrame = self.createDataFrame(streamOne, byteBufferSize: 30)
let threeFrame = self.createDataFrame(streamThree, byteBufferSize: 15)
XCTAssertNoThrow(try self.buffer.processOutboundFrame(oneFrame, promise: nil).assertNothing())
XCTAssertNoThrow(try self.buffer.processOutboundFrame(threeFrame, promise: nil).assertNothing())
XCTAssertNil(self.buffer.nextFlushedWritableFrame())

// Let the window be consumed.
self.buffer.flushReceived()
self.receivedFrames().sorted(by: { $0.streamID < $1.streamID }).assertFramesMatch([oneFrame.sliceDataFrame(length: 15), threeFrame])

// Ok, now we can increase the window size for all streams. This makes more data available.
self.buffer.initialWindowSizeChanged(10)
self.receivedFrames().assertFramesMatch([oneFrame.sliceDataFrame(length: 10)])

let secondThreeFrame = self.createDataFrame(streamThree, byteBufferSize: 5)
XCTAssertNoThrow(try self.buffer.processOutboundFrame(secondThreeFrame, promise: nil).assertNothing())
self.buffer.flushReceived()
self.receivedFrames().assertFramesMatch([secondThreeFrame])

// Now we shrink the window.
self.buffer.initialWindowSizeChanged(-10)

// And attempt to send another frame on stream three. This should be buffered.
XCTAssertNoThrow(try self.buffer.processOutboundFrame(secondThreeFrame, promise: nil).assertNothing())
self.buffer.flushReceived()
XCTAssertNil(self.buffer.nextFlushedWritableFrame())
}
}


Expand Down
1 change: 1 addition & 0 deletions Tests/NIOHTTP2Tests/SimpleClientServerTests+XCTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ extension SimpleClientServerTests {
("testStreamCloseEventForGoawayFiresAfterFrame", testStreamCloseEventForGoawayFiresAfterFrame),
("testManyConcurrentInactiveStreams", testManyConcurrentInactiveStreams),
("testBadClientMagic", testBadClientMagic),
("testOpeningWindowsViaSettingsInitialWindowSize", testOpeningWindowsViaSettingsInitialWindowSize),
]
}
}
Expand Down
61 changes: 61 additions & 0 deletions Tests/NIOHTTP2Tests/SimpleClientServerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1032,4 +1032,65 @@ class SimpleClientServerTests: XCTestCase {
XCTAssertNoThrow(try XCTAssertFalse(self.clientChannel.finish()))
XCTAssertNoThrow(try XCTAssertFalse(self.serverChannel.finish()))
}

func testOpeningWindowsViaSettingsInitialWindowSize() throws {
try self.basicHTTP2Connection()

// Start by having the client shrink the server's initial window size to 0. We should get an ACK as well.
try self.assertSettingsUpdateWithAck([HTTP2Setting(parameter: .initialWindowSize, value: 0)], sender: self.clientChannel, receiver: self.serverChannel)
self.clientChannel.assertNoFramesReceived()
self.serverChannel.assertNoFramesReceived()

// Now open a stream.
let headers = HPACKHeaders([(":path", "/"), (":method", "POST"), (":scheme", "https"), (":authority", "localhost")])
let clientStreamID = HTTP2StreamID(1)
let reqFrame = HTTP2Frame(streamID: clientStreamID, payload: .headers(.init(headers: headers, endStream: true)))
try self.assertFramesRoundTrip(frames: [reqFrame], sender: self.clientChannel, receiver: self.serverChannel)

// Confirm there's no bonus frame sitting around.
self.clientChannel.assertNoFramesReceived()
self.serverChannel.assertNoFramesReceived()

// The server can respond with a headers frame and a DATA frame.
let serverHeaders = HPACKHeaders([(":status", "200")])
let respFrame = HTTP2Frame(streamID: clientStreamID, payload: .headers(.init(headers: serverHeaders)))
try self.assertFramesRoundTrip(frames: [respFrame], sender: self.serverChannel, receiver: self.clientChannel)

// Confirm there's no bonus frame sitting around.
self.clientChannel.assertNoFramesReceived()
self.serverChannel.assertNoFramesReceived()

// Now we're going to send in a DATA frame. This will not be sent, as the window size is 0.
var buffer = self.clientChannel.allocator.buffer(capacity: 5)
buffer.writeStaticString("hello")
let dataFrame = HTTP2Frame(streamID: clientStreamID, payload: .data(.init(data: .byteBuffer(buffer), endStream: true)))
self.serverChannel.writeAndFlush(dataFrame, promise: nil)
self.interactInMemory(self.clientChannel, self.serverChannel)

// No frames should be produced.
self.clientChannel.assertNoFramesReceived()
self.serverChannel.assertNoFramesReceived()

// Now the client will send a new SETTINGS frame. This will produce a SETTINGS ACK, and widen the flow control window a bit.
// We make this one a bit tricky to confirm that we do the math properly: we set the window size to 5, then to 3. The new value
// should be 3.
try self.assertSettingsUpdateWithAck([HTTP2Setting(parameter: .initialWindowSize, value: 5), HTTP2Setting(parameter: .initialWindowSize, value: 3)], sender: self.clientChannel, receiver: self.serverChannel)
try self.clientChannel.assertReceivedFrame().assertFrameMatches(this: HTTP2Frame(streamID: clientStreamID, payload: .data(.init(data: .byteBuffer(buffer.readSlice(length: 3)!)))))
self.clientChannel.assertNoFramesReceived()
self.serverChannel.assertNoFramesReceived()

// Sending the same SETTINGS frame again does not produce more data.
try self.assertSettingsUpdateWithAck([HTTP2Setting(parameter: .initialWindowSize, value: 3)], sender: self.clientChannel, receiver: self.serverChannel)
self.clientChannel.assertNoFramesReceived()
self.serverChannel.assertNoFramesReceived()

// Now we can widen the window again, and get the rest of the frame.
try self.assertSettingsUpdateWithAck([HTTP2Setting(parameter: .initialWindowSize, value: 6)], sender: self.clientChannel, receiver: self.serverChannel)
try self.clientChannel.assertReceivedFrame().assertFrameMatches(this: HTTP2Frame(streamID: clientStreamID, payload: .data(.init(data: .byteBuffer(buffer.readSlice(length: 2)!), endStream: true))))
self.clientChannel.assertNoFramesReceived()
self.serverChannel.assertNoFramesReceived()

XCTAssertNoThrow(try self.clientChannel.finish())
XCTAssertNoThrow(try self.serverChannel.finish())
}
}
11 changes: 11 additions & 0 deletions Tests/NIOHTTP2Tests/TestUtilities.swift
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,17 @@ extension XCTestCase {

return receivedFrames
}

/// Asserts that sending new settings from `sender` to `receiver` leads to an appropriate settings ACK. Does not assert that no other frames have been
/// received.
func assertSettingsUpdateWithAck(_ newSettings: HTTP2Settings, sender: EmbeddedChannel, receiver: EmbeddedChannel, file: StaticString = #file, line: UInt = #line) throws {
let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings(newSettings)))
sender.writeAndFlush(frame, promise: nil)
self.interactInMemory(sender, receiver, file: file, line: line)

try receiver.assertReceivedFrame(file: file, line: line).assertFrameMatches(this: frame)
try sender.assertReceivedFrame(file: file, line: line).assertFrameMatches(this: HTTP2Frame(streamID: .rootStream, payload: .settings(.ack)))
}
}

extension EmbeddedChannel {
Expand Down

0 comments on commit 9e07639

Please sign in to comment.