Используйте p6spy для регистрации источника данных Spark JDB C - PullRequest
2 голосов
/ 26 мая 2020

Я хотел бы регистрировать операторы JDB C, которые мое задание Spark выполняет с помощью p6spy .

Использование p6spy обычно просто: вставляется строка :p6spy: в URL-адрес jdb c и включает классы драйвера p6spy в путь к классам приложения. После этого все действия jdb c будут записаны в файл.

Например, если исходная (MySQL) строка подключения была

jdbc:mysql://172.17.0.2:3306/spark_test
, строка подключения с включенным ведением журнала будет
jdbc:<b>p6spy:</b>mysql://172.17.0.2:3306/spark_test

Я использую эту строку для записи фрейма данных в MySQL таблицу

df.write.mode(SaveMode.Overwrite).jdbc("jdbc:p6spy:mysql://172.17.0.2:3306/spark_test", "test_table", prop)

с prop, содержащим пользователя и пароль db.

Эта строка кода не работает с сообщением об ошибке

Exception in thread "main" java.sql.SQLSyntaxErrorException: You have an error in your SQL syntax;
check the manual that corresponds to your MySQL server version for the right syntax to use near '"value" INTEGER NOT NULL)' at line 1

Без части :p6spy: в строке подключения все работает должным образом.

Мои результаты на данный момент

Причина ошибки в том, что Spark пытается выполнить оператор

CREATE TABLE test_table ("value" INTEGER NOT NULL)

, который включает " вокруг имени столбца. правильным символом будет ` для MySQL.

Spark может обрабатывать различные SQL диалекты. Диалекты реализованы в пакете org. apache .spark. sql .jdb c. Используемый диалект выбирается в зависимости от URL-адреса jdb c базы данных. Каждый объект диалекта реализует метод canHandle(url : String). MySQLDialect обрабатывает начинающиеся URL-адреса с jdbc:mysql, но не с jdbc:p6spy:mysql. К сожалению, Spark по умолчанию использует NoopDialect для неизвестного типа URL. Этот диалект добавляет " вокруг имени столбца.

Возможное решение

Можно зарегистрировать новые диалекты базы данных, вызвав JdbcDialects.registerDialect . Здесь можно было зарегистрировать новый диалект, реализующий метод canHandle как

override def canHandle(url: String): Boolean = url.startsWith("jdbc:p6spy:mysql")

, а затем делегировать все другие вызовы методов на исходный диалект MySQL.

К сожалению, объект MySQLDialect объявлен как

private case object MySQLDialect extends JdbcDialect {
  ...
}

, поэтому моя собственная реализация диалекта не может использовать MySQLDialect напрямую. Можно было бы скопировать код MySQLDialect в мой собственный объект диалекта (код небольшой), но я бы хотел избежать копирования кода.

Есть ли другие варианты?

Я использую Spark 2.4.5

...