Мне нужно написать простой сервис (сервис Record ingester), с помощью которого мне нужно потреблять сообщения, присутствующие на apache pulsar, и сохранять их в хранилище elasti c, и для этого я использую com.sksamuel.pulsar4s.akka .
Сообщения на пульсаре создаются другой службой, которая называется записывающим насосом. Обе эти службы должны быть развернуты отдельно в производственной среде.
Вот мой источник:
private val source = committableSource(consumerFn)
Приведенный выше код работает нормально и может принимать сообщения от пульсара и писать в ES. Однако я не уверен, следует ли использовать MessageId.earliest при создании источника
private val source = committableSource(consumerFn, Some(MessageId.earliest))
Во время тестирования я обнаружил плюсы и минусы обоих, без использования MessageId.earliest и с использованием MessageId.earliest, но ни один из них не подходит для производства (по моему мнению).
1. Без использования MessageId.earliest:
a. Это добавляет ограничение, что служба приема записи должна быть активирована до того, как мы запустим службу насоса записи.
b. Если моя служба приема записей выйдет из строя (из-за проблемы или из-за технического обслуживания), сообщения, созданные на Pulsar службой записи насосов, не будут использоваться после того, как служба приема записей будет восстановлена. Это означает, что сообщения, созданные в то время, когда служба ингейстера не работает, никогда не потребляются.
Итак, я думаю, что лог c состоит в том, что будут использоваться только те сообщения, которые будут отправлены на пульсар ПОСЛЕ того, как потребитель подписался на этот topi c.
Но я не думаю, что это приемлемо в производстве по причине, упомянутой в пунктах a и b.
2. С MessageId.earliest: Пункты a и b, упомянутые выше, решаются этим, но -
Когда мы используем это, каждый раз, когда моя служба приема записей возобновляет работу (после простоя или обслуживания), она начинает потреблять все сообщения с самого начала. У меня есть logi c, в котором записи с одинаковым идентификатором перезаписываются на стороне ES, так что это действительно не причиняет никакого вреда, но все же я не думаю, что это правильный путь - так как на этом топе будут миллионы сообщений c и каждый раз будет потреблять сообщения, которые уже потреблены (что является пустой тратой).
Это также неприемлемо для меня в продакшене.
Кто-нибудь может мне помочь, в какой конфигурации использовать который решает оба случая. Я пробовал различные конфигурации, например, с помощью subscriptionInitialPosition = Some (SubscriptionInitialPosition.Earliest), но безуспешно.
Полный код:
//consumer
private val consumerFn = () =>
pulsarClient.consumer(
ConsumerConfig(
subscriptionName = Subscription.generate,
topics = Seq(statementTopic),
subscriptionType = Some(SubscriptionType.Shared)
)
)
//create source
private val source = committableSource(consumerFn)
//create intermediate flow
private val intermediateFlow = Flow[CommittableMessage[Array[Byte]]].map {
committableSourceMessage =>
val message = committableSourceMessage.message
val obj: MyObject = MyObject.parseFrom(message.value)
WriteMessage.createIndexMessage(obj.id, JsonUtil.toJson(obj))
}.via(
ElasticsearchFlow.create(
indexName = "myindex",
typeName = "_doc",
settings = ElasticsearchWriteSettings.Default,
StringMessageWriter
)
)
source.via(intermediateFlow).run()