У меня есть существующие паркет (скажем, p1) для чтения в информационный кадр, а затем после некоторого преобразования записать его в новый файл паркета (скажем, p2);
Процесс выглядит так:
val df1 = spark.read.parquet(s"path_to_p1_1")
df1.createOrReplaceTempView("table1")
val df2 = spark.read.parquet(s"path_to_p1_2")
df2.createOrReplaceTempView("table2")
val q = s"""
select
cast(ADDRESS as String) as ADDRESS,
cast(CITY as String) as CITY,
cast(STATE as String) as STATE,
.......80 fields.......
FROM
( SELECT * FROM table1
UNION
SELECT * FROM table2 ) A
"""
val result = spark.sql(q)
res.repartition(1).write.mode(SaveMode.Overwrite).parquet(s"path_to_p2")
Это связано с необходимостью извлечения географической информации (long и lat) из трех столбцов (адрес, город, штат) и добавления ее обратно в p2, или создания нового файла паркета p3.
Geoчасть, как показано ниже:
import requests
http_str = 'https://maps.googleapis.com/maps/api/geocode/json?address='
addr = '1600+Amphitheatre+Parkway,+Mountain+View,+CA'
#addr = '181 University Ave, Toronto, ON, CANADA'
response = requests.get(http_str + addr)
resp_json_payload = response.json()
latlong = resp_json_payload['results'][0]['geometry']['location']
lat = latlong.get('lat')
lng = latlong.get('lng')
широта и lng - это два производных значения, которые я хочу добавить к существующему паркету p2 (предпочтительно) или новому паркету p3.
Что лучшеспособ сделать это?
Большое спасибо.