Запись всех данных одновременно в файл паркета с использованием структурированной потоковой передачи - PullRequest
0 голосов
/ 29 мая 2019

Я хочу записать все агрегированные данные из темы Кафки в файл паркета одновременно (или, по крайней мере, в итоге получить один файл паркета в конце).

Я запускаю отдельное приложение производителя, которое помещает 50 сообщений в тему.Данные агрегируются по времени (1 день) в потребительском приложении, поэтому мне нужно собрать все данные за 1 день и посчитать их.Это работает и выполняется следующим образом:

Dataset<Row> df = spark.readStream()
                .format("kafka")
                .option("kafka.bootstrap.servers", BOOTSTRAP_SERVER)
                .option("subscribe", "test")
                .option("startingOffsets", "latest")
                .option("group.id", "test")
                .option("failOnDataLoss", false)
                .option("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer")
                .option("value.deserializer", "org.apache.kafka.common.serialization.StringSerializer")
                .load()

// LEFT OUT CODE FOR READABILITY

                .withWatermark("timestamp", "1 minutes")
                .groupBy(
                        functions.window(new Column("timestamp"), "1 day", "1 day"),
                        new Column("container_nummer"))
                .count();

Результаты затем записываются в файл партера следующим образом:

StreamingQuery query = df.writeStream()
                .format("parquet")
                .option("truncate", "false")
                .option("checkpointLocation", "/tmp/kafka-logs")
                .start("/Users/**/kafka-path");

query.awaitTermination();

Если я записываю это в консоль, я получаюправильные подсчеты для каждого дня в партии 1. При попытке записать его в паркет, я получаю только несколько пустых файлов паркета.Я читаю их так:

SparkSession spark = SparkSession
                .builder()
                .appName("test")
                .config("spark.master", "local")
                .config("spark.sql.session.timeZone", "UTC")
                .getOrCreate();

        Dataset<Row> df = spark.read()
                .parquet("/Users/**/kafka-path/part-00000-dd416263-8db1-4166-b243-caba470adac7-c000.snappy.parquet");

        df.explain();
        df.show(20);

все файлы паркета кажутся пустыми (в отличие от записи их в консоль), код выше выводит это:

+------+----------------+-----+
|window|container_nummer|count|
+------+----------------+-----+
+------+----------------+-----+

IУ меня есть два вопроса:

  • По каким / возможным причинам мои файлы паркета пусты?
  • Можно ли в конце иметь 1 полный файл паркета со всеми данными в нем?Я хочу использовать эти данные для подачи модели машинного обучения в другую программу.

Примечание. Не требуется запускать в производство.Я просто надеюсь, что кто-то знает, как это работает ..

Заранее спасибо!

1 Ответ

0 голосов
/ 30 мая 2019

Компонент, участвующий в чтении текущего потока, является структурированным потоковым.Итак, вы в основном читаете неограниченный поток сообщений / записей, которые нужно будет записывать по мере поступления данных.Исходя из этого, Spark продолжит назначать исполнителей для выполнения операции чтения / записи, и в рамках этого процесса будет создано несколько файлов.Таким образом, у вас не будет одного файла со всеми данными.Ниже приведен синтаксис, который можно использовать для записи в файл паркета:

df..writeStream.queryName("Loantxns_view").outputMode("append").format("parquet").option("path", "/user/root/Ln_sink2").option("checkpointLocation", "/user/root/Checkpoints2").start()

Попробуйте прочитать файл паркета, используя механизм ниже, и посмотрите, получите ли вы данные, которые вы ищете (ParquetDF - новый фрейм данныхкоторый читается путем чтения пути к каталогу паркета):

val ParquetDF = spark.read.parquet("/user/root/Ln_sink2")
    ParquetDF.createOrReplaceTempView("xxxx_view");
    spark.sql("select * from xxxx_view").show(false);

Попробуйте и посмотрите, видны ли данные.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...