Topic1 to Topic2 с использованием потоков kafka - PullRequest
1 голос
/ 14 марта 2019

Я новичок в потоках kafka и хотел бы прочитать тему и написать ее часть в новой теме, используя API-интерфейсы kafka streams. Мой ключ - строка, а значение - Avro. Есть ли документация / пример, который я могу использовать?

Редактировать:

    final StreamsBuilder builder = new StreamsBuilder();
    final KStream<String, GenericRecord> inputStream = builder.stream("Test_CX_TEST_KAFKA_X");
    final KStream<String, String> newStream = inputStream.mapValues(value -> value.get("ID").toString());
    newStream.to("SUB_TOPIC",Produced.with(Serdes.String(),Serdes.String()));
    final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
    streams.start();

В SUB_TOPIC у меня есть:

Key: { "ID": "145" } Отметка: 14 марта 2019 г. 17: 52: 23.43 смещение: 12 Раздел: 0

Моя тема ввода:

{ «ID»: «145», "TIMESTAMP": 1552585938545, "НЕДЕЛЯ": "\ u0000", "ИСТОЧНИК": { "строка": "ТМП" }, "ТЕЛО": { "string": "{\" operation_type \ ": \" INSERT \ ", \" old \ ": {\" ROW_ID \ ": null, \" LAST_UPD \ ": null, \" DENOMINATION \ ": null, \ "SIREN_SIRET \": нулевой}, \ "новый \": {\ "ROW_ID \": \ "170309 - ******** \", \ "LAST_UPD \": \ "2019-03-14T17: 52:18 \ "\ "ОБОЗНАЧЕНИЕ \": \ "1 - ****** \", \ "SIREN_SIRET \": нулевая}}" }, "TYPE_ACTION": { "string": "INSERT" } }

Как я могу добавить другие поля из тела в новой теме? пример:

{ «ID»: «145», "TIMESTAMP": 1552585938545, "НЕДЕЛЯ": "\ u0000", "ИСТОЧНИК": { "строка": "ТМП" }, "ТЕЛО": { "string": "{\" operation_type \ ": \" INSERT \ ", \" old \ ": {\" ROW_ID \ ": null, \" LAST_UPD \ ": null}, \" new \ ": {\ "ROW_ID \": \ "170309 - ******** \", \ "LAST_UPD \": \ "2019-03-14T17: 52: 18 \"}}» }, "TYPE_ACTION": { "string": "INSERT" } }

1 Ответ

1 голос
/ 14 марта 2019

Вы можете просто использовать тему в виде потока и изменить значение / KeyValues, используя функции .map () /. MapValues ​​().

Пример. Допустим, вы хотите выбрать столбец из записи avro и опубликовать его в новой теме вывода.

// If you are using Schema registry, make sure to add the schema registry url 
// in streamConfiguration. Also specify the AvroSerde for VALUE_SERDE

final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, GenericRecord> inputStream = builder.stream("inputTopic");
final KStream<String, String> newStream = userProfiles.mapValues(value -> value.get("fieldName").toString());
subStream.to("outputTopic",Produced.with(Serdes.String(),Serdes.String());
final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);

Также вы можете посмотреть примеры на github:
https://github.com/confluentinc/kafka-streams-examples/blob/5.1.2-post/src/main/java/io/confluent/examples/streams/WikipediaFeedAvroExample.java

...