Ведение журнала файла HDFS при чтении файла HDFS с использованием Spark readStream - PullRequest
0 голосов
/ 01 мая 2020

Я написал небольшую программу, которая будет постоянно читать новые поступающие файлы из каталога HDFS и передавать их на Kafka Topi c. Продолжая читать файл HDFS, я хочу записать этот файл в свой лог-файл, но я не нахожу никакого способа сделать это. И я также хочу зарегистрировать файл, как только он будет полностью загружен в Kafka topi c. И во время записи данных в Kafka topi c я хочу передать имя файла в качестве заголовка в kafka topi c.

Ниже приведен пример кода, который я использую для чтения файла из HDFS и отправки его в Кафка Топи c.

    StructType sch =  DataTypes.createStructType(new StructField[] {
            DataTypes.createStructField("id", DataTypes.StringType, true),
            DataTypes.createStructField("cat", DataTypes.StringType, true)
    });

    Dataset<Row> jsonlines = spark.readStream().schema(sch).json("/data/json/") ;

    StreamingQuery query = jsonlines.selectExpr("to_json(struct(*)) AS value")
            .writeStream()
            .format("kafka")
            .option("kafka.bootstrap.servers", "localhost:9092")
            .option("checkpointLocation", "/checkpoint/topic2/")
            .option("batch.size", 10)
            .option("topic", "topic2").start();

    query.awaitTermination();

Можете ли вы помочь, по-настоящему оценят.

...