Dstream Spark Streaming - PullRequest
       8

Dstream Spark Streaming

0 голосов
/ 29 октября 2018

Здравствуйте, я написал следующий код

val receivedStream: DStream[PubsubSparkMessage] =
      PubsubSource.createStream(ssc, pubsubConfig, pubsubSubscriber)
    @volatile var receivedMessages: List[PubsubSparkMessage] = List() //scalastyle:ignore
    receivedStream.foreachRDD { rdd =>
      if (!rdd.isEmpty()) {
        receivedMessages = List(rdd.collect)
        receivedMessages
      }
    }

Я получаю ошибку из-за несоответствия типов; У меня вопрос, как изменить List [Array [PubsubSparkMessage]] на List [PubsubSparkMessage]

1 Ответ

0 голосов
/ 29 октября 2018

Что-то вроде:

val list : List[Array[PubsubSparkMessage]] = List()

val result = list.flatMap(x => x)
...