Записать в Spark фрейм данных Spark с обновленным необработанным текстом sql удаляет все предыдущие данные из таблицы - PullRequest
0 голосов
/ 24 апреля 2020

Проблема, с которой я сталкиваюсь: в Mem sql есть таблица, подобная этой:

app_name | table_name | record_count | hdfs_path | отметка времени | write_success | memsql2write_status

My_App_name | My_tab_name. | 39162 | hdfs: // test ... | 2020-04-19 22-35-32 | 1 | НАЧАТЬ | My_App_name | My_tab_name. | 39162 | hdfs: // test ... | 2020-04-19 22-35-32 | 1 | FINISHED

Я хочу обновить memsql2write_status с «STARTED» до «FINISHED». Я занимаюсь извлечением hdfs_url, уникального для каждого необработанного столбца из hdfs_path, чтением файла с использованием hdfs_url и записью того же в Mem * 1059. * вторичная база данных.

Во-первых, // getMetaDataFromMemSqlServer - это функция, которую я написал для чтения из Mem sql

val fetch_meta_table = getMetaDataFromMemSqlServer (spark)

//// Я зарегистрировал это фрейм данных как временная таблица «meta_table_vw» и извлеченный hdfs_path, который является уникальным для каждой записи 1017 *

val hdfs_url = spark.sql("SELECT timestamp FROM meta_table_vw WHERE write_success = 1 AND memsql2write_status LIKE '%STARTED' LIMIT 1")

// hdfs_url.show (1, false), val columns = hdf_url.select ("hdfs_path"). Collect (). Map (_. GetString (0)). MkString ("")

// Написал обновление функции, чтобы обновить «STARTED» до «FINISHED» и записать его в mem sql. Я использую saveMode = overwrite, потому что append не заменяет значение STARTED, он добавляет старые и новые data

def update {val memsql2_write_status_change = spark. sql (s "SELECT * From meta_table_vw WHERE hdfs_path == '$ {row}'")

  val new_memsql2_status = memsql2_write_status_change.withColumn("memsql2write_status", when(col("memsql2write_status").equalTo("STARTED"), "FINISHED") .otherwise(col("memsql2write_status"))).cache()
  new_memsql2_status.count() 

// - * // запись в Mem sql:

  new_memsql2_status.write.option("trunacate", false).mode(SaveMode.Overwrite)
    .jdbc(jdbc_url, "tablename", connectionProperties)}

Результат показан будет

имя_приложения | имя_таблицы | запись_счет | hdfs_path | отметка времени | write_success | memsql2write_status |

My_App_name | My_tab_name. | 39162 | hdfs: // test ... | 2020-04-19 22-35-32 | 1 | ЗАВЕРШЕНО

My_App_name | My_tab_name. | 39162 | hdfs: // test ... | 2020-04-19 22-35-32 | 1 | ЗАВЕРШЕНО

Но я получаю результат, как,

имя_приложения | имя_таблицы | запись_счет | hdfs_path | отметка времени | write_success | memsql2write_status |

My_App_name | My_tab_name. | 39162 | hdfs: // test ... | 2020-04-19 22-35-32 | 1 | FINISHED |

Все предыдущие записи удалены, и присутствует только последняя обновленная запись.

Tried

  // val c = memsql2_write_status_change.union(new_memsql2_status),but getting a result like this:

app_name | table_name | record_count | hdfs_path | отметка времени | write_success | memsql2write_status

My_App_name | My_tab_name. | 39162 | hdfs: // test ... | 2020-04-19 22-35-32 | 1 | ЗАВЕРШЕНО My_App_name | My_tab_name. | 39162 | hdfs: // test ... | 2020-04-19 22-35-32 | 1 ЗАВЕРШЕНО

Одни и те же данные записываются дважды.

Если я попробую объединить вместо объединения, сама таблица будет удалена.

Если кто-нибудь знает какие-либо обходные пути, помогите

...