Сервис Лагом, потребляющий информацию от Кафки - PullRequest
0 голосов
/ 04 февраля 2019

Я пытаюсь выяснить, как Lagom можно использовать для получения данных из внешних систем, обменивающихся данными через Kafka.

Я столкнулся с этим разделом документации Lagom , в котором описано, какСлужба Lagom может взаимодействовать с другой службой Lagom, подписавшись на ее тему.

helloService
  .greetingsTopic()
  .subscribe // <-- you get back a Subscriber instance
  .atLeastOnce(
  Flow.fromFunction(doSomethingWithTheMessage)
)

Однако, какова подходящая конфигурация, когда вы хотите подписаться на тему Kafka, которая содержит события, генерируемые какой-то случайной внешней системой?

Нужен ли какой-то адаптер для этой функции?Чтобы уточнить, у меня есть это на данный момент:

object Aggregator {
  val TOPIC_NAME = "my-aggregation"
}

trait Aggregator extends Service {
  def aggregate(correlationId: String): ServiceCall[Data, Done]

  def aggregationTopic(): Topic[DataRecorded]

  override final def descriptor: Descriptor = {
    import Service._

    named("aggregator")
      .withCalls(
        pathCall("/api/aggregate/:correlationId", aggregate _)
      )
      .withTopics(
        topic(Aggregator.TOPIC_NAME, aggregationTopic())
          .addProperty(
            KafkaProperties.partitionKeyStrategy,
            PartitionKeyStrategy[DataRecorded](_.sessionData.correlationId)
          )
      )
      .withAutoAcl(true)
  }
}

И я могу вызвать его с помощью простого запроса POST.Тем не менее, я хотел бы, чтобы его вызывали, потребляя Data сообщений из некоторой (внешней) темы Кафки.

Мне было интересно, существует ли такой способ настройки дескриптора способом, подобным этому макету:

