JSONParseException - чтение данных с помощью API отдыха Kafka - PullRequest
0 голосов
/ 31 декабря 2018

KAFKA TOPIC (test3)

$ kafka-console-consumer --bootstrap-server broker:9092 --topic test3 --from-beginning

"Can we write to a topic that does not exist?"
"Can we write to a topic that does not exist?"
{"foo":"bar"}
["foo","bar"]
confluent
confluent
confluent
kafka
logs


0

0

Потребитель (API-интерфейс kafka-rest на localhost:8082)

  1. Создатьпотребитель POST запрос к http://localhost:8082/consumers/rested

Тело запроса:

 {
   "format": "json",
   "auto.offset.reset": "earliest",
   "auto.commit.enable": "false"
 }

Тело ответа:

{
   "instance_id": "rest-consumer-dfa6ee0e-4f24-46dc-b0dc-dda3b80866ff",

   "base_uri": "http://rest-proxy:8082/consumers/rested/instances/rest-consumer-dfa6ee0e-4f24-46dc-b0dc-dda3b80866ff"

}
Создание подписки с использованием POST на http://localhost:8082/consumers/rested/instances/rest-consumer-dfa6ee0e-4f24-46dc-b0dc-dda3b80866ff/subscription

с использованием заголовков:

Host: http://localhost:8082
Content-Type: application/vnd.kafka.v2+json

и тела запроса:

{
    "topics": [
      "test3"
    ]
}

возвращаетОтвет 204 No Content.

Считайте записи, сделав запрос GET на http://localhost:8082/consumers/rested/instances/rest-consumer-dfa6ee0e-4f24-46dc-b0dc-dda3b80866ff/records

, используя Заголовки:

Host: http://localhost:8082
Accept: application/vnd.kafka.json.v2+json

возвращает Ответ:

{
    "error_code": 50002,
    "message": "Kafka error: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'key': was expecting ('true', 'false' or 'null')\n at [Source: (byte[])\"key\"; line: 1, column: 7]"
}

Как мы можем решить эту проблему и обеспечить получение данных?

Исключение (на Kafka)

В журнале работающего прокси-сервера Kafka Rest Pro есть следующее исключение:

rest-proxy         | [2018-12-31 03:09:27,232] INFO 172.25.0.1 - - [31/Dec/2018:03:09:26 +0000] "GET /consumers/rest-consumer/instances/rest-consumer-8e49873e-13ce-46a5-be1f-0237a0369efe/records HTTP/1.1" 500 211  341 (io.confluent.rest-utils.requests)
rest-proxy         | [2018-12-31 03:09:27,235] ERROR Unexpected exception in consumer read task id=io.confluent.kafkarest.v2.KafkaConsumerReadTask@59611e28  (io.confluent.kafkarest.v2.KafkaConsumerReadTask)
rest-proxy         | org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'key': was expecting ('true', 'false' or 'null')
rest-proxy         |  at [Source: (byte[])"key"; line: 1, column: 7]
rest-proxy         | Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'key': was expecting ('true', 'false' or 'null')
rest-proxy         |  at [Source: (byte[])"key"; line: 1, column: 7]
rest-proxy         |    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1804)
rest-proxy         |    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:679)
rest-proxy         |    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3526)
rest-proxy         |    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2621)
rest-proxy         |    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:826)
rest-proxy         |    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:723)
rest-proxy         |    at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4141)
rest-proxy         |    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4000)
rest-proxy         |    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3091)
rest-proxy         |    at io.confluent.kafkarest.v2.JsonKafkaConsumerState.deserialize(JsonKafkaConsumerState.java:79)
rest-proxy         |    at io.confluent.kafkarest.v2.JsonKafkaConsumerState.createConsumerRecord(JsonKafkaConsumerState.java:64)
rest-proxy         |    at io.confluent.kafkarest.v2.KafkaConsumerReadTask.maybeAddRecord(KafkaConsumerReadTask.java:158)
rest-proxy         |    at io.confluent.kafkarest.v2.KafkaConsumerReadTask.addRecords(KafkaConsumerReadTask.java:142)
rest-proxy         |    at io.confluent.kafkarest.v2.KafkaConsumerReadTask.doPartialRead(KafkaConsumerReadTask.java:99)
rest-proxy         |    at io.confluent.kafkarest.v2.KafkaConsumerManager$RunnableReadTask.run(KafkaConsumerManager.java:370)
rest-proxy         |    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
rest-proxy         |    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
rest-proxy         |    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
rest-proxy         |    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
rest-proxy         |    at java.lang.Thread.run(Thread.java:748)

CLI групп потребителей

Я могу просмотреть группу потребителей в CLI, но у нее нет активных членов:

$ kafka-consumer-groups --bootstrap-server broker:9092 --list

имеет результат:

console-consumer-60695
console-consumer-62259
console-consumer-19307
console-consumer-47906
console-consumer-40838
rested

Однако, когда я пытаюсь получить members:

$ kafka-consumer-groups --bootstrap-server localhost:29092 --group rest-consumer --describe --members

Consumer group 'rested' has no active members.

1 Ответ

0 голосов
/ 01 января 2019

TL; DR

Вам нужно заключить ключ в двойные кавычки, не потому, что все ключи должны быть заключены в кавычки, но с помощью синтаксического анализатора JSON вам нужно сделать свой ключдопустимый JSON и строка, заключенная в двойные кавычки, является допустимым JSON.

Если вам действительно нужно обработать это сообщение, вам необходимо прочитать его в формате, отличном от JSON.

Длинный ответ

У вас есть запись с ключом, который не имеет кавычек, что делает значение JSON недопустимым, поэтому, когда анализатор JSON Jackson пытается проанализировать ключ, это недопустимый JSON (что не ясноиз сообщения об ошибке, но когда он не видит кавычку, квадратную или фигурную скобку, он начинает предполагать, что это логическое или нулевое значение).

Вы можете видеть, где он захватывает их ключ и пытается расшифровать его какJSON здесь

https://github.com/confluentinc/kafka-rest/blob/a9b7cc527a26fdf09db27d148f2e71bfe3d87a6a/kafka-rest/src/main/java/io/confluent/kafkarest/v2/JsonKafkaConsumerState.java#L69

Я смог воспроизвести вашу ошибку, используя этот метод

curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \
      --data '{"name": "my_consumer_instance", "format": "json", "auto.offset.reset": "latest"}' \
      http://localhost:8082/consumers/my_json_consumer

curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["testjsontopic"]}' \
 http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/subscription


./bin/kafka-console-producer \
  --broker-list :9092 \
  --topic testjsontopic \
  --property parse.key=true \
  --property key.separator="&"

>"key"&{"foo":"bar"}

*Ctrl-C

curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" \
      http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/records

На данный момент я могу прочитатьзапись, но когда я добавляю ключ безя получаю ту же ошибку, что и вы

./bin/kafka-console-producer \
  --broker-list :9092 \
  --topic testjsontopic \
  --property parse.key=true \
  --property key.separator="&"

>key&{"foo":"bar"}

Теперь, когда я вызываю этот код

curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" \
      http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/records

Теперь я получаю эту ошибку

com.fasterxml.jackson.core.JsonParseException: нераспознанный токен «ключ»: ожидался («истина», «ложь» или «ноль»)

Используйте это, чтобы читать ключи вашей темы также

./bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic testjsontopic --property print.key=true --from-beginning
...