Конвертировать RDD в Dataframe в Spark Streaming Python - PullRequest
0 голосов
/ 13 декабря 2018

Я пытаюсь преобразовать RDD в DataFrame в Spark Streaming.Я слежу за процессом ниже.

socket_stream = ssc.socketTextStream("localhost", 9999)
def convert_to_df(rdd):
    schema = StructType([StructField("text", StringType(), True)])
    df =spark.createDataFrame(rdd, schema = schema)
    df.show(10)

socket_stream.foreachRDD(convert_to_df)

Я предоставляю ввод через сокет nc -lk 9999

Если я задаю «привет мир» в качестве ввода, он показывает мне ошибку ниже

StructType can not accept object 'hello world' in type <class 'str'>

ожидаемый результат

+-------=-+
|text     |
+---------+
hello world
+---------+

Ответы [ 2 ]

0 голосов
/ 13 декабря 2018

Поскольку вы используете RDD[str], вы должны либо указать соответствующий тип.Для атомарного значения это либо соответствующий AtomicType

from pyspark.sql.types import StringType, StructField, StructType

rdd = sc.parallelize(["hello world"])
spark.createDataFrame(rdd, StringType())

, либо описание его строки:

spark.createDataFrame(rdd, "string")

Если вы хотите использовать StructType конвертирование данных в tuples сначала:

schema = StructType([StructField("text", StringType(), True)])

spark.createDataFrame(rdd.map(lambda x: (x, )), schema)

Конечно, если вы собираетесь просто конвертировать каждую партию в DataFrame, имеет гораздо больше смысла использовать структурированную потоковую передачу полностью:

lines = (spark
    .readStream
    .format("socket")
    .option("host", "localhost")
    .option("port", 9999)
    .load())
0 голосов
/ 13 декабря 2018

Попробуйте ArrayType(StringType())

Иначе, поскольку у вас есть только один столбец, попробуйте указать схему непосредственно как

df =spark.createDataFrame(rdd, StringType())

Проверьте udf для pyspark, так как вам нужно объявить udf для spark

...