Мое приложение успешно отправляет сообщения Kafka, но только после инициализации Kafka.Перед этим я получаю сообщение об ошибке «У диспетчера нет подписчиков».Как мне дождаться окончания регистрации подписчиков на каналы?
Вот след последовательности событий (хронирование в секундах):
- 17.165 Создан SenderClass
- 17.816 класс инициализации, @PostConstruct запускает PollingTask
- 24.781 PollingTask отправляет первое сообщение Kafka
- 24.816 Первая ошибка: «Диспетчер не имеет подписчиков»
- 25.778 Регистрация MessageChannel my-канал
- все еще видит ошибки диспетчера
- 27.067 Канал my-channel 'имеет 1 подписчика
- После этого больше нет ошибок, сообщения отправляются нормально
Я не уверен, как подойти к этому.В число необдуманных догадок входят:
- Поместите отправляющий код в @ PostConstruct
- Добавьте @AutoConfigureBefore (BindingServiceConfiguration.class) в Sender
- Добавьте @AutoConfigureAfter (BindingServiceConfiguration.class)в SenderClass
- Добавьте @AutoConfigureBefore (BindingServiceConfiguration.class) в Main
- Поместите @DependsOn ({"EnableBindingClass"}) в задачу
- Поместите @DependsOn ({"ApplicationLifeCycle"}) в SchedulerClass, где ApplicationLifeCycle - это класс, который ничего не делает, но реализует SmartLifecycle с getPhase, возвращая MAX_INT
- Убедитесь, что ComponentScan включен для всего пакета (предложение из других потоков SO)
- Различные комбинации
Создано новое приложение, максимально простое:
public interface Source {
@Output(channelName)
MessageChannel outboundChannel();
}
@EnableBinding(Source.class)
@Component
public class Sender {
@Autowired
private Source source;
public boolean send(SomeObject object) {
return source.outboundChannel().send(MessageBuilder.withPayload(object).build());
}
@Service
public class Scheduler {
@Autowired
Sender sender;
@Autowired
ThreadPoolTaskScheduler taskScheduler;
@PostConstruct
public void initialize() {
taskScheduler.schedule(new PollingTask(), nextTime);
}
private class PollingTask implements Runnable {
@Override
public void run() {
List<SomeObject> objects = getDummyData();
for(SomeObject object : objects)
{
sender.send(interval);
}
Instant nextTime = Instant.now().plusMillis(1_000L);
try {
taskScheduler.schedule(new PollingTask(), nextTime);
} catch (Exception e) {
logger.error(e);
}
}
}
Редактировать, чтобы добавить решение
Теперь работает!В моем планировщике, который запускает вещи, которые отправляют сообщения, я переключился с запуска вещей в @PostConstruct на SmartLifecycle :: start ().
@Service
public class Scheduler implements SmartLifecycle {
@Autowired
Sender sender;
@Autowired
ThreadPoolTaskScheduler taskScheduler;
@Override
public void start() {
taskScheduler.schedule(new PollingTask(), nextTime);
}
private class PollingTask implements Runnable {
@Override
public void run() {
List<SomeObject> objects = getDummyData();
for(SomeObject object : objects)
{
sender.send(interval);
}
Instant nextTime = Instant.now().plusMillis(1_000L);
try {
taskScheduler.schedule(new PollingTask(), nextTime);
} catch (Exception e) {
logger.error(e);
}
}
}