Встроенная функция карты вместо циклов - PullRequest
0 голосов
/ 12 мая 2019

У меня есть таблица в dataframe с тремя столбцами.city_name, driver_name, транспортные средства, из которых транспортное средство является списком.

У меня также есть некоторые другие детали, такие как часы водителя, контакт с водителем и т. Д. Для каждого водителя в mysql.Таблицы в базе данных имеют следующий формат: city_name.driver_name.

scala> val tables = """
[
                {"vehicles" : ["subaru","mazda"], "city_name" : "seattle", "driver_name" : "x"},
                {"city_name" : "seattle", "driver_name" : "y"},
                {"city_name" : "newyork", "driver_name" : "x"},
                {"city_name" : "dallas", "driver_name" : "y"}                         
]
"""     |      |      |      |      |      |      | 
tables: String =
"
[
                {"vehicles" : ["subaru","mazda"], "city_name" : "seattle", "driver_name" : "x"},
                {"city_name" : "seattle", "driver_name" : "y"},
                {"city_name" : "newyork", "driver_name" : "x"},
                {"city_name" : "dallas", "driver_name" : "y"}
]
"

scala> val metadataRDD = sc.parallelize(tables.split('\n').map(_.trim.filter(_ >= ' ')).mkString :: Nil)   
metadataRDD: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[5] at parallelize at <console>:30

scala>     val metadataDF = spark.read.json(metadataRDD)
metadataDF: org.apache.spark.sql.DataFrame = [city_name: string, driver_name: string ... 1 more field]

scala> metadataDF.show
+---------+-----------+---------------+
|city_name|driver_name|       vehicles|
+---------+-----------+---------------+
|  seattle|          x|[subaru, mazda]|
|  seattle|          y|           null|
|  newyork|          x|           null|
|   dallas|          y|           null|
+---------+-----------+---------------+

Для каждого из этих драйверов мне нужно применить функцию и написать паркет.Я пытаюсь использовать встроенную функцию, как показано ниже, но я не могу заставить ее работать:

metadataDF.map((e) => {
        val path = "s3://test/"
        val df = sparkJdbcReader.option("dbtable",  
                 e.city_name + "." + e.driver_name).load()

        val dir = path + e.driver_name + e.city_name

        if (e.vehicles)
          do something
        else:
          df.write.mode("overwrite").format("parquet").save(dir)
  })

По сути, вопрос заключается в том, как использовать эту встроенную функцию.

1 Ответ

0 голосов
/ 12 мая 2019

Вызов функции map() всегда преобразует данную входную коллекцию типа A в другую коллекцию типа B, используя предоставленную функцию.При вызове функции карты вы сохраняете Dataframe на своем слое хранилища [предположительно HDFS].Метод save(), определенный в классе DataFrameWriter , имеет тип возврата Unit [в Java это воспринимается как void ].Следовательно, ваша функция не будет работать, поскольку она преобразует ваш DataFrame по существу в два типа: тип данных, возвращаемый из блока if, и Unit, возвращаемый из блока else.

Вы можете реорганизовать свой коди разбить его на два блока или около того:

import org.apache.spark.sql.functions.{concat,concat_ws,lit,col}
import org.apache.spark.sql.DataFrame
import org.apache.spark.rdd.RDD    

val metadataRDD: RDD[String] = sc.parallelize(tables.split('\n').map(_.trim.filter(_ >= ' ')).mkString :: Nil)

val metadataDF: DataFrame = spark.read.json(metadataRDD)

val df_new_col: DataFrame = metadataDF
.withColumn("city_driver",concat_ws(".",col("city_name"),col("driver_name")))
.withColumn("dir",concat(lit("s3://test/"),col("city_name"),col("driver_name")))

теперь у вас есть два столбца, где у вас есть имена таблиц и их пути рядом с ними.Вы можете собирать их и использовать их для чтения ваших кадров данных, которые будут храниться в формате Parquet.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...