Это вся топология, представленная двумя входными темами (inputTopic1 и inputTopic2) и одной выходной темой (outputTopic).
Сначала мы создаем таблицу из первой темы - сетевая таблица с ее serdes.
Во-вторых, поскольку inputTopic2 имеет записи с ключом, отличным от inputTopic1, мы сначала сопоставляем поток с новым значением ключа (Long value from companyId), а после этого мы группируем все значения KeyByKey и агрегируем их в списке для всей компании, чтобы новый В возвращенной таблице ключом является companyId, а значением - список всех карточек этой компании.
После всех этих преобразований мы объединяем таблицу истории с сетевой таблицей, а результат объединений передается в новую тему.
Проблема в том, что когда я пытаюсь поместить 50 000 карт данных истории в inputTopic2, их объединение в списке занимает так много времени, и производитель выдает следующее сообщение об ошибке:
org.apache.kafka.common.errors.TimeoutException: Истекающие 1 запись (и) для service-KSTREAM-AGGREGATE-STATE-STORE-0000000005-changelog-1: с момента последней попытки прошло плюс 30043 мсек. *
Весь код топологии:
KTable<Long, DefaultDirectedGraph<Card, DefaultEdge>> networkTable = builder.table(inputTopic1,
Consumed.with(Serdes.Long(), new CompanyNetworkSeder()));
KTable<Long, List<Card>> historyListTable = builder
.stream(inputTopic2, Consumed.with(new HistoryDataKeySeder(), new HistoryDataValueSeder()))
.map(new KeyValueMapper<HistoryDataKey, Card, KeyValue<Long, Card>>() {
public KeyValue<Long, Card> apply(HistoryDataKey key, Card value) {
return new KeyValue<>(key.getCompanyId(), value);
}
}).groupByKey(Serialized.with(Serdes.Long(), new HistoryDataValueSeder())).aggregate(() -> {
return new ArrayList<Card>();
}, new Aggregator<Long, Card, List<Card>>() {
@Override
public List<Card> apply(Long key, Card newCard, List<Card> currentHistory) {
if (!historyUtils.checkForDuplicateHistoryCard(newCard, currentHistory)
&& !historyUtils.checkIfCardHasNoEndDate(newCard)) {
currentHistory.add(newCard);
}
return currentHistory;
}
}, Materialized.with(Serdes.Long(), new HistoryDataListValueSeder()));
KTable<Long, ForecastInput> joinedTable = historyListTable.leftJoin(networkTable, (history, network) -> {
if (network == null) {
return new ForecastInput();
} else if (history == null) {
return new ForecastInput(network, null);
} else {
return new ForecastInput(network, history);
}
});
KTable<Long, DefaultDirectedGraph<Card, DefaultEdge>> outputTable = joinedTable
.mapValues(new ValueMapperWithKey<Long, ForecastInput, DefaultDirectedGraph<Card, DefaultEdge>>() {
public DefaultDirectedGraph<Card, DefaultEdge> apply(Long companyId,
ForecastInput forecastInputData) {
DefaultDirectedGraph<Card, DefaultEdge> network = forecastInputData.getNetwork();
List<Card> history = forecastInputData.getHistory();
if (network == null) {
return new DefaultDirectedGraph<Card, DefaultEdge>(DefaultEdge.class);
} else if (history == null) {
return forecastInputData.getNetwork();
} else {
return (espService.updateForecastCalculationsForWholeCompany(forecastInputData.getNetwork(),
forecastInputData.getHistory()));
}
}
});
Я думаю, что проблема в том, что агрегация замедляет обработку целых потоков. Я использую версию kafka 2.0.1.