Как отправить сообщение Кафки из одной топи c в другую топи c? - PullRequest
0 голосов
/ 23 января 2020

предположим, что мой продюсер пишет сообщение в Topi c A ... после того, как сообщение отправлено в Topi c A, я хочу скопировать то же сообщение в Topi c B. Возможно ли это в kafka?

Ответы [ 3 ]

4 голосов
/ 23 января 2020

Если я правильно понимаю, вы просто хотите stream.to("topic-b"), хотя это кажется странным без что-то с данными.

Примечание:

Указанный topi c должен быть создан вручную перед использованием

0 голосов
/ 26 января 2020

Существует два немедленных варианта перенаправления содержимого одной топи c в другую:

  1. с помощью функции потоковой передачи Kafka для создания пересылочной ссылки между двумя темами.
  2. путем создания пары потребитель / производитель и использования ее для получения, а затем пересылки сообщений

У меня есть небольшой фрагмент кода, который показывает оба (в Scala):

  def topologyPlan(): StreamsBuilder = {
    val builder = new StreamsBuilder
    val inputTopic: KStream[String, String] = builder.stream[String, String]("topic2")
    inputTopic.to("topic3")
    builder
  }

  def run() = {
    val kafkaStreams = createStreams(topologyPlan())
    kafkaStreams.start()

    val kafkaConsumer = createConsumer()
    val kafkaProducer = createProducer()
    kafkaConsumer.subscribe(List("topic1").asJava)
    while (true) {
      val record = kafkaConsumer.poll(Duration.ofSeconds(5)).asScala
      for (data <- record.iterator) {
        kafkaProducer.send(new ProducerRecord[String, String]("topic2", data.value()))
      }
    }
  }

Глядя на метод run, первые две строки устанавливают объект streams, который использует topologyPlan () для прослушивания сообщений в 'topic2' и перенаправления затем в 'topic3'.

Остальные строки показывают, как потребитель может прослушать «topic1» и использовать продюсера для отправки их в «topic2».

Последний пример этого примера: Кафка достаточно гибок, чтобы позволить вам смешивать параметры в зависимости от на то, что вам нужно, поэтому приведенный выше код будет принимать сообщения в «topic1» и отправлять их в «topic3» через «topic2».

Если вы хотите увидеть код, который устанавливает потребителя, производителя и потоки см. полный класс здесь .

0 голосов
/ 24 января 2020

Мне не ясно, какой именно вариант использования вы пытаетесь достичь, просто копируя данные из одной топи c в другую топи c. Если обе темы находятся в одном и том же кластере Kafka, то не очень хорошая идея иметь две темы с одинаковым сообщением / содержанием.

Я полагаю, что разрыв заключается в том, что, вероятно, вы не совсем понимаете концепцию Группы потребителей в Кафке . Вероятно, вам нужно выполнить два действия, используя сообщение от Kafka topi c. И вы верите, что если первое приложение будет использовать сообщение от Kafka topi c, будет ли оно доступно для второго приложения для того же сообщения или нет. Kafka позволяет вам решить этот тип общего случая использования с помощью группы потребителей.

Давайте попробуем провести различие между другой очередью сообщений и Kafka, и вы поймете, что вам не нужно копировать те же данные / сообщение между двумя темами.

В других очередях сообщений, таких как SQS (Simple Queue Service), где, если сообщение потребляется потребителем, то же сообщение недоступно для использования другими потребителями. Ответственность за безопасное удаление сообщения лежит на клиенте после его обработки. Делая это, мы гарантируем, что одно и то же сообщение не должно обрабатываться двумя потребителями, что приводит к несогласованности.

Но в Kafka вполне нормально иметь несколько групп потребителей, потребляющих одну и ту же топику c. Группа потребителей образует группу, обычно называемую группой потребителей. Здесь один из потребителей из группы потребителей может обработать сообщение на основе раздела Kafka topi c, из которого получено сообщение.

Теперь уловка в том, что мы можем иметь несколько группы потребителей, потребляющих из той же кафки топи c. Каждая группа потребителей будет обрабатывать сообщение так, как они хотят. нет помех между потребителями двух разных групп потребителей .

Для выполнения вашего варианта использования, я полагаю, вам могут понадобиться две группы потребителей, которые могут просто обработать сообщение так, как они хотят. По сути, вам не нужно копировать данные между двумя темами.

Надеюсь, это поможет.

...