получение нового идентификатора строки из pySpark SQL для записи в удаленную базу данных mysql (JDBC) - PullRequest
0 голосов
/ 05 сентября 2018

Я использую pyspark-sql для создания строк в удаленной базе данных mysql, используя JDBC.

У меня есть две таблицы, parent_table(id, value) и child_table(id, value, parent_id), поэтому каждая строка parent_id может иметь столько строк в child_id, сколько необходимо.

Теперь я хочу создать новые данные и вставить их в базу данных. Я использую правила кода здесь для операции write, но я хотел бы иметь возможность сделать что-то вроде:

parentDf = sc.parallelize([5, 6, 7]).toDF(('value',))
parentWithIdDf = parentDf.write.mode('append') \
                         .format("jdbc") \
                         .option("url", "jdbc:mysql://" + host_name + "/"
                            + db_name).option("dbtable", table_name) \
                         .option("user", user_name).option("password", password_str) \
                         .save()
# The assignment at the previous line is wrong, as pyspark.sql.DataFrameWriter#save doesn't return anything.

Я бы хотел, чтобы последняя строка кода выше возвращала DataFrame с новыми идентификаторами строк для каждой строки, чтобы я мог сделать

childDf = parentWithIdDf.flatMap(lambda x: [[8, x[0]], [9, x[0]]])
childDf.write.mode('append')...

означает, что в конце у меня будет удаленная база данных

parent_table
 ____________
| id | value |
 ____________
| 1  |   5   |
| 2  |   6   |
| 3  |   7   |
 ____________

child_table
 ________________________
| id | value | parent_id |
 ________________________
| 1  |   8   |    1      |
| 2  |   9   |    1      |
| 3  |   8   |    2      |
| 4  |   9   |    2      |
| 5  |   8   |    3      |
| 6  |   9   |    3      |
 ________________________ 

Как я уже писал в первом фрагменте кода выше, pyspark.sql.DataFrameWriter#save ничего не возвращает, глядя на его документацию , так как мне этого добиться?

Я что-то делаю не так? Похоже, что нет способа вернуть данные из действия Spark (что является save), в то время как я хотел бы использовать это действие в качестве преобразования, shich заставляет меня думать, что я могу думать обо всем этом неправильно .

1 Ответ

0 голосов
/ 06 сентября 2018

Простой ответ - использовать метку времени + номер автоинкремента для создания уникального идентификатора. Это работает, только если в данный момент времени работает только один сервер. :)

...