Я могу подключиться к Redshift с помощью psycopg2
:
import psycopg2
conn = psycopg2.connect(host=__credential__.host_redshift,
dbname=__credential__.dbname_redshift,
user=__credential__.user_redshift,
password=__credential__.password_redshift,
port=__credential__.port_redshift)
cur = conn.cursor()
Кроме того, я могу обновить существующую таблицу в базе данных:
cur.execute("""
UPDATE tb
SET col2='updated_target_row'
WHERE col1='target_row';
""")
conn.commit()
Теперь я хотел бы обновить таблицу в Redshift с Rows
из Spark DataFrame
. Я поднял голову и обнаружил довольно недавний вопрос об этом (который, я хотел бы обосновать, совсем не дублируется другим вопросом).
Решение кажется довольно простым. Однако я даже не могу передать объект Row
методу, в который включен курсор.
Что я сейчас пытаюсь:
def update_info(row):
cur.execute("""
UPDATE tb
SET col2='updated_target_row'
WHERE col1='target_row';
""")
df.rdd.foreach(update_info)
conn.commit()
И я получил ошибку:
TypeError: can't pickle psycopg2.extensions.cursor objects
Интересно, что это не является общей проблемой. Любая помощь приветствуется.
P.S:.
Версии:
python=3.6
pyspark=2.2.0
psycopg2=2.7.4
Полный текст сообщения об ошибке можно найти в pastebin .
Я попробовал rdd.map
вместо rdd.foreach
и не получил удачи.