Мое требование - пропустить или избежать дублирования сообщений (имеющих одинаковый ключ), полученных из темы INPUT, с использованием API-интерфейса DSL kafka stream.
Взгляните на пример EventDeduplication
на https://github.com/confluentinc/kafka-streams-examples,, который это делает. Затем вы можете адаптировать пример с требуемой flatMap
функциональностью, специфичной для вашего варианта использования.
Вот суть примера:
final KStream<byte[], String> input = builder.stream(inputTopic);
final KStream<byte[], String> deduplicated = input.transform(
// In this example, we assume that the record value as-is represents a unique event ID by
// which we can perform de-duplication. If your records are different, adapt the extractor
// function as needed.
() -> new DeduplicationTransformer<>(windowSize.toMillis(), (key, value) -> value),
storeName);
deduplicated.to(outputTopic);
и
/**
* @param maintainDurationPerEventInMs how long to "remember" a known event (or rather, an event
* ID), during the time of which any incoming duplicates of
* the event will be dropped, thereby de-duplicating the
* input.
* @param idExtractor extracts a unique identifier from a record by which we de-duplicate input
* records; if it returns null, the record will not be considered for
* de-duping but forwarded as-is.
*/
DeduplicationTransformer(final long maintainDurationPerEventInMs, final KeyValueMapper<K, V, E> idExtractor) {
if (maintainDurationPerEventInMs < 1) {
throw new IllegalArgumentException("maintain duration per event must be >= 1");
}
leftDurationMs = maintainDurationPerEventInMs / 2;
rightDurationMs = maintainDurationPerEventInMs - leftDurationMs;
this.idExtractor = idExtractor;
}
@Override
@SuppressWarnings("unchecked")
public void init(final ProcessorContext context) {
this.context = context;
eventIdStore = (WindowStore<E, Long>) context.getStateStore(storeName);
}
public KeyValue<K, V> transform(final K key, final V value) {
final E eventId = idExtractor.apply(key, value);
if (eventId == null) {
return KeyValue.pair(key, value);
} else {
final KeyValue<K, V> output;
if (isDuplicate(eventId)) {
output = null;
updateTimestampOfExistingEventToPreventExpiry(eventId, context.timestamp());
} else {
output = KeyValue.pair(key, value);
rememberNewEvent(eventId, context.timestamp());
}
return output;
}
}
private boolean isDuplicate(final E eventId) {
final long eventTime = context.timestamp();
final WindowStoreIterator<Long> timeIterator = eventIdStore.fetch(
eventId,
eventTime - leftDurationMs,
eventTime + rightDurationMs);
final boolean isDuplicate = timeIterator.hasNext();
timeIterator.close();
return isDuplicate;
}
private void updateTimestampOfExistingEventToPreventExpiry(final E eventId, final long newTimestamp) {
eventIdStore.put(eventId, newTimestamp, newTimestamp);
}
private void rememberNewEvent(final E eventId, final long timestamp) {
eventIdStore.put(eventId, timestamp, timestamp);
}
@Override
public void close() {
// Note: The store should NOT be closed manually here via `eventIdStore.close()`!
// The Kafka Streams API will automatically close stores when necessary.
}
}
Я ищу DSL API, который может пропускать повторяющиеся записи, полученные из темы INPUT, а также генерировать несколько ключей / значений перед отправкой в тему OUTPUT.
DSL не включает в себя такую функциональность "из коробки", но приведенный выше пример показывает, как вы можете легко создать собственную логику дедупликации, комбинируя DSL с API-интерфейсом процессора Kafka Streams с использованием * 1021. *.
Мысли точно, как только конфигурация будет полезна здесь для дедупликации сообщений, полученных из темы INPUT на основе ключей, но похоже, что она не работает, возможно, я не понял использование точно один раз.
Как отметил Матиас Дж. Сакс в своем ответе, с точки зрения Кафки, эти "дубликаты" не являются дубликатами с точки зрения семантики обработки, выполняемой в точности один раз. Кафка гарантирует, что не будет сама вводить такие дубликаты, но не может принимать такие решения «из коробки» для исходных источников данных, которые являются черным ящиком для Кафки.