Самое простое решение будет выглядеть так:
allEvents.selectExpr("topic", "CONCAT('m_oKafkaEvents_for_', Model, '_topic')")
.write()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.save();
Пример вы можете увидеть здесь https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html#writing-the-output-of-batch-queries-to-kafka.Но после просмотра кода Spark кажется, что у нас может быть только 1 тема / запись, т.е. он выберет в качестве темы первую найденную строку:
def write(
sparkSession: SparkSession,
queryExecution: QueryExecution,
kafkaParameters: ju.Map[String, Object],
topic: Option[String] = None): Unit = {
val schema = queryExecution.analyzed.output
validateQuery(schema, kafkaParameters, topic)
queryExecution.toRdd.foreachPartition { iter =>
val writeTask = new KafkaWriteTask(kafkaParameters, schema, topic)
Utils.tryWithSafeFinally(block = writeTask.execute(iter))(
finallyBlock = writeTask.close())
}
Вы можете попробовать этот подход и сказать здесь, еслиэто работает как сказано выше?Если это не работает, у вас есть альтернативные решения, такие как:
- Кэшируйте основной DataFrame и создайте 2 других DataFrames, отфильтрованных по атрибуту Model
- Используйте foreachPartition и средство записи Kafka для отправкисообщения без разделения основного набора данных
Первое решение довольно просто реализовать, и вы используете для этого все возможности Spark.С другой стороны, и, по крайней мере, теоретически, разделение набора данных должно быть немного медленнее, чем второе предложение.Но попробуйте измерить, прежде чем выбрать тот или иной вариант, возможно, разница будет очень мала, и всегда лучше использовать понятный и одобренный сообществом подход.
Ниже вы можете найти код, показывающий обе ситуации:
SparkSession spark = SparkSession
.builder()
.appName("JavaStructuredNetworkWordCount")
.getOrCreate();
Dataset<Row> allEvents = spark.readStream().format("kafka")
.option("kafka.bootstrap.servers", "")
.option("subscribe", "event")
.option("maxOffsetsPerTrigger", "100000")
.option("startingOffsets", "latest")
.option("failOnDataLoss", false)
.load()
.select(functions.from_json(functions.col("value").cast("string"), null).as("events"))
.select("events.*");
// First solution
Dataset<Row> opportunity1Events = allEvents.filter("Model = 'Opportunity_1'");
opportunity1Events.write().format("kafka").option("kafka.bootstrap.servers", "")
.option("topic", "m_oKafkaEvents_for_Opportunity_1_topic").save();
Dataset<Row> opportunity2Events = allEvents.filter("Model = 'Opportunity_2'");
opportunity2Events.write().format("kafka").option("kafka.bootstrap.servers", "")
.option("topic", "m_oKafkaEvents_for_Opportunity_2_topic").save();
// Note: Kafka writer was added in 2.2.0 https://github.com/apache/spark/commit/b0a5cd89097c563e9949d8cfcf84d18b03b8d24c
// Another approach with iteration throughout messages accumulated within each partition
allEvents.foreachPartition(new ForeachPartitionFunction<Row>() {
private KafkaProducer<String, Row> localProducer = new KafkaProducer<>(new HashMap<>());
private final Map<String, String> modelsToTopics = new HashMap<>();
{
modelsToTopics.put("Opportunity_1", "m_oKafkaEvents_for_Opportunity_1_topic");
modelsToTopics.put("Opportunity_2", "m_oKafkaEvents_for_Opportunity_2_topic");
}
@Override
public void call(Iterator<Row> rows) throws Exception {
// If your message is Opportunity1 => add to messagesOpportunity1
// otherwise it goes to Opportunity2
while (rows.hasNext()) {
Row currentRow = rows.next();
// you can reformat your row here or directly in Spark's map transformation
localProducer.send(new ProducerRecord<>(modelsToTopics.get(currentRow.getAs("Model")),
"some_message_key", currentRow));
}
// KafkaProducer accumulates messages in a in-memory buffer and sends when a threshold was reached
// Flush them synchronously here to be sure that every stored message was correctly
// delivered
// You can also play with features added in Kafka 0.11: the idempotent producer and the transactional producer
localProducer.flush();
}
});