В моем приложении я читаю 40 ГБ текстовых файлов, которые полностью распределены по 188 файлам.
Я разделяю эти файлы и создаю xml-файлы в каждой строке в spark, используя пару rdd.
Для 40 ГБ входных данных будет создано много миллионов небольших XML-файлов, и это мое требование.
Все работает нормально, но когда spark сохраняет файлы в S3, выдает ошибку, и задание не выполняется.
Вот исключение, которое я получаю
Причина: java.nio.file.FileSystemException:
/ mnt / s3 / emrfs-2408623010549537848/0000000000: слишком много открытых файлов в
sun.nio.fs.UnixException.translateToIOException (UnixException.java:91)
в
sun.nio.fs.UnixException.rethrowAsIOException (UnixException.java:102)
в
sun.nio.fs.UnixException.rethrowAsIOException (UnixException.java:107)
в
sun.nio.fs.UnixFileSystemProvider.newByteChannel (UnixFileSystemProvider.java:214)
в java.nio.file.Files.newByteChannel (Files.java:361) в
java.nio.file.Files.createFile (Files.java:632) в
com.amazon.ws.emr.hadoop.fs.files.TemporaryFiles.create (TemporaryFiles.java:70)
в
com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream.openNewPart (MultipartUploadOutputStream.java:493)
... еще 21
Хост ApplicationMaster: 10.97.57.198 RPC-порт ApplicationMaster: 0
очередь: время начала по умолчанию: 1542344243252 конечный статус: СБОЙ
URL отслеживания:
http://ip -10-97-57-234.tr-fr-nonprod.aws-int.thomsonreuters.com: 20888 / прокси / application_1542343091900_0001 /
пользователь: hadoop Исключение в теме "основной"
org.apache.spark.SparkException: приложение
application_1542343091900_0001 завершено с ошибкой статуса
И это тоже
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
Пожалуйста, уменьшите частоту запросов. (Сервис: Amazon S3; Код статуса:
503; Код ошибки: SlowDown; Идентификатор запроса: D33581CA9A799F64; S3 Extended
Идентификатор запроса:
/ SlEplo + lCKQRVVH + zHiop0oh8q8WqwnNykK3Ga6 / VM2HENl / eKizbd1rg4vZD1BZIpp8lk6zwA =),
S3 Расширенный идентификатор запроса:
/ SlEplo + lCKQRVVH + zHiop0oh8q8WqwnNykK3Ga6 / VM2HENl / eKizbd1rg4vZD1BZIpp8lk6zwA =
Вот мой код для этого.
object TestAudit {
def main(args: Array[String]) {
val inputPath = args(0)
val output = args(1)
val noOfHashPartitioner = args(2).toInt
//val conf = new SparkConf().setAppName("AuditXML").setMaster("local");
val conf = new SparkConf().setAppName("AuditXML")
val sc = new SparkContext(conf);
val input = sc.textFile(inputPath)
val pairedRDD = input.map(row => {
val split = row.split("\\|")
val fileName = split(0)
val fileContent = split(1)
(fileName, fileContent)
})
import org.apache.hadoop.io.NullWritable
import org.apache.spark.HashPartitioner
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat
class RddMultiTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {
override def generateActualKey(key: Any, value: Any): Any = NullWritable.get()
override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = key.asInstanceOf[String]
}
pairedRDD.partitionBy(new HashPartitioner(10000)).saveAsHadoopFile("s3://a205381-tr-fr-development-us-east-1-trf-auditabilty//AUDITOUTPUT", classOf[String], classOf[String], classOf[RddMultiTextOutputFormat], classOf[GzipCodec])
}
}
Даже если я попытался уменьшить HashPartitioner, то он также не работает