Я использую структурированную потоковую обработку для чтения входящих сообщений из темы Kafka и записи в несколько таблиц паркета на основе входящего сообщения. Поэтому я создал один readStream, поскольку источник Kafka является общим, и для каждой таблицы паркета был создан отдельный поток записи впетляЭто работает нормально, но поток чтения создает узкое место, так как для каждого writeStream он создает readStream, и нет никакого способа кэшировать уже прочитанный фрейм данных.
val kafkaDf=spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", conf.servers)
.option("subscribe", conf.topics)
// .option("earliestOffset","true")
.option("failOnDataLoss",false)
.load()
foreach table {
//filter the data from source based on table name
//write to parquet
parquetDf.writeStream.format("parquet")
.option("path", outputFolder + File.separator+ tableName)
.option("checkpointLocation", "checkpoint_"+tableName)
.outputMode("append")
.trigger(Trigger.Once())
.start()
}
Теперь каждый поток записи создает новую группу потребителей и читает все данные из Kafka, а затем выполняет фильтрацию и запись в Parquet.Это создает огромные накладные расходы.Чтобы избежать этого, я могу разделить тему Kafka так, чтобы в ней было столько же разделов, сколько и таблиц, и тогда поток чтения должен читать только из данного раздела.Но я не вижу способа указать детали раздела как часть потока чтения Kafka.