Как сделать ленивую загрузку Apache Spark Dataframe подключенной к REST API - PullRequest
0 голосов
/ 29 ноября 2018

Я очень рад, что Spark позволяет мне установить JDBC-соединение с таблицей базы данных, а затем создавать преобразования на ней до тех пор, пока не будет запущена оценка.Я хотел бы сделать то же самое с подключением REST API.Это теоретически обеспечит способ интеграции информации БД и API в логическом представлении.Можно ли привязать фрейм данных Spark к пользовательской функции, где функция вызывает API, используя параметры, сгенерированные ленивой оценкой?

Вот некоторый код pySpark, с которым можно поиграть:

import findspark, json, requests
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("basic test") \
    .getOrCreate()

url = 'http://worldclockapi.com/api/json/utc/now'
headers = {"accept": "application/json;charset=UTF-8"}
results = requests.get(url, headers=headers)
obj = json.loads(results.text)
mydict = {k:v for k,v in obj.items() if v is not None}
df = spark.createDataFrame([mydict])
df.show()

InВ этом примере вызов API не является ленивым и не вызывается функцией show ().

Я понимаю, что это легко может быть невозможно с помощью API pySpark.Можно ли это сделать в Scala?Существует ли программный пакет, позволяющий сделать это в Spark?

Может быть связано с В Apache Spark, как сделать операцию RDD / DataFrame ленивой?

1 Ответ

0 голосов
/ 29 ноября 2018

Ленивый вызов REST API возможен, но вам нужно поместить его в функцию map (при работе с RDD) или в UDF (в API Dataframe):

>>> from pyspark.sql import Row
>>> from pyspark.sql.functions import *
>>> import requests
>>> 
>>> urls = [Row(url='http://worldclockapi.com/api/json/utc/now')] * 10
>>> call_time_api = lambda url: requests.get(url).json()['currentFileTime']
>>> 
>>> spark.createDataFrame(urls) \
...     .withColumn('time', udf(call_time_api)('url')) \
...     .show(truncate=False)
+-----------------------------------------+------------------+                  
|url                                      |time              |
+-----------------------------------------+------------------+
|http://worldclockapi.com/api/json/utc/now|131879608910925580|
|http://worldclockapi.com/api/json/utc/now|131879608911081830|
|http://worldclockapi.com/api/json/utc/now|131879608911238454|
|http://worldclockapi.com/api/json/utc/now|131879608911550881|
|http://worldclockapi.com/api/json/utc/now|131879608911706855|
|http://worldclockapi.com/api/json/utc/now|131879608911706855|
|http://worldclockapi.com/api/json/utc/now|131879608911863229|
|http://worldclockapi.com/api/json/utc/now|131879608912019732|
|http://worldclockapi.com/api/json/utc/now|131879608912175607|
|http://worldclockapi.com/api/json/utc/now|131879608912175607|
+-----------------------------------------+------------------+

Фактически этодействительно полезно удалять результаты API с помощью подкачки страниц - сначала вы создаете массив URL-адресов (каждый для своей страницы результатов), затем вы можете параллельно извлекать данные внутри контекста spark и создавать Dataframe результатов.

...