Как запустить оператор SQL из кластера Databricks - PullRequest
1 голос
/ 08 апреля 2019

У меня есть кластер Azure Databricks, который обрабатывает различные таблицы, а затем на заключительном этапе я помещаю эти таблицы в Azure SQL Server, который будет использоваться некоторыми другими процессами. У меня есть ячейка в кирпичах данных, которая выглядит примерно так:

def generate_connection():
  jdbcUsername = dbutils.secrets.get(scope = "Azure-Key-Vault-Scope", key = "AzureSqlUserName")
  jdbcPassword = dbutils.secrets.get(scope = "Azure-Key-Vault-Scope", key = "AzureSqlPassword")
  connectionProperties = {
    "user" : jdbcUsername,
    "password" : jdbcPassword,
    "driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"
  }
  return connectionProperties

def generate_url():
  jdbcHostname = dbutils.secrets.get(scope = "Azure-Key-Vault-Scope", key = "AzureSqlHostName")
  jdbcDatabase = dbutils.secrets.get(scope = "Azure-Key-Vault-Scope", key = "AzureSqlDatabase")
  jdbcPort = 1433
  return "jdbc:sqlserver://{0}:{1};database={2}".format(jdbcHostname, jdbcPort, jdbcDatabase)


def persist_table(table, sql_table, mode):
  jdbcUrl = generate_url();
  connectionProperties = generate_connection()
  table.write.jdbc(jdbcUrl, sql_table, properties=connectionProperties, mode=mode)

persist_table(spark.table("Sales.OpenOrders"), "Sales.OpenOrders", "overwrite")
persist_table(spark.table("Sales.Orders"), "Sales.Orders", "overwrite")

Это работает как ожидалось. Проблема, которая у меня есть, состоит в том, что таблица «Заказы» очень большая, и только небольшая часть строк может меняться каждый день, поэтому я хочу изменить режим перезаписи на режим добавления и изменить фрейм данных из всю таблицу только строки, которые могли бы измениться. Все это я знаю, как это сделать достаточно легко, но я хочу выполнить простой оператор SQL для базы данных SQL Azure, чтобы удалить уже существующие строки, чтобы они, возможно, изменили строки, будут вставлены обратно. .

Я хочу выполнить инструкцию SQL для базы данных SQL Azure, например

Delete From Sales.Orders Where CreateDate >= '01/01/2019'

1 Ответ

0 голосов
/ 09 апреля 2019

Вам необходимо использовать библиотеку pyodbc. Вы можете подключиться и использовать операторы SQL.

import pyodbc

conn = pyodbc.connect( 'DRIVER={ODBC Driver 17 for SQL Server};'
                       'SERVER=mydatabe.database.azure.net;'
                       'DATABASE=AdventureWorks;UID=jonnyFast;'
                       'PWD=MyPassword')

# Example doing a simple execute
conn.execute('INSERT INTO Bob (Bob1, Bob2) VALUES (?, ?)', ('A', 'B'))

К сожалению, заставить его работать на блоках данных немного больно. Некоторое время назад я написал сообщение в блоге, которое должно помочь. https://datathirst.net/blog/2018/10/12/executing-sql-server-stored-procedures-on-databricks-pyspark

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...