У меня есть задание Kafka Spark Job, которое представляет собой ежедневное пакетное задание, которое последовательно считывает файлы деталей HDFS из местоположения has oop и последовательно создает их. последняя запись в пакете и создание нового фрейма данных (df2) поверх него, а остальные записи переходят в другой фрейм данных (df1).
Сначала я создаю df1, а затем создаю df2, так как я хочу потребитель (который является потоковым заданием) для использования df2 в последнем пакете среди всех пакетов, отправленных от производителя. Но этого не происходит. На стороне потребителя запись в df2 не используется в последней партии, вместо этого она входит во вторую или третью последнюю партию, но не в последнюю партию.
Ниже приведен пример кода:
class XYZ{
create new Kafka Instance
.
ABC.getInstance(topic,<required Kafka Parameters>);
.
.
f(!(df2.toJavaRDD().isEmpty()) && !(df1.toJavaRDD().isEmpty())) {
calling produceDF method(df1, <required Kafka Parameters>);
calling produceDF method(df2, <required Kafka Parameters>);
}
public void produceDF(Dataset<Row> df, Broadcast<ABC> abc>){
df.foreachPartition(partitionOfRecords ->{
final ABC kafkaProducer = abc.value();
while (partitionOfRecords.hasNext()) {
Row row = partitionOfRecords.next();
kafkaProducer.produceDF(<key>, <value>);
}
kafkaProducer.flush();
});
}
}
class ABC implements Serializable{
private transient KafkaProducer<String,String> kafkaProducer = null;
private static ABC abcInstance;
public static ABC getInstance(String topic, Map<String,Object> kafkaParams) {
if(abcInstance==null){
synchronized (ABC.class){
if(abcInstance==null){
abcInstance = new ABC(topic,kafkaParams);
}
}
}
return abcInstance;
}
public void produceDF(String key,String val) {
if( null==kafkaProducer) {
kafkaProducer = new KafkaProducer<String, String>(<Kafka Parameters>);
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
kafkaProducer.close();
}
});
}
if(key!=null) {
kafkaProducer.send(new ProducerRecord(topic, key, val), new ProducerCallback(key, val));
}
public void flush(){
if(null!=kafkaProducer)
kafkaProducer.flush();
}
}
}
Посоветуйте, пожалуйста, какой подход мне нужно использовать, чтобы использовать df2 в последней партии