Сообщения опроса Kafka, только когда я использую опрос 5000ms - PullRequest
0 голосов

У меня есть приложение с блоками кода Spring-Kafka и Native Kafka.

краткая логика: My Spring Kafka Listener получает сообщения, запускает поток воздуха в отдельном потоке, код ожидает завершения задачи потока воздуха, затемпроверяет сообщение из второй темы и отправляет сообщение о результате в какую-либо систему.

Когда я использую любое значение вместо 5000 мс, мои потребители не могут назначить этот раздел.Любые значения выше 5000 мс работают правильно.Мой инженер devops не дает мне никакой информации о conf кроме версии сервера kafka.

мой код Native Kafka

  private List<ExecutionTaskResultMessageInfo> getAllTransformationStatusEvents(Consumer<String, ExecutionTaskResult> consumer, String topic) {
        synchronized (this.consumer) {
            if (!isInited.get()) {
                consumer.subscribe(Collections.singletonList(topic));
                consumer.poll(Duration.ofMillis(0));
                this.isInited.set(true);
            }
            List<ExecutionTaskResultMessageInfo> events = new ArrayList<>();
            try {
                ConsumerRecords<String, ExecutionTaskResult> records = ConsumerRecords.empty();
                boolean empty = false;
                LOG.info("Starting to poll messages from transformation.status topic");
                while (!empty) {
                    try {
                        records = consumer.poll(Duration.ofMillis(kafkaConf.getPollTimeout()));
                    } catch (SerializationException e) {
                        LOG.error(e.getMessage());
                    }
                    if (!records.isEmpty()) {
                        for (ConsumerRecord<String, ExecutionTaskResult> record : records) {
                            ExecutionTaskResult executionTaskResult = record.value();
                            ExecutionTaskResultMessageInfo executionTaskResultMessageInfo;
                            if (isExecutionTaskResultEmpty(executionTaskResult)) {
                                executionTaskResultMessageInfo = new ExecutionTaskResultMessageInfo(executionTaskResult, record.offset(), record.partition(), true);
                                CustomEndPoint.Feature.incrementMapStatusErrorCount();
                                CustomEndPoint.Feature.addTransErrOffset(record.partition(), record.offset());
                            } else {
                                executionTaskResultMessageInfo = new ExecutionTaskResultMessageInfo(executionTaskResult, record.offset(), record.partition(), false);
                            }
                            int partition = executionTaskResultMessageInfo.getPartition();
                            kafkaNativeAcknowledgeObject.putMessageInfo(executionTaskResultMessageInfo, partition);
                            events.add(executionTaskResultMessageInfo);
                        }
                    } else {
                        empty = true;
                    }
                }
            } catch (Exception e) {
                LOG.info(e.toString());
            }
            LOG.info("Events from transformations.status topic: {}", events);
            LOG.info("Events collection size: {}", events.size());
            return events;
        }
    }

Spring Kafka

 @KafkaListener(topics = "${kafka.execution-task-topic}", containerFactory = "executionTaskObjectContainerFactory")
    public void protocolEventsHandle(ExecutionTask executionTask,
                                     Acknowledgment acknowledgment,
                                     @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                                     @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                                     @Header(KafkaHeaders.OFFSET) long offset) {

        ExecutionTaskMessageInfo messageInfo;
        if (isExecutionTaskEmpty(executionTask)) {
            LOG.info("Getting new empty message from offset: {} from partition: {} and topic: {}", offset, partition, topic);
            messageInfo = new ExecutionTaskMessageInfo(offset, partition, true, acknowledgment);
            kafkaAcknowledgeObject.putMessageInfo(messageInfo, partition);
            CustomEndPoint.Feature.incrementBDCPErrorCount();
            CustomEndPoint.Feature.addBDCPErrOffset(partition, offset);
        } else {
            LOG.info("Getting new message offset: {} from partition: {} and topic: {}", offset, partition, topic);
            try {
                ResponseEntity<String> stringResponseEntity = airflowRestRunner.startDag(executionTask);
                String runId = getRunId(stringResponseEntity);
                messageInfo = new ExecutionTaskMessageInfo(offset, partition, false, acknowledgment);
                kafkaAcknowledgeObject.putMessageInfo(messageInfo, partition);
                this.executorService.submit(kafkaAlertProducer.produceMessageAfterTaskSuccess(runId, executionTask, messageInfo));
            } catch (Exception e) {
                LOG.info(e.toString());
                messageInfo = new ExecutionTaskMessageInfo(offset, partition, true, acknowledgment);
                kafkaAcknowledgeObject.putMessageInfo(messageInfo, partition);
            }
        }
    }

Server kafkaверсия 1.0.0 приложение кафка версия 2.0.1

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...