В KafkaStream при реализации ValueTransformer или ValueTransformerWithKey при вызове transform () я планирую новый пунктуатор. Когда выполняется метод punctuate () из Punctuator , я хочу, чтобы он направлял событие в нисходящем направлении с использованием экземпляра контекста. Однако экземпляр контекста, по-видимому, не определен в составе топологии DSL.
Есть какие-нибудь подсказки, как это сделать с помощью Трансформера?
Используя ту же логику в процессоре, реализуя низкоуровневую топологию процессора, она работает.
В ValueTransformerWithKey:
@Override
public Event transform(final String key, final Event event) {
this.context.schedule(timeout.toMillis(), PunctuationType.WALL_CLOCK_TIME, new MyPunctuator(context, key, event));
return null;
}
В MyPunctuator:
private class MytPunctuator implements Punctuator {
private String key;
private ProcessorContext context;
private Event event;
MyPunctuator(ProcessorContext context, String key, Event event)
{
this.context = context;
this.key = key;
this.event = event;
}
@Override
public void punctuate(final long timestamp) {
context.forward(key, AlertEvent.builder().withSource(event).build());
context.commit();
}
}
При выполнении
myStream
.groupByKey(Serialized.with(Serdes.String(), Event.serde()))
.reduce((k, v) -> v)
.transformValues(() -> valueTransformerWithKey)
.toStream().to(ALARM_TOPIC, Produced.with(Serdes.String(), AlarmEvent.serde()));
Я ожидаю, что событие Alarm, созданное пунктуатором, будет перенаправлено в тему ALARM после истечения срока действия.
Вместо этого я получил следующее исключение: ProcessorContext.forward () не поддерживается.