Как я могу преобразовать столбец df [JSON_Format] в несколько столбцов в PySpark? - PullRequest
0 голосов
/ 13 января 2020

Я получил данные в формате JSON от Kafka и считал данные в виде DataFrame в PySpark.

После того, как я получил данные из Kafka, они появились в формате DataFrame:

DataFrame[value: string]

Однако значение содержит формат JSON / DICT.

Печать устава и возврат:

def print_row(row):
    print(row)
    pass

testing.writeStream.foreach(print_row).start()
Row(value='{col_1 =80.0, timestamp=2020-01-13T08:58:58.164Z}')

Как преобразовать значение (JSON) в столбцы DATAFRAME, например:

col_1  timestamp
80.0   2020-01-13T08:58:58.164Z

Ответы [ 2 ]

0 голосов
/ 13 января 2020

Определить схему и проанализировать JSON.

Скопировано из https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html

# value schema: { "a": 1, "b": "string" }
schema = StructType().add("a", IntegerType()).add("b", StringType())
df.select( \
  col("key").cast("string"),
  from_json(col("value").cast("string"), schema))
0 голосов
/ 13 января 2020

DataFrame может быть создан для набора данных JSON, представленного RDD [String], хранящего один JSON объект на строку.

jsonStrings = ['{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}']
otherPeopleRDD = sc.parallelize(jsonStrings)
otherPeople = spark.read.json(otherPeopleRDD)
otherPeople.show()
...