override final def descriptor: Descriptor = {
  ...
  kafkaTopic("my-input-topic")
    .subscribe(serviceCall(aggregate _)
    .withAtMostOnceDelivery
}

Я столкнулся с этим обсуждением групп Google , но в вопросах ОП я не вижу, что он действительно что-то делает с EventMessage сообщениями some-topic за исключением направления их в тему, определенную его службой.

РЕДАКТИРОВАТЬ # 1: Обновление прогресса

Глядя на документацию, я решил попробовать следующий подход.Я добавил еще 2 модуля, aggregator-kafka-proxy-api и aggregator-kafka-proxy-impl.

В новом модуле API я определил новый сервис, без методов, но с одной темой, которая представляла бы мою тему Кафки:

object DataKafkaPublisher {
  val TOPIC_NAME = "data-in"
}

trait DataKafkaPublisher extends Service {
  def dataInTopic: Topic[DataPublished]

  override final def descriptor: Descriptor = {
    import Service._
    import DataKafkaPublisher._

    named("data-kafka-in")
      .withTopics(
        topic(TOPIC_NAME, dataInTopic)
          .addProperty(
            KafkaProperties.partitionKeyStrategy,
            PartitionKeyStrategy[SessionDataPublished](_.data.correlationId)
          )
      )
      .withAutoAcl(true)
  }
}

В модуле impl я просто выполнил стандартную реализацию

class DataKafkaPublisherImpl(persistentEntityRegistry: PersistentEntityRegistry) extends DataKafkaPublisher {
  override def dataInTopic: Topic[api.DataPublished] =
    TopicProducer.singleStreamWithOffset {
      fromOffset =>
        persistentEntityRegistry.eventStream(KafkaDataEvent.Tag, fromOffset)
          .map(ev => (convertEvent(ev), ev.offset))
    }

  private def convertEvent(evt: EventStreamElement[KafkaDataEvent]): api.DataPublished = {
    evt.event match {
      case DataPublished(data) => api.DataPublished(data)
    }
  }
}

Теперь, чтобы фактически использовать эти события, в моем модуле aggregator-impl я добавил службу "подписчик", которая принимает этисобытия и вызывает соответствующие команды для сущности.

class DataKafkaSubscriber(persistentEntityRegistry: PersistentEntityRegistry, kafkaPublisher: DataKafkaPublisher) {

  kafkaPublisher.dataInTopic.subscribe.atLeastOnce(
    Flow[DataPublished].mapAsync(1) { sd =>
      sessionRef(sd.data.correlationId).ask(RecordData(sd.data))
    }
  )

  private def sessionRef(correlationId: String) =
    persistentEntityRegistry.refFor[Entity](correlationId)
}

Это фактически позволило мне опубликовать сообщение на тему "data-in" Kafka, которое затем было передано по прокси и преобразовано в команду RecordData перед отправкой вюридическое лицо, чтобы потреблять.

Однако, это кажется мне немного хакЯ связан с Кафкой внутренностями Лагома.Я не могу легко поменять источник моих данных.Например, как бы я использовал внешние сообщения от RabbitMQ, если бы захотел?Что, если я пытаюсь использовать другой Kafka (отличный от используемого Lagom)?

Редактировать # 2: Больше документов

Я нашел несколько статей о документах Lagom, особенно, это:

Использование тем от третьих лиц

Возможно, вы захотите, чтобы ваша служба Lagom использовала данные, полученные от служб, не реализованных в Lagom.В этом случае, как описано в разделе Service Clients, вы можете создать сторонний модуль service-api в своем проекте Lagom.Этот модуль будет содержать дескриптор службы, объявляющий тему, из которой вы будете использовать.После того, как вы внедрили свой интерфейс ThirdPartyService и связанные с ним классы, вы должны добавить сторонний-service-api как зависимость от вашего fancy-service-impl.Наконец, вы можете использовать тему, описанную в ThirdPartyService, как описано в разделе «Подписаться на тему».

Ответы [ 2 ]

0 голосов
/ 19 февраля 2019

Ответ предоставил Алан Кликич на форумах Lightbend здесь .

Часть 1:

Если вы используете только внешний кластер Kafka в вашембизнес-сервис, то вы можете реализовать это, используя только Lagom Broker API.Поэтому вам необходимо:

  1. создать API с дескриптором службы только с определением темы (этот API не реализован)
  2. в вашей бизнес-службе настроить kafka_native в зависимости от вашего развертывания (как яупоминается в предыдущем посте)
  3. в сервисе внедрения бизнес-сервисов из API, созданного в # 1, и подпишитесь на него с помощью подписчика API Lagom Broker

Офсетная фиксация, в подписчике API Lagom Broker естьобработано "из коробки".

Часть 2:

Для пользовательских реализаций Kafka и AMQP требуется постоянный поток akka.Так что вам нужно обрабатывать разрывы.Это можно сделать двумя способами:

  1. управлять постоянным потоком akka, помещая его в актера.Вы инициализируете свой поток Flow на предварительном запуске актора, и поток потока завершается для актера, который остановит его.Если поток завершается или происходит сбой, актер останавливается.Затем оберните актера в откат актера с помощью стратегии перезапуска, которая перезапустит актера в случае завершения или сбоя и повторно инициализирует потоки
  2. akka Отложенные перезапуски с этапом отката

Персонал, который я использую# 1 и еще не пробовал # 2.

Инициализация актера отката для # 1 или потока для # 2 может быть выполнена в вашей характеристике компонентов Lagom (в основном там же, где вы сейчас подписываетесь, используя Lagom Broker API).

Обязательно укажите группу потребителей при настройке потребителя, чтобы избежать потребления дубликатов.Вы можете использовать, как это делает Lagom, имя службы из дескриптора в качестве имени группы потребителей.

0 голосов
/ 06 февраля 2019

Я не использую lagom, так что, возможно, это просто идея.Но поскольку akka-streams является частью lagom (по крайней мере, я так полагаю) - получить из этого решения то, что вам нужно, должно быть легко.

Я использовал akka-stream-kafka и все прошло очень хорошо (я только сделал прототип)

Когда вы принимаете сообщения, вы делаете что-то:

     Consumer
      .committableSource(
          consumerSettings(..), // config of Kafka
          Subscriptions.topics("kafkaWsPathMsgTopic")) // Topic to subscribe
      .mapAsync(10) { msg =>
        business(msg.record) // do something
      }

Проверьте хорошо написанное документация

Весь мой пример вы найдете здесь: PathMsgConsumer

...