У меня есть облачная эра, записывающая потоковую запись в папку HDFS. После этого у нас есть работа по уплотнению, которая будет выполняться ежедневно, сжиматься и помещаться в еще один каталог.
После выполнения сжатия я выполняю refre sh table-name [создан поверх целевой папки потокового приложения]
Иногда я получаю исключение удаленного ввода-вывода, потому что потоковое приложение по-прежнему выполнение и запись данных [это в режиме передачи]
Как переопределить sh пропуск частичных данных прохождения во внешней таблице HDFS impala refre sh.
скажем Потоковая целевая папка есть; - rawmessage и сжатие в; - rawmessagecompact
, когда запись происходит одновременно; задание сжатия выбирает и переносит в таблицу rawmessagecompact.
при выполнении refre sh rawmessage получает ava.io.IOException: соединение сбрасывается равноправным узлом
, потому что взятие основано на _success, как показано ниже.
Поток искры; -
eventDFCached
.write
.partitionBy(partitionCols: _*)
.mode("append")
.option("timestampFormat", "yyyy-MM-dd HH:mm:ss")
.option("sep","|")
.csv(arguments("baseDirectory"))
Сжатие
logger.info("Scanning directory {} for files to compact", inputDirectory)
fs
.listStatus(new Path(inputDirectory), new PathFilter {
def accept(path: Path): Boolean = !path.getName.contains("_SUCCESS")
})
.toVector
.map(_.getPath.toString)
}
Common.refreshDataInImpalaTable(s"${databaseName}.${compactorOptions.reader.table}", arguments.environment.sds.impala.host, true, true, false)