У меня есть таблица в dataframe с тремя столбцами.city_name, driver_name, транспортные средства, из которых транспортное средство является списком.
У меня также есть некоторые другие детали, такие как часы водителя, контакт с водителем и т. Д. Для каждого водителя в mysql.Таблицы в базе данных имеют следующий формат: city_name.driver_name.
scala> val tables = """
[
{"vehicles" : ["subaru","mazda"], "city_name" : "seattle", "driver_name" : "x"},
{"city_name" : "seattle", "driver_name" : "y"},
{"city_name" : "newyork", "driver_name" : "x"},
{"city_name" : "dallas", "driver_name" : "y"}
]
""" | | | | | | |
tables: String =
"
[
{"vehicles" : ["subaru","mazda"], "city_name" : "seattle", "driver_name" : "x"},
{"city_name" : "seattle", "driver_name" : "y"},
{"city_name" : "newyork", "driver_name" : "x"},
{"city_name" : "dallas", "driver_name" : "y"}
]
"
scala> val metadataRDD = sc.parallelize(tables.split('\n').map(_.trim.filter(_ >= ' ')).mkString :: Nil)
metadataRDD: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[5] at parallelize at <console>:30
scala> val metadataDF = spark.read.json(metadataRDD)
metadataDF: org.apache.spark.sql.DataFrame = [city_name: string, driver_name: string ... 1 more field]
scala> metadataDF.show
+---------+-----------+---------------+
|city_name|driver_name| vehicles|
+---------+-----------+---------------+
| seattle| x|[subaru, mazda]|
| seattle| y| null|
| newyork| x| null|
| dallas| y| null|
+---------+-----------+---------------+
Для каждого из этих драйверов мне нужно применить функцию и написать паркет.Я пытаюсь использовать встроенную функцию, как показано ниже, но я не могу заставить ее работать:
metadataDF.map((e) => {
val path = "s3://test/"
val df = sparkJdbcReader.option("dbtable",
e.city_name + "." + e.driver_name).load()
val dir = path + e.driver_name + e.city_name
if (e.vehicles)
do something
else:
df.write.mode("overwrite").format("parquet").save(dir)
})
По сути, вопрос заключается в том, как использовать эту встроенную функцию.