Как обрабатывать дубликаты сообщений с использованием потоковых функций DSL Kafka - PullRequest
0 голосов
/ 23 апреля 2019

Мое требование - пропустить или избежать дублирования сообщений (имеющих одинаковый ключ), полученных от темы INPUT, с использованием API-интерфейса DSL kafka stream.

Существует возможность отправки исходной системой дублирующихся сообщений в тему INPUT в случае любых сбоев.

ПОТОК -

Исходная система -> Тема INPUT -> Поток Kafka -> Тема OUTPUT

В настоящее время я использую flatMap для генерации нескольких ключей из полезной нагрузки, но flatMap не имеет состояния, поэтому не может избежать обработки дублирующихся сообщений при получении из темы INPUT.

Я ищу DSL API, который может пропускать повторяющиеся записи, полученные из темы INPUT, а также генерировать несколько ключей / значений перед отправкой в ​​тему OUTPUT.

Мысли точно, как только конфигурация будет полезна здесь для дедупликации сообщений, полученных из темы INPUT на основе ключей, но похоже, что она не работает, возможно, я не понимал использование точно один раз.

Не могли бы вы пролить свет на это.

Ответы [ 3 ]

0 голосов
/ 23 апреля 2019

Мое требование - пропустить или избежать дублирования сообщений (имеющих одинаковый ключ), полученных из темы INPUT, с использованием API-интерфейса DSL kafka stream.

Взгляните на пример EventDeduplication на https://github.com/confluentinc/kafka-streams-examples,, который это делает. Затем вы можете адаптировать пример с требуемой flatMap функциональностью, специфичной для вашего варианта использования.

Вот суть примера:

final KStream<byte[], String> input = builder.stream(inputTopic);
final KStream<byte[], String> deduplicated = input.transform(
    // In this example, we assume that the record value as-is represents a unique event ID by
    // which we can perform de-duplication.  If your records are different, adapt the extractor
    // function as needed.
    () -> new DeduplicationTransformer<>(windowSize.toMillis(), (key, value) -> value),
    storeName);
deduplicated.to(outputTopic);

и

    /**
     * @param maintainDurationPerEventInMs how long to "remember" a known event (or rather, an event
     *                                     ID), during the time of which any incoming duplicates of
     *                                     the event will be dropped, thereby de-duplicating the
     *                                     input.
     * @param idExtractor extracts a unique identifier from a record by which we de-duplicate input
     *                    records; if it returns null, the record will not be considered for
     *                    de-duping but forwarded as-is.
     */
    DeduplicationTransformer(final long maintainDurationPerEventInMs, final KeyValueMapper<K, V, E> idExtractor) {
      if (maintainDurationPerEventInMs < 1) {
        throw new IllegalArgumentException("maintain duration per event must be >= 1");
      }
      leftDurationMs = maintainDurationPerEventInMs / 2;
      rightDurationMs = maintainDurationPerEventInMs - leftDurationMs;
      this.idExtractor = idExtractor;
    }

    @Override
    @SuppressWarnings("unchecked")
    public void init(final ProcessorContext context) {
      this.context = context;
      eventIdStore = (WindowStore<E, Long>) context.getStateStore(storeName);
    }

    public KeyValue<K, V> transform(final K key, final V value) {
      final E eventId = idExtractor.apply(key, value);
      if (eventId == null) {
        return KeyValue.pair(key, value);
      } else {
        final KeyValue<K, V> output;
        if (isDuplicate(eventId)) {
          output = null;
          updateTimestampOfExistingEventToPreventExpiry(eventId, context.timestamp());
        } else {
          output = KeyValue.pair(key, value);
          rememberNewEvent(eventId, context.timestamp());
        }
        return output;
      }
    }

    private boolean isDuplicate(final E eventId) {
      final long eventTime = context.timestamp();
      final WindowStoreIterator<Long> timeIterator = eventIdStore.fetch(
          eventId,
          eventTime - leftDurationMs,
          eventTime + rightDurationMs);
      final boolean isDuplicate = timeIterator.hasNext();
      timeIterator.close();
      return isDuplicate;
    }

    private void updateTimestampOfExistingEventToPreventExpiry(final E eventId, final long newTimestamp) {
      eventIdStore.put(eventId, newTimestamp, newTimestamp);
    }

    private void rememberNewEvent(final E eventId, final long timestamp) {
      eventIdStore.put(eventId, timestamp, timestamp);
    }

    @Override
    public void close() {
      // Note: The store should NOT be closed manually here via `eventIdStore.close()`!
      // The Kafka Streams API will automatically close stores when necessary.
    }

  }

