Я пытаюсь прочитать потоковые данные JSON из Kinesis в PySpark. Мой JSON выглядит следующим образом:
{'installmentNo': '10', 'loanId': '1'}
Я указал схему, но я получаю «ноль», когда spark читает данные.Ниже приведен фрагмент кода.
from pyspark.sql.types import *
from pyspark.sql.functions import from_json
fields = [
StructField("installmentNo", IntegerType(), True),
StructField("loanId", IntegerType(), True)
]
pythonSchema = StructType(fields)
kinesisDf = spark.readStream \
.format("kinesis")\
.option("streamName", kinesisStreamName)\
.option("region", kinesisRegion)\
.option("initialPosition", "latest")\
.option("awsAccessKey", awsAccessKeyId)\
.option("awsSecretKey", awsSecretKey).load()
dataDevicesDF = kinesisDf.selectExpr("cast (data as STRING) my_json_data").select(from_json("my_json_data", pythonSchema).alias("yp_inst")).select("yp_inst.*")
display(dataDevicesDF)
Вывод:
Однако, когда я удаляю часть 'from_json', я получаюодин столбец со строкой JSON.Но я хочу разбить JSON на конкретные столбцы и получить данные как df.Может кто-нибудь предложить мне изменения?