Когда я использую структурированную потоковую передачу PySpark2.4 для анализа данных из Kafka, у меня возникает вопрос о функции from_json
в pyspark.sql.functions
модуле.
from pyspark.sql.functions import *
from pyspark.sql.types import *
schema = StructType([
StructField("id", StringType(), True),
StructField("mobile", StringType(), True),
StructField("email", StringType()),
StructField("created_time", TimestampType(), True),
StructField("created_ip", StringType(), True),
])
data = {
"id": "11111",
"mobile": "18212341234",
"created_time": '2019-01-03 15:40:27',
"created_ip": "11.122.68.106",
}
data_list = [(1, str(data))]
df = spark.createDataFrame(data_list, ("key", "value"))
df.select(from_json("value", schema).alias("json")).collect()
[Row (json =Строка (id = '11111', мобильный телефон = '18212341234', электронная почта = нет, create_time = datetime.datetime (2019, 1, 3, 15, 40, 27), creation_ip = '11 .122.68.106 '))]
Вышеуказанный код верен и может работать.Но что-то меня смутило за кодом.
from pyspark.sql.functions import *
from pyspark.sql.types import *
schema = StructType([
StructField("id", StringType(), True),
StructField("mobile", StringType(), True),
StructField("email", StringType()),
StructField("created_time", TimestampType(), True),
StructField("created_ip", StringType(), True),
])
data = {
"id": "11111",
"mobile": "18212341234",
"email": None,
"created_time": '2019-01-03 15:40:27',
"created_ip": "11.122.68.106",
}
data_list = [(1, str(data))]
df = spark.createDataFrame(data_list, ("key", "value"))
df.select(from_json("value", schema).alias("json")).collect()
[Row (json = None)]
Я только добавляю "email": None
в словарь данных,и функция from_json не может правильно проанализировать данные в DataFrame.Потому что я читаю эти данные напрямую от Кафки и не знаю, как с ними обращаться в первую очередь.Стоит ли сначала удалить значение None в данных или я могу использовать другие функции для правильного анализа данных?
Не могли бы вы мне помочь?Большое спасибо.