Я написал небольшую программу, которая будет постоянно читать новые поступающие файлы из каталога HDFS и передавать их на Kafka Topi c. Продолжая читать файл HDFS, я хочу записать этот файл в свой лог-файл, но я не нахожу никакого способа сделать это. И я также хочу зарегистрировать файл, как только он будет полностью загружен в Kafka topi c. И во время записи данных в Kafka topi c я хочу передать имя файла в качестве заголовка в kafka topi c.
Ниже приведен пример кода, который я использую для чтения файла из HDFS и отправки его в Кафка Топи c.
StructType sch = DataTypes.createStructType(new StructField[] {
DataTypes.createStructField("id", DataTypes.StringType, true),
DataTypes.createStructField("cat", DataTypes.StringType, true)
});
Dataset<Row> jsonlines = spark.readStream().schema(sch).json("/data/json/") ;
StreamingQuery query = jsonlines.selectExpr("to_json(struct(*)) AS value")
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("checkpointLocation", "/checkpoint/topic2/")
.option("batch.size", 10)
.option("topic", "topic2").start();
query.awaitTermination();
Можете ли вы помочь, по-настоящему оценят.