Использование createDrainingControl в Consumer API? - PullRequest
0 голосов
/ 20 апреля 2019

Я просматривал документацию для Consumer API для Kafka в Alpakka.Я наткнулся на этот кусок кода.Насколько я понимаю, смещение фиксируется с помощью msg.committableOffset ().Тогда зачем нам нужны .toMat () и mapMaterializedValue ().Я не могу просто прикрепить его к Sink.Ignore ()?

Consumer.committableSource(consumerSettings, Subscriptions.topics(topic))
      .mapAsync(
          1,
          msg ->
              business(msg.record().key(), msg.record().value())
                  .thenApply(done -> msg.committableOffset()))
      .toMat(Committer.sink(committerSettings.withMaxBatch(1)), Keep.both())
      .mapMaterializedValue(Consumer::createDrainingControl)
      .run(materializer);

1 Ответ

1 голос
/ 20 апреля 2019

Вы не можете присоединиться к Sink.ignore, потому что вы уже подключили Commiter.Sink.Но вы можете отбросить материализованные значения.

В примере используется toMat с Keep.both для сохранения обоих материализованных значений: элемента управления из источника и будущего [Готово] из приемника.С обоими значениями он создает DrainingControl в mapMaterializedValue, который позволяет вам останавливать поток или сливать поток до его остановки или получать уведомления, когда поток останавливается.

Если вас не волнует этот элемент управления (хотя вам следует)Вы можете использовать

Consumer.committableSource(consumerSettings, Subscriptions.topics(topic))
      .mapAsync(
          1,
          msg ->
              business(msg.record().key(), msg.record().value())
                  .thenApply(done -> msg.committableOffset()))
      .to(Committer.sink(committerSettings.withMaxBatch(1)))
      .run(materializer);
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...