Я ищу DSL API, который может пропускать повторяющиеся записи, полученные из темы INPUT, а также генерировать несколько ключей / значений перед отправкой в ​​тему OUTPUT.

DSL не включает в себя такую ​​функциональность "из коробки", но приведенный выше пример показывает, как вы можете легко создать собственную логику дедупликации, комбинируя DSL с API-интерфейсом процессора Kafka Streams с использованием * 1021. *.

Мысли точно, как только конфигурация будет полезна здесь для дедупликации сообщений, полученных из темы INPUT на основе ключей, но похоже, что она не работает, возможно, я не понял использование точно один раз.

Как отметил Матиас Дж. Сакс в своем ответе, с точки зрения Кафки, эти "дубликаты" не являются дубликатами с точки зрения семантики обработки, выполняемой в точности один раз. Кафка гарантирует, что не будет сама вводить такие дубликаты, но не может принимать такие решения «из коробки» для исходных источников данных, которые являются черным ящиком для Кафки.

0 голосов
/ 25 апреля 2019

Спасибо, Мэтт и Мишель за вашу помощь. Очень признателен.

Я собирался использовать комбинацию FlatMap и FilterNot API. Просто для сохранения состояния мы храним детали транзакции в canssandra.

FilterNot - Логика может включать в себя подключение Cassandra и проверку дубликатов. FlatMap - логика включает в себя создание нескольких ключей / значений и отправку их в тему OUTPUT.

Здесь речь идет о сбое подключения к Cassandra, а также о первом предложенном подходе - устойчивость хранилища состояний в случае миллионов транзакций в день, срок хранения и т. Д. И т. Д.

Пожалуйста, дайте мне знать, какой подход лучше.

0 голосов
/ 23 апреля 2019

Точно один раз можно использовать, чтобы гарантировать, что использование и обработка входной темы не приведет к дублированию в выходной теме.Однако с точной точки зрения дубликаты в теме ввода, которую вы описываете, на самом деле не являются дубликатами, а представляют собой два обычных сообщения ввода.

Для удаления дубликатов темы ввода можно использовать шаг transform()с подключенным хранилищем состояний (в DSL нет встроенного оператора, который делает то, что вы хотите).Для каждой входной записи вы сначала проверяете, нашли ли вы соответствующий ключ в магазине.Если нет, вы добавляете его в магазин и пересылаете сообщение.Если вы найдете его в магазине, вы отбрасываете ввод как дубликат.Обратите внимание, что это будет работать только с гарантией 100% правильности, если вы включите однократную обработку в своем приложении Kafka Streams.Другие, даже если вы попытаетесь выполнить дедупликацию, в случае сбоя Kafka Streams может снова ввести дублирование.

Кроме того, вам необходимо решить, как долго вы хотите хранить записи в хранилище.Вы можете использовать Punctuation для удаления старых данных из хранилища, если вы уверены, что в теме ввода больше не будет дубликатов.Один из способов сделать это - сохранить временную метку записи (или, возможно, смещение) в хранилище.Таким образом, вы можете сравнить текущее время с временем записи магазина в пределах punctuate() и удалить старые записи (то есть вы бы перебрали все записи в хранилище с помощью store#all()).

После transform() вы применяете flatMap() (или также можете объединить свой код flatMap() с transform() напрямую.

...