Выберите BidiFlow на основе первого элемента ввода - PullRequest
0 голосов
/ 25 января 2020

У меня есть протокол типа protocol: Flow[ByteString, ByteString, NotUsed]. Элементы, поступающие в поток, представляют собой сообщения, отправленные пользователем, а элементы, выходящие из потока, являются ответами от сервера. Слои шифрования имеют тип encryption: BidiFlow[ByteString, ByteString, ByteString, ByteString, NotUsed] (потоки Akka предоставляют, например, TLS в этом формате). При такой архитектуре передача протокола через уровень шифрования сводится к следующему:

protocol.join(tlsEncryptionLayer): Flow[ByteString, ByteString, NotUsed]
protocol.join(noEncryptionLayer): Flow[ByteString, ByteString, NotUsed]

Проблема заключается в том, что Я хочу иметь возможность выбрать уровень шифрования (он же BidiFlow, являющийся join -ed) основан на первом ByteString, полученном от клиента.

Мои мысли пока

Эта проблема выглядит очень похоже на def lazyInit[I, O, M](flowFactory: (I) ⇒ Future[Flow[I, O, M]], fallback: () ⇒ M): Flow[I, O, M] за Flow. Например, lazyInit на BidiFlow откроет меня:

object BidiFlow {

  // This method doesn't exist, but if it did, my problem would be solved
  def lazyInit[I1, O1, I2, O2](
    bidiFactory: (I1) ⇒ BidiFlow[I1, O1, I2, O2, NotUsed]
  ): BidiFlow[I1, O1, I2, O2, NotUsed] = ???
}

Учитывая такую ​​гипотетическую функцию, я мог бы написать код, который использует TLS, когда первый байт равен 0x16, и в противном случае не используйте шифрование. Следующие типы кода проверяют, принимая приведенную выше функцию:

val encryptionLayer = BidiFlow.lazyInit[ByteString, ByteString, ByteString, ByteString](
  bidiFactory = {
    // If the very first byte is `0x16`, we are dealing with TLS
    case bstr if bstr.head == 0x16 =>
      TLS(createSSLEngine = ???, closing = ???)
        .reversed
        .atop(BidiFlow.fromFunctions(
          (inbound: TLSProtocol.SslTlsInbound) => inbound match {
            case TLSProtocol.SessionBytes(_, bytes) => bytes
            case _ => ??? // TODO: good error handling
          },
          (byteString: ByteString) => TLSProtocol.SendBytes(byteString)
        ))

    // If the first byte is anything else, assume no-encryption
    case _ => BidiFlow.identity[ByteString, ByteString]
  }
) 

1 Ответ

0 голосов
/ 25 января 2020

После публикации и более подробного изучения документов я нашел полу-решение: поскольку в любом случае BidiFlow в конечном итоге будет join -обращенным к Flow, мы можем просто подумать о проблема с точки зрения Flow.

// The protocol that may need to be encrypted
val protocol: Flow[ByteString, ByteString, NotUsed] = ???

// A bidi that encrypts bytestrings coming through
val tlsBidi = TLS(createSSLEngine = ???, closing = ???)
  .reversed
  .atop(BidiFlow.fromFunctions(
    (inbound: TLSProtocol.SslTlsInbound) => inbound match {
      case TLSProtocol.SessionBytes(_, bytes) => bytes
      case _ => ??? // TODO: good error handling
    },
    (byteString: ByteString) => TLSProtocol.SendBytes(byteString)
  ))


// The encrypted protocol
val encryptedProtocol = Flow[ByteString]
  .dropWhile(_.isEmpty)
  .prefixAndTail(1)
  .flatMapConcat { case (bstr, restBstrs) =>
    if (bstr.head.head == 0x16)
      restBstrs.via(protocol.join(tlsBidi))
    else
      restBstrs.via(protocol)
  }

Это все еще довольно грубо, хотя - уровень шифрования уже не так аккуратно спрятан в BidiFlow. Вместо этого мне нужно реструктурировать окружающий код, чтобы он соответствовал уровню шифрования. (

...