Как выполнять операции на Kstreams с нулевыми ключами? - PullRequest
0 голосов
/ 16 мая 2018

У меня есть объект потока kafka, подобный этому:

[KSTREAM-SOURCE-0000000000]: null, {"id": 1, "name": "john", "age": 26, "updated_at": 1525774480752}
[KSTREAM-SOURCE-0000000000]: null, {"id": 2, "name": "jane", "age": 24, "updated_at": 1525774480784}
[KSTREAM-SOURCE-0000000000]: null, {"id": 3, "name": "julia", "age": 25, "updated_at": 1525774480827}

Когда я пытался выполнить операцию с картой:

KStream<String, mysql> locationsStream = pgsql_users.map((k, v) -> new KeyValue<String, mysql>(k.toString(), new mysql ((Integer) v.get("id"), (String) v.get("name").toString(), (Integer) v.get("age")) ));

я получаю исключение как:

[KSTREAM-SOURCE-0000000000]: null, {"id": 1, "name": "john", "age": 26, "updated_at": 1525774480752}

Исключение в потоке "test-app-0d15a00c-51dc-4c1a-8293-82f47fc7ef90-StreamThread-1" java.lang.NullPointerException в com.aail.kafka_stream.lambda $ main $ 0 (kafka_stream.Java: 86) в org.apache.kafka.streams.kstream.internals.KStreamMap $ KStreamMapProcessor.process (KStreamMap.java:41) в org.apache.kafka.streams.processor.internals.ProcessorNode $ 1.run (ProcessorNode.java: 46) в org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs (StreamsMetricsImpl.java:208) в org.apache.kafka.streams.processor.internals.ProcessorNode.process (ProcessorNode.java:12)org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward (ProcessorContextImpl.java:85) в org.apache.kafka.streams.processor.internals.SourceNode.process (SourceNode.java:80) в org.apache.kafka.streams.processor.internals.StreamTask.process (StreamTask.java:216) в org.apache.kafka.streams.processor.internals.AssignedTasks.process (AssignedTasks.java:403) в org.apache.kafka.streams.processor.internals.TaskManager.процесс (TaskManager.java:317) в org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit (StreamThread.java:942) в org.apache.kafka.streams.processor.internals.StreamThread.runOnce (StreamThread.Java: 822) в org.apache.kafka.streams.processor.internals.StreamThread.runLoop (StreamThread.java:774) в org.apache.kafka.streams.processor.internals.StreamThread.run (StreamThread.java:744).

Я думал, что получаю это, потому что ключ нулевой.Может ли кто-нибудь помочь мне в решении этой проблемы.

1 Ответ

0 голосов
/ 16 мая 2018

Вы используете k.toString(), но k равно нулю.

Возможно, вы захотите использовать либо k как есть, null снова в новом ключе, либо String.valueOf(k)если вам действительно нужно слово "null" (не рекомендуется, поскольку оно отправляет все сообщения в один раздел)

Кроме того, поскольку ключи имеют значение null, вы можете просто установить byte[] в качестве ключевого объекта для потока.,Нет необходимости использовать десериализатор

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...