Десериализовать сообщение Kafka json с помощью потоковой передачи PySpark - PullRequest
0 голосов
/ 09 марта 2019

У меня есть приложение pyspark, которое принимает сообщения из раздела Kafka, эти сообщения сериализуются как org.apache.kafka.connect.json.JsonConverter.Для этого я использую соединяющий коннектор Kafka JDBC

. Проблема заключается в том, что когда я принимаю сообщения, столбец идентификатора появляется в виде некоторого закодированного текста, такого как "ARM =", когда он должен быть числовым типом..

Вот код, который у меня сейчас есть

spark = SparkSession.builder.appName("my app").getOrCreate()
sc = spark.sparkContext
sc.setLogLevel('WARN')
ssc = StreamingContext(sc, 5)

kafka_params = {
    "bootstrap.servers": "kafkahost:9092",
    "group.id": "Deserialize"
}

kafka_stream = KafkaUtils.createDirectStream(ssc, ['mytopic'], kafka_params)
kafka_stream.foreachRDD(lambda rdd: rdd.foreach(lambda x: print(x))

ssc.start()
ssc.awaitTermination()

Я знаю, что createDirectStream имеет параметр valueDecoder, который я могу установить, проблема в том, что я не знаю, как использовать это для декодирования,Я также знаю о схеме заранее, поэтому я смогу создать ее, если это будет необходимо.

Для справки: это JSON, который я получаю при распечатке rdd.foreach

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "bytes",
        "optional": False,
        "name": "org.apache.kafka.connect.data.Decimal",
        "version": 1,
        "parameters": {
          "scale": "0"
        },
        "field": "ID"
      },
      {
        "type": "string",
        "optional": True,
        "field": "COLUMN1"
      }
    ],
    "optional": False
  },
  "payload": {
    "ID": "AOo=",
    "COLUMN1": "some string"
  }
}

Ответы [ 2 ]

1 голос
/ 22 марта 2019

Итак, как упоминалось в cricket_007, в вашей конфигурации Kafka вы должны установить эту настройку как value.converter.schema.enable=false. Это избавит от поля Schema и оставит вас только с полезной нагрузкой json. Теперь по какой-то причине у меня возникла проблема, когда все мои числовые столбцы кодировались в этом странном формате AOo=. Теперь при использовании Json для сериализации ваших данных, confluent преобразует ваши числовые столбцы, используя base64, но реальная проблема еще до этого. По какой-то причине все мои числовые столбцы были преобразованы в байты. Не знаю точно, почему он это делает, но это как-то связано с тем, как слияния обрабатывают базы данных Oracle. В любом случае, это можно исправить, установив декодер значений в createDirectStream, например

kafka_stream = KafkaUtils.createDirectStream(ssc, ['mytopic'], kafka_params, valueDecoder=decoder)

и в вашем методе декодера вы должны декодировать ваше сообщение из UTF-8, проанализировать json, а затем декодировать ваш числовой столбец из base64, а затем из байтов, например, так:

def decoder(s):
    if s is None:
        return None

    loaded_json = json.loads(s.decode('utf-8'))
    loaded_json["ID"] = int.from_bytes(base64.b64decode(loaded_json['ID']), "big")
    return loaded_json
1 голос
/ 21 марта 2019

В вашей конфигурации Connect вы можете установить value.converter.schema.enable=false, и тогда вы получите только данные "полезной нагрузки" этой записи JSON.

Оттуда я предполагаю, что вы сможете обработать сообщение в соответствии с любым другим примером чтения потокового JSON в PySpark.

В противном случае, поскольку вы не используете структурированную потоковую передачу, для вас нет схемы для определения.Скорее всего, вам придется хотя бы сделать что-то подобное, чтобы просто разобрать записи

rdd.map(lambda x: json.loads(x))\
    .map(lambda x: x['payload'])\
    .foreach(lambda x: print(x))
...