Добрый день, коллеги. Я использую Spring Kafka 2.2.5
У меня есть слушатель:
@KafkaListener(topics = "${kafka.execution-task-topic}", containerFactory = "executionTaskObjectContainerFactory")
public void protocolEventsHandle(ExecutionTask executionTask,
Acknowledgment acknowledgment,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.OFFSET) long offset) {
ResponseEntity<String> stringResponseEntity = airflowRestRunner.startDag(executionTask);
JSONObject body = new JSONObject(stringResponseEntity.getBody());
String message = body.getString("message");
String runId = messageParser.getRunId(message);
ExecutionTaskMessageInfo messageInfo = new ExecutionTaskMessageInfo(offset, partition, false, acknowledgment);
kafkaAcknowledgeObject.putMessageInfo(messageInfo, partition);
this.executorService.submit(kafkaAlertProducer.produceMessageAfterTaskSuccess(runId, executionTask, messageInfo));
}
Я делаю некоторые операции, и если они успешны, я использую интерфейс Acknowledge для фиксации смещения.
У меня проблема.
Пока вычисления создаются в созданном потоке, слушатель снова читает сообщение с того же смещения. Из-за этого при попытке подтверждения смещения происходит сбой приложения.
Какая лучшая практика для работы с Кафкой в параллельном режиме?
Я мог получить до 10 сообщений параллельно, и мне нужно зафиксировать их только после вычислений.
Update1
Я храню все свои сообщения от Кафки в:
ключ - номер раздела
значение - специальный класс модели, содержащий ссылку на необходимое подтверждение
@Data
@NoArgsConstructor
@AllArgsConstructor
public abstract class KafkaAcknowledgeObject<T extends Comparable> {
protected ConcurrentHashMap<Integer, TreeSet<T>> hashMap = new ConcurrentHashMap<>();
public abstract void doAck();
public void putMessageInfo(T message, int partition){
if (hashMap.containsKey(partition)) {
hashMap.get(partition).add(message);
} else {
TreeSet<T> messageInfos = new TreeSet<>();
messageInfos.add(message);
hashMap.put(partition, messageInfos);
}
}
}
После вычислений я вызываю doAck (), например
@Override
public void doAck() {
for (TreeSet<ExecutionTaskMessageInfo> messageInfoTreeSet : super.hashMap.values()) {
checkHandledOffsets(messageInfoTreeSet);
}
}
private void checkHandledOffsets(TreeSet<ExecutionTaskMessageInfo> messageInfoTreeSet) {
ExecutionTaskMessageInfo first = getFirstMessageInfo(messageInfoTreeSet);
if (first.isCompleted()) {
first.getAcknowledgment().acknowledge();
messageInfoTreeSet.remove(first);
checkHandledOffsets(messageInfoTreeSet);
}
return;
}
private ExecutionTaskMessageInfo getFirstMessageInfo(TreeSet<ExecutionTaskMessageInfo> messageInfoTreeSet) {
Iterator<ExecutionTaskMessageInfo> iterator = messageInfoTreeSet.iterator();
return iterator.next();
}