Весенний поток облака: динамический выходной канал странное поведение - PullRequest
0 голосов
/ 19 января 2019

Я использую Spring Cloud Stream версии 2.1.0. RELEASE для отправки сообщений (в данном случае в Kafka) на каналы, динамически определяемые на основе полученных входных данных. Проблема в том, что только каждое другое сообщение попадает в правильный канал, а другая половина попадает в канал по умолчанию.

Я использовал этот образец в качестве отправной точки.

Я помещаю канал, который я хочу отправить, в определенный заголовок сообщения, затем использую HeaderValueRouter, чтобы проверить то же значение заголовка, чтобы увидеть, в какой канал выводить.

Я настраиваю свое приложение следующим образом:

@EnableBinding(CloudStreamConfig.DynamicSource.class)
public class CloudStreamConfig {

    @Autowired
    private BinderAwareChannelResolver resolver;

    public static final String CHANNEL_HEADER = "channelHeader";
    public static final String OUTPUT_CHANNEL = "outputChannel";

    private final String defaultChannel = "defaultChannel";

    @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
    @Bean
    public HeaderValueRouter router() {
        HeaderValueRouter router = new HeaderValueRouter(CHANNEL_HEADER);
        router.setDefaultOutputChannelName(defaultChannel);
        router.setChannelResolver(resolver);
        return router;
    }

    public interface DynamicSource {
        @Output(OUTPUT_CHANNEL)
        MessageChannel output();
    }

}

И в моем контроллере я принимаю объект, а также параметр, определяющий, на какой канал его отправлять, затем отправляю его в MessageChannel. Соответствующий код ниже:

    @Autowired
    @Qualifier(CloudStreamConfig.OUTPUT_CHANNEL)
    public MessageChannel localChannel;

    ...

    @GetMapping(path = "/error/{channel}")
    @ResponseStatus(HttpStatus.OK)
    public void error(@PathVariable String channel) {
        // build my object
        Message message = MessageBuilder.createMessage(myObject,
                new MessageHeaders(Collections.singletonMap(CloudStreamConfig.CHANNEL_HEADER, channel)));
        localChannel.send(message);
    }

Если я отправлю 10 сообщений на /error/someChannel, я ожидаю увидеть 10 сообщений в someChannel. Однако я вижу половину сообщений в someChannel, а другую половину в defaultChannel. Я добавил переменную счетчика отладки в мои сообщения, и она отправляет первое сообщение на правильный канал, а затем каждое второе сообщение на правильный канал, в то время как все остальные переходят на канал по умолчанию.

Что может быть причиной и как я могу это исправить? Я неправильно использую свой DynamicSource класс? Я предполагал, что он будет привязан к любой автопроводке MessageChannel с тем же именем (и, похоже, она есть), но мне интересно, есть ли что-то, что я пропускаю. Или есть непреднамеренное взаимодействие с BinderAwareChannelResolver? (Честно говоря, я не знаю, что это делает, я включил его только потому, что образцы делают)

1 Ответ

0 голосов
/ 19 января 2019

На выходном канале есть два подписчика - привязка канала (в связывателе) и ваш маршрутизатор.

Для DirectChannel с алгоритм диспетчеризации по умолчанию - циклический прием, поэтому вы отправляете сообщения поочередномаршрутизатор и непосредственно к связующему.

Вам нужен другой DirectChannel @Bean для активатора службы, чтобы все сообщения отправлялись туда, а затем в связующее после маршрутизации.

См. sourceChannel в этом примере.

...