Чтение сообщения от Kafka Topi c и выгрузка его в HDFS - PullRequest
1 голос
/ 05 мая 2020

Я пытаюсь использовать данные из Kafka Topi c, загрузить их в набор данных, а затем выполнить фильтрацию перед загрузкой в ​​файлы Hdf.

Я могу использовать данные из Kafka Topi c, загрузите его в набор данных и сохранить как паркетный файл в HDFS. Но не удается выполнить условие фильтрации. Подскажите, пожалуйста, как выполнить фильтр перед сохранением в hdfs? Я использую Java со Spark для потребления из kafka topi c. какая-то часть моего кода выглядит так:

DataframeDeserializer dataframe = new DataframeDeserializer(dataset);

 ds = dataframe.fromConfluentAvro("value", <your schema path>, <yourmap>, RETAIN_SELECTED_COLUMN_ONLY$.MODULE$);

StreamingQuery query = ds.coalesce(10)
                .writeStream()
                .format("parquet")
                .option("path", path.toString())
                .option("checkpointLocation", "<your path>")
                .trigger(Trigger.Once())
                .start();

Ответы [ 2 ]

1 голос
/ 05 мая 2020

Вместо того, чтобы заново изобретать колесо, я настоятельно рекомендую Kafka Connect . Все, что вам нужно, это HDFS Sink Connector, который реплицирует данные из Kafka topi c в HDFS.

1 голос
/ 05 мая 2020

Записать лог фильтра c перед coalesce т.е. ds.filter().coalesce()


DataframeDeserializer dataframe = new DataframeDeserializer(dataset);

 ds = dataframe.fromConfluentAvro("value", <your schema path>, <yourmap>, RETAIN_SELECTED_COLUMN_ONLY$.MODULE$);

StreamingQuery query = 
                ds
                .filter(...) // Write your filter condition here
                .coalesce(10)
                .writeStream()
                .format("parquet")
                .option("path", path.toString())
                .option("checkpointLocation", "<your path>")
                .trigger(Trigger.Once())
                .start();


Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...