У меня есть приложение pyspark, которое принимает сообщения из раздела Kafka, эти сообщения сериализуются как org.apache.kafka.connect.json.JsonConverter
.Для этого я использую соединяющий коннектор Kafka JDBC
. Проблема заключается в том, что когда я принимаю сообщения, столбец идентификатора появляется в виде некоторого закодированного текста, такого как "ARM =", когда он должен быть числовым типом..
Вот код, который у меня сейчас есть
spark = SparkSession.builder.appName("my app").getOrCreate()
sc = spark.sparkContext
sc.setLogLevel('WARN')
ssc = StreamingContext(sc, 5)
kafka_params = {
"bootstrap.servers": "kafkahost:9092",
"group.id": "Deserialize"
}
kafka_stream = KafkaUtils.createDirectStream(ssc, ['mytopic'], kafka_params)
kafka_stream.foreachRDD(lambda rdd: rdd.foreach(lambda x: print(x))
ssc.start()
ssc.awaitTermination()
Я знаю, что createDirectStream имеет параметр valueDecoder, который я могу установить, проблема в том, что я не знаю, как использовать это для декодирования,Я также знаю о схеме заранее, поэтому я смогу создать ее, если это будет необходимо.
Для справки: это JSON, который я получаю при распечатке rdd.foreach
{
"schema": {
"type": "struct",
"fields": [
{
"type": "bytes",
"optional": False,
"name": "org.apache.kafka.connect.data.Decimal",
"version": 1,
"parameters": {
"scale": "0"
},
"field": "ID"
},
{
"type": "string",
"optional": True,
"field": "COLUMN1"
}
],
"optional": False
},
"payload": {
"ID": "AOo=",
"COLUMN1": "some string"
}
}