Spring Cloud Stream - первые сообщения Kafka получают ошибку «У диспетчера нет подписчиков» - PullRequest
0 голосов
/ 09 июля 2019

Мое приложение успешно отправляет сообщения Kafka, но только после инициализации Kafka.Перед этим я получаю сообщение об ошибке «У диспетчера нет подписчиков».Как мне дождаться окончания регистрации подписчиков на каналы?

Вот след последовательности событий (хронирование в секундах):

  • 17.165 Создан SenderClass
  • 17.816 класс инициализации, @PostConstruct запускает PollingTask
  • 24.781 PollingTask отправляет первое сообщение Kafka
  • 24.816 Первая ошибка: «Диспетчер не имеет подписчиков»
  • 25.778 Регистрация MessageChannel my-канал
  • все еще видит ошибки диспетчера
  • 27.067 Канал my-channel 'имеет 1 подписчика
  • После этого больше нет ошибок, сообщения отправляются нормально

Я не уверен, как подойти к этому.В число необдуманных догадок входят:

  1. Поместите отправляющий код в @ PostConstruct
  2. Добавьте @AutoConfigureBefore (BindingServiceConfiguration.class) в Sender
  3. Добавьте @AutoConfigureAfter (BindingServiceConfiguration.class)в SenderClass
  4. Добавьте @AutoConfigureBefore (BindingServiceConfiguration.class) в Main
  5. Поместите @DependsOn ({"EnableBindingClass"}) в задачу
  6. Поместите @DependsOn ({"ApplicationLifeCycle"}) в SchedulerClass, где ApplicationLifeCycle - это класс, который ничего не делает, но реализует SmartLifecycle с getPhase, возвращая MAX_INT
  7. Убедитесь, что ComponentScan включен для всего пакета (предложение из других потоков SO)
  8. Различные комбинации

Создано новое приложение, максимально простое:

public interface Source {
  @Output(channelName)
  MessageChannel outboundChannel();
}
@EnableBinding(Source.class)
@Component
public class Sender {
  @Autowired
  private Source source;

  public boolean send(SomeObject object) {
    return source.outboundChannel().send(MessageBuilder.withPayload(object).build());
  }
@Service
public class Scheduler {
  @Autowired
  Sender sender;
  @Autowired
  ThreadPoolTaskScheduler taskScheduler;

  @PostConstruct
  public void initialize() {
    taskScheduler.schedule(new PollingTask(), nextTime);
  }

  private class PollingTask implements Runnable {
    @Override
    public void run() {
      List<SomeObject> objects = getDummyData();
      for(SomeObject object : objects)
      {
        sender.send(interval);
      }

      Instant nextTime = Instant.now().plusMillis(1_000L);
      try {
        taskScheduler.schedule(new PollingTask(), nextTime);
      } catch (Exception e) {
        logger.error(e);
      }
    }
}

Редактировать, чтобы добавить решение

Теперь работает!В моем планировщике, который запускает вещи, которые отправляют сообщения, я переключился с запуска вещей в @PostConstruct на SmartLifecycle :: start ().

@Service
public class Scheduler implements SmartLifecycle {
  @Autowired
  Sender sender;
  @Autowired
  ThreadPoolTaskScheduler taskScheduler;

  @Override
  public void start() {
    taskScheduler.schedule(new PollingTask(), nextTime);
  }

  private class PollingTask implements Runnable {
    @Override
    public void run() {
      List<SomeObject> objects = getDummyData();
      for(SomeObject object : objects)
      {
        sender.send(interval);
      }

      Instant nextTime = Instant.now().plusMillis(1_000L);
      try {
        taskScheduler.schedule(new PollingTask(), nextTime);
      } catch (Exception e) {
        logger.error(e);
      }
    }
}

1 Ответ

0 голосов
/ 09 июля 2019

@ PostConstruct слишком рано отправлять сообщения; контекст все еще создается. Реализуйте цикл SmartLifecycle, поместите компонент в высокую фазу (Integer.MAX_VALUE) и выполните отправку в start ().

Или отправьте в ApplicationRunner.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...