Расширить массив до столбцов в spark со структурированным потоковым - PullRequest
0 голосов
/ 28 октября 2018

У меня есть эта проблема:

Я читаю данные из Kafka с использованием структурированной потоковой передачи, данные представляют собой строки CSV.Когда я получаю данные от Кафки, у меня есть потоковый фрейм данных, где строка CSV находится внутри «значения», и это последовательность байтов.

 sDF2 = sDF.selectExpr("CAST(value as string)").select( split("value",","))

, используя это, у меня есть новый фрейм данных, где «значение» являетсястрока и это строка CSV.

Как получить новый кадр данных, где я проанализировал и разделил поля CSV на столбцы данных?

Пример: строка csv - это «abcd, 123, frgh, 1321»

sDF schema, which contains the data downloaded from Kafka, is  
key, value, topic, timestamp etc... and here value is a byte sequence with no type

sDF2.schema has only a column ( named value of type string )

Мне нравится, что новый фрейм данных

sDF3.col1 = abcd
sDF3.col2 = 123
sDF3.col3 = frgh ...etc

, где все столбцыСтрока.

Я все еще могу сделать это:

 sDF3 = sDF2.select( sDF2.csv[0].alias("EventId").cast("string"),
 sDF2.csv[1].alias("DOEntitlementId").cast("string"),               
 sDF2.csv[3].alias("AmazonSubscriptionId").cast("string"),
 sDF2.csv[4].alias("AmazonPlanId").cast("string"),
 ... etc ... 

но это выглядит некрасиво.

1 Ответ

0 голосов
/ 11 февраля 2019

Я не пробовал, но что-то вроде этого должно работать.

sDF2 = 
      sDF.selectExpr("CAST(value as string)")
       .alias("csv").select("csv.*")
       .select("split(value,',')[0] as DOEntitlementId", 
               "split(value,',')[1] as AmazonSubscriptionId", 
               "split(value,',')[2] as AmazonPlanId")
...