spring-kafka обрабатывает одно и то же сообщение несколько раз с очень длинными задачами. - PullRequest
0 голосов
/ 15 мая 2018

Этот вопрос задавался и решался пару раз, но из-за моих очень ограниченных знаний я не могу найти ответ на свою проблему.

У меня есть продюсер, который отправляет рабочее задание потребителю, выполнение задачи занимает около двух часов. Мне нужно, чтобы задание было выполнено только один раз, однако оно заканчивается, затем запускается снова и снова

Самым полезным, что я нашел в своих журналах, является

2018-05-15 15:18:23.731  WARN 6888 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=anonymous.1ae85859-db41-4dc2-a7e2-ab4268256e00] Synchronous auto-commit of offsets {consumer-message-0=OffsetAndMetadata{offset=34, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

Что заставляет меня думать, что простое завершение задачи потребителя в потоке решит проблему, но это не так.

Код от моего потребителя

@Component
@Slf4j
public class KafkaConsumer {

    private final CommandRunnerService commandRunnerService;

    public KafkaConsumer(CommandRunnerService commandRunnerService) {
        this.commandRunnerService = commandRunnerService;
    }

    @StreamListener(KafkaStreams.INPUT)
    public void handleWorkUnit(@Payload Steak steak) {
        commandRunnerService.executeCreateSteak(steak);
    }
}

Это handleWorkUnit, который занимает несколько часов. Поэтому моя попытка исправить была

    @StreamListener(KafkaStreams.INPUT)
    public void handleWorkUnit(@Payload Steak steak) {
        Runnable task = () -> commandRunnerService.executeCreateSteak(steak);
        task.run();
    }

Что не имеет значения.

Я использую готовые конфигурации только с теми базовыми настройками, которые установлены для потребителя

spring:
  application:
  cloud:
    stream:
      kafka:
        binder:
          brokers: 192.168.0.100
      bindings:
        consumer-message:
          destination: consumer-message
          contentType: application/json
        consumer-response:
          destination: consumer-response
          contentType: application/json

И версии вещей, которые я использую:

ext {
    springCloudVersion = 'Finchley.RC1'
}

dependencies {
    compile('org.springframework.cloud:spring-cloud-stream')
    compile('org.springframework.cloud:spring-cloud-stream-binder-kafka')
    compile('org.springframework.kafka:spring-kafka')
}

Как уже упоминалось выше, я видел много сложных примеров как в документации, так и в SO, но я надеюсь на простое исправление конфигурации? Или еще несколько примеров для начинающих.

Приветствия,

1 Ответ

0 голосов
/ 15 мая 2018

Пожалуйста, попробуйте исправить ваш код, как показано ниже.

@StreamListener(KafkaStreams.INPUT)
public void handleWorkUnit(@Payload Steak steak) {
    Runnable task = () -> commandRunnerService.executeCreateSteak(steak);
    new Thread(task).start();
}

В вашем коде вы не создали ни одного потока. Ваш код просто вызывает run метод Runnable.

Связанное свойство - max.poll.interval.ms потребителя и его значение по умолчанию 5 минут . Если вы не вызываете метод poll() в течение этого периода, ваш брокер считает, что ваш потребитель потерпел неудачу. Вероятно, в этом причина вашей неудачи (перебалансировать и назначить)

...