PySpark: как использовать функцию MySQL с подключением JDBC? - PullRequest
0 голосов
/ 20 сентября 2019

Я использую этот запрос в базе данных MySQL

select *,
    UNIX_TIMESTAMP(CONVERT_TZ(
        SUBSTRING(input_date, 1, 19),
        SUBSTRING(input_date, 20),
        'SYSTEM'
    )) as timestamp
from my_table

, которая используется для преобразования выборки input_date отметки времени со смещением UTC (например, 2018-12-15T13:48:16-08:00) в время эпохи.

Теперь мне нужно сделать то же самое, используя PySpark и получить доступ к этой таблице через соединение JDBC, но при попытке получить следующую ошибку

Py4JJavaError: An error occurred while calling o62.sql.
: org.apache.spark.sql.AnalysisException: Undefined function: 'CONVERT_TZ'. This function is neither a registered temporary function nor a permanent function registered in the database ...

Что я делаю неправильно?Есть ли лучший способ сделать это в PySpark?

Спасибо

1 Ответ

1 голос
/ 21 сентября 2019

Вы можете использовать эту функцию для подключения к базе данных MySQL:

def connect_to_sql(
    spark, jdbc_hostname, jdbc_port, database, data_table, username, password
):
    jdbc_url = "jdbc:mysql://{0}:{1}/{2}".format(jdbc_hostname, jdbc_port, database)

    connection_details = {
        "user": username,
        "password": password,
        "driver": "com.mysql.cj.jdbc.Driver",
    }

    df = spark.read.jdbc(url=jdbc_url, table=data_table, properties=connection_details)
    return df

Что касается преобразования часового пояса, этот вопрос поможет вам:

Как преобразовать строку даты изUTC для определенного часового пояса в HIVE?

...