У меня есть такой случай: пользователи собирают заказы как строки заказа.Я реализовал это с темой Kafka, содержащей события с изменениями заказа, они объединяются, хранятся в локальном хранилище значений ключей и транслируются как вторая тема с версиями заказа.
Мне нужно как-то реагировать на оставленные заказы - те, которые былизапущен, но изменений не было, по крайней мере, за последние x часов.
Простым решением может быть сканирование локального хранилища каждые y минут и публикация события изменения статуса заказа на Abandoned.Кажется, я не могу получить доступ к хранилищу не с процессора ... Но это также не очень элегантное кодирование.Любые предложения приветствуются.
- edit
Я не могу просто добавить пунктуацию к преобразователю слияния / проверки, потому что его выходные данные отличаются и должны быть направлены в другое место, как на этом изображении (отдельные потоки kafka)app):
, поэтому «обработчик / преобразователь заброшенных заказов» будет недоступен для входа (единственный триггер здесь - время).Другое дело, что в таком случае (как на изображении) мой преобразователь получает ForwardingDisabledProcessorContext при инициализации, поэтому я не могу генерировать сообщения в пунктуаторе.Я мог бы просто передать туда bean-компонент kafkaTemplate и просто создать новые сообщения, но тогда весь процессор / преобразователь - просто пустая оболочка только для доступа к локальному хранилищу ...
это фрагмент кода, который я использовал:
public class AbandonedOrdersTransformer implements ValueTransformer<OrderEvent, OrderEvent> {
@Override
public void init(ProcessorContext processorContext) {
this.context = processorContext;
stateStore = (KeyValueStore)processorContext.getStateStore(KafkaConfig.OPENED_ORDERS_STORE);
//main scheduler
this.context.schedule(TimeUnit.MINUTES.toMillis(5), PunctuationType.WALL_CLOCK_TIME, (timestamp) -> {
KeyValueIterator<String, Order> iter = this.stateStore.all();
while (iter.hasNext()) {
KeyValue<String, Order> entry = iter.next();
if(OrderStatuses.NEW.equals(entry.value.getStatus()) &&
(timestamp - entry.value.getLastChanged().getTime()) > TimeUnit.HOURS.toMillis(4)) {
//SEND ABANDON EVENT "event"
context.forward(entry.key, event);
}
}
iter.close();
context.commit();
});
}
@Override
public OrderEvent transform(OrderEvent orderEvent) {
//do nothing
return null;
}
@Override
public void close() {
//do nothing
}
}