У меня есть сценарий использования, где мы создаем несколько потоков из Kafka DStream. Я хотел бы зафиксировать смещения только после успешной обработки обоих потоков. Возможно ли это?
Текущая стратегия:
1) create dstream one.
2) create dstream two.
3) process two streams in parallel by creating threads.
4) wait for all therads to complete using countdown latch.
5) finally commit all offsets.
Но в приведенной выше стратегии одна проблема состоит в том, как отследить смещения для записей, которые не удалось обработать полностью.
JavaInputDStream<ConsumerRecord<String, String>> telemetryStream = KafkaUtils.createDirectStream(
streamingContext, LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(topics, kafkaParams));
JavaDStream<String> telemetryDStream = telemetryStream.map(record -> {
return record.value();
});
telemetryDStream.cache();
CountDownLatch latch = new CountDownLatch(2);
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
try {
//processing logic here
} finally {
latch.countDown();
}
}
});
t1.start();
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
try {
//processing logic here
} finally {
latch.countDown();
}
}
});
t2.start();
latch.await();
//now commit offsets here
Есть ли лучший способ справиться с этим.