Apache Проверка схемы Пульсара с помощью строки json - PullRequest
1 голос
/ 22 марта 2020

В моем случае у меня есть некоторые необработанные строковые данные JSON, отправленные в topi c, и я не могу жестко кодировать класс POJO, я хочу использовать функцию схемы пульсара для проверки структуры. У меня есть topi c "my-topi c" и связана со схемой JSON ниже, затем я пытаюсь передать какое-то сообщение.

var producer = client.newProducer(Schema.AUTO_PRODUCE_BYTES();
producer.send("{\"y\": 1}".getBytes()); // here! the value is 1(number) not string.

var reader = client.newReader(Schema.AUTO_CONSUME())
var message = reader.readNext();
I got {"y": 1}

мой вопрос, как работает схема pulsar? Сообщение должно быть отклонено.

{
  "version": 1,
  "schemaInfo": {
    "name": "my-topic",
    "schema": {
      "type": "record",
      "name": "Data",
      "namespace": "com.iot.test",
      "fields": [
        {
          "name": "y",
          "type": [
            "null",
            "string"
          ]
        }
      ]
    },
    "type": "JSON",
    "properties": {
      "__alwaysAllowNull": "true"
    }
  }
}

Ответы [ 2 ]

1 голос
/ 23 марта 2020

моя вина. просто нужно установить

v2.5.0
bin/pulsar-admin namespaces set-is-allow-auto-update-schema --disable iot/test

v2.4.2
bin/pulsar-admin namespaces set-schema-autoupdate-strategy --disable iot/test
0 голосов
/ 22 марта 2020

Параметр Schema.AUTO_PRODUCE_BYTES полезен для передачи данных от производителя в Pulsar topi c, который имеет схему, поскольку он гарантирует, что отправленное сообщение совместимо со схемой topi c. Однако я не вижу, где вы указали схему для topi c.

Топи c автоматически назначается схема при подключении типизированного производителя или потребителя, например,

Producer producer = client.newProducer(JSONSchema.of(SensorReading.class))
    .topic("sensor-data")
    .sendTimeout(3, TimeUnit.SECONDS)
    .create();

Но вы заявили, что не можете этого сделать, потому что «не можете» жесткий код POJO ". Поэтому единственная другая возможность назначить схему для topi c (чтобы обеспечить совместимость со схемой сообщений) - это использовать вызовы REST API для ручного управления схемой.

Исходя из вашей схемы, ваш файл определения схемы будет выглядеть примерно так:

{
  "type": "JSON",
  "schema": "{\"type\":\"record\",\"name\":\"Data\",\"namespace\":\"com.iot.test\",\"fields\":[{\"name\":\"y\",\"type\":[\"null\",\"string\"],\"default\":null}}",
  "properties": {}
}

HTH

...