Я рассматриваю возможность использования Alpakka для обработки подписчиков на сервере MQTT, и я читал эту документацию!
https://doc.akka.io/docs/alpakka/current/mqtt.html
В одном из примеров они имеют следующее:
val mqttSource: Source[MqttMessage, Future[Done]] =
MqttSource.atMostOnce(
connectionSettings.withClientId(clientId = "source-spec/source"),
MqttSubscriptions(Map(topic1 -> MqttQoS.AtLeastOnce, topic2 -> MqttQoS.AtLeastOnce)),
bufferSize = 8
)
val (subscribed, streamResult) = mqttSource
.take(messages.size)
.toMat(Sink.seq)(Keep.both)
.run()
Итак, из того, что я понимаю, я подписываюсь на несколько тем в моем источнике и в моем Sink, я беру только определенный размер входных сообщений и передаю его как Seq, которыйзатем агрегируется в streamResult Future.Пока это хорошо.Я заглянул в Sink.seq и из Scaladoc, там написано:
A `Sink` that keeps on collecting incoming elements until upstream terminates.
Звучит хорошо, но в то же время написано:
`Seq` is limited to `Int.MaxValue` elements, this Sink will cancel the stream
* after having received that many elements.
Итак, теперь мой вопрос, как я могу лучше использовать свой собственный Материализатор, чтобы я мог передать свою логику обработки как Sink?