Не могу прочитать строку CSV, используя PySpark - PullRequest
0 голосов
/ 25 июня 2019

Сценарий: EventHub -> Azure Databricks (с использованием pyspark)

Формат файла: CSV (в кавычках, с разделителями труб и пользовательской схемой)

Я пытаюсь прочитать строки CSV, поступающие из eventhub.Spark успешно создает фрейм данных с правильной схемой, но фрейм данных заканчивается пустым после каждого сообщения.

Мне удалось провести несколько тестов вне потоковой среды, и при получении данных из файла все идет хорошо,но происходит сбой, когда данные поступают из строки.

Так что я нашел несколько ссылок, чтобы помочь мне в этом, но ни одна не работала:

can-i-read-a-csv-represented-as-a-string-in-apache-spark-using-spark-csv? rq = 1

Pyspark - преобразование строки json в DataFrame

Прямо сейчас у меня есть код ниже:

schema = StructType([StructField("Decisao",StringType(),True), StructField("PedidoID",StringType(),True), StructField("De_LastUpdated",StringType(),True)])
body = 'DECISAO|PEDIDOID|DE_LASTUPDATED\r\n"asdasdas"|"1015905177"|"sdfgsfgd"'
csvData = sc.parallelize([body])

df = spark.read \
.option("header", "true") \
.option("mode","FAILFAST") \
.option("delimiter","|") \
.schema(schema) \
.csv(csvData)

df.show()

Это вообще возможно сделать с файлами CSV?

1 Ответ

0 голосов
/ 25 июня 2019

Вы можете создать такую ​​схему с помощью Row и split на | разделитель

from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import Row
body = 'DECISAO|PEDIDOID|DE_LASTUPDATED\r\n"asdasdas"|"1015905177"|"sdfgsfgd"'
csvData = sc.parallelize([body])
schemaDF = csvData\
.map(lambda x: x.split("|"))\
.map(lambda x: Row(x[0],\
                   x[1],\
                   x[2],\
                   x[3],\
                   x[4]))\
.toDF(["Decisao", "PedidoID", "De_LastUpdated", "col4", "col5"])

for i in schemaDF.take(1): print(i)
Row(Decisao='DECISAO', PedidoID='PEDIDOID', De_LastUpdated='DE_LASTUPDATED\r\n"asdasdas"', col4='"1015905177"', col5='"sdfgsfgd"')

schemaDF.printSchema()
root
 |-- Decisao: string (nullable = true)
 |-- PedidoID: string (nullable = true)
 |-- De_LastUpdated: string (nullable = true)
 |-- col4: string (nullable = true)
 |-- col5: string (nullable = true)

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...