Как использовать PySpark и структурированный поток для правильного анализа потока Kafka и удаления None из данных - PullRequest
0 голосов
/ 04 января 2019

Когда я использую структурированную потоковую передачу PySpark2.4 для анализа данных из Kafka, у меня возникает вопрос о функции from_json в pyspark.sql.functions модуле.

from pyspark.sql.functions import *
from pyspark.sql.types import *

schema = StructType([
    StructField("id", StringType(), True),
    StructField("mobile", StringType(), True),
    StructField("email", StringType()),
    StructField("created_time", TimestampType(), True),
    StructField("created_ip", StringType(), True),
])

data = {
    "id": "11111",
    "mobile": "18212341234",
    "created_time": '2019-01-03 15:40:27',
    "created_ip": "11.122.68.106",
}

data_list = [(1, str(data))]
df = spark.createDataFrame(data_list, ("key", "value"))
df.select(from_json("value", schema).alias("json")).collect()

[Row (json =Строка (id = '11111', мобильный телефон = '18212341234', электронная почта = нет, create_time = datetime.datetime (2019, 1, 3, 15, 40, 27), creation_ip = '11 .122.68.106 '))]

Вышеуказанный код верен и может работать.Но что-то меня смутило за кодом.

from pyspark.sql.functions import *
from pyspark.sql.types import *

schema = StructType([
    StructField("id", StringType(), True),
    StructField("mobile", StringType(), True),
    StructField("email", StringType()),
    StructField("created_time", TimestampType(), True),
    StructField("created_ip", StringType(), True),
])

data = {
    "id": "11111",
    "mobile": "18212341234",
    "email": None,
    "created_time": '2019-01-03 15:40:27',
    "created_ip": "11.122.68.106",
}

data_list = [(1, str(data))]
df = spark.createDataFrame(data_list, ("key", "value"))
df.select(from_json("value", schema).alias("json")).collect()

[Row (json = None)]

Я только добавляю "email": None в словарь данных,и функция from_json не может правильно проанализировать данные в DataFrame.Потому что я читаю эти данные напрямую от Кафки и не знаю, как с ними обращаться в первую очередь.Стоит ли сначала удалить значение None в данных или я могу использовать другие функции для правильного анализа данных?

Не могли бы вы мне помочь?Большое спасибо.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...