Apache pulsar: потоки Akka - конфигурация потребителя - PullRequest
0 голосов
/ 12 июля 2020

Мне нужно написать простой сервис (сервис Record ingester), с помощью которого мне нужно потреблять сообщения, присутствующие на apache pulsar, и сохранять их в хранилище elasti c, и для этого я использую com.sksamuel.pulsar4s.akka .

Сообщения на пульсаре создаются другой службой, которая называется записывающим насосом. Обе эти службы должны быть развернуты отдельно в производственной среде.

Вот мой источник:

private val source = committableSource(consumerFn)

Приведенный выше код работает нормально и может принимать сообщения от пульсара и писать в ES. Однако я не уверен, следует ли использовать MessageId.earliest при создании источника

private val source = committableSource(consumerFn, Some(MessageId.earliest))

Во время тестирования я обнаружил плюсы и минусы обоих, без использования MessageId.earliest и с использованием MessageId.earliest, но ни один из них не подходит для производства (по моему мнению).

1. Без использования MessageId.earliest:

a. Это добавляет ограничение, что служба приема записи должна быть активирована до того, как мы запустим службу насоса записи.

b. Если моя служба приема записей выйдет из строя (из-за проблемы или из-за технического обслуживания), сообщения, созданные на Pulsar службой записи насосов, не будут использоваться после того, как служба приема записей будет восстановлена. Это означает, что сообщения, созданные в то время, когда служба ингейстера не работает, никогда не потребляются.

Итак, я думаю, что лог c состоит в том, что будут использоваться только те сообщения, которые будут отправлены на пульсар ПОСЛЕ того, как потребитель подписался на этот topi c.

Но я не думаю, что это приемлемо в производстве по причине, упомянутой в пунктах a и b.

2. С MessageId.earliest: Пункты a и b, упомянутые выше, решаются этим, но -

Когда мы используем это, каждый раз, когда моя служба приема записей возобновляет работу (после простоя или обслуживания), она начинает потреблять все сообщения с самого начала. У меня есть logi c, в котором записи с одинаковым идентификатором перезаписываются на стороне ES, так что это действительно не причиняет никакого вреда, но все же я не думаю, что это правильный путь - так как на этом топе будут миллионы сообщений c и каждый раз будет потреблять сообщения, которые уже потреблены (что является пустой тратой).

Это также неприемлемо для меня в продакшене.

Кто-нибудь может мне помочь, в какой конфигурации использовать который решает оба случая. Я пробовал различные конфигурации, например, с помощью subscriptionInitialPosition = Some (SubscriptionInitialPosition.Earliest), но безуспешно.

Полный код:

//consumer
  private val consumerFn = () =>
    pulsarClient.consumer(
      ConsumerConfig(
        subscriptionName = Subscription.generate,
        topics = Seq(statementTopic),
        subscriptionType = Some(SubscriptionType.Shared)
      )
    )
    
    //create source
    private val source = committableSource(consumerFn)
    
    
    //create intermediate flow
    private val intermediateFlow = Flow[CommittableMessage[Array[Byte]]].map {
    committableSourceMessage =>
      val message                     = committableSourceMessage.message
      val obj: MyObject = MyObject.parseFrom(message.value)
      WriteMessage.createIndexMessage(obj.id, JsonUtil.toJson(obj))
  }.via(
    ElasticsearchFlow.create(
      indexName = "myindex",
      typeName = "_doc",
      settings = ElasticsearchWriteSettings.Default,
      StringMessageWriter
    )
  )
    
    source.via(intermediateFlow).run()

1 Ответ

0 голосов
/ 13 июля 2020

То, что вам нужно, - это некоторая форма уплотнения. Подробнее см. в документации Pulsar . Вы можете сделать потребление с учетом уплотнения с помощью

ConsumerConfig(
  // other consumer config options as before
  readCompacted = Some(true)
)

В документах Pulsar есть обсуждение о механике уплотнения. Обратите внимание, что для включения уплотнения требуется, чтобы записи в topi c имели ключ, что могло или не могло происходить в прошлом. ключи для сжатия находятся в топах c, как часто они заменяются более поздними сообщениями и т. д. c. Основная идея c состояла бы в том, чтобы иметь statefulMapConcat, который сохраняет Map[String, T] в своем состоянии и некоторые средства очистки буфера.

Простая реализация:

Flow[CommittableMessage[Array[Byte]].map { csm =>
  Option(MyObject.parseFrom(csm.message.value))
}
.keepAlive(1.minute, () => None)
.statefulMapConcat { () =>
  var map: Map[String, MyObject] = Map.empty
  var count: Int = 0 
  { objOpt: Option[MyObject] =>
    objOpt.map { obj =>
      map = map.updated(obj.id, obj)
      count += 1
      if (count == 1000) {
        val toEmit = map.values.toList
        count = 0
        map = Map.empty
        toEmit
      } else Nil
    }.getOrElse {
      val toEmit = map.values.toList
      count = 0
      map = Map.empty
      toEmit
    }
  }

Более сложным ответом было бы создание актера, соответствующего каждому объекту (здесь может быть полезно сегментирование кластера, особенно если существует много объектов), и получение от Pulsar отправляет входящие сообщения в соответствующие Actor, который затем планирует запись последнего полученного сообщения в Elasticsearch.

Одна вещь, с которой нужно быть осторожной, - это не совершать смещения, пока вы не убедитесь, что сообщение (или его преемник) был написано в Elasticsearch. При использовании подхода «актор на объект» может оказаться полезным Akka Persistence: основная стратегия c будет заключаться в фиксации смещения после того, как актор подтвердит получение (что происходит после сохранения события, например, для Кассандры).

...