У меня есть один исходный CSV-файл, содержащий записи разных размеров, который помещает каждую запись в один исходный файл topi c. Я хочу разделить записи на разные KStreams / KTables из этого источника topi c. У меня есть конвейер для одной загрузки таблицы, где я помещаю запись из источника topi c в stream1 в формате с разделителями, а затем помещаю записи в другой поток в формате AVRO, который затем помещается в соединитель приемника JDB C, который помещает запись в базу данных MySQL. Трубопровод должен быть таким же. Но я хотел собрать sh записей разных таблиц в один источник topi c, а затем разбить записи на разные потоки по одному значению. Это возможно? Я пытался найти способы сделать это, но не смог. Можно ли как-то улучшить конвейер или использовать KTable вместо KStreams или каких-либо других модификаций?
Мой текущий поток - один исходный файл CSV (source.csv
) -> исходная топи c (имя - sourcetopi c, содержащая записи test1) -> поток 1 (формат значений с разделителями) -> поток 2 (в формате значения AVRO) -> end topi c (имя - sink-db-test1
) -> JDB C разъем приемника -> MySQL DB (имя - test1
)
У меня есть другая MySQL таблица test2
с другой схемой и записи для этой таблицы также присутствуют в файле source.csv
. Поскольку схема отличается, я не могу следовать текущему конвейеру test1
для вставки данных в таблицу test2
.
Пример - в исходном файле CSV,
line 1 - 9,atm,mun,ronaldo
line 2- 10,atm,mun,bravo,num2
line 3 - 11,atm,sign,bravo,sick
здесь, в этом примере, значение, под которым оно должно быть разделено, равно column 4
(ronaldo
или bravo
), все эти данные должны быть загружены в table 1
, table 2
, table 3
соответственно. ключ к столбцу 4.
if col4==ronaldo, go to table 1
if col4==bravo and col3==mun, go to table 2
if col4==bravo and col3 ==sign go to table 3
Я новичок в Kafka, начал разработку Kafka с предыдущей недели.