Передача Azure DataBricks Stream foreach завершается с NotSerializableException - PullRequest
0 голосов
/ 26 марта 2019

Я хочу непрерывно обрабатывать строки потока набора данных (первоначально инициированные Kafka): основываясь на условии, что я хочу обновить хэш Radis. Это мой фрагмент кода (lastContacts - это результат предыдущей команды, которая представляет собой поток такого типа: org.apache.spark.sql.DataFrame = [serialNumber: string, lastModified: long]. Это расширяется до org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]):

class MyStreamProcessor extends ForeachWriter[Row] {
  override def open(partitionId: Long, version: Long): Boolean = {
    true
  }

  override def process(record: Row) = {
    val stringHashRDD = sc.parallelize(Seq(("lastContact", record(1).toString)))
    sc.toRedisHASH(stringHashRDD, record(0).toString)(redisConfig)
  }

  override def close(errorOrNull: Throwable): Unit = {}
}

val query = lastContacts
  .writeStream
  .foreach(new MyStreamProcessor())
  .start()

query.awaitTermination()

Я получаю огромную трассировку стека, соответствующая часть (я думаю) такова: java.io.NotSerializableException: org.apache.spark.sql.streaming.DataStreamWriter

Может ли кто-нибудь объяснить, почему возникает это исключение и как его избежать? Спасибо!

Этот вопрос относится к следующим двум:

1 Ответ

2 голосов
/ 26 марта 2019

Контекст Spark не сериализуем.

Любая реализация ForeachWriter должна быть сериализуемой, потому что каждая задача получит новую сериализованную десериализованную копию предоставленного объекта.Следовательно, настоятельно рекомендуется, чтобы любая инициализация для записи данных (например, открытие соединения или запуск транзакции) была выполнена после вызова метода open (...), что означает, что задача готова к генерации данных.

В своем коде вы пытаетесь использовать искровой контекст в методе процесса,

override def process(record: Row) = {
    val stringHashRDD = sc.parallelize(Seq(("lastContact", record(1).toString)))
    *sc.toRedisHASH(stringHashRDD, record(0).toString)(redisConfig)*
  }

Чтобы отправить данные в Redis, вам нужно создать собственное соединение и открыть его в открытом виде.метод, а затем использовать его в методе процесса.

Посмотрите, как создать пул соединений Redis.https://github.com/RedisLabs/spark-redis/blob/master/src/main/scala/com/redislabs/provider/redis/ConnectionPool.scala

...