Ошибка компиляции foreachBatch не является членом DataStreamWriter, хотя в искровой оболочке он работает - PullRequest
0 голосов
/ 05 августа 2020

Я пытаюсь использовать foreachBatch с потоковой передачей со структурой искры. Я пробовал код на консоли spark-shell, и он работал без каких-либо проблем, но когда я пытаюсь скомпилировать код, я получаю ошибку ниже.

value foreachBatch не является членом организации. apache .spark. sql .streaming.DataStreamWriter [org. apache .spark. sql .Row] [ошибка] возможная причина: может быть, перед `value foreachBatch 'отсутствует точка с запятой? [error] .foreachBatch {(batchDf: DataFrame, batchId: Long) => batchDf

Мой код выглядит примерно так.

val query = finalStream
  .writeStream
  .foreachBatch { (batchDf: DataFrame, batchId: Long) => batchDf
      .write
      .format("com.databricks.spark.redshift")
      .option("url", StreamingCfg.redshiftJdbcUrl)
      .option("dbtable", redshiftTableName)
      .option("aws_iam_role", StreamingCfg.redshiftARN)
      .option("tempdir", redshiftTempDir)
      .mode(SaveMode.Append)
      .save()

    batchDf
      .write
      .mode(SaveMode.Append)
      .partitionBy("date_key", "hour")
      .parquet(outputLocation);
  }
  .trigger(Trigger.ProcessingTime(aggregationTime.seconds))
  .option("checkpointLocation", checkPointingLocation)
  .start()

Кто-нибудь знает, что мне не хватает здесь?

Еще немного о том, что я делаю. Чтение двух потоков из kafka -> Объединение потокового потока на них -> одновременная запись в красное смещение и S3. Спасибо.

1 Ответ

1 голос
/ 05 августа 2020

Попробуйте использовать его так:

finalStream
  .writeStream
  .foreachBatch( (batchDF: DataFrame, batchId: Long ) => {
      
  })

Если он работает в искровой оболочке, вам следует дважды проверить зависимости в вашей рабочей (dev) среде. Убедитесь, что он может загрузить все зависимости Spark и использует правильную версию.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...