В общем, если у вас есть Sink
, который материализуется в Future[Done]
(Scala API, я считаю, что Java эквивалент равен CompletionStage[Done]
), вы можете просто запустить подпоток с этим Sink
в пределах mapAsync
стадии. Данные будут проходить только в случае успеха.
Из быстрого просмотра API Alpakka AMQP все Sink
s материализуются таким образом, так что этот подход будет работать.
Предполагая, что ваш * Функция 1011 * дает как CommittableReadResult
, так и WriteMessage
, что-то вроде следующего в Scala будет работать:
def doSomethingWithMessage(in: CommittableReadResult): (CommittableReadResult, WriteMessage) = ???
amqpSource
.map(doSomethingWithMessage)
.mapAsync(parallelism) { tup =>
val (crr, wm) = tup
val fut = Source.single(crr, wm).runWith(amqpSink)
fut.flatMap(_ => crr.ack().map(_ => crr.message))
}.runWith(Sink.ignore)