Обнаружение брошенных процессов в Kafka Streams 2.0 - PullRequest
0 голосов
/ 20 февраля 2019

У меня есть такой случай: пользователи собирают заказы как строки заказа.Я реализовал это с темой Kafka, содержащей события с изменениями заказа, они объединяются, хранятся в локальном хранилище значений ключей и транслируются как вторая тема с версиями заказа.

Мне нужно как-то реагировать на оставленные заказы - те, которые былизапущен, но изменений не было, по крайней мере, за последние x часов.

Простым решением может быть сканирование локального хранилища каждые y минут и публикация события изменения статуса заказа на Abandoned.Кажется, я не могу получить доступ к хранилищу не с процессора ... Но это также не очень элегантное кодирование.Любые предложения приветствуются.

- edit

Я не могу просто добавить пунктуацию к преобразователю слияния / проверки, потому что его выходные данные отличаются и должны быть направлены в другое место, как на этом изображении (отдельные потоки kafka)app):

enter image description here

, поэтому «обработчик / преобразователь заброшенных заказов» будет недоступен для входа (единственный триггер здесь - время).Другое дело, что в таком случае (как на изображении) мой преобразователь получает ForwardingDisabledProcessorContext при инициализации, поэтому я не могу генерировать сообщения в пунктуаторе.Я мог бы просто передать туда bean-компонент kafkaTemplate и просто создать новые сообщения, но тогда весь процессор / преобразователь - просто пустая оболочка только для доступа к локальному хранилищу ...

это фрагмент кода, который я использовал:

public class AbandonedOrdersTransformer implements ValueTransformer<OrderEvent, OrderEvent> {

    @Override
    public void init(ProcessorContext processorContext) {
        this.context = processorContext;

        stateStore = (KeyValueStore)processorContext.getStateStore(KafkaConfig.OPENED_ORDERS_STORE);

        //main scheduler
        this.context.schedule(TimeUnit.MINUTES.toMillis(5), PunctuationType.WALL_CLOCK_TIME, (timestamp) -> {

            KeyValueIterator<String, Order> iter = this.stateStore.all();
            while (iter.hasNext()) {
                KeyValue<String, Order> entry = iter.next();
                if(OrderStatuses.NEW.equals(entry.value.getStatus()) &&
                    (timestamp - entry.value.getLastChanged().getTime()) > TimeUnit.HOURS.toMillis(4)) {

                    //SEND ABANDON EVENT "event"

                    context.forward(entry.key, event);
                }
            }
            iter.close();
            context.commit();
        });
    }

    @Override
    public OrderEvent transform(OrderEvent orderEvent) {
        //do nothing
        return null;
    }

    @Override
    public void close() {
        //do nothing
    }
}
...