Spark структурированный потоковый раздел foreachpartition / проблема с пулами соединений - PullRequest
0 голосов
/ 17 мая 2018

Я читаю данные из Kafka, используя структурированную потоковую передачу, и мне нужно сохранить данные в InfluxDB.В обычном подходе, основанном на Dstreams, я сделал это следующим образом:

val messages:DStream[(String, String)] =  kafkaStream.map(record => 
(record.topic, record.value)) 
messages.foreachRDD { rdd => 
  rdd.foreachPartition { partitionOfRecords => 
    val influxService = new InfluxService() 
    val connection = influxService.createInfluxDBConnectionWithParams( 
        host, 
        port, 
        username, 
        password, 
        database 
        ) 
    partitionOfRecords.foreach(record => { 
      ABCService.handleData(connection, record._1, record._2) 
    } 
    ) 
  } 
} 
ssc.start() 
logger.info("Started Spark-Kafka streaming session") 
ssc.awaitTermination() 

Примечание :

Я создаю объект соединения внутри foreachpartition.Как мне сделать это в структурированном потоке?

Я попробовал подход пула соединений (где я создаю пул соединений на главном узле и передаю его рабочим узлам) здесь Пул соединений Spark - это правильный подход
ирабочие не смогли получить объект пула соединений.Что-нибудь очевидное, что я здесь скучаю?

1 Ответ

0 голосов
/ 17 мая 2018

Структурированная потоковая передача имеет совершенно другой дизайн, и старые шаблоны на основе RDD там на самом деле не применимы.

Вместо этого вы должны реализовать свой собственный ForeachWriter .Требуется три метода:

  • open

    abstract def open(partitionId: Long, version: Long): Boolean Вызывается при запуске обработки одного раздела новых данных в исполнителе.

    Здесь вы инициализируете объекты подключения.В общем, это не должно зависеть от объектов, переданных с замыканием (ошибка, которую вы допустили во втором вопросе).

    Если вы хотите ограничить количество соединений, вы можете использовать объекты синглтонов, если:все компоненты являются поточно-ориентированными.

  • process

    abstract def process(value: T): Unit
    

    Вызывается для обработки данных на стороне исполнителя.

    Это эквивалентно foreach.

  • close

    abstract def close(errorOrNull: Throwable): Unit

    Вызывается при остановке для обработки одного разделановых данных на стороне исполнителя.

    Здесь вы можете закрывать соединения и погружать другие временные объекты.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...