Допустим, у меня есть список из множества тем Kafka, которые нужно слушать, и имена таблиц, в которые нужно добавить. Итак, я пишу это:
def upsert(df, tableName):
df.createOrReplaceTempView("mergeTable")
spark.sql(f'''
MERGE INTO {tableName}
USING mergeTable
ON {tableName}.Key = mergeTable.Key
WHEN MATCHED THEN
UPDATE SET
{tableName}.Value = mergeTable.Value
WHEN NOT MATCHED
THEN INSERT (Key, Value) VALUES
(mergeTable.Key, mergeTable.Value)
''')
def curryUpsert(df, tableName):
def F(df, batchId):
return upsert(df,tableName)
return F
, а затем называю это так:
for (topic, tableName) in a_big_list_of_tables_to_upsert_into:
(
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("subscribe", topic)
.load()
.writeStream
.foreachBatch(curryUpsert(tableName))
.outputMode("update")
.start()
)
Я получаю эту ошибку: Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation.
Есть ли способ создать генератор фреймов данных искрового потока? Я хочу разделить коллекцию на равное количество потоковых носителей данных. Я мог бы сделать это, написав каждую из них от руки, но похоже, что я смогу написать функцию, которая будет их выводить за меня, но Spark magi c не позволяет мне работать.