Я бы хотел помочь выбрать один из двух путей, которыми я могу следовать из тех, кто более опытен с потоками Kafka в JAVA.У меня есть два работающих приложения JAVA, которые могут принимать входящий поток целых чисел и выполнять различные вычисления и задачи, создавая четыре результирующих исходящих потока на разные темы.Фактический расчет / задачи не важен, я обеспокоен двумя возможными методами, которые я мог бы использовать, чтобы определить обработчик, который выполняет математику и любые связанные риски с моим любимым.
Метод 1 использует отдельно определенную функцию типа Итерируемая и возвращает тип Список .
Метод2 использует более распространенный интегральный метод, который помещает функцию в объявление KStream .
Я очень новичок в Kafka Streams и не хочу возглавлятьпо неверному пути.Мне нравится метод 1, потому что код очень читабелен, прост в использовании и позволяет тестировать обработчики в автономном режиме без необходимости вызывать трафик с потоками.
Метод 2 кажется более распространенным, но с ростом сложности код загрязняется в main ().Кроме того, я подключен к тестированию алгоритмов с использованием потокового трафика, что замедляет разработку.
Метод 1: Разделимые обработчики (частичные):
// Take inbound stream from math-input and perform transformations A-D, then write out to 4 streams.
KStream<String, String> source = src_builder.stream("math-input");
source.flatMapValues(value -> transformInput_A(Arrays.asList(value.split("\\W+"))) ).to("math-output-A");
source.flatMapValues(value -> transformInput_B(Arrays.asList(value.split("\\W+"))) ).to("math-output-B");
source.flatMapValues(value -> transformInput_C(Arrays.asList(value.split("\\W+"))) ).to("math-output-C");
source.flatMapValues(value -> transformInput_D(Arrays.asList(value.split("\\W+"))) ).to("math-output-D");
// More code here, removed for brevity.
// Transformation handlers A, B, C, and D.
// ******************************************************************
// Perform data transformation using method A
public static Iterable transformInput_A (List str_array) {
// Imagine some very complex math here using the integer
// values. This could be 50+ lines of code.
for (int i = 0; i < str_array.size(); i++) {
// grab values and perform ops
}
// Return results in string format
return math_results;
}
// End of Transformation Method A
// ******************************************************************
// Imagine similar handlers for methods B, C, and D below.
Метод 2: Внутренние обработчики для объявления KStream (частичное):
// Take inbound stream from math-input and perform transformations A-D, then write out to 4 streams.
KStream<String, String> inputStream = src_builder.stream("math-input");
KStream<String, String> outputStream_A = inputStream.mapValues(new ValueMapper<String, String>() {
@Override
public String apply(String s) {
// Imagine some very complex math here using the integer
// values. This could be 50+ lines of code.
for (int i = 0; i < str_array.length; i++) {
// grab values and perform ops
}
// Return results in Iterbale string format
return math_results;
}
});
// Send the data to the outbound topic A.
outputStream_A.to("math-output-A");
KStream<String, String> outputStream_B ....
// Use ValueMapper in the KStream declaration just like above. 50+ lines of code
outputStream_B.to("math-output-B");
KStream<String, String> outputStream_C ....
// Use ValueMapper in the KStream declaration just like above. 50+ lines of code
outputStream_C.to("math-output-C");
KStream<String, String> outputStream_D ....
// Use ValueMapper in the KStream declaration just like above. 50+ lines of code
outputStream_D.to("math-output-D");
Помимо моего желания держать main () опрятным и вывести сложность из виду, я направляюсь в неправильном направлении с помощью метода 1?