Проблема, с которой я сталкиваюсь: в Mem sql есть таблица, подобная этой:
app_name | table_name | record_count | hdfs_path | отметка времени | write_success | memsql2write_status
My_App_name | My_tab_name. | 39162 | hdfs: // test ... | 2020-04-19 22-35-32 | 1 | НАЧАТЬ | My_App_name | My_tab_name. | 39162 | hdfs: // test ... | 2020-04-19 22-35-32 | 1 | FINISHED
Я хочу обновить memsql2write_status с «STARTED» до «FINISHED». Я занимаюсь извлечением hdfs_url, уникального для каждого необработанного столбца из hdfs_path, чтением файла с использованием hdfs_url и записью того же в Mem * 1059. * вторичная база данных.
Во-первых, // getMetaDataFromMemSqlServer - это функция, которую я написал для чтения из Mem sql
val fetch_meta_table = getMetaDataFromMemSqlServer (spark)
//// Я зарегистрировал это фрейм данных как временная таблица «meta_table_vw» и извлеченный hdfs_path, который является уникальным для каждой записи 1017 *
val hdfs_url = spark.sql("SELECT timestamp FROM meta_table_vw WHERE write_success = 1 AND memsql2write_status LIKE '%STARTED' LIMIT 1")
// hdfs_url.show (1, false), val columns = hdf_url.select ("hdfs_path"). Collect (). Map (_. GetString (0)). MkString ("")
// Написал обновление функции, чтобы обновить «STARTED» до «FINISHED» и записать его в mem sql. Я использую saveMode = overwrite, потому что append не заменяет значение STARTED, он добавляет старые и новые data
def update {val memsql2_write_status_change = spark. sql (s "SELECT * From meta_table_vw WHERE hdfs_path == '$ {row}'")
val new_memsql2_status = memsql2_write_status_change.withColumn("memsql2write_status", when(col("memsql2write_status").equalTo("STARTED"), "FINISHED") .otherwise(col("memsql2write_status"))).cache()
new_memsql2_status.count()
// - * // запись в Mem sql:
new_memsql2_status.write.option("trunacate", false).mode(SaveMode.Overwrite)
.jdbc(jdbc_url, "tablename", connectionProperties)}
Результат показан будет
имя_приложения | имя_таблицы | запись_счет | hdfs_path | отметка времени | write_success | memsql2write_status |
My_App_name | My_tab_name. | 39162 | hdfs: // test ... | 2020-04-19 22-35-32 | 1 | ЗАВЕРШЕНО
My_App_name | My_tab_name. | 39162 | hdfs: // test ... | 2020-04-19 22-35-32 | 1 | ЗАВЕРШЕНО
Но я получаю результат, как,
имя_приложения | имя_таблицы | запись_счет | hdfs_path | отметка времени | write_success | memsql2write_status |
My_App_name | My_tab_name. | 39162 | hdfs: // test ... | 2020-04-19 22-35-32 | 1 | FINISHED |
Все предыдущие записи удалены, и присутствует только последняя обновленная запись.
Tried
// val c = memsql2_write_status_change.union(new_memsql2_status),but getting a result like this:
app_name | table_name | record_count | hdfs_path | отметка времени | write_success | memsql2write_status
My_App_name | My_tab_name. | 39162 | hdfs: // test ... | 2020-04-19 22-35-32 | 1 | ЗАВЕРШЕНО My_App_name | My_tab_name. | 39162 | hdfs: // test ... | 2020-04-19 22-35-32 | 1 ЗАВЕРШЕНО
Одни и те же данные записываются дважды.
Если я попробую объединить вместо объединения, сама таблица будет удалена.
Если кто-нибудь знает какие-либо обходные пути, помогите