строка в двойных кавычках после операции отображения в потоках Кафки - PullRequest
0 голосов
/ 03 сентября 2018

Я использую потоки Кафки DSL и карту для преобразования KStream<String, JsonNode> в KStream<String, String>.

В функции ValueMapper я просто возвращаю new ValueMapper("key", "some constant string"), но там, где значение отправляется обратно в Kafka, используется KStream.to("some topic"), я получил результат, добавляется двойные кавычки.

мой код такой:

KStream<String, JsonNode> views = builder.stream("fromTopic");
views.map(new ValueKMapper()).to("toTopic");

и ValueMapper просто реализуют KeyValueMapper, а код apply():

public KeyValue<String, String> apply(String key, JsonNode value) 
{
    return new KeyValue<String,String>("a", "hello");
}

и затем, когда я потребляю toTopic, я получаю ""hello"" с добавленными кавычками.

Может быть, это ошибка потоков Кафки?

1 Ответ

0 голосов
/ 04 сентября 2018

Я предполагаю, что метод apply(), который вы указали в своем вопросе

public KeyValue<String, String> apply(String key, JsonNode value) 
{
    return new KeyValue<String,String>("a", "hello");
}

не передает жестко закодированный ключ и значение конструктору KeyValue. Я предполагаю, что проблема как-то связана с JsonNode. Возможно, фактическая реализация вашего метода использует value.get(key), т.е.

public KeyValue<String, String> apply(String key, JsonNode value) 
{
    return new KeyValue<String,String>(key, value.get(key));
}

Однако value.get(key) вернет TextNode, а метод toString() вернет строковое представление TextNode, включая кавычки. Чтобы правильно проанализировать JsonNode, вам нужно использовать метод textValue(), чтобы ваш метод стал

public KeyValue<String, String> apply(String key, JsonNode value) 
{
    return new KeyValue<String,String>(key, value.get(key).textValue());
}

Пример: * * тысяча двадцать одна Предполагая, что у вас есть ключ a и значение hello,

json.get("a").toString())

вернет "hello", а

json.get("a").textValue();

вернет hello

...