Как переслать событие вниз по течению от экземпляра Punctuator в ValueTransformer? - PullRequest
0 голосов
/ 29 мая 2019

В KafkaStream при реализации ValueTransformer или ValueTransformerWithKey при вызове transform () я планирую новый пунктуатор. Когда выполняется метод punctuate () из Punctuator , я хочу, чтобы он направлял событие в нисходящем направлении с использованием экземпляра контекста. Однако экземпляр контекста, по-видимому, не определен в составе топологии DSL.

Есть какие-нибудь подсказки, как это сделать с помощью Трансформера?

Используя ту же логику в процессоре, реализуя низкоуровневую топологию процессора, она работает.

В ValueTransformerWithKey:

@Override 
    public Event transform(final String key, final Event event) { 
        this.context.schedule(timeout.toMillis(), PunctuationType.WALL_CLOCK_TIME, new MyPunctuator(context, key, event));
        return null;
}

В MyPunctuator:

private class MytPunctuator implements Punctuator {
    private String key;
    private ProcessorContext context;
    private Event event;

    MyPunctuator(ProcessorContext context, String key, Event event)
    {
        this.context = context;
        this.key = key;
        this.event = event;
    }

    @Override
    public void punctuate(final long timestamp) {
        context.forward(key, AlertEvent.builder().withSource(event).build());
        context.commit();
    }
}

При выполнении

myStream
    .groupByKey(Serialized.with(Serdes.String(), Event.serde()))
    .reduce((k, v) -> v)
    .transformValues(() -> valueTransformerWithKey)
    .toStream().to(ALARM_TOPIC, Produced.with(Serdes.String(), AlarmEvent.serde()));

Я ожидаю, что событие Alarm, созданное пунктуатором, будет перенаправлено в тему ALARM после истечения срока действия.

Вместо этого я получил следующее исключение: ProcessorContext.forward () не поддерживается.

1 Ответ

1 голос
/ 30 мая 2019

Как обычно, я нашел ответ в javadoc о ValueTransformerWithKey interface: https://kafka.apache.org/20/javadoc/org/apache/kafka/streams/kstream/ValueTransformerWithKey.html

Обратите внимание, что используется ProcessorContext.forward (Object, Object) или ProcessorContext.forward (Object, Object, To) не разрешен внутри преобразования и приведет к исключению.

Однако реализация интерфейса Transformer вместо этого позволяет использовать контекст .forward () .Спасибо @Matthias J. Sax

https://kafka.apache.org/20/javadoc/org/apache/kafka/streams/kstream/Transformer.html

Если более чем одна выходная запись должна быть переадресована вниз по потоку ProcessorContext.forward (Object, Object) и ProcessorContext.forward (Object, Object, К) можно использовать.Если запись не должна пересылаться вниз по потоку, преобразование может вернуть ноль.

...