Этот вопрос задавался и решался пару раз, но из-за моих очень ограниченных знаний я не могу найти ответ на свою проблему.
У меня есть продюсер, который отправляет рабочее задание потребителю, выполнение задачи занимает около двух часов. Мне нужно, чтобы задание было выполнено только один раз, однако оно заканчивается, затем запускается снова и снова
Самым полезным, что я нашел в своих журналах, является
2018-05-15 15:18:23.731 WARN 6888 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=anonymous.1ae85859-db41-4dc2-a7e2-ab4268256e00] Synchronous auto-commit of offsets {consumer-message-0=OffsetAndMetadata{offset=34, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
Что заставляет меня думать, что простое завершение задачи потребителя в потоке решит проблему, но это не так.
Код от моего потребителя
@Component
@Slf4j
public class KafkaConsumer {
private final CommandRunnerService commandRunnerService;
public KafkaConsumer(CommandRunnerService commandRunnerService) {
this.commandRunnerService = commandRunnerService;
}
@StreamListener(KafkaStreams.INPUT)
public void handleWorkUnit(@Payload Steak steak) {
commandRunnerService.executeCreateSteak(steak);
}
}
Это handleWorkUnit
, который занимает несколько часов. Поэтому моя попытка исправить была
@StreamListener(KafkaStreams.INPUT)
public void handleWorkUnit(@Payload Steak steak) {
Runnable task = () -> commandRunnerService.executeCreateSteak(steak);
task.run();
}
Что не имеет значения.
Я использую готовые конфигурации только с теми базовыми настройками, которые установлены для потребителя
spring:
application:
cloud:
stream:
kafka:
binder:
brokers: 192.168.0.100
bindings:
consumer-message:
destination: consumer-message
contentType: application/json
consumer-response:
destination: consumer-response
contentType: application/json
И версии вещей, которые я использую:
ext {
springCloudVersion = 'Finchley.RC1'
}
dependencies {
compile('org.springframework.cloud:spring-cloud-stream')
compile('org.springframework.cloud:spring-cloud-stream-binder-kafka')
compile('org.springframework.kafka:spring-kafka')
}
Как уже упоминалось выше, я видел много сложных примеров как в документации, так и в SO, но я надеюсь на простое исправление конфигурации? Или еще несколько примеров для начинающих.
Приветствия,