Spark структурированная потоковая передача - UNION два или более потоковых источника - PullRequest
0 голосов
/ 01 июля 2019

Я использую spark 2.3.2 и сталкиваюсь с проблемой создания объединения двух или более потоковых источников из Kafka. Каждый из них - потоковые источники из Kafka, которые я уже преобразовал и сохранил в Dataframes.

В идеале я бы хотел сохранить результаты этого UNIONed-фрейма данных в формате паркета в HDFS или, возможно, даже обратно в Kafka. Конечная цель - хранить эти объединенные события с минимально возможной задержкой.

val finalDF = flatDF1
      .union(flatDF2)
      .union(flatDF3)

val query = finalDF.writeStream
      .format("parquet")
      .outputMode("append")
      .option("path", hdfsLocation)
      .option("checkpointLocation", checkpointLocation)
      .option("failOnDataLoss", false)
      .start()

    query.awaitTermination()

при выполнении writeStream для консоли вместо паркета я получаю ожидаемые результаты, но приведенный выше пример вызывает ошибку подтверждения.

Caused by: java.lang.AssertionError: assertion failed
    at scala.Predef$.assert(Predef.scala:156)
    at org.apache.spark.sql.execution.streaming.OffsetSeq.toStreamProgress(OffsetSeq.scala:42)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$populateStartOffsets(MicroBatchExecution.scala:185)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:124)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)

вот класс и утверждение, которое терпит неудачу:

case class OffsetSeq(offsets: Seq[Option[Offset]], metadata: Option[OffsetSeqMetadata] = None) {

assert(sources.size == offsets.size)

Это потому, что контрольная точка хранит смещения только для одного из кадров данных? Просматривая документацию Spark Structured Streaming, выглядело, как будто можно было объединять / объединять потоковые источники в Spark 2.2 или>

.
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...