Дублирующиеся промежуточные результаты KStream-KStream объединяются в Kafka Streams - PullRequest
1 голос
/ 10 мая 2019

У меня есть следующий сценарий:

  1. Таблица A и таблица B объединены с помощью FK.
  2. Транзакционная вставка / обновление как в A, так и в B.
  3. Debezium генерирует одно событие a для таблицы A и одно событие b для таблицы B.
  4. Kafka Streams создает KStream для таблиц A и B.
  5. Приложение Kafka Streams leftJoin KStreams Aи B. (Предположим, что записи a и b имеют одинаковые ключи и попадают в окно объединения).
  6. Выходные записи будут [a, null], [a, b].

Каквы отбрасываете [a, null]?

Можно выполнить innerJoin, но это все равно будет проблемой в случае update запросов.

Мы попытались использовать временную метку события для фильтрации(т.е. сохранить событие с последней отметкой времени), но уникальность отметки времени не гарантируется.

т.е.Конечная цель состоит в том, чтобы иметь возможность идентифицировать самые последние агрегаты, чтобы мы могли отфильтровывать промежуточные результаты во время запроса (в Athena / Presto или в некоторых RDBMS).

1 Ответ

0 голосов
/ 13 мая 2019

На данный момент лучший рабочий подход, который я нашел, - это использовать смещения Кафки из выходных записей.

Подход можно обобщить так:

  1. Выполните всю логику, которую вы хотите сделать, и не беспокойтесь о нескольких записях для одного и того же ключа.
  2. Запишите результаты в промежуточную тему с минимальным удержанием (1 час и т. Д.)
  3. Прочтите промежуточные разделы, используя Процессор и в Процессоре, обогатите сообщение с помощью Смещения Кафки, используя context.offset().
  4. Запишите сообщения в тему вывода.

Теперь ваша тема вывода содержит несколько сообщений для одного и того же ключа, но каждое с различным смещением.

Теперь во время запроса вы можете выбрать максимальное смещение для каждой клавиши, используя подзапрос.

Пример TransformerSupplier можно увидеть ниже

/**
 * @param <K> key type
 * @param <V> value type
 */
public class OutputTransformSupplier<K, V> implements TransformerSupplier<K, V, KeyValue<String, String>> {
  @Override
  public Transformer<K, V, KeyValue<String, String>> get() {
    return new OutputTransformer<>();
  }

  private class OutputTransformer<K, V> implements Transformer<K, V, KeyValue<String, String>> {
    private ProcessorContext context;

    @Override
    public void init(ProcessorContext context) {
      this.context = context;
    }

    /**
     * @param key   the key for the record
     * @param value the value for the record
     */
    @Override
    public KeyValue<String, String> transform(K key, V value) {
      if (value != null) {
        value.setKafkaOffset(context.offset());
      }
      return new KeyValue<>(key, value);
    }

    @Override
    public KeyValue<String, String> punctuate(long timestamp) {
      return null;
    }

    @Override
    public void close() {
      // nothing to close
    }
  }
}
...