Функции высшего порядка в pyspark для создания потоковых операций с фреймами данных? - PullRequest
1 голос
/ 16 июня 2020

Допустим, у меня есть список из множества тем Kafka, которые нужно слушать, и имена таблиц, в которые нужно добавить. Итак, я пишу это:

def upsert(df, tableName):
  df.createOrReplaceTempView("mergeTable")
  spark.sql(f'''
MERGE INTO {tableName}
USING mergeTable
ON {tableName}.Key = mergeTable.Key
WHEN MATCHED THEN
UPDATE SET
{tableName}.Value = mergeTable.Value
WHEN NOT MATCHED
THEN INSERT (Key, Value) VALUES
(mergeTable.Key, mergeTable.Value)
  ''')
def curryUpsert(df, tableName):
  def F(df, batchId):
    return upsert(df,tableName)
  return F

, а затем называю это так:

for (topic, tableName) in a_big_list_of_tables_to_upsert_into:
  (
      spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option("subscribe", topic)
      .load()  
      .writeStream
      .foreachBatch(curryUpsert(tableName))
      .outputMode("update")
      .start()
    )

Я получаю эту ошибку: Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation.

Есть ли способ создать генератор фреймов данных искрового потока? Я хочу разделить коллекцию на равное количество потоковых носителей данных. Я мог бы сделать это, написав каждую из них от руки, но похоже, что я смогу написать функцию, которая будет их выводить за меня, но Spark magi c не позволяет мне работать.

...