Как обработать вложенную пару ключ-значение в импорте данных Spark / Scala - PullRequest
0 голосов
/ 14 октября 2018

Я новичок в Spark и Scala, поэтому, пожалуйста, прости нубизм.У меня есть текстовый файл в следующем формате:

328;ADMIN HEARNG;[street#939 W El Camino,city#Chicago,state#IL]

Мне удалось создать RDD с помощью команды sc.textFile, и я могу обработать каждый раздел с помощью этой команды:

val department_record = department_rdd.map(record => record.split(";"))

Как видите, третий элемент является вложенной парой ключ / значение, и до сих пор я не смог с ней работать.То, что я ищу, - это способ преобразования данных из приведенного выше в RDD, который выглядит следующим образом:

|ID |NAME        |STREET         |CITY   |STATE|

|328|ADMIN HEARNG|939 W El Camino|Chicago|IL   |

Любая помощь приветствуется.

Ответы [ 2 ]

0 голосов
/ 15 октября 2018

DF решение:

scala> val df = Seq(("328;ADMIN HEARNG;[street#939 W El Camino,city#Chicago,state#IL]"),
     |   ("400;ADMIN HEARNG;[street#800 First Street,city#San Francisco,state#CA]")).toDF("dept")
df: org.apache.spark.sql.DataFrame = [dept: string]

scala> val df2 =df.withColumn("arr",split('dept,";")).withColumn("address",split(regexp_replace('arr(2),"\\[|\\]",""),"#"))
df2: org.apache.spark.sql.DataFrame = [dept: string, arr: array<string> ... 1 more field]

scala> df2.select('arr(0) as "id",'arr(1) as "name",split('address(1),",")(0) as "street",split('address(2),",")(0) as "city",'address(3) as "state").show
+---+------------+----------------+-------------+-----+
| id|        name|          street|         city|state|
+---+------------+----------------+-------------+-----+
|328|ADMIN HEARNG| 939 W El Camino|      Chicago|   IL|
|400|ADMIN HEARNG|800 First Street|San Francisco|   CA|
+---+------------+----------------+-------------+-----+


scala>
0 голосов
/ 14 октября 2018

Вы можете разделить поле адреса в , на массив, убрать скобку и снова разделить в #, чтобы извлечь нужные компоненты адреса, как показано ниже:

val department_rdd = sc.parallelize(Seq(
  "328;ADMIN HEARNG;[street#939 W El Camino,city#Chicago,state#IL]",
  "400;ADMIN HEARNG;[street#800 First Street,city#San Francisco,state#CA]"
))

val department_record = department_rdd.
  map(_.split(";")).
  map{ case Array(id, name, address) =>
    val addressArr = address.split(",").
      map(_.replaceAll("^\\[|\\]$", "").split("#"))
    (id, name, addressArr(0)(1), addressArr(1)(1), addressArr(2)(1))
  }

department_record.collect
// res1: Array[(String, String, String, String, String)] = Array(
//   (328,ADMIN HEARNG,939 W El Camino,Chicago,IL),
//   (400,ADMIN HEARNG,800 First Street,San Francisco,CA)
// )

InЕсли вы хотите преобразовать в DataFrame, просто примените toDF():

department_record.toDF("id", "name", "street", "city", "state").show
// +---+------------+----------------+-------------+-----+
// | id|        name|          street|         city|state|
// +---+------------+----------------+-------------+-----+
// |328|ADMIN HEARNG| 939 W El Camino|      Chicago|   IL|
// |400|ADMIN HEARNG|800 First Street|San Francisco|   CA|
// +---+------------+----------------+-------------+-----+
...