Кафки сравнивают последовательные значения для ключа - PullRequest
1 голос
/ 07 ноября 2019

Мы создаем приложение для получения данных с датчиков. Данные передаются в Kafka, откуда потребители будут публиковать их в разных хранилищах данных. Каждая точка данных будет иметь несколько атрибутов, представляющих состояние датчика.

В одном из потребителей мы хотим опубликовать данные в хранилище данных, только если значение изменилось. Например, если есть датчик температуры, который опрашивается для данных каждые 10 секунд, мы ожидаем получить данные типа

----------------------------------------------------------------------
Key                Value
----------------------------------------------------------------------
Sensor1            {timestamp: "10-10-2019 10:20:30", temperature: 10}
Sensor1            {timestamp: "10-10-2019 10:20:40", temperature: 10}
Sensor1            {timestamp: "10-10-2019 10:20:50", temperature: 11}

. В вышеприведенном случае должна быть опубликована только первая запись и третья запись.

Для этого нам нужен способ сравнить текущее значение ключа с предыдущим значением с тем же ключом. Я считаю, что это возможно с KTable или KStream, но не могу найти примеров.

Любая помощь будет отличной!

Ответы [ 4 ]

2 голосов
/ 07 ноября 2019

Вот пример, как решить эту проблему с помощью KStream#transformValues().

StreamsBuilder builder = new StreamsBuilder();
StoreBuilder<KeyValueStore<String, YourValueType>> keyValueStoreBuilder =
    Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(stateStoreName),
                                Serdes.String(),
                                YourValueTypeSerde());
builder.addStateStore(keyValueStoreBuilder);
stream = builder.stream(INPUT_TOPIC, Consumed.with(Serdes.Integer(), YourValueTypeSerde()))
    .transformValues(() -> new ValueTransformerWithKey<String, YourValueType, YourValueType>() {
        private KeyValueStore<String, YourValueType> state;

        @Override
        public void init(final ProcessorContext context) {
            state = (KeyValueStore<String, YourValueType>) context.getStateStore(stateStoreName);}

        @Override
        public YourValueType transform(final String key, final YourValueType value) {
            YourValueType prevValue = state.get(key);
            if (prevValue != null) {
                if (prevValue.temperature() != value.temperature()) {
                    return prevValue;
                }
            } else {
                state.put(key, value);
            }
            return null;
       }

       @Override
       public void close() {}
    }, stateStorName))
    .to(OUTPUT_TOPIC);

Вы сравниваете запись с предыдущей записью, хранящейся в хранилище состояний. Если температура отличается, вы возвращаете запись из хранилища состояний и сохраняете текущую запись в хранилище состояний. Если температура равна, вы отменяете текущую запись.

2 голосов
/ 07 ноября 2019

Вы можете использовать поток Кафки Процессор API . Вы можете настроить локальное хранилище значений ключей в качестве контекста состояния. Функция процесса вызывается для каждой выбранной записи.

В функции процесса вы можете проверить последнее сохраненное значение и принять или отклонить последнюю запись на основе бизнес-логики (в вашем случае, сравнивая значение температуры).

В функции пунктуации вы можете переслать запись потребителю по расписанию. Смотрите пример кода ниже (без пунктуации)

public class SensorProcessor implements Processor<String, String> {

  private ProcessorContext context;
  private KeyValueStore<String, String> kvStore;

  @Override
  @SuppressWarnings("unchecked")
  public void init(ProcessorContext context) {
      // keep the processor context locally because we need it in punctuate() and commit()
      this.context = context;

      // retrieve the key-value store named "SensorData"
      kvStore = (KeyValueStore) context.getStateStore("SensorData");

      // schedule a punctuate() method every second based on event-time

  }

  @Override
  public void process(String sensorName, String sensorData) {

          String oldValue = this.kvStore.get(sensorName);

          if (oldValue == null) {
              this.kvStore.put(sensorName, sensorData);
          } else {
              //Put the business logic for comparison
              //compare temperatures
              //if required put the value
              this.kvStore.put(sensorName, sensorData);

              //Forward it o consumer
              context.forward(sensorName, sensorData);


          }
          context.commit();
      }
  }

  @Override
  public void close() {
      // nothing to do
  }

}
0 голосов
/ 07 ноября 2019

Если вы хотите сделать это с Kafka Streams, вы должны использовать Processor API.

Вам необходимо реализовать свой пользовательский Transformer с State store. Для каждого сообщения вы должны искать значение в хранилище состояний, если оно изменилось или его нет, вы должны вернуть новое значение, в противном случае - null. Кроме того, вы должны также сохранить это значение в хранилище состояний (KeyValueStore::put(...))

Более подробную информацию о API процессора можно найти: здесь

0 голосов
/ 07 ноября 2019

Если у вас есть база данных в качестве источника данных вашего потребителя, вы можете сравнить свои текущие сохраненные данные с недавно использованными, а затем обновить свои значения при изменении данных, в противном случае игнорируйте использованные данные.

...