Сохранение Pyspark не работает при вызове изнутри foreach - PullRequest
1 голос
/ 27 июня 2019

Я строю конвейер, который получает сообщения из Azure EventHub и сохраняет их в дельта-таблицах блоков данных.

Все мои тесты со статическими данными прошли успешно, см. Код ниже:

body = 'A|B|C|D\n"False"|"253435564"|"14"|"2019-06-25 04:56:21.713"\n"True"|"253435564"|"13"|"2019-06-25 04:56:21.713"\n"
tableLocation = "/delta/tables/myTableName"

spark = SparkSession.builder.appName("CSV converter").getOrCreate()    
csvData = spark.sparkContext.parallelize(body.split('\n'))

df = spark.read \
.option("header", True) \
.option("delimiter","|") \
.option("quote", "\"") \
.option("nullValue", "\\N") \
.option("inferShema", "true") \
.option("mergeSchema", "true") \
.csv(csvData)

df.write.format("delta").mode("append").save(tableLocation)

Однако в моем случае каждое сообщение Eventhub представляет собой строку CSV, и они могут поступать из многих источников.Поэтому каждое сообщение должно обрабатываться отдельно, поскольку каждое сообщение может в конечном итоге сохраняться в разных дельта-таблицах.

Когда я пытаюсь выполнить этот же код внутри оператора foreach, это не работает.В журналах не отображаются ошибки, и я не могу найти ни одной сохраненной таблицы.

Так что, возможно, я что-то не так делаю при вызове foreach.См. Код ниже:

def SaveData(row):
   ...
   The same code above

dfEventHubCSV.rdd.foreach(SaveData)

Я пытался сделать это в контексте потоковой передачи, но, к сожалению, я столкнулся с той же проблемой.

Что в foreach делает его по-другому?

Ниже полного кода, который я запускаю:

import pyspark.sql.types as t
from pyspark.sql import SQLContext

--row contains the fields Body and SdIds
--Body: CSV string
--SdIds: A string ID 
def SaveData(row):

  --Each row data that is going to be added to different tables
  rowInfo = GetDestinationTableData(row['SdIds']).collect()  
  table = rowInfo[0][4]
  schema = rowInfo[0][3]
  database = rowInfo[0][2]     
  body = row['Body']

  tableLocation = "/delta/" + database + '/' + schema + '/' + table
  checkpointLocation = "/delta/" + database + '/' + schema + "/_checkpoints/" + table

  spark = SparkSession.builder.appName("CSV").getOrCreate()    
  csvData = spark.sparkContext.parallelize(body.split('\n'))

  df = spark.read \
  .option("header", True) \
  .option("delimiter","|") \
  .option("quote", "\"") \
  .option("nullValue", "\\N") \
  .option("inferShema", "true") \
  .option("mergeSchema", "true") \
  .csv(csvData)

  df.write.format("delta").mode("append").save(tableLocation)

dfEventHubCSV.rdd.foreach(SaveData)

1 Ответ

0 голосов
/ 02 июля 2019

Ну, в конце концов, как всегда, это что-то очень простое, но я не вижу этого раньше.

В основном, когда вы выполняете foreach, и фрейм данных, который вы хотите сохранить, встроен внутрипетля.Рабочий, в отличие от драйвера, не будет автоматически устанавливать путь "/ dbfs /" при сохранении, поэтому, если вы не добавите вручную "/ dbfs /", он сохранит данные локально на рабочем месте, и вы никогда не будетенайти сохраненные данные.

Вот почему мои циклы не работают.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...