Akka Alpakka Mqtt Обработка сообщений - PullRequest
0 голосов
/ 11 июня 2019

Я рассматриваю возможность использования Alpakka для обработки подписчиков на сервере MQTT, и я читал эту документацию!

https://doc.akka.io/docs/alpakka/current/mqtt.html

В одном из примеров они имеют следующее:

val mqttSource: Source[MqttMessage, Future[Done]] =
  MqttSource.atMostOnce(
    connectionSettings.withClientId(clientId = "source-spec/source"),
    MqttSubscriptions(Map(topic1 -> MqttQoS.AtLeastOnce, topic2 -> MqttQoS.AtLeastOnce)),
    bufferSize = 8
  )

val (subscribed, streamResult) = mqttSource
  .take(messages.size)
  .toMat(Sink.seq)(Keep.both)
  .run()

Итак, из того, что я понимаю, я подписываюсь на несколько тем в моем источнике и в моем Sink, я беру только определенный размер входных сообщений и передаю его как Seq, которыйзатем агрегируется в streamResult Future.Пока это хорошо.Я заглянул в Sink.seq и из Scaladoc, там написано:

 A `Sink` that keeps on collecting incoming elements until upstream terminates.

Звучит хорошо, но в то же время написано:

`Seq` is limited to `Int.MaxValue` elements, this Sink will cancel the stream
   * after having received that many elements.

Итак, теперь мой вопрос, как я могу лучше использовать свой собственный Материализатор, чтобы я мог передать свою логику обработки как Sink?

...