Kafka Stream-GlobalKTable присоединяется к указанному полю c - PullRequest
0 голосов
/ 03 августа 2020

Итак, у меня есть KStream, который десериализуется в POJO, вот так

public class FinancialMessage {

public String user_id;
public String stock_symbol;
public String exchange_id;

}

А вот как выглядит запись Global Ktable

public class CompanySectors {

public String company_id;
public String company_name;
public String tckr;
public String sector_cd;
}

Я хочу иметь возможность соедините поле KStream stock_symbol с полем Ktable tckr. Это возможно? Я хочу создать новый объект EnrichedMessage, прежде чем передавать его в другой topi c. У меня был код, как показано ниже, но я, кажется, получаю исключения с нулевым указателем.

Exception in thread "trade-enrichment-stream-0c7e7782-4217-4450-8086-21871b4ebc45-StreamThread-1" java.lang.NullPointerException
    at com.domain.EnrichedMessage.<init>(EnrichedMessage.java:51)
    at com.domain.TradeEnrichmentTopology.lambda$3(TradeEnrichmentTopology.java:73)
    at org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:79)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:101)
    at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$3(StreamTask.java:383)
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:801)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:383)
    at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:475)
    at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:550)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:802)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)

Вот как выглядит фрагмент кода.

KStream<String, FinancialMessage> financialMessageStream =
        builder.stream(
            INCOMING_TOPIC,
            Consumed.with(Serdes.String(), financialMessageSerde)
        );

    GlobalKTable<String, CompanySectors> companySectorsStore = 
        builder.globalTable(
            KTABLE_TOPIC,
            Consumed.with(Serdes.String(), companySectorsSerde)
    );
    
    KStream<String, EnrichedMessage> enrichedStream = financialMessageStream.leftJoin(
        companySectorsStore,
        (financialMessageKey, financialMessageValue) -> financialMessageValue.stock_symbol,
        (financialMessageValue, companySectorsValue) -> new EnrichedMessage(financialMessageValue, companySectorsValue)
    );
    
    enrichedStream.to(
        OUTGOING_TOPIC,
        Produced.with(Serdes.String(), enrichedMessageSerde));

Я полагаю, что может быть какая-то ошибка у меня слева Присоединяйтесь к логам c.

1 Ответ

4 голосов
/ 03 августа 2020

При выполнении левого соединения вы можете предположить, что запись левого потока не равна нулю; однако вы не можете предположить, что правильный GlobalKTable будет иметь запись для соответствия данному ключу, и поэтому результирующая запись может быть нулевой. В вашем случае, когда вы создаете экземпляр new EnrichedMessage(financialMessageValue, companySectorsValue), уверены ли вы, что companySectorsValue не является нулевым? Если он равен нулю, правильно ли вы его обрабатываете? Похоже, что ваш NPE встречается в конструкторе EnrichedMessage, поэтому просто убедитесь, что вы знаете, что companySectorsValue может иметь значение null.

Кроме того, убедитесь, что ваш GlobalKTable предварительно заполнен перед любым присоединением logi c происходит.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...