Kafka Spark Streaming: как создавать и использовать запись в последней партии - PullRequest
0 голосов
/ 09 июля 2020

У меня есть задание Kafka Spark Job, которое представляет собой ежедневное пакетное задание, которое последовательно считывает файлы деталей HDFS из местоположения has oop и последовательно создает их. последняя запись в пакете и создание нового фрейма данных (df2) поверх него, а остальные записи переходят в другой фрейм данных (df1).

Сначала я создаю df1, а затем создаю df2, так как я хочу потребитель (который является потоковым заданием) для использования df2 в последнем пакете среди всех пакетов, отправленных от производителя. Но этого не происходит. На стороне потребителя запись в df2 не используется в последней партии, вместо этого она входит во вторую или третью последнюю партию, но не в последнюю партию.

Ниже приведен пример кода:

class XYZ{

create new Kafka Instance
.
                    ABC.getInstance(topic,<required Kafka Parameters>);
.
.
f(!(df2.toJavaRDD().isEmpty()) && !(df1.toJavaRDD().isEmpty())) {
                    calling produceDF method(df1, <required Kafka Parameters>);
                    calling produceDF method(df2, <required Kafka Parameters>);
                } 


    public void produceDF(Dataset<Row> df, Broadcast<ABC> abc>){
        df.foreachPartition(partitionOfRecords ->{
                    final ABC kafkaProducer = abc.value();
            while (partitionOfRecords.hasNext()) {
                Row row = partitionOfRecords.next();
                kafkaProducer.produceDF(<key>, <value>);
            }
            kafkaProducer.flush();
        });
}
}


class ABC implements Serializable{
        private transient KafkaProducer<String,String> kafkaProducer = null;
                private static ABC abcInstance;

                public static ABC getInstance(String topic, Map<String,Object> kafkaParams) {
        if(abcInstance==null){
            synchronized (ABC.class){
                if(abcInstance==null){
                    abcInstance = new ABC(topic,kafkaParams);
                }
            }
        }
        return abcInstance;
    }
    
public void produceDF(String key,String val) {
        if( null==kafkaProducer) {
            kafkaProducer = new KafkaProducer<String, String>(<Kafka Parameters>);
            Runtime.getRuntime().addShutdownHook(new Thread() {
                public void run() {
                    kafkaProducer.close();
                }
            });

        }
        if(key!=null) {
    kafkaProducer.send(new ProducerRecord(topic, key, val), new ProducerCallback(key, val));
}
        public void flush(){
        if(null!=kafkaProducer)
            kafkaProducer.flush();
    }
}
}

Посоветуйте, пожалуйста, какой подход мне нужно использовать, чтобы использовать df2 в последней партии

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...