Как обновить sh внешнюю таблицу Hdfs одновременно в версии cloudera 5.x, потоковое приложение Spark записывает в каталог HDFS - PullRequest
0 голосов
/ 03 апреля 2020

У меня есть облачная эра, записывающая потоковую запись в папку HDFS. После этого у нас есть работа по уплотнению, которая будет выполняться ежедневно, сжиматься и помещаться в еще один каталог.

После выполнения сжатия я выполняю refre sh table-name [создан поверх целевой папки потокового приложения]

Иногда я получаю исключение удаленного ввода-вывода, потому что потоковое приложение по-прежнему выполнение и запись данных [это в режиме передачи]

Как переопределить sh пропуск частичных данных прохождения во внешней таблице HDFS impala refre sh.

скажем Потоковая целевая папка есть; - rawmessage и сжатие в; - rawmessagecompact

, когда запись происходит одновременно; задание сжатия выбирает и переносит в таблицу rawmessagecompact.

при выполнении refre sh rawmessage получает ava.io.IOException: соединение сбрасывается равноправным узлом

, потому что взятие основано на _success, как показано ниже.

Поток искры; -

eventDFCached
          .write
          .partitionBy(partitionCols: _*)
          .mode("append")
          .option("timestampFormat", "yyyy-MM-dd HH:mm:ss")
          .option("sep","|")
          .csv(arguments("baseDirectory"))

Сжатие

    logger.info("Scanning directory {} for files to compact", inputDirectory)

    fs
      .listStatus(new Path(inputDirectory), new PathFilter {
        def accept(path: Path): Boolean = !path.getName.contains("_SUCCESS")
      })
      .toVector
      .map(_.getPath.toString)
  }


 Common.refreshDataInImpalaTable(s"${databaseName}.${compactorOptions.reader.table}", arguments.environment.sds.impala.host, true, true, false)
...