Переписать блокирующий код цикла в неблокирующий код в стиле SwiftNIO - PullRequest
1 голос
/ 30 января 2020

Я работаю над драйвером, который будет считывать данные из сети. Он не знает, сколько в ответе, кроме того, что когда он пытается прочитать и возвращает 0 байтов, это делается. Поэтому мой блокирующий код Swift выглядит наивно так:

func readAllBlocking() -> [Byte] {

  var buffer: [Byte] = []
  var fullBuffer: [Byte] = []

  repeat {
    buffer = read() // synchronous, blocking
    fullBuffer.append(buffer)
  } while buffer.count > 0

  return fullBuffer
}

Как я могу переписать это как обещание, которое будет продолжаться до тех пор, пока не будет прочитан весь результат? После попытки обернуть мой мозг вокруг него, я все еще застрял здесь:

func readAllNonBlocking() -> EventLoopFuture<[Byte]> {

  ///...?
}

Я должен добавить, что я могу переписать read (), чтобы вместо возврата [Byte] возвращать EventLoopFuture <[Byte] >

1 Ответ

2 голосов
/ 30 января 2020

Как правило, циклы в синхронном программировании превращаются в рекурсию, чтобы получить тот же эффект с асинхронным программированием, которое использует фьючерсы (а также в функциональном программировании).

Таким образом, ваша функция может выглядеть следующим образом:

func readAllNonBlocking(on eventLoop: EventLoop) -> EventLoopFuture<[Byte]> {
    // The accumulated chunks
    var accumulatedChunks: [Byte] = []

    // The promise that will hold the overall result
    let promise = eventLoop.makePromise(of: [Byte].self)

    // We turn the loop into recursion:
    func loop() {
        // First, we call `read` to read in the next chunk and hop
        // over to `eventLoop` so we can safely write to `accumulatedChunks`
        // without a lock.
        read().hop(to: eventLoop).map { nextChunk in
            // Next, we just append the chunk to the accumulation
            accumulatedChunks.append(contentsOf: nextChunk)
            guard nextChunk.count > 0 else {
                promise.succeed(accumulatedChunks)
                return
            }
            // and if it wasn't empty, we loop again.
            loop()
        }.cascadeFailure(to: promise) // if anything goes wrong, we fail the whole thing.
    }

    loop() // Let's kick everything off.

    return promise.futureResult
}

Я хотел бы добавить две вещи, однако:

Во-первых, то, что вы реализуете выше, это просто читать все, пока вы не увидите EOF, если эта часть программного обеспечения подвергается воздействию inte rnet, вам определенно следует добавить ограничение на максимальное количество байт в памяти.

Во-вторых, SwiftNIO - это система, управляемая событиями, поэтому, если бы вы читали эти байты с помощью SwiftNIO, программа на самом деле выглядела бы немного. иначе. Если вам интересно, как выглядит просто накапливать все байты до EOF в SwiftNIO, то это:

struct AccumulateUntilEOF: ByteToMessageDecoder {
    typealias InboundOut = ByteBuffer

    func decode(context: ChannelHandlerContext, buffer: inout ByteBuffer) throws -> DecodingState {
        // `decode` will be called if new data is coming in.
        // We simply return `.needMoreData` because always need more data because our message end is EOF.
        // ByteToMessageHandler will automatically accumulate everything for us because we tell it that we need more
        // data to decode a message.
        return .needMoreData
    }

    func decodeLast(context: ChannelHandlerContext, buffer: inout ByteBuffer, seenEOF: Bool) throws -> DecodingState {
        // `decodeLast` will be called if NIO knows that this is the _last_ time a decode function is called. Usually,
        // this is because of EOF or an error.
        if seenEOF {
            // This is what we've been waiting for, `buffer` should contain all bytes, let's fire them through
            // the pipeline.
            context.fireChannelRead(self.wrapInboundOut(buffer))
        } else {
            // Odd, something else happened, probably an error or we were just removed from the pipeline. `buffer`
            // will now contain what we received so far but maybe we should just drop it on the floor.
        }
        buffer.clear()
        return .needMoreData
    }
}

