Получить ссылку на оригинальный элемент после Akka Streams Sink? - PullRequest
0 голосов
/ 16 марта 2020

Я пытаюсь использовать соединитель Amqp alpakka в качестве источника и приемника.

Source<CommittableReadResult, NotUsed> AMQP_SOURCE  
      -> processAndGetResponse 
      -> Sink<ByteString, CompletionStage<Done>> AMQP_SINK 

Я хочу подтвердить сообщение, полученное из очереди Amqp, после успешной операции Sink. как это:

amqp_source(committableReadResult) 
    -> processAndGetResponse 
    -> amqp_sink 
    -> IfSinkOperationSuccess.Then(committableReadResult.ack())

Как мне этого добиться? По сути, я хочу пометить сообщение как подтверждающее только после успешного завершения операции приема.

1 Ответ

0 голосов
/ 17 марта 2020

В общем, если у вас есть Sink, который материализуется в Future[Done] (Scala API, я считаю, что Java эквивалент равен CompletionStage[Done]), вы можете просто запустить подпоток с этим Sink в пределах mapAsync стадии. Данные будут проходить только в случае успеха.

Из быстрого просмотра API Alpakka AMQP все Sink s материализуются таким образом, так что этот подход будет работать.

Предполагая, что ваш * Функция 1011 * дает как CommittableReadResult, так и WriteMessage, что-то вроде следующего в Scala будет работать:

def doSomethingWithMessage(in: CommittableReadResult): (CommittableReadResult, WriteMessage) = ???

amqpSource
  .map(doSomethingWithMessage)
  .mapAsync(parallelism) { tup =>
    val (crr, wm) = tup
    val fut = Source.single(crr, wm).runWith(amqpSink)
    fut.flatMap(_ => crr.ack().map(_ => crr.message))
  }.runWith(Sink.ignore)
...