Проблема при чтении JSON из Kinesis в Pyspark - PullRequest
0 голосов
/ 21 сентября 2018

Я пытаюсь прочитать потоковые данные JSON из Kinesis в PySpark. Мой JSON выглядит следующим образом:

{'installmentNo': '10', 'loanId': '1'}

Я указал схему, но я получаю «ноль», когда spark читает данные.Ниже приведен фрагмент кода.

from pyspark.sql.types import *
from pyspark.sql.functions import from_json

fields = [

  StructField("installmentNo", IntegerType(), True),
  StructField("loanId", IntegerType(), True)

]
pythonSchema = StructType(fields)

kinesisDf = spark.readStream \
.format("kinesis")\
.option("streamName", kinesisStreamName)\
.option("region", kinesisRegion)\
.option("initialPosition", "latest")\
.option("awsAccessKey", awsAccessKeyId)\
.option("awsSecretKey", awsSecretKey).load()

dataDevicesDF = kinesisDf.selectExpr("cast (data as STRING) my_json_data").select(from_json("my_json_data", pythonSchema).alias("yp_inst")).select("yp_inst.*")
display(dataDevicesDF)

Вывод:

enter image description here

Однако, когда я удаляю часть 'from_json', я получаюодин столбец со строкой JSON.Но я хочу разбить JSON на конкретные столбцы и получить данные как df.Может кто-нибудь предложить мне изменения?

1 Ответ

0 голосов
/ 21 сентября 2018

Схема неверна - ваши данные строковые, а вы объявляете целые числа.

Пожалуйста, измените определение на

pythonSchema = StructType([
    StructField("installmentNo", StringType(), True),
    StructField("loanId", StringType(), True)
])

и приведите к выводу:

from_json(
    "my_json_data", pythonSchema
).cast("struct<installmentNo: integer, loanId: integer>"))

Остальная часть кода должна оставаться как есть, хотя для ясности вы можете явно установить параметры (поскольку ввод не является стандартным JSON):

from_json(
    "my_json_data", pythonSchema, {"allowSingleQuotes": "true"}
).cast("struct<installmentNo: integer, loanId: integer>"))
...