У меня есть приложение с блоками кода Spring-Kafka и Native Kafka.
краткая логика: My Spring Kafka Listener получает сообщения, запускает поток воздуха в отдельном потоке, код ожидает завершения задачи потока воздуха, затемпроверяет сообщение из второй темы и отправляет сообщение о результате в какую-либо систему.
Когда я использую любое значение вместо 5000 мс, мои потребители не могут назначить этот раздел.Любые значения выше 5000 мс работают правильно.Мой инженер devops не дает мне никакой информации о conf кроме версии сервера kafka.
мой код Native Kafka
private List<ExecutionTaskResultMessageInfo> getAllTransformationStatusEvents(Consumer<String, ExecutionTaskResult> consumer, String topic) {
synchronized (this.consumer) {
if (!isInited.get()) {
consumer.subscribe(Collections.singletonList(topic));
consumer.poll(Duration.ofMillis(0));
this.isInited.set(true);
}
List<ExecutionTaskResultMessageInfo> events = new ArrayList<>();
try {
ConsumerRecords<String, ExecutionTaskResult> records = ConsumerRecords.empty();
boolean empty = false;
LOG.info("Starting to poll messages from transformation.status topic");
while (!empty) {
try {
records = consumer.poll(Duration.ofMillis(kafkaConf.getPollTimeout()));
} catch (SerializationException e) {
LOG.error(e.getMessage());
}
if (!records.isEmpty()) {
for (ConsumerRecord<String, ExecutionTaskResult> record : records) {
ExecutionTaskResult executionTaskResult = record.value();
ExecutionTaskResultMessageInfo executionTaskResultMessageInfo;
if (isExecutionTaskResultEmpty(executionTaskResult)) {
executionTaskResultMessageInfo = new ExecutionTaskResultMessageInfo(executionTaskResult, record.offset(), record.partition(), true);
CustomEndPoint.Feature.incrementMapStatusErrorCount();
CustomEndPoint.Feature.addTransErrOffset(record.partition(), record.offset());
} else {
executionTaskResultMessageInfo = new ExecutionTaskResultMessageInfo(executionTaskResult, record.offset(), record.partition(), false);
}
int partition = executionTaskResultMessageInfo.getPartition();
kafkaNativeAcknowledgeObject.putMessageInfo(executionTaskResultMessageInfo, partition);
events.add(executionTaskResultMessageInfo);
}
} else {
empty = true;
}
}
} catch (Exception e) {
LOG.info(e.toString());
}
LOG.info("Events from transformations.status topic: {}", events);
LOG.info("Events collection size: {}", events.size());
return events;
}
}
Spring Kafka
@KafkaListener(topics = "${kafka.execution-task-topic}", containerFactory = "executionTaskObjectContainerFactory")
public void protocolEventsHandle(ExecutionTask executionTask,
Acknowledgment acknowledgment,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.OFFSET) long offset) {
ExecutionTaskMessageInfo messageInfo;
if (isExecutionTaskEmpty(executionTask)) {
LOG.info("Getting new empty message from offset: {} from partition: {} and topic: {}", offset, partition, topic);
messageInfo = new ExecutionTaskMessageInfo(offset, partition, true, acknowledgment);
kafkaAcknowledgeObject.putMessageInfo(messageInfo, partition);
CustomEndPoint.Feature.incrementBDCPErrorCount();
CustomEndPoint.Feature.addBDCPErrOffset(partition, offset);
} else {
LOG.info("Getting new message offset: {} from partition: {} and topic: {}", offset, partition, topic);
try {
ResponseEntity<String> stringResponseEntity = airflowRestRunner.startDag(executionTask);
String runId = getRunId(stringResponseEntity);
messageInfo = new ExecutionTaskMessageInfo(offset, partition, false, acknowledgment);
kafkaAcknowledgeObject.putMessageInfo(messageInfo, partition);
this.executorService.submit(kafkaAlertProducer.produceMessageAfterTaskSuccess(runId, executionTask, messageInfo));
} catch (Exception e) {
LOG.info(e.toString());
messageInfo = new ExecutionTaskMessageInfo(offset, partition, true, acknowledgment);
kafkaAcknowledgeObject.putMessageInfo(messageInfo, partition);
}
}
}
Server kafkaверсия 1.0.0 приложение кафка версия 2.0.1