У меня есть эта проблема:
Я читаю данные из Kafka с использованием структурированной потоковой передачи, данные представляют собой строки CSV.Когда я получаю данные от Кафки, у меня есть потоковый фрейм данных, где строка CSV находится внутри «значения», и это последовательность байтов.
sDF2 = sDF.selectExpr("CAST(value as string)").select( split("value",","))
, используя это, у меня есть новый фрейм данных, где «значение» являетсястрока и это строка CSV.
Как получить новый кадр данных, где я проанализировал и разделил поля CSV на столбцы данных?
Пример: строка csv - это «abcd, 123, frgh, 1321»
sDF schema, which contains the data downloaded from Kafka, is
key, value, topic, timestamp etc... and here value is a byte sequence with no type
sDF2.schema has only a column ( named value of type string )
Мне нравится, что новый фрейм данных
sDF3.col1 = abcd
sDF3.col2 = 123
sDF3.col3 = frgh ...etc
, где все столбцыСтрока.
Я все еще могу сделать это:
sDF3 = sDF2.select( sDF2.csv[0].alias("EventId").cast("string"),
sDF2.csv[1].alias("DOEntitlementId").cast("string"),
sDF2.csv[3].alias("AmazonSubscriptionId").cast("string"),
sDF2.csv[4].alias("AmazonPlanId").cast("string"),
... etc ...
но это выглядит некрасиво.