Как обработать KStream в пакете максимального размера или откатить до временного окна? - PullRequest
0 голосов
/ 05 декабря 2018

Я хотел бы создать приложение на основе потока Kafka, которое обрабатывает тему и принимает сообщения партиями размера X (т. Е. 50), но если поток имеет низкий поток, чтобы дать мне все, что имеет поток, в течение Y секунд (т.е.5).

Таким образом, вместо обработки сообщений одно за другим, я обрабатываю List[Record], где размер списка равен 50 (или, возможно, меньше).

Это сделано дляОбработка привязки ввода-вывода более эффективна.

Я знаю, что это может быть реализовано с помощью классического API Kafka, но я искал реализацию на основе потоков, которая может также обрабатывать фиксацию смещения непосредственно, принимая во внимание ошибки / сбои.Я не мог найти ничего, связанного с его документацией или поиском, и задавался вопросом, есть ли у кого-нибудь решение этой проблемы.

Ответы [ 2 ]

0 голосов
/ 09 декабря 2018

@ Matthias J. Sax, ответ хороший, я просто хочу добавить пример для этого, я думаю, что это может быть полезно для кого-то.скажем, мы хотим объединить входящие значения в следующий тип:

public class MultipleValues { private List<String> values; }

Чтобы собрать сообщения в пакеты с максимальным размером, нам нужно создать преобразователь:

public class MultipleValuesTransformer implements Transformer<String, String, KeyValue<String, MultipleValues>> {
    private ProcessorContext processorContext;
    private String stateStoreName;
    private KeyValueStore<String, MultipleValues> keyValueStore;

    public MultipleValuesTransformer(String stateStoreName) {
        this.stateStoreName = stateStoreName;
    }

    @Override
    public void init(ProcessorContext processorContext) {
        this.processorContext = processorContext;
        this.keyValueStore = (KeyValueStore) processorContext.getStateStore(stateStoreName);
        processorContext.schedule(Duration.ofSeconds(30), PunctuationType.WALL_CLOCK_TIME, this::doPunctuate);
    }

    @Override
    public KeyValue<String, MultipleValues> transform(String key, String value) {
        MultipleValues itemValueFromStore = keyValueStore.get(key);
        if (isNull(itemValueFromStore)) {
            itemValueFromStore = MultipleValues.builder().values(Collections.singletonList(value)).build();
        } else {
            List<String> values = new ArrayList<>(itemValueFromStore.getValues());
            values.add(value);
            itemValueFromStore = itemValueFromStore.toBuilder()
                    .values(values)
                    .build();
        }
        if (itemValueFromStore.getValues().size() >= 50) {
            processorContext.forward(key, itemValueFromStore);
            keyValueStore.put(key, null);
        } else {
            keyValueStore.put(key, itemValueFromStore);
        }
        return null;
    }

    private void doPunctuate(long timestamp) {
        KeyValueIterator<String, MultipleValues> valuesIterator = keyValueStore.all();
        while (valuesIterator.hasNext()) {
            KeyValue<String, MultipleValues> keyValue = valuesIterator.next();
            if (nonNull(keyValue.value)) {
                processorContext.forward(keyValue.key, keyValue.value);
                keyValueStore.put(keyValue.key, null);
            }
        }
    }

    @Override
    public void close() {
    }
}

и нам нужносоздать хранилище значений ключей, добавить его в StreamsBuilder и построить поток KStream, используя метод transform

Properties props = new Properties();
...
Serde<MultipleValues> multipleValuesSerge = Serdes.serdeFrom(new JsonSerializer<>(), new JsonDeserializer<>(MultipleValues.class));
StreamsBuilder builder = new StreamsBuilder();
String storeName = "multipleValuesStore";
KeyValueBytesStoreSupplier storeSupplier = Stores.persistentKeyValueStore(storeName);
StoreBuilder<KeyValueStore<String, MultipleValues>> storeBuilder =
        Stores.keyValueStoreBuilder(storeSupplier, Serdes.String(), multipleValuesSerge);
builder.addStateStore(storeBuilder);

builder.stream("source", Consumed.with(Serdes.String(), Serdes.String()))
        .transform(() -> new MultipleValuesTransformer(storeName), storeName)
        .print(Printed.<String, MultipleValues>toSysOut().withLabel("transformedMultipleValues"));
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), props);
kafkaStreams.start();

, при таком подходе мы использовали входящий ключ, для которого мы провели агрегирование.если вам нужно собирать сообщения не по ключу, а по полям некоторых сообщений, вам необходим следующий поток для запуска перебалансировки в KStream (с использованием промежуточного раздела):

.selectKey(..)
.through(intermediateTopicName)
.transform( ..)
0 голосов
/ 05 декабря 2018

Самый простой способ - использовать операцию с состоянием transform().Каждый раз, когда вы получаете запись, вы кладете ее в магазин.Когда вы получили 50 записей, вы выполняете их обработку, выводите данные и удаляете записи из хранилища.

Для принудительной обработки, если вы не читаете лимит в течение определенного времени, вы можете зарегистрироватьсяпунктуация настенные часы.

...