Почему происходит сбой ksqldb-сервера при обновлении с v0.6.0 до v0.8.1? - PullRequest
1 голос
/ 07 апреля 2020

У меня есть существующий ksqldb-сервер, который я хочу обновить с v0.6.0 с подключением в качестве отдельного узла, до v0.8.1 со встроенным подключением. Однако ksqldb-сервер падает после запуска следующего журнала:

ksqldb-server      | [2020-04-07 07:24:37,266] ERROR Error during restore (io.confluent.ksql.rest.server.computation.CommandRunner:191)
ksqldb-server      | org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition _confluent-ksql-default__command_topic-0 at offset 0. If needed, please seek past the record to continue consumption.
ksqldb-server      | Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "useOffsetAsQueryID" (class io.confluent.ksql.rest.server.computation.Command), not marked as ignorable (4 known properties: "plan", "originalProperties", "streamsProperties", "statement"])
ksqldb-server      |  at [Source: (byte[])"{"statement":"CREATE STREAM KSQL_PROCESSING_LOG (logger VARCHAR, level VARCHAR, time BIGINT, message STRUCT<type INT, deserializationError STRUCT<errorMessage VARCHAR, recordB64 VARCHAR, cause ARRAY<VARCHAR>>, recordProcessingError STRUCT<errorMessage VARCHAR, record VARCHAR, cause ARRAY<VARCHAR>>, productionError STRUCT<errorMessage VARCHAR>>) WITH(KAFKA_TOPIC='default_ksql_processing_log', VALUE_FORMAT='JSON');","useOffsetAsQueryID":true,"streamsProperties":{},"originalProperties":{"ksql.exten"[truncated 1813 bytes]; line: 1, column: 2314] (through reference chain: io.confluent.ksql.rest.server.computation.Command["useOffsetAsQueryID"])
ksqldb-server      | Caused by: com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "useOffsetAsQueryID" (class io.confluent.ksql.rest.server.computation.Command), not marked as ignorable (4 known properties: "plan", "originalProperties", "streamsProperties", "statement"])
ksqldb-server      |  at [Source: (byte[])"{"statement":"CREATE STREAM KSQL_PROCESSING_LOG (logger VARCHAR, level VARCHAR, time BIGINT, message STRUCT<type INT, deserializationError STRUCT<errorMessage VARCHAR, recordB64 VARCHAR, cause ARRAY<VARCHAR>>, recordProcessingError STRUCT<errorMessage VARCHAR, record VARCHAR, cause ARRAY<VARCHAR>>, productionError STRUCT<errorMessage VARCHAR>>) WITH(KAFKA_TOPIC='default_ksql_processing_log', VALUE_FORMAT='JSON');","useOffsetAsQueryID":true,"streamsProperties":{},"originalProperties":{"ksql.exten"[truncated 1813 bytes]; line: 1, column: 2314] (through reference chain: io.confluent.ksql.rest.server.computation.Command["useOffsetAsQueryID"])
ksqldb-server      |    at com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:61)
ksqldb-server      |    at com.fasterxml.jackson.databind.DeserializationContext.handleUnknownProperty(DeserializationContext.java:840)
ksqldb-server      |    at com.fasterxml.jackson.databind.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:1206)
ksqldb-server      |    at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperty(BeanDeserializerBase.java:1592)
ksqldb-server      |    at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperties(BeanDeserializerBase.java:1542)
ksqldb-server      |    at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:504)
ksqldb-server      |    at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1287)
ksqldb-server      |    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:326)
ksqldb-server      |    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:159)
ksqldb-server      |    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4202)
ksqldb-server      |    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3266)
ksqldb-server      |    at io.confluent.ksql.rest.server.computation.InternalTopicSerdes$InternalTopicDeserializer.deserialize(InternalTopicSerdes.java:59)
ksqldb-server      |    at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
ksqldb-server      |    at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1309)
ksqldb-server      |    at org.apache.kafka.clients.consumer.internals.Fetcher.access$3500(Fetcher.java:128)
ksqldb-server      |    at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1540)
ksqldb-server      |    at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1376)
ksqldb-server      |    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:676)
ksqldb-server      |    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:631)
ksqldb-server      |    at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1315)
ksqldb-server      |    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1248)
ksqldb-server      |    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
ksqldb-server      |    at io.confluent.ksql.rest.server.CommandTopic.getRestoreCommands(CommandTopic.java:88)
ksqldb-server      |    at io.confluent.ksql.rest.server.computation.CommandStore.getRestoreCommands(CommandStore.java:211)
ksqldb-server      |    at io.confluent.ksql.rest.server.computation.CommandRunner.processPriorCommands(CommandRunner.java:155)
ksqldb-server      |    at io.confluent.ksql.rest.server.KsqlRestApplication.initialize(KsqlRestApplication.java:424)
ksqldb-server      |    at io.confluent.ksql.rest.server.KsqlRestApplication.startKsql(KsqlRestApplication.java:354)
ksqldb-server      |    at io.confluent.ksql.rest.server.KsqlRestApplication.startAsync(KsqlRestApplication.java:338)
ksqldb-server      |    at io.confluent.ksql.rest.server.ExecutableServer.startAsync(ExecutableServer.java:47)
ksqldb-server      |    at io.confluent.ksql.rest.server.MultiExecutable.doAction(MultiExecutable.java:63)
ksqldb-server      |    at io.confluent.ksql.rest.server.MultiExecutable.startAsync(Mult iExecutable.java:42)
ksqldb-server      |    at io.confluent.ksql.rest.server.KsqlServerMain.tryStartApp(KsqlServerMain.java:75)
ksqldb-server      |    at io.confluent.ksql.rest.server.KsqlServerMain.main(KsqlServerMain.java:61)
ksqldb-server      | [2020-04-07 07:24:37,286] INFO Server shutting down (io.confluent.ksql.rest.server.KsqlServerMain:79)

Ключевое сообщение здесь: Нераспознанное поле «useOffsetAsQueryID» (класс io.confluent.k sql .rest.server .computation.Command), не помечен как игнорируемый (4 известных свойства: "plan", "originalProperties", "streamsProperties", "Statement"])

Единственный ресурс, который я смог найти, - это эта функция но это было уже в v0.6.0: https://github.com/confluentinc/ksql/pull/3343

Есть идеи? Спасибо за вашу помощь.

1 Ответ

3 голосов
/ 07 апреля 2020

Произошли критические изменения с 0,6 до 0,8,1. Вы видите ошибку, потому что K SQL 0.8.1 не может использовать потоки 0.6, прочитанные из команды topi c. Чтобы исправить это, выполните следующие инструкции по обновлению: https://github.com/confluentinc/ksql/blob/master/docs-md/operate-and-deploy/installation/upgrading.md

Было бы проще, если бы вы могли go вернуться к своей версии 0.6, чтобы получить все потоки / таблицы / типы из K SQL, затем воссоздайте их в 0.8.1.

...