Я пытаюсь использовать foreachBatch
с потоковой передачей со структурой искры. Я пробовал код на консоли spark-shell
, и он работал без каких-либо проблем, но когда я пытаюсь скомпилировать код, я получаю ошибку ниже.
value foreachBatch не является членом организации. apache .spark. sql .streaming.DataStreamWriter [org. apache .spark. sql .Row] [ошибка] возможная причина: может быть, перед `value foreachBatch 'отсутствует точка с запятой? [error] .foreachBatch {(batchDf: DataFrame, batchId: Long) => batchDf
Мой код выглядит примерно так.
val query = finalStream
.writeStream
.foreachBatch { (batchDf: DataFrame, batchId: Long) => batchDf
.write
.format("com.databricks.spark.redshift")
.option("url", StreamingCfg.redshiftJdbcUrl)
.option("dbtable", redshiftTableName)
.option("aws_iam_role", StreamingCfg.redshiftARN)
.option("tempdir", redshiftTempDir)
.mode(SaveMode.Append)
.save()
batchDf
.write
.mode(SaveMode.Append)
.partitionBy("date_key", "hour")
.parquet(outputLocation);
}
.trigger(Trigger.ProcessingTime(aggregationTime.seconds))
.option("checkpointLocation", checkPointingLocation)
.start()
Кто-нибудь знает, что мне не хватает здесь?
Еще немного о том, что я делаю. Чтение двух потоков из kafka -> Объединение потокового потока на них -> одновременная запись в красное смещение и S3. Спасибо.