Как создать соединение (я) с источником данных в Spark Streaming для поисков - PullRequest
1 голос
/ 15 марта 2019

У меня есть сценарий использования, когда мы транслируем события, и для каждого события мне нужно выполнить некоторые поиски. Поиски в Redis, и мне интересно, каков наилучший способ создания связей. При потоковой передаче искры будут работать 40 исполнителей, и у меня есть 5 таких потоковых заданий, подключенных к одному Redis Cluster. Поэтому я не понимаю, какой подход следует использовать для создания подключения Redis

  1. Создайте объект подключения в драйвере и передайте его исполнителям (не уверен, действительно ли он работает, поскольку я должен сделать этот объект сериализуемым). Могу ли я сделать это с широковещательными переменными?

  2. Создайте соединение Redis для каждого раздела, однако у меня есть код, написанный таким образом

    val update = umfRecsStream.transform(rdd => { // on driver if (MetaDataRefresh.isNewDay) { ..... } rdd }) update.foreachRDD(rdd => { rdd.foreachPartition(partition => { partition.foreach(kafkaKey_trans => { // perform some lookups logic here } } })

Так что теперь, если я создам соединение внутри каждого раздела, это будет означать, что для каждого RDD и для каждого раздела в этом RDD я буду создавать новое соединение.

Есть ли способ, которым я могу поддерживать одно соединение для каждого раздела и кэшировать этот объект, чтобы мне не приходилось создавать соединения снова и снова?

Я могу добавить больше контекста / информации, если требуется.

1 Ответ

2 голосов
/ 16 марта 2019

1.Создайте объект подключения в драйвере и передайте его исполнителям (не уверен, действительно ли он работает, поскольку я должен сделать этот объект сериализуемым).Могу ли я сделать это с широковещательными переменными?

Ответ - Нет. Большинство объектов соединения не сериализуемы из-за машинно-зависимых данных, связанных с соединением.

2.Есть ли способ сохранить одно соединение для каждого раздела и кэшировать этот объект, чтобы мне не приходилось создавать соединения снова и снова?

Ответ. Да, создайте пул соединений и используйте его враздел.вот стиль.Вы можете создать пул соединений, подобный этому https://github.com/RedisLabs/spark-redis/blob/master/src/main/scala/com/redislabs/provider/redis/ConnectionPool.scala

, а затем использовать его

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    // ConnectionPool is a static, lazily initialized pool of connections
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
  }
}

Пожалуйста, отметьте это: шаблон проектирования для использования foreachRDD

...