Это обобщение этого вопроса .
Предположим, у меня есть несколько исходных потоков, для которых применяется один и тот же набор предикатов.Я хотел бы настроить потоки ветвления так, чтобы записи, которые удовлетворяли предикату, независимо от того, какой исходный поток обрабатывался одним и тем же потоком ветвления.Как показано на диаграмме ниже, каждый поток ветвления похож на универсальный процессор, который преобразует входящие записи.
Следующий кодовый блок не работает должным образомпоскольку он создает отдельный набор потоков ветвления для каждого исходного потока.
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source1 = builder.stream("x");
KStream<String, String> source2 = builder.stream("y");
Predicate<String, String>[] branchPredicates = new Predicate[forkCount];
for (int i = 0; i < forkCount; ++i) {
int idx = i;
branchPredicates[i] = ((key, value) ->
key.hashCode() % forkCount == idx);
}
Kstream<String, String>[] forkStreams = Arrays.asList(source1, source2)
.map(srcStream -> srcStream.branch(branchPredicates)
.flatMap(x -> Arrays.stream())
.collect(Collectors.toList());
извините, я в основном разработчик scala :)
В приведенном выше примере, forkStreams.length == branchPredicates.length x 2 и, в общем, пропорционально количеству исходных потоков.Есть ли хитрость в потоке Кафки, которая позволяет мне поддерживать взаимно-однозначное отношение между предикатами и ветвлениями?
ОБНОВЛЕНИЕ 11/27/2018 В этом есть некоторый прогрессЯ могу:
- Читать из всех исходных тем, используя один исходный поток
- Подключать исходный поток к нескольким ветвям
- Распределять сообщения равномерно по количеству ветвей.
Однако, как показывает следующий блок кода, ALL потоки ветвлений существуют в одном потоке.Чего я хотел бы добиться, так это поместить каждый поток ветвления в отдельный поток, чтобы обеспечить лучшую загрузку ЦП
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream(Arrays.asList("a", "b", "c")
// Create workers
// Need to have predicates for the branches
int totalPerdicates = Integer
.parseInt(props.getProperty(WORKER_PROCESSOR_COUNT));
Predicate<String, String>[] predicates = new Predicate[totalPerdicates];
IntStream
.range(0, totalPerdicates)
.forEach(i -> {
int idx = i;
predicates[i] = (key, value) ->
key.hashCode() % totalPerdicates == idx;
});
forkStreams = Arrays.asList(sourceStreams.branch(predicates));
// Hack- Dump the number of messages processed every 10 seconds
forkStreams
.forEach(fork -> {
KStream<Windowed<String>, Long> tbl =
fork.transformValues(new SourceTopicValueTransformerSupplier())
.selectKey((key, value) -> "foobar")
.groupByKey()
.windowedBy(TimeWindows.of(2000L))
.count()
.toStream();
tbl
.foreach((key, count) -> {
String fromTo = String.format("%d-%d",
key.window().start(),
key.window().end());
System.out.printf("(Thread %d, Index %d) %s - %s: %d\n",
Thread.currentThread().getId(),
forkStreams.indexOf(fork),
fromTo, key.key(), count);
});
Вот фрагмент вывода
<snip>
(Thread 13, Index 1) 1542132126000-1542132128000 - foobar: 2870
(Thread 13, Index 1) 1542132024000-1542132026000 - foobar: 2955
(Thread 13, Index 1) 1542132106000-1542132108000 - foobar: 1914
(Thread 13, Index 1) 1542132054000-1542132056000 - foobar: 546
<snip>
(Thread 13, Index 2) 1542132070000-1542132072000 - foobar: 524
(Thread 13, Index 2) 1542132012000-1542132014000 - foobar: 2491
(Thread 13, Index 2) 1542132042000-1542132044000 - foobar: 261
(Thread 13, Index 2) 1542132022000-1542132024000 - foobar: 2823
<snip>
(Thread 13, Index 3) 1542132088000-1542132090000 - foobar: 2170
(Thread 13, Index 3) 1542132010000-1542132012000 - foobar: 2962
(Thread 13, Index 3) 1542132008000-1542132010000 - foobar: 2847
(Thread 13, Index 3) 1542132022000-1542132024000 - foobar: 2797
<snip>
(Thread 13, Index 4) 1542132046000-1542132048000 - foobar: 2846
(Thread 13, Index 4) 1542132096000-1542132098000 - foobar: 3216
(Thread 13, Index 4) 1542132108000-1542132110000 - foobar: 2696
(Thread 13, Index 4) 1542132010000-1542132012000 - foobar: 2881
<snip>
Любые предложения относительно того, какдля размещения каждого потока ветвлений в отдельном потоке.