Я читаю данные из Kafka, используя структурированную потоковую передачу, и мне нужно сохранить данные в InfluxDB.В обычном подходе, основанном на Dstreams, я сделал это следующим образом:
val messages:DStream[(String, String)] = kafkaStream.map(record =>
(record.topic, record.value))
messages.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
val influxService = new InfluxService()
val connection = influxService.createInfluxDBConnectionWithParams(
host,
port,
username,
password,
database
)
partitionOfRecords.foreach(record => {
ABCService.handleData(connection, record._1, record._2)
}
)
}
}
ssc.start()
logger.info("Started Spark-Kafka streaming session")
ssc.awaitTermination()
Примечание :
Я создаю объект соединения внутри foreachpartition
.Как мне сделать это в структурированном потоке?
Я попробовал подход пула соединений (где я создаю пул соединений на главном узле и передаю его рабочим узлам) здесь Пул соединений Spark - это правильный подход
ирабочие не смогли получить объект пула соединений.Что-нибудь очевидное, что я здесь скучаю?