Я использую pyspark-sql для создания строк в удаленной базе данных mysql, используя JDBC.
У меня есть две таблицы, parent_table(id, value)
и child_table(id, value, parent_id)
, поэтому каждая строка parent_id
может иметь столько строк в child_id
, сколько необходимо.
Теперь я хочу создать новые данные и вставить их в базу данных. Я использую правила кода здесь для операции write
, но я хотел бы иметь возможность сделать что-то вроде:
parentDf = sc.parallelize([5, 6, 7]).toDF(('value',))
parentWithIdDf = parentDf.write.mode('append') \
.format("jdbc") \
.option("url", "jdbc:mysql://" + host_name + "/"
+ db_name).option("dbtable", table_name) \
.option("user", user_name).option("password", password_str) \
.save()
# The assignment at the previous line is wrong, as pyspark.sql.DataFrameWriter#save doesn't return anything.
Я бы хотел, чтобы последняя строка кода выше возвращала DataFrame с новыми идентификаторами строк для каждой строки, чтобы я мог сделать
childDf = parentWithIdDf.flatMap(lambda x: [[8, x[0]], [9, x[0]]])
childDf.write.mode('append')...
означает, что в конце у меня будет удаленная база данных
parent_table
____________
| id | value |
____________
| 1 | 5 |
| 2 | 6 |
| 3 | 7 |
____________
child_table
________________________
| id | value | parent_id |
________________________
| 1 | 8 | 1 |
| 2 | 9 | 1 |
| 3 | 8 | 2 |
| 4 | 9 | 2 |
| 5 | 8 | 3 |
| 6 | 9 | 3 |
________________________
Как я уже писал в первом фрагменте кода выше, pyspark.sql.DataFrameWriter#save
ничего не возвращает, глядя на его документацию , так как мне этого добиться?
Я что-то делаю не так? Похоже, что нет способа вернуть данные из действия Spark (что является save
), в то время как я хотел бы использовать это действие в качестве преобразования, shich заставляет меня думать, что я могу думать обо всем этом неправильно .