Передайте дополнительные аргументы foreachBatch в pyspark - PullRequest
1 голос
/ 03 мая 2019

Я использую foreachBatch в структурированной потоковой передаче pyspark, чтобы записывать каждую микробатку в SQL Server с использованием JDBC.Мне нужно использовать один и тот же процесс для нескольких таблиц, и я хотел бы повторно использовать одну и ту же функцию записи, добавив дополнительный аргумент для имени таблицы, но я не уверен, как передать аргумент имени таблицы.

Пример здесь довольно полезен, но в примере с python имя таблицы жестко закодировано, и похоже, что в примере с scala они ссылаются на глобальную переменную (?) Я хотел бы передать имятаблицы в функцию.

Функция, приведенная в примере с Python по ссылке выше:

def writeToSQLWarehose(df, epochId):
  df.write \
    .format("com.databricks.spark.sqldw") \
    .mode('overwrite') \
    .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
    .option("forward_spark_azure_storage_credentials", "true") \
    .option("dbtable", "my_table_in_dw_copy") \
    .option("tempdir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>") \
    .save()

Я хотел бы использовать что-то вроде этого:

def writeToSQLWarehose(df, epochId, tableName):
  df.write \
    .format("com.databricks.spark.sqldw") \
    .mode('overwrite') \
    .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
    .option("forward_spark_azure_storage_credentials", "true") \
    .option("dbtable", tableName) \
    .option("tempdir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>") \
    .save()

Но я не уверен, как передать дополнительный аргумент через foreachBatch.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...