Я отправил postgres wallog на kafka, каждая таблица - тема.Схемы тем похожи на следующие (вложенный ключ данных отличается от каждой темы):
topic1: { tablename, data{id,name,age,created_at} }
topic2: { tablename, data{id,school,address,created_at} }
Теперь я хочу использовать spark, чтобы сохранить их в другой файл hdfs (имя таблицы в качестве имени файла).
Легко использовать способ множественного ввода.Но я обнаружил, что должен установить количество одновременных заданий на количество тем, иначе это не сработает.Но у меня есть около ста тем.Это проблема?
Я также попробовал метод с одним входным потоком, например, следующий:
stream.foreachRDD(rdd => {
rdd.foreachPartition(partition => {
val ds = partition.toList.toDS
val df = spark.read.json(ds)
df.write.mode("append").parquet("the_file_path")
})
})
Но у меня возникло много проблем.Кто-нибудь может дать здесь несколько баллов?
И есть ли идеи о том, использовать ли один или несколько потоков?