Подключение нескольких исходных потоков к одному и тому же набору ветвей - PullRequest
0 голосов
/ 24 ноября 2018

Это обобщение этого вопроса .

Предположим, у меня есть несколько исходных потоков, для которых применяется один и тот же набор предикатов.Я хотел бы настроить потоки ветвления так, чтобы записи, которые удовлетворяли предикату, независимо от того, какой исходный поток обрабатывался одним и тем же потоком ветвления.Как показано на диаграмме ниже, каждый поток ветвления похож на универсальный процессор, который преобразует входящие записи.

enter image description here

Следующий кодовый блок не работает должным образомпоскольку он создает отдельный набор потоков ветвления для каждого исходного потока.

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source1 = builder.stream("x");
KStream<String, String> source2 = builder.stream("y");

Predicate<String, String>[] branchPredicates = new Predicate[forkCount];
for (int i = 0; i < forkCount; ++i) {
    int idx = i;
    branchPredicates[i] = ((key, value) ->
        key.hashCode() % forkCount == idx);
}

Kstream<String, String>[] forkStreams = Arrays.asList(source1, source2)
    .map(srcStream -> srcStream.branch(branchPredicates)
    .flatMap(x -> Arrays.stream())
    .collect(Collectors.toList());

извините, я в основном разработчик scala :)

В приведенном выше примере, forkStreams.length == branchPredicates.length x 2 и, в общем, пропорционально количеству исходных потоков.Есть ли хитрость в потоке Кафки, которая позволяет мне поддерживать взаимно-однозначное отношение между предикатами и ветвлениями?

ОБНОВЛЕНИЕ 11/27/2018 В этом есть некоторый прогрессЯ могу:

  • Читать из всех исходных тем, используя один исходный поток
  • Подключать исходный поток к нескольким ветвям
  • Распределять сообщения равномерно по количеству ветвей.

Однако, как показывает следующий блок кода, ALL потоки ветвлений существуют в одном потоке.Чего я хотел бы добиться, так это поместить каждый поток ветвления в отдельный поток, чтобы обеспечить лучшую загрузку ЦП

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream(Arrays.asList("a", "b", "c")
// Create workers
// Need to have predicates for the branches
int totalPerdicates = Integer
    .parseInt(props.getProperty(WORKER_PROCESSOR_COUNT));
Predicate<String, String>[] predicates = new Predicate[totalPerdicates];
IntStream
    .range(0, totalPerdicates)
    .forEach(i -> {
        int idx = i;
        predicates[i] = (key, value) ->
            key.hashCode() % totalPerdicates == idx;
    });

forkStreams = Arrays.asList(sourceStreams.branch(predicates));

// Hack- Dump the number of messages processed every 10 seconds
forkStreams
    .forEach(fork -> {
        KStream<Windowed<String>, Long> tbl =
        fork.transformValues(new SourceTopicValueTransformerSupplier())
            .selectKey((key, value) -> "foobar")
            .groupByKey()
            .windowedBy(TimeWindows.of(2000L))
            .count()
            .toStream();

        tbl
            .foreach((key, count) -> {
                String fromTo = String.format("%d-%d",
                                              key.window().start(),
                                              key.window().end());
                System.out.printf("(Thread %d, Index %d) %s - %s: %d\n",
                                  Thread.currentThread().getId(),
                                  forkStreams.indexOf(fork),
                                  fromTo, key.key(), count);
            });

Вот фрагмент вывода

<snip>
(Thread 13, Index 1) 1542132126000-1542132128000 - foobar: 2870
(Thread 13, Index 1) 1542132024000-1542132026000 - foobar: 2955
(Thread 13, Index 1) 1542132106000-1542132108000 - foobar: 1914
(Thread 13, Index 1) 1542132054000-1542132056000 - foobar: 546
<snip>
(Thread 13, Index 2) 1542132070000-1542132072000 - foobar: 524
(Thread 13, Index 2) 1542132012000-1542132014000 - foobar: 2491
(Thread 13, Index 2) 1542132042000-1542132044000 - foobar: 261
(Thread 13, Index 2) 1542132022000-1542132024000 - foobar: 2823
<snip>
(Thread 13, Index 3) 1542132088000-1542132090000 - foobar: 2170
(Thread 13, Index 3) 1542132010000-1542132012000 - foobar: 2962
(Thread 13, Index 3) 1542132008000-1542132010000 - foobar: 2847
(Thread 13, Index 3) 1542132022000-1542132024000 - foobar: 2797
<snip>
(Thread 13, Index 4) 1542132046000-1542132048000 - foobar: 2846
(Thread 13, Index 4) 1542132096000-1542132098000 - foobar: 3216
(Thread 13, Index 4) 1542132108000-1542132110000 - foobar: 2696
(Thread 13, Index 4) 1542132010000-1542132012000 - foobar: 2881
<snip>

Любые предложения относительно того, какдля размещения каждого потока ветвлений в отдельном потоке.

1 Ответ

0 голосов
/ 30 ноября 2018

Обновление от 27.11.2008 ответило на вопрос.При этом решение не работает для меня, так как я хотел, чтобы каждый поток ветвления работал как отдельный поток.Вызов stream.branch() создает несколько подпотоков в одном и том же пространстве потока.Таким образом, все записи в разделе обрабатываются в одном и том же пространстве потоков.

Чтобы добиться обработки в подразделах, я использовал клиентский API kafka в сочетании с потоками Java и параллельными очередями.

...