Моя команда сейчас вступает в сферу структурированного потокового вещания.Я относительно новичок в структурированном потоке.
У меня есть требование с
Источник - CSV
Раковина - JSON
Подробности Env:
Кластер: Spark 2.2.1
Язык программирования: Scala
Инструмент сборки: Gradle
Область применения:
Я реализовал этот простой код
val schema = StructType(
Array(StructField("customer_id", StringType),
StructField("name", StringType),
StructField("pid", StringType),
StructField("product_name", StringType)))
val fileData = spark.readStream
.option("header", "true")
.schema(schema)
.csv(args(0))
ЗатемЯ применяю простое агрегирование как
// The actual business logic is more complex than this
val customerCount = fileData.groupBy("customer_id").count()
Наконец, пишите в JSON
val query = customerCount
.writeStream
.format("json")
.option("path", "src/main/resources/output/myjson_dir")
.option("checkpointLocation", "src/main/resources/chkpoint_dir")
.trigger(Trigger.ProcessingTime("10 seconds"))
.start()
Вопросы:
- Это работает, как и ожидалось, когда я использую
.format("console")
,Но это вызывает исключение, когда я использую .format("json")
-
Исключение в потоке "main" org.apache.spark.sql.AnalysisException: добавление режима вывода не поддерживается при наличии потоковых агрегатовпри потоковой передаче DataFrames / DataSets без водяного знака ;;Aggregate [customer_id # 0], [customer_id # 0, count (1) AS count # 18L] + - Источник данных StreamingRelation (org.apache.spark.sql.SparkSession @ 4b56b031, csv, List (), Some (StructType (StructField (customer_id, StringType, true), StructField (имя, StringType, true), StructField (product_id, StringType, true), StructField (product_name, StringType, true))), List (), Нет, Карта (заголовок -> true, путь-> / Users / Underwood / Documents / workspace / Spark_Streaming_Examples / src / main / resources / input), Нет), FileSource [/ Users / Underwood / Documents / workspace / Spark_Streaming_Examples / src / main / resources / input], [customer_id #0, имя № 1, идентификатор продукта № 2, имя продукта № 3, дата № 4] в org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker $ .org $ apache $ spark $ sql $ катализатор анализ $ UnsupportedOperationChecker $$ throwError(UnsupportedOperationChecker.scala: 297)
Я пробовал использовать другие комбинации outputMode = "update"
и outputMode = "complete"
.Но эти ошибки также выбрасывают.Почему это так?Это ожидаемое поведение?Как мне записать вывод в приемник JSON?
Вышеуказанное исключение говорит об использовании водяных знаков .AFAIK, водяные знаки используются с полем Timestamp, но в моих входных данных у меня нет поля timestamp или date.Пожалуйста, дайте мне знать, если я ошибаюсь здесь.Как добавление водяного знака будет иметь значение здесь?
Моя следующая попытка была написать пользовательский ForEachSink.Я ссылался на этот пост .Но это мне тоже не помогло.Проблема была в том, что я получал 200 каталогов с 0-байтовым файлом в каждом.
Как выбрать не-группу по столбцам в конечном выводе?В простой пакетной обработке я обычно достигаю этого, соединяя агрегированный DF с исходным DF и выбирая необходимые строки.Но структурированной потоковой передаче, похоже, не нравится этот подход.Вот мой пример кода
val customerCount = fileData.groupBy("customer_id").count()
val finalDF = fileData.join(customerCount, Seq("customer_id"))
.select("customer_id", "count", "product_name" )
Пожалуйста, дайте мне знать, если я пропустил какие-либо детали.