Ошибка записи Spark DataFrame в Redshift с помощью Psycopg2: невозможно выбрать объекты psycopg2.extensions.cursor - PullRequest
0 голосов
/ 06 мая 2018

Я могу подключиться к Redshift с помощью psycopg2:

import psycopg2
conn = psycopg2.connect(host=__credential__.host_redshift, 
                        dbname=__credential__.dbname_redshift,
                        user=__credential__.user_redshift, 
                        password=__credential__.password_redshift,
                        port=__credential__.port_redshift)
cur = conn.cursor()

Кроме того, я могу обновить существующую таблицу в базе данных:

cur.execute("""
    UPDATE tb
    SET col2='updated_target_row'
    WHERE col1='target_row';
""")
conn.commit()

Теперь я хотел бы обновить таблицу в Redshift с Rows из Spark DataFrame. Я поднял голову и обнаружил довольно недавний вопрос об этом (который, я хотел бы обосновать, совсем не дублируется другим вопросом).

Решение кажется довольно простым. Однако я даже не могу передать объект Row методу, в который включен курсор.

Что я сейчас пытаюсь:

def update_info(row):
    cur.execute("""
        UPDATE tb
        SET col2='updated_target_row'
        WHERE col1='target_row';
    """)

df.rdd.foreach(update_info)
conn.commit()

И я получил ошибку:

TypeError: can't pickle psycopg2.extensions.cursor objects

Интересно, что это не является общей проблемой. Любая помощь приветствуется.

P.S:.

  1. Версии:

    python=3.6
    pyspark=2.2.0
    psycopg2=2.7.4
    
  2. Полный текст сообщения об ошибке можно найти в pastebin .

  3. Я попробовал rdd.map вместо rdd.foreach и не получил удачи.

1 Ответ

0 голосов
/ 07 мая 2018

Объекты подключения и курсоры не сериализуются и не могут быть отправлены рабочим. Вы должны использовать foreachPartition:

def update_info(rows):
    conn = psycopg2.connect(...)
    cur = conn.cursor()

    for row in rows:
        cur.execute(...)

df.rdd.foreachPartition(update_info)
...