Как расширить горизонтальные данные до вертикальных в кадре данных? - PullRequest
1 голос
/ 13 июня 2019

У меня есть текстовый файл.Теперь я хочу расширить горизонтальные данные, чтобы они были вертикальными.Используя поля из первого поля указанного файла в качестве ключа, горизонтально расположенные данные поля расширяются по вертикали и переупорядочиваются.Что мне делать?

Это мой ввод:

0000000 aa______ 50 F 91 59 20 76 
0000001 bb______ 50 F 46 39 8 5 
0000003 cc______ 26 F 30 50 71 36 
0000004 dd______ 40 M 58 71 20 10

Exp1: Теперь я хочу развернуть по вертикали для каждых 2 полей с первым значением, сохраненным в каждой строке.Я хочу получить желаемый результат вывода, как показано ниже.

0000000 aa______ 50 
0000000 F 91 
0000000 59 20 
0000000 76 
0000001 bb______ 50 
0000001 F 46 
0000001 39 8 
0000001 5 
0000003 cc______ 26 
0000003 F 30 
0000003 50 71 
0000003 36 
0000004 dd______ 40 
0000004 M 58 
0000004 71 20 
0000004 10

Exp2: То же, что и выше, но с сохранением первых двух значений в каждой строке.

0000000 aa______ 50 F 
0000000 aa______ 91 59 
0000000 aa______ 20 76 
0000001 bb______ 50 F 
0000001 bb______ 46 39 
0000001 bb______ 8 5 
0000003 cc______ 26 F 
0000003 cc______ 30 50 
0000003 cc______ 71 36 
0000004 dd______ 40 M 
0000004 dd______ 58 71 
0000004 dd______ 20 10

Это мой код,но это не работает правильно.

val df = sc.textFile("/home/ubuntu/spark-2.4.3-bin-hadoop2.6/data.txt"); 
val splitRdd = df.map{s => val a = s.split("[ |]") 
var i = 0; 
val date = Array(a(i) + " " + a(i+1) + " " + a(i+2) + " " + a(i+3) + " " + a(i+4)) 
(date ++ a.takeRight(0)).mkString(" ") 
} 
splitRdd.foreach(println)

1 Ответ

1 голос
/ 13 июня 2019

Это было бы проще всего решить с помощью UDF:

def splitValues(nKeys: Int, nGroup: Int) = udf((str: String) => {
  val vals = str.split(" ")
  val key = vals.take(nKeys)
  vals.drop(nKeys).grouped(nGroup).toSeq.map(e => (key ++ e).mkString(" "))
})

UDF принимает два ввода: nKeys - это количество значений, которое следует использовать в качестве ключа, и nGroup - это количество значений, которое следует хранить в каждой строке (в дополнение к ключам). Этот UDF вернет массив, поэтому вам нужно использовать explode после его применения.

Пример использования с одним значением ключа:

val df = spark.read.text("test.txt")
df.select(explode(splitValues(1, 2)($"value")))

+-------------------+
|col                |
+-------------------+
|0000000 aa______ 50|
|0000000 F 91       |
|0000000 59 20      |
|0000000 76         |
|0000001 bb______ 50|
|0000001 F 46       |
|0000001 39 8       |
|0000001 5          |
|0000003 cc______ 26|
|0000003 F 30       |
|0000003 50 71      |
|0000003 36         |
|0000004 dd______ 40|
|0000004 M 58       |
|0000004 71 20      |
|0000004 10         |
+-------------------+

С двумя ключевыми значениями:

df.select(explode(splitValues(2, 2)($"value")))

+----------------------+
|col                   |
+----------------------+
|0000000 aa______ 50 F |
|0000000 aa______ 91 59|
|0000000 aa______ 20 76|
|0000001 bb______ 50 F |
|0000001 bb______ 46 39|
|0000001 bb______ 8 5  |
|0000003 cc______ 26 F |
|0000003 cc______ 30 50|
|0000003 cc______ 71 36|
|0000004 dd______ 40 M |
|0000004 dd______ 58 71|
|0000004 dd______ 20 10|
+----------------------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...