структурированное потоковое чтение на основе разделов kafka - PullRequest
0 голосов
/ 09 октября 2018

Я использую структурированную потоковую обработку для чтения входящих сообщений из темы Kafka и записи в несколько таблиц паркета на основе входящего сообщения. Поэтому я создал один readStream, поскольку источник Kafka является общим, и для каждой таблицы паркета был создан отдельный поток записи впетляЭто работает нормально, но поток чтения создает узкое место, так как для каждого writeStream он создает readStream, и нет никакого способа кэшировать уже прочитанный фрейм данных.

val kafkaDf=spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", conf.servers)
      .option("subscribe", conf.topics)
      //  .option("earliestOffset","true")
      .option("failOnDataLoss",false)
      .load()

foreach table   {  
//filter the data from source based on table name
//write to parquet
 parquetDf.writeStream.format("parquet")
        .option("path", outputFolder + File.separator+ tableName)
        .option("checkpointLocation", "checkpoint_"+tableName)
        .outputMode("append")
        .trigger(Trigger.Once())
       .start()
}

Теперь каждый поток записи создает новую группу потребителей и читает все данные из Kafka, а затем выполняет фильтрацию и запись в Parquet.Это создает огромные накладные расходы.Чтобы избежать этого, я могу разделить тему Kafka так, чтобы в ней было столько же разделов, сколько и таблиц, и тогда поток чтения должен читать только из данного раздела.Но я не вижу способа указать детали раздела как часть потока чтения Kafka.

1 Ответ

0 голосов
/ 14 октября 2018

если объем данных не так велик, напишите свой собственный приемник, соберите данные каждой микропакета, тогда вы сможете кэшировать этот фрейм данных и записывать их в разные места, хотя вам понадобятся некоторые настройки, но это будет работать

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...