Ошибка в структурированной потоковой передаче Spark с источником файла и приемником файла - PullRequest
0 голосов
/ 11 июня 2018

Моя команда сейчас вступает в сферу структурированного потокового вещания.Я относительно новичок в структурированном потоке.

У меня есть требование с

Источник - CSV
Раковина - JSON

Подробности Env:

Кластер: Spark 2.2.1
Язык программирования: Scala
Инструмент сборки: Gradle

Область применения:

Я реализовал этот простой код

val schema = StructType(
    Array(StructField("customer_id", StringType),
        StructField("name", StringType),
        StructField("pid", StringType),
        StructField("product_name", StringType)))

val fileData = spark.readStream
    .option("header", "true")
    .schema(schema)
    .csv(args(0))

ЗатемЯ применяю простое агрегирование как

// The actual business logic is more complex than this
val customerCount = fileData.groupBy("customer_id").count()

Наконец, пишите в JSON

val query = customerCount
    .writeStream
    .format("json")
    .option("path", "src/main/resources/output/myjson_dir")
    .option("checkpointLocation", "src/main/resources/chkpoint_dir")
    .trigger(Trigger.ProcessingTime("10 seconds"))
    .start()

Вопросы:

  1. Это работает, как и ожидалось, когда я использую .format("console"),Но это вызывает исключение, когда я использую .format("json") -

Исключение в потоке "main" org.apache.spark.sql.AnalysisException: добавление режима вывода не поддерживается при наличии потоковых агрегатовпри потоковой передаче DataFrames / DataSets без водяного знака ;;Aggregate [customer_id # 0], [customer_id # 0, count (1) AS count # 18L] + - Источник данных StreamingRelation (org.apache.spark.sql.SparkSession @ 4b56b031, csv, List (), Some (StructType (StructField (customer_id, StringType, true), StructField (имя, StringType, true), StructField (product_id, StringType, true), StructField (product_name, StringType, true))), List (), Нет, Карта (заголовок -> true, путь-> / Users / Underwood / Documents / workspace / Spark_Streaming_Examples / src / main / resources / input), Нет), FileSource [/ Users / Underwood / Documents / workspace / Spark_Streaming_Examples / src / main / resources / input], [customer_id #0, имя № 1, идентификатор продукта № 2, имя продукта № 3, дата № 4] в org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker $ .org $ apache $ spark $ sql $ катализатор анализ $ UnsupportedOperationChecker $$ throwError(UnsupportedOperationChecker.scala: 297)

Я пробовал использовать другие комбинации outputMode = "update" и outputMode = "complete".Но эти ошибки также выбрасывают.Почему это так?Это ожидаемое поведение?Как мне записать вывод в приемник JSON?

Вышеуказанное исключение говорит об использовании водяных знаков .AFAIK, водяные знаки используются с полем Timestamp, но в моих входных данных у меня нет поля timestamp или date.Пожалуйста, дайте мне знать, если я ошибаюсь здесь.Как добавление водяного знака будет иметь значение здесь?

Моя следующая попытка была написать пользовательский ForEachSink.Я ссылался на этот пост .Но это мне тоже не помогло.Проблема была в том, что я получал 200 каталогов с 0-байтовым файлом в каждом.

Как выбрать не-группу по столбцам в конечном выводе?В простой пакетной обработке я обычно достигаю этого, соединяя агрегированный DF с исходным DF и выбирая необходимые строки.Но структурированной потоковой передаче, похоже, не нравится этот подход.Вот мой пример кода

val customerCount = fileData.groupBy("customer_id").count()
val finalDF = fileData.join(customerCount, Seq("customer_id"))
    .select("customer_id", "count", "product_name" )

Пожалуйста, дайте мне знать, если я пропустил какие-либо детали.

1 Ответ

0 голосов
/ 09 июля 2018

Прочтите официальную документацию Spark Structured Streaming , относящуюся к водяным знакам .

В основном, когда вы агрегируете, вы должны установить outputMode = "complete", потому что добавлять новые данные не имеет смыслабез сохранения в памяти обработки, выполненной ранее (например, подсчет слов).

Из-за этого вы должны указать, используя водяной знак или оконную функцию, когда программа должна начать новое агрегирование, и когдаданные слишком поздние.

Если у вас нет столбца с отметкой времени, вы можете создать столбец, используя функцию now(), и это будет время обработки.

Если естьчто-то неясно или есть вопросы, прокомментируйте, и я обновлю свой ответ.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...