Akka stream actor-conflation-ratelimit-actor отбрасывает первые несколько сообщений (иногда) - PullRequest
1 голос
/ 29 марта 2019

Простое комбинированное совмещение (ниже) иногда печатает отладочное сообщение при старте, говоря, что оно отбрасывает сообщения из-за нулевого спроса.Я ожидаю, что стадия слияния обеспечит бесконечный спрос, поэтому вышеупомянутое никогда не должно иметь место.Чего мне не хватает?

val sourceRef = Source.actorRef[KeyedHighFreqEvent](0, OverflowStrategy.fail)
.conflateWithSeed(...into hash map...)
.throttle(8, per = 1.second, maxBurst=24, ThrottleMode.shaping)
.mapConcat(...back to individual KeyedHighFreqEvent...)
.groupedWithin(1024, 1.millisecond)
.to(Sink.actorRef(networkPublisher, Nil))
.run()

system.eventStream.subscribe(sourceRef, classOf[KeyedHighFreqEvent])

1 Ответ

1 голос
/ 03 апреля 2019

Документация Source.actorRef совершенно ясна по этому поводу:

Буфер можно отключить, используя bufferSize из 0, а затем полученные сообщения отбрасываются, если нет потребности в нисходящем канале.Когда bufferSize равно 0, overflowStrategy не имеет значения.Асинхронная граница добавляется после этого источника;как таковое, никогда не безопасно предполагать, что нисходящий поток всегда будет генерировать спрос.

Проблема заключается в асинхронной границе между источником и стадией слияния.Стадия слияния действительно обеспечивает бесконечное требование, но асинхронный тип границы замедляет распространение к источнику.

Вы можете использовать буфер в своем источнике (увеличить bufferSize) или использовать другой источник, например, Source.queue при необходимости, поскольку он не вводит асинхронную границу

...