Spring Kafka, ручной коммит в разных темах - PullRequest
0 голосов

Добрый день, коллеги. Я использую Spring Kafka 2.2.5 У меня есть слушатель:

    @KafkaListener(topics = "${kafka.execution-task-topic}", containerFactory = "executionTaskObjectContainerFactory")
    public void protocolEventsHandle(ExecutionTask executionTask,
                                     Acknowledgment acknowledgment,
                                     @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                                     @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                                     @Header(KafkaHeaders.OFFSET) long offset) {

        ResponseEntity<String> stringResponseEntity = airflowRestRunner.startDag(executionTask);
        JSONObject body = new JSONObject(stringResponseEntity.getBody());
        String message = body.getString("message");
        String runId = messageParser.getRunId(message);
        ExecutionTaskMessageInfo messageInfo = new ExecutionTaskMessageInfo(offset, partition, false, acknowledgment);
        kafkaAcknowledgeObject.putMessageInfo(messageInfo, partition);

        this.executorService.submit(kafkaAlertProducer.produceMessageAfterTaskSuccess(runId, executionTask, messageInfo));

    }

Я делаю некоторые операции, и если они успешны, я использую интерфейс Acknowledge для фиксации смещения.

У меня проблема. Пока вычисления создаются в созданном потоке, слушатель снова читает сообщение с того же смещения. Из-за этого при попытке подтверждения смещения происходит сбой приложения.

Какая лучшая практика для работы с Кафкой в ​​параллельном режиме? Я мог получить до 10 сообщений параллельно, и мне нужно зафиксировать их только после вычислений.

Update1

Я храню все свои сообщения от Кафки в: ключ - номер раздела значение - специальный класс модели, содержащий ссылку на необходимое подтверждение

@Data
@NoArgsConstructor
@AllArgsConstructor
public abstract class KafkaAcknowledgeObject<T extends Comparable> {

    protected ConcurrentHashMap<Integer, TreeSet<T>> hashMap = new ConcurrentHashMap<>();

    public abstract void doAck();

    public void putMessageInfo(T message, int partition){
        if (hashMap.containsKey(partition)) {
            hashMap.get(partition).add(message);
        } else {
            TreeSet<T> messageInfos = new TreeSet<>();
            messageInfos.add(message);
            hashMap.put(partition, messageInfos);
        }
    }

}

После вычислений я вызываю doAck (), например

    @Override
    public void doAck() {

        for (TreeSet<ExecutionTaskMessageInfo> messageInfoTreeSet : super.hashMap.values()) {
            checkHandledOffsets(messageInfoTreeSet);
        }
    }

    private void checkHandledOffsets(TreeSet<ExecutionTaskMessageInfo> messageInfoTreeSet) {
        ExecutionTaskMessageInfo first = getFirstMessageInfo(messageInfoTreeSet);
        if (first.isCompleted()) {
            first.getAcknowledgment().acknowledge();
            messageInfoTreeSet.remove(first);
            checkHandledOffsets(messageInfoTreeSet);
        }
        return;
    }

    private ExecutionTaskMessageInfo getFirstMessageInfo(TreeSet<ExecutionTaskMessageInfo> messageInfoTreeSet) {
        Iterator<ExecutionTaskMessageInfo> iterator = messageInfoTreeSet.iterator();
        return iterator.next();
    }

1 Ответ

1 голос
/ 17 мая 2019

То, что вы делаете, должно быть хорошо; Я только что проверил аналогичное устройство, и оно прекрасно работает для меня ...

@SpringBootApplication
public class So56190029Application {

    public static void main(String[] args) {
        SpringApplication.run(So56190029Application.class, args);
    }

    private final ExecutorService exec = Executors.newSingleThreadExecutor();

    private final AtomicInteger count = new AtomicInteger();

    @KafkaListener(id = "so56190029", topics = "so56190029")
    public void listen(String in, Acknowledgment ack) {
        this.exec.execute(runner(in, ack));
    }

    private Runnable runner(String payload, Acknowledgment ack) {
        return () -> {
            System.out.println(payload);
            if (this.count.incrementAndGet() % 3 == 0) {
                System.out.println("acking");
                ack.acknowledge();
            }
        };
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<?, String> template) {
        return args -> IntStream.range(0, 6).forEach(i -> template.send("so56190029", "foo" + i));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
            ConsumerFactory<Object, Object> kafkaConsumerFactory) {

        ConcurrentKafkaListenerContainerFactory<Object, Object> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        configurer.configure(factory, kafkaConsumerFactory);
        factory.getContainerProperties().setCommitLogLevel(Level.INFO);
        return factory;
    }

}

и

spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.properties.max.poll.records=3
spring.kafka.listener.ack-mode=MANUAL

и

foo0
foo1
foo2
acking
foo3
foo4
foo5
acking
2019-05-17 14:46:28.790  INFO 62429 --- [o56190029-0-C-1] essageListenerContainer$ListenerConsumer 
    : Committing: {so56190029-0=OffsetAndMetadata{offset=36, leaderEpoch=null, metadata=''}}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...