Используйте kafka для обнаружения изменений в значениях - PullRequest
4 голосов
/ 29 марта 2019

У меня есть потоковое приложение, которое непрерывно принимает поток координат вместе с некоторыми пользовательскими метаданными, которые также включают цепочку битов.Этот поток создается по теме kafka с использованием API производителя.Теперь другое приложение должно обработать этот поток [Streams API] и сохранить определенный бит из строки битов и генерировать оповещения, когда этот бит изменяется

Ниже представлен непрерывный поток сообщений, которые необходимо обработать

{"device_id":"1","status_bit":"0"}
{"device_id":"2","status_bit":"1"}
{"device_id":"1","status_bit":"0"}
{"device_id":"3","status_bit":"1"}
{"device_id":"1","status_bit":"1"} // need to generate alert with change: 0->1
{"device_id":"3","status_bits":"1"}
{"device_id":"2","status_bit":"1"}
{"device_id":"3","status_bits":"0"} // need to generate alert with change 1->0

Теперь я хотел бы написать эти оповещения в другую тему кафки, например

{"device_id":1,"init":0,"final":1,"timestamp":"somets"}
{"device_id":3,"init":1,"final":0,"timestamp":"somets"}

. Я могу сохранить текущий бит в хранилище состояний, используя что-то вроде

streamsBuilder
        .stream("my-topic")
        .mapValues((key, value) -> value.getStatusBit())
        .groupByKey()
        .windowedBy(TimeWindows.of(Duration.ofMinutes(1)))
        .reduce((oldAggValue, newMessageValue) -> newMessageValue, Materialized.as("bit-temp-store"));

, ноЯ не могу понять, как я могу обнаружить это изменение из существующего бита.Нужно ли как-то запрашивать хранилище состояний внутри топологии процессора?Если да?Как?Если нет?Что еще можно сделать?

Любые предложения / идеи, которые я могу попробовать (возможно, полностью отличается от того, что я думаю), также приветствуются.Я новичок в Kafka и думаю, что с точки зрения событийно-ориентированных потоков ускользает от меня.

Заранее спасибо.

1 Ответ

4 голосов
/ 29 марта 2019

Я не уверен, что это лучший подход, но в аналогичной задаче я использовал промежуточную сущность, чтобы зафиксировать изменение состояния.В вашем случае это будет что-то вроде

streamsBuilder.stream("my-topic").groupByKey()
          .aggregate(DeviceState::new, new Aggregator<String, Device, DeviceState>() {
        public DeviceState apply(String key, Device newValue, DeviceState state) {
            if(!newValue.getStatusBit().equals(state.getStatusBit())){
                 state.setChanged(true);    
            }
            state.setStatusBit(newValue.getStatusBit());
            state.setDeviceId(newValue.getDeviceId());
            state.setKey(key);
            return state;
        }
    }, TimeWindows.of(…) …).filter((s, t) -> (t.changed())).toStream();

В появившейся теме у вас будут изменения.Вы также можете добавить некоторые атрибуты в DeviceState, чтобы сначала инициализировать его, в зависимости от того, хотите ли вы отправить событие, когда поступит первая запись устройства и т. Д.

...