Я пытаюсь выяснить, как Lagom можно использовать для получения данных из внешних систем, обменивающихся данными через Kafka.
Я столкнулся с этим разделом документации Lagom , в котором описано, какСлужба Lagom может взаимодействовать с другой службой Lagom, подписавшись на ее тему.
helloService
.greetingsTopic()
.subscribe // <-- you get back a Subscriber instance
.atLeastOnce(
Flow.fromFunction(doSomethingWithTheMessage)
)
Однако, какова подходящая конфигурация, когда вы хотите подписаться на тему Kafka, которая содержит события, генерируемые какой-то случайной внешней системой?
Нужен ли какой-то адаптер для этой функции?Чтобы уточнить, у меня есть это на данный момент:
object Aggregator {
val TOPIC_NAME = "my-aggregation"
}
trait Aggregator extends Service {
def aggregate(correlationId: String): ServiceCall[Data, Done]
def aggregationTopic(): Topic[DataRecorded]
override final def descriptor: Descriptor = {
import Service._
named("aggregator")
.withCalls(
pathCall("/api/aggregate/:correlationId", aggregate _)
)
.withTopics(
topic(Aggregator.TOPIC_NAME, aggregationTopic())
.addProperty(
KafkaProperties.partitionKeyStrategy,
PartitionKeyStrategy[DataRecorded](_.sessionData.correlationId)
)
)
.withAutoAcl(true)
}
}
И я могу вызвать его с помощью простого запроса POST.Тем не менее, я хотел бы, чтобы его вызывали, потребляя Data
сообщений из некоторой (внешней) темы Кафки.
Мне было интересно, существует ли такой способ настройки дескриптора способом, подобным этому макету:
override final def descriptor: Descriptor = {
...
kafkaTopic("my-input-topic")
.subscribe(serviceCall(aggregate _)
.withAtMostOnceDelivery
}
Я столкнулся с этим обсуждением групп Google , но в вопросах ОП я не вижу, что он действительно что-то делает с EventMessage
сообщениями some-topic
за исключением направления их в тему, определенную его службой.
РЕДАКТИРОВАТЬ # 1: Обновление прогресса
Глядя на документацию, я решил попробовать следующий подход.Я добавил еще 2 модуля, aggregator-kafka-proxy-api
и aggregator-kafka-proxy-impl
.
В новом модуле API я определил новый сервис, без методов, но с одной темой, которая представляла бы мою тему Кафки:
object DataKafkaPublisher {
val TOPIC_NAME = "data-in"
}
trait DataKafkaPublisher extends Service {
def dataInTopic: Topic[DataPublished]
override final def descriptor: Descriptor = {
import Service._
import DataKafkaPublisher._
named("data-kafka-in")
.withTopics(
topic(TOPIC_NAME, dataInTopic)
.addProperty(
KafkaProperties.partitionKeyStrategy,
PartitionKeyStrategy[SessionDataPublished](_.data.correlationId)
)
)
.withAutoAcl(true)
}
}
В модуле impl я просто выполнил стандартную реализацию
class DataKafkaPublisherImpl(persistentEntityRegistry: PersistentEntityRegistry) extends DataKafkaPublisher {
override def dataInTopic: Topic[api.DataPublished] =
TopicProducer.singleStreamWithOffset {
fromOffset =>
persistentEntityRegistry.eventStream(KafkaDataEvent.Tag, fromOffset)
.map(ev => (convertEvent(ev), ev.offset))
}
private def convertEvent(evt: EventStreamElement[KafkaDataEvent]): api.DataPublished = {
evt.event match {
case DataPublished(data) => api.DataPublished(data)
}
}
}
Теперь, чтобы фактически использовать эти события, в моем модуле aggregator-impl
я добавил службу "подписчик", которая принимает этисобытия и вызывает соответствующие команды для сущности.
class DataKafkaSubscriber(persistentEntityRegistry: PersistentEntityRegistry, kafkaPublisher: DataKafkaPublisher) {
kafkaPublisher.dataInTopic.subscribe.atLeastOnce(
Flow[DataPublished].mapAsync(1) { sd =>
sessionRef(sd.data.correlationId).ask(RecordData(sd.data))
}
)
private def sessionRef(correlationId: String) =
persistentEntityRegistry.refFor[Entity](correlationId)
}
Это фактически позволило мне опубликовать сообщение на тему "data-in" Kafka, которое затем было передано по прокси и преобразовано в команду RecordData
перед отправкой вюридическое лицо, чтобы потреблять.
Однако, это кажется мне немного хакЯ связан с Кафкой внутренностями Лагома.Я не могу легко поменять источник моих данных.Например, как бы я использовал внешние сообщения от RabbitMQ, если бы захотел?Что, если я пытаюсь использовать другой Kafka (отличный от используемого Lagom)?
Редактировать # 2: Больше документов
Я нашел несколько статей о документах Lagom, особенно, это:
Использование тем от третьих лиц
Возможно, вы захотите, чтобы ваша служба Lagom использовала данные, полученные от служб, не реализованных в Lagom.В этом случае, как описано в разделе Service Clients, вы можете создать сторонний модуль service-api в своем проекте Lagom.Этот модуль будет содержать дескриптор службы, объявляющий тему, из которой вы будете использовать.После того, как вы внедрили свой интерфейс ThirdPartyService и связанные с ним классы, вы должны добавить сторонний-service-api как зависимость от вашего fancy-service-impl.Наконец, вы можете использовать тему, описанную в ThirdPartyService, как описано в разделе «Подписаться на тему».