Как добавить поля в существующий паркет и создать новый паркет (Scala или python) - PullRequest
0 голосов
/ 20 сентября 2018

У меня есть существующие паркет (скажем, p1) для чтения в информационный кадр, а затем после некоторого преобразования записать его в новый файл паркета (скажем, p2);

Процесс выглядит так:

val df1 = spark.read.parquet(s"path_to_p1_1")
df1.createOrReplaceTempView("table1")

val df2 = spark.read.parquet(s"path_to_p1_2")
df2.createOrReplaceTempView("table2")

val q = s"""
select 
       cast(ADDRESS as String) as ADDRESS,
       cast(CITY as String) as CITY,
       cast(STATE as String) as STATE,
.......80 fields.......
FROM 
    ( SELECT * FROM table1
        UNION
    SELECT * FROM table2 ) A
  """

val result = spark.sql(q)
res.repartition(1).write.mode(SaveMode.Overwrite).parquet(s"path_to_p2")

Это связано с необходимостью извлечения географической информации (long и lat) из трех столбцов (адрес, город, штат) и добавления ее обратно в p2, или создания нового файла паркета p3.

Geoчасть, как показано ниже:

import requests

http_str = 'https://maps.googleapis.com/maps/api/geocode/json?address='
addr = '1600+Amphitheatre+Parkway,+Mountain+View,+CA'
#addr = '181 University Ave, Toronto, ON, CANADA'
response = requests.get(http_str + addr)

resp_json_payload = response.json()
latlong = resp_json_payload['results'][0]['geometry']['location']
lat = latlong.get('lat')
lng = latlong.get('lng')

широта и lng - это два производных значения, которые я хочу добавить к существующему паркету p2 (предпочтительно) или новому паркету p3.

Что лучшеспособ сделать это?

Большое спасибо.

1 Ответ

0 голосов
/ 20 сентября 2018

Если у вас есть значения lat, то есть литерала, вы можете использовать существующую функцию с подсветкой при добавлении нового столбца на фрейме данных.

Если ваш фрейм данных является DF, вы можете сделать это с помощью импорта org.apache.spark.sql.functions._ DF.withColumn ("широта", горит (широта)). withColumn ("долгота", горит (долгота))

...