df_insert.toJSON()
возвращает RDD
, что вы можете flatMap
больше. 1
source_rdd = df_insert.toJSON()
Выполните flatMap
над этим СДР и получите СДР, содержащий только ошибки.
headers = {
'Authorization': authorization,
'content-type': "application/json",
'cache-control': "no-cache"
}
def post_service_error(row):
# requests package may not be available in the node
# see about adding files to the spark context
response = requests.request("POST", url_insert, data=row, headers=headers)
response_result = response.json()
if response_result['message'] == 'success':
print ("INFO : Record inserted successfully")
return []
print ("ERROR : Error in the record")
status_code = response_result["status"]
error_message = response_result["error"]
return [(status_code, error_message, row)]
errors_rdd = source_rdd.flatMap(post_service_error)
Конвертироватьошибки приводят к искровому DataFrame и сохраняют это до таблицы.
errors_df = sc.createDataFrame(errors_rdd, ['status', 'error', 'json data'])
(errors_df.write.format(SNOWFLAKE_SOURCE_NAME)
.options(**sfOptions)
.option("dbtable", "error_table")
.option("header", "true")
.option("truncate_table", "on")
.mode("append")
.save())
Если у вас есть API, к которому вы делаете запрос, я предлагаю изучить реализацию, которая принимает пакет этих объектов / массивов.Таким образом, вы можете разбить RDD перед отображением каждого раздела на пакетный запрос и впоследствии обработать ошибку.