Kafka KTable - соединение KTable с агрегацией, дающей TimeoutException: срок истекает 1 запись (и) - PullRequest
0 голосов
/ 21 января 2019

Это вся топология, представленная двумя входными темами (inputTopic1 и inputTopic2) и одной выходной темой (outputTopic). Сначала мы создаем таблицу из первой темы - сетевая таблица с ее serdes. Во-вторых, поскольку inputTopic2 имеет записи с ключом, отличным от inputTopic1, мы сначала сопоставляем поток с новым значением ключа (Long value from companyId), а после этого мы группируем все значения KeyByKey и агрегируем их в списке для всей компании, чтобы новый В возвращенной таблице ключом является companyId, а значением - список всех карточек этой компании.

После всех этих преобразований мы объединяем таблицу истории с сетевой таблицей, а результат объединений передается в новую тему.

Проблема в том, что когда я пытаюсь поместить 50 000 карт данных истории в inputTopic2, их объединение в списке занимает так много времени, и производитель выдает следующее сообщение об ошибке:

org.apache.kafka.common.errors.TimeoutException: Истекающие 1 запись (и) для service-KSTREAM-AGGREGATE-STATE-STORE-0000000005-changelog-1: с момента последней попытки прошло плюс 30043 мсек. *

Весь код топологии:

KTable<Long, DefaultDirectedGraph<Card, DefaultEdge>> networkTable = builder.table(inputTopic1,
        Consumed.with(Serdes.Long(), new CompanyNetworkSeder()));
KTable<Long, List<Card>> historyListTable = builder
    .stream(inputTopic2, Consumed.with(new HistoryDataKeySeder(), new HistoryDataValueSeder()))
    .map(new KeyValueMapper<HistoryDataKey, Card, KeyValue<Long, Card>>() {
        public KeyValue<Long, Card> apply(HistoryDataKey key, Card value) {
        return new KeyValue<>(key.getCompanyId(), value);
        }
    }).groupByKey(Serialized.with(Serdes.Long(), new HistoryDataValueSeder())).aggregate(() -> {
        return new ArrayList<Card>();
    }, new Aggregator<Long, Card, List<Card>>() {
        @Override
        public List<Card> apply(Long key, Card newCard, List<Card> currentHistory) {
        if (!historyUtils.checkForDuplicateHistoryCard(newCard, currentHistory)
            && !historyUtils.checkIfCardHasNoEndDate(newCard)) {
            currentHistory.add(newCard);
        }
        return currentHistory;
        }
    }, Materialized.with(Serdes.Long(), new HistoryDataListValueSeder()));

KTable<Long, ForecastInput> joinedTable = historyListTable.leftJoin(networkTable, (history, network) -> {
    if (network == null) {
    return new ForecastInput();
    } else if (history == null) {
    return new ForecastInput(network, null);
    } else {
    return new ForecastInput(network, history);
    }
});

KTable<Long, DefaultDirectedGraph<Card, DefaultEdge>> outputTable = joinedTable
    .mapValues(new ValueMapperWithKey<Long, ForecastInput, DefaultDirectedGraph<Card, DefaultEdge>>() {

        public DefaultDirectedGraph<Card, DefaultEdge> apply(Long companyId,
            ForecastInput forecastInputData) {

        DefaultDirectedGraph<Card, DefaultEdge> network = forecastInputData.getNetwork();

        List<Card> history = forecastInputData.getHistory();


        if (network == null) {
            return new DefaultDirectedGraph<Card, DefaultEdge>(DefaultEdge.class);
        } else if (history == null) {
            return forecastInputData.getNetwork();
        } else {
            return (espService.updateForecastCalculationsForWholeCompany(forecastInputData.getNetwork(),
                forecastInputData.getHistory()));
        }
        }
    });

Я думаю, что проблема в том, что агрегация замедляет обработку целых потоков. Я использую версию kafka 2.0.1.

...