Как контролировать количество записей при записи Spark Dataframe в Kafka Producer с помощью Spark Java - PullRequest
1 голос
/ 27 апреля 2020

У меня есть искровой фрейм данных с двумя столбцами, столбцом 'keyCol' и столбцом 'valCol'. Фрейм данных имеет огромный размер, около 100 миллионов строк. Я хочу записать / произвести кадр данных для kafka topi c в мини-пакетах, т.е. 10000 записей в минуту. Это искровое задание будет выполняться один раз в день, что создает этот фрейм данных

Как реализовать запись в мини-пакетах по 10000 записей в минуту в приведенном ниже коде, или, пожалуйста, предложите, если есть какой-либо лучший / эффективный способ реализовать это .

spark_df.foreachPartition(partitions ->{
            Producer<String, String> producer= new KafkaProducer<String, String>(allKafkaParamsMapObj);
            while (partitions) {
                Row row =  partitions.next();
                producer.send(new ProducerRecord<String, String>("topicName", row.getAs("keyCol"), row.getAs("valCol")), new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                       //Callback code goes here
                    }
                });
            }
            return;
        });

1 Ответ

1 голос
/ 27 апреля 2020

Вы можете использовать функцию grouped(10000), как показано ниже, и выполнять поток ожидания в течение минуты

config.foreachPartition(f => {
      f.grouped(10000).foreach( (roqSeq : Seq[Row]) => { // Run 10000 in batch

        roqSeq.foreach( row => {
          producer.send(new Nothing("topicName", row.getAs("keyCol"), row.getAs("valCol")), new Nothing() {
            def onCompletion(recordMetadata: Nothing, e: Exception): Unit = {
              //Callback code goes here
            }
          })
        })
          Thread.sleep(60000) // Sleep for 1 minute
        }
      )
    })
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...