Как разбить записи на разные потоки, от одной топи c до разных потоков? - PullRequest
0 голосов
/ 13 апреля 2020

У меня есть один исходный CSV-файл, содержащий записи разных размеров, который помещает каждую запись в один исходный файл topi c. Я хочу разделить записи на разные KStreams / KTables из этого источника topi c. У меня есть конвейер для одной загрузки таблицы, где я помещаю запись из источника topi c в stream1 в формате с разделителями, а затем помещаю записи в другой поток в формате AVRO, который затем помещается в соединитель приемника JDB C, который помещает запись в базу данных MySQL. Трубопровод должен быть таким же. Но я хотел собрать sh записей разных таблиц в один источник topi c, а затем разбить записи на разные потоки по одному значению. Это возможно? Я пытался найти способы сделать это, но не смог. Можно ли как-то улучшить конвейер или использовать KTable вместо KStreams или каких-либо других модификаций?

Мой текущий поток - один исходный файл CSV (source.csv) -> исходная топи c (имя - sourcetopi c, содержащая записи test1) -> поток 1 (формат значений с разделителями) -> поток 2 (в формате значения AVRO) -> end topi c (имя - sink-db-test1) -> JDB C разъем приемника -> MySQL DB (имя - test1)

У меня есть другая MySQL таблица test2 с другой схемой и записи для этой таблицы также присутствуют в файле source.csv. Поскольку схема отличается, я не могу следовать текущему конвейеру test1 для вставки данных в таблицу test2.

Пример - в исходном файле CSV,

line 1 - 9,atm,mun,ronaldo line 2- 10,atm,mun,bravo,num2 line 3 - 11,atm,sign,bravo,sick

здесь, в этом примере, значение, под которым оно должно быть разделено, равно column 4 (ronaldo или bravo), все эти данные должны быть загружены в table 1, table 2, table 3 соответственно. ключ к столбцу 4.

if col4==ronaldo, go to table 1 if col4==bravo and col3==mun, go to table 2 if col4==bravo and col3 ==sign go to table 3

Я новичок в Kafka, начал разработку Kafka с предыдущей недели.

Ответы [ 2 ]

0 голосов
/ 14 апреля 2020

Я могу разделить данные и использовал K SQL для подхода, которым я поделюсь ниже. 1. Входной поток создается с value_format='JSON' и столбцом payload как STRING 2. Полезная нагрузка будет содержать всю запись как STRING 3. Затем запись будет разбита на разные потоки с помощью оператора LIKE в предложении WHERE при размещении полезной нагрузки в разные потоки согласно требованию. Здесь я использовал SPLIT оператор K SQL для получения записей из полезной нагрузки в формате с разделителями-запятыми

0 голосов
/ 13 апреля 2020

Вы можете написать отдельное приложение Kafka Streams для разделения записей из ввода topi c на разные темы KStream или вывода, используя оператор KStream#branch() :

KStream<K, V>[] branches = streamsBuilder.branch(
        (key, value) -> {filter logic for topic 1 here},
        (key, value) -> {filter logic for topic 2 here},
        (key, value) -> true//get all messages for this branch
);

// KStream branches[0] records for logic 1
// KStream branches[1] records for logic 2
// KStream branches[2] records for logic 3

Или вы может вручную разветвлять ваш KStream так:

KStream<K, V> inputKStream = streamsBuilder.stream("your_input_topic", Consumed.with(keySerde, valueSerdes));

inputKStream
        .filter((key, value) -> {filter logic for topic 1 here})
        .to("your_1st_output_topic");

inputKStream
        .filter((key, value) -> {filter logic for topic 2 here})
        .to("your_2nd_output_topic");
...
...