Как ждать конечного массового результата потока - PullRequest
0 голосов
/ 28 марта 2020

У меня есть приложение для обработки потоков, созданное с использованием потоков весенних облаков и потоков kafka. Эта система берет журналы из приложения и сравнивает их с наблюдениями, выполненными другим потоковым процессором, и создает оценку, после чего поток журнала разделяется на оценку ( выше и ниже некоторого порога).

Топология:

enter image description here

Проблема:

Поэтому моя проблема заключается в том, как правильно реализовать «Журнал наилучшего процессора селектора наблюдений». На момент обработки журнала существует конечное количество наблюдений, но их может быть много.

Итак, я предложил 2 решения ...

  1. Наблюдения за журналами групп и окон topi c по идентификатору журнала и затем уменьшаем, чтобы получить наивысший балл. (Проблема: для оценки всех наблюдений может потребоваться больше времени, чем для окна)

  2. Извещать завершенное сообщение оценки после каждой оценки, объединяться с журналом релевантных наблюдений, использовать глобальную таблицу журналов наблюдений наблюдений & интерактивный запрос для проверки того, что каждый идентификатор наблюдения находится в хранилище глобальной таблицы, когда все идентификаторы в магазине сопоставляются с наблюдением с наибольшим количеством очков. (Проблема: глобальная таблица не работает, когда используется только для интерактивного запроса)

Каков наилучший способ достичь того, что я пытаюсь?

  • Я надеюсь не создавать узких мест для раздела, диска или памяти.

  • У всех есть уникальные идентификаторы и кортежи соответствующих идентификаторов, когда значение объединяется из журнала и наблюдения .

(Редактировать: переключенное текстовое описание топологии с диаграммой и изменить заголовок)

1 Ответ

0 голосов
/ 01 апреля 2020

Решение № 2, похоже, работает, но оно генерирует предупреждения, потому что интерактивные запросы требуют некоторого времени для подготовки - поэтому я реализовал то же решение с Transformer:

@Slf4j
@Configuration
@RequiredArgsConstructor
@SuppressWarnings("unchecked")
public class LogBestObservationsSelectorProcessorConfig {
    private String logScoredObservationsStore = "log-scored-observations-store";

    private final Serde<LogEntryRelevantObservationIdTuple> logEntryRelevantObservationIdTupleSerde;
    private final Serde<LogRelevantObservationIdsTuple> logRelevantObservationIdsTupleSerde;
    private final Serde<LogEntryObservationMatchTuple> logEntryObservationMatchTupleSerde;
    private final Serde<LogEntryObservationMatchIdsRelevantObservationsTuple> logEntryObservationMatchIdsRelevantObservationsTupleSerde;

    @Bean
    public Function<
            GlobalKTable<LogEntryRelevantObservationIdTuple, LogEntryObservationMatchTuple>,
                Function<
                    KStream<LogEntryRelevantObservationIdTuple, LogEntryRelevantObservationIdTuple>,
                    Function<
                            KTable<String, LogRelevantObservationIds>,
                            KStream<String, LogEntryObservationMatchTuple>
                    >
                >
            >
    logBestObservationSelectorProcessor() {
        return (GlobalKTable<LogEntryRelevantObservationIdTuple, LogEntryObservationMatchTuple> logScoredObservationsTable) ->
                (KStream<LogEntryRelevantObservationIdTuple, LogEntryRelevantObservationIdTuple> logScoredObservationProcessedStream) ->
                        (KTable<String, LogRelevantObservationIdsTuple> logRelevantObservationIdsTable) -> {
            return logScoredObservationProcessedStream
                    .selectKey((k, v) -> k.getLogId())
                    .leftJoin(
                            logRelevantObservationIdsTable,
                            LogEntryObservationMatchIdsRelevantObservationsTuple::new,
                            Joined.with(
                                    Serdes.String(),
                                    logEntryRelevantObservationIdTupleSerde,
                                    logRelevantObservationIdsTupleSerde
                            )
                    )
                    .transform(() -> new LogEntryObservationMatchTransformer(logScoredObservationsStore))
                    .groupByKey(
                            Grouped.with(
                                Serdes.String(),
                                logEntryObservationMatchTupleSerde
                            )
                    )
                    .reduce(
                            (match1, match2) -> Double.compare(match1.getScore(), match2.getScore()) != -1 ? match1 : match2,
                            Materialized.with(
                                    Serdes.String(),
                                    logEntryObservationMatchTupleSerde
                            )
                    )
                    .toStream()
                    ;
        };
    }

    @RequiredArgsConstructor
    private static class LogEntryObservationMatchTransformer implements Transformer<String, LogEntryObservationMatchIdsRelevantObservationsTuple, KeyValue<String, LogEntryObservationMatchTuple>> {
        private final String stateStoreName;
        private ProcessorContext context;
        private TimestampedKeyValueStore<LogEntryRelevantObservationIdTuple, LogEntryObservationMatchTuple> kvStore;

        @Override
        public void init(ProcessorContext context) {
            this.context = context;
            this.kvStore = (TimestampedKeyValueStore<LogEntryRelevantObservationIdTuple, LogEntryObservationMatchTuple>) context.getStateStore(stateStoreName);
        }

        @Override
        public KeyValue<String, LogEntryObservationMatchTuple> transform(String logId, LogEntryObservationMatchIdsRelevantObservationsTuple value) {
            val observationIds = value.getLogEntryRelevantObservationsTuple().getRelevantObservations().getObservationIds();
            val allObservationsProcessed = observationIds.stream()
                    .allMatch((observationId) -> {
                        val key = LogEntryRelevantObservationIdTuple.newBuilder()
                                .setLogId(logId)
                                .setRelevantObservationId(observationId)
                                .build();
                        return kvStore.get(key) != null;
                    });
            if (!allObservationsProcessed) {
                return null;
            }

            val observationId = value.getLogEntryRelevantObservationIdTuple().getObservationId();
            val key = LogEntryRelevantObservationIdTuple.newBuilder()
                    .setLogId(logId)
                    .setRelevantObservationId(observationId)
                    .build();
            ValueAndTimestamp<LogEntryObservationMatchTuple> observationMatchValueAndTimestamp = kvStore.get(key);
            return new KeyValue<>(logId, observationMatchValueAndTimestamp.value());
        }

        @Override
        public void close() {

        }
    }
}
...