Как создать поток с несколькими выходами из одного потока ввода с помощью связывателя потоков Spring Cloud Kafka? - PullRequest
0 голосов
/ 09 июля 2020

Я пытаюсь создать несколько потоков вывода (в зависимости от временного окна) из одного потока ввода.

interface AnalyticsBinding {
        String PAGE_VIEWS_IN = "pvin";
        String PAGE_VIEWS _COUNTS_OUT_Last_5_Minutes = "pvcout_last_5_minutes";
        String PAGE_VIEWS _COUNTS_OUT_Last_30_Minutes = "pvcout_last_30_minutes";
        @Input(PAGE_VIEWS_IN)
        KStream<String, PageViewEvent> pageViewsIn();
        @Output(PAGE_VIEWS_COUNTS_OUT_Last_5_Minutes)
        KStream<String,Long> pageViewsCountOutLast5Minutes();
        @Output(PAGE_VIEWS_COUNTS_OUT_Last_30_Minutes)
        KStream<String,Long> pageViewsCountOutLast30Minutes();
    }

  @StreamListener
  @SendTo({ AnalyticsBinding.PAGE_VIEWS_COUNTS_OUT_Last_5_Minutes })
    public KStream<String, Long> processPageViewEventForLast5Mintues(
            @Input(AnalyticsBinding.PAGE_VIEWS_IN)KStream<String, PageViewEvent> stream) {
                  // aggregate by Duration.ofMinutes(5)
    }


  @StreamListener
  @SendTo({ AnalyticsBinding.PAGE_VIEWS_COUNTS_OUT_Last_30_Minutes })
    public KStream<String, Long> processPageViewEventForLast30Mintues(
            @Input(AnalyticsBinding.PAGE_VIEWS_IN)KStream<String, PageViewEvent> stream) {
                  // aggregate by Duration.ofMinutes(30)
}

Когда я запускаю приложение, будет работать только одна задача потока. Есть ли способ получить оба processPageViewEventForLast5Mintues и processPageViewEventForLast30Mintues работают одновременно

1 Ответ

0 голосов
/ 09 июля 2020

Вы используете одну и ту же привязку ввода в обоих процессорах, и поэтому вы видите, что только один работает. Добавьте еще одну привязку ввода в интерфейс привязки и установите ее назначение на тот же topi c. Кроме того, измените один из методов StreamListener, чтобы использовать это новое имя привязки.

С учетом сказанного, если вы используете последние версии Spring Cloud Stream, вам следует рассмотреть возможность перехода на функциональную модель. Например, должно работать следующее:

@Bean
public Function<KStream<String, PageViewEvent>, KStream<String, Long>> processPageViewEventForLast5Mintues() {
...
}

и

@Bean
public Function<KStream<String, PageViewEvent>, KStream<String, Long>> processPageViewEventForLast30Mintues() {
...
}

В этом случае связыватель автоматически создает две отдельные привязки ввода. Для этих привязок можно указать места назначения.

spring.cloud.stream.bindings.processPageViewEventForLast5Mintues-in-0.destination=<your Kafka topic>
spring.cloud.stream.bindings.processPageViewEventForLast30Mintues-in-0.destination=<your Kafka topic>
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...