Если вы хотите сделать из этого целую программу с SwiftNIO, вот пример, который сервер, который принимает все данные, пока не увидит EOF, а затем буквально просто записывает количество полученных байтов :). Конечно, в реальном мире вы бы никогда не держались за все полученные байты, чтобы подсчитать их (вы могли бы просто добавить каждый отдельный фрагмент), но я думаю, что это служит примером.

import NIO

let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer {
    try! group.syncShutdownGracefully()
}

struct AccumulateUntilEOF: ByteToMessageDecoder {
    typealias InboundOut = ByteBuffer

    func decode(context: ChannelHandlerContext, buffer: inout ByteBuffer) throws -> DecodingState {
        // `decode` will be called if new data is coming in.
        // We simply return `.needMoreData` because always need more data because our message end is EOF.
        // ByteToMessageHandler will automatically accumulate everything for us because we tell it that we need more
        // data to decode a message.
        return .needMoreData
    }

    func decodeLast(context: ChannelHandlerContext, buffer: inout ByteBuffer, seenEOF: Bool) throws -> DecodingState {
        // `decodeLast` will be called if NIO knows that this is the _last_ time a decode function is called. Usually,
        // this is because of EOF or an error.
        if seenEOF {
            // This is what we've been waiting for, `buffer` should contain all bytes, let's fire them through
            // the pipeline.
            context.fireChannelRead(self.wrapInboundOut(buffer))
        } else {
            // Odd, something else happened, probably an error or we were just removed from the pipeline. `buffer`
            // will now contain what we received so far but maybe we should just drop it on the floor.
        }
        buffer.clear()
        return .needMoreData
    }
}

// Just an example "business logic" handler. It will wait for one message
// and just write back the length.
final class SendBackLengthOfFirstInput: ChannelInboundHandler {
    typealias InboundIn = ByteBuffer
    typealias OutboundOut = ByteBuffer

    func channelRead(context: ChannelHandlerContext, data: NIOAny) {
        // Once we receive the message, we allocate a response buffer and just write the length of the received
        // message in there. We then also close the channel.
        let allData = self.unwrapInboundIn(data)
        var response = context.channel.allocator.buffer(capacity: 10)
        response.writeString("\(allData.readableBytes)\n")
        context.writeAndFlush(self.wrapOutboundOut(response)).flatMap {
            context.close(mode: .output)
        }.whenSuccess {
            context.close(promise: nil)
        }
    }

    func errorCaught(context: ChannelHandlerContext, error: Error) {
        print("ERROR: \(error)")
        context.channel.close(promise: nil)
    }
}

let server = try ServerBootstrap(group: group)
    // Allow us to reuse the port after the process quits.
    .serverChannelOption(ChannelOptions.socket(.init(SOL_SOCKET), .init(SO_REUSEADDR)), value: 1)
    // We should allow half-closure because we want to write back after having received an EOF on the input
    .childChannelOption(ChannelOptions.allowRemoteHalfClosure, value: true)
    // Our program consists of two parts:
    .childChannelInitializer { channel in
        channel.pipeline.addHandlers([
            // 1: The accumulate everything until EOF handler
            ByteToMessageHandler(AccumulateUntilEOF(),
                                 // We want 1 MB of buffering max. If you remove this parameter, it'll also
                                 // buffer indefinitely.
                                 maximumBufferSize: 1024 * 1024),
            // 2: Our "business logic"
            SendBackLengthOfFirstInput()
        ])
    }
    // Let's bind port 9999
    .bind(to: SocketAddress(ipAddress: "127.0.0.1", port: 9999))
    .wait()

// This will never return.
try server.closeFuture.wait()

Демонстрация:

$ echo -n "hello world" | nc localhost 9999
11
...