Kafka - Перенаправлять сообщения из «Темы A» в «Темы B» на основе значения заголовка - PullRequest
0 голосов
/ 27 марта 2019

Я бы хотел перенаправить сообщения kafka из темы "all-topic" в тему с именем " headervalue -topic", где headervalue - это значение пользовательского заголовка каждогосообщение имеет.

В настоящее время я использую пользовательское консольное приложение, которое принимает сообщения и перенаправляет сообщения в нужную тему, но обрабатывает только 16 сообщений в секунду.

И kafka, иzookeeper работает в док-контейнере, настроенном так:

zookeeper:
  image: "wurstmeister/zookeeper:latest"
  restart: always
  ports:
    - "2181:2181"
  environment:
    ZOOKEEPER_CLIENT_PORT: 2181
    ZOOKEEPER_SERVER_ID: 1

kafka:
  hostname: kafka
  image: "wurstmeister/kafka:latest"
  restart: always
  depends_on:
    - zookeeper
  ports:
    - "9092:9092"
  environment:
    KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
    KAFKA_ADVERTISED_HOST_NAME: kafka
    KAFKA_ADVERTISED_PORT: 9092

Каков наилучший и быстрый способ достижения моей цели?

Я знаю о существовании потоков Kafka, ноЯ не знаком с Java, поэтому, если вы захотите предложить Kafka Streams, будет полезен небольшой пример:)

Большое спасибо!

1 Ответ

0 голосов
/ 29 марта 2019

Вот решение, которое я придумал, используя библиотеку kafka-streams nodejs:

const {KafkaStreams} = require("kafka-streams");
const {nativeConfig: config} = require("./config.js");

const kafkaStreams = new KafkaStreams(config);
const myConsumerStream = kafkaStreams.getKStream("all-topic");

myConsumerStream
    .mapJSONConvenience()
    .filter((element) => {
        return element.value.type == "Article";
    })
    .tap((element) => {console.log("Got Article")})
    .mapWrapKafkaValue()
    .to("Article-topic", 1, "buffer");

myConsumerStream.start();
...