Я пытаюсь прочитать все файлы из двух разных сегментов s3, которые находятся в разных регионах.
При чтении первого сегмента, который находится в us-east-1, где находится экземпляр EMR, похоже, что код выглядит счастливым, поскольку мое основное понимание не выходит.
При чтении из второго сегмента, который находится в us-west-2, код выдает следующее сообщение:
"Слияние не удалось: путь не существует: hdfs: //ip-10-240-15-43.bamtech.test.us-east-1.bamgrid.net: 8020 / user / hadoop / some-bucket-us-west -2; "
Я попытался установить конечные точки для обеих групп в конф. Спарк, как описано здесь https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.6.5/bk_cloud-data-access/content/s3-per-bucket-region-configs.html
val sparkConf = new SparkConf()
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
//.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
.set(
"fs.s3.bucket.some-bucket-us-east-1.endpoint",
"s3.amazonaws.com"
)
.set(
"fs.s3.bucket.some-bucket-us-west-2.endpoint",
"s3-us-west-2.amazonaws.com"
)
// Todo: Add european buckets.
.registerKryoClasses(Array(classOf[Event]))
Я попытался изменить протокол s3 на s3a (и приведенные выше ключи свойств с "fs.s3" на "fs.s3a"), но приложение, похоже, зависло перед чтением из первого сегмента.
Имена сегментов изменяются, поскольку они чувствительны, но настоящие имена сегментов существуют. Я подозреваю, что это как-то связано с кросс-регионами, но из моего исследования похоже, что это было исправлено в emr 5.1.0 (я использую emr 5.21.0).
Вот главное понимание:
for {
bucketOnePath <- buildBucketPath(config.bucketOne.name)
_ <- log(s"Reading events from $bucketOnePath")
bucketOneEvents: RDD[Event] <- parseEvents(bucketOnePath, config.service)
_ <- log(s"Enriching events from $bucketOnePath with originating region data")
bucketOneEventsWithRegion: RDD[Event] <- enrichEventsWithRegion(
bucketOneEvents,
config.bucketOne.region
)
bucketTwoPath <- buildBucketPath(config.bucketTwo.name)
_ <- log(s"Reading events from $bucketTwoPath")
bucketTwoEvents: RDD[Event] <- parseEvents(config.bucketTwo.name, config.service)
_ <- log(s"Enriching events from $bucketTwoPath with originating region data")
bucketTwoEventsWithRegion: RDD[Event] <- enrichEventsWithRegion(
bucketTwoEvents,
config.bucketTwo.region
)
_ <- log("Merging events")
mergedEvents: RDD[Event] <- merge(bucketOneEventsWithRegion, bucketTwoEventsWithRegion)
if mergedEvents.isEmpty() == false
_ <- log("Grouping merged events by partition key")
mergedEventsByPartitionKey: RDD[(EventsPartitionKey, Iterable[Event])] <- eventsByPartitionKey(
mergedEvents
)
_ <- log(s"Storing merged events to ${config.outputBucket.name}")
_ <- store(config.outputBucket.name, config.service, mergedEventsByPartitionKey)
} yield ()
А вот вывод stdout из журналов emr:
Reading events from s3://some-bucket-us-east-1/*/*/*/*/*.gz
Enriching events from s3://some-bucket-us-east-1/*/*/*/*/*.gz with originating region data
Reading events from s3://some-bucket-us-west-2/*/*/*/*/*.gz
Merge failed: Path does not exist: hdfs://ip-10-240-15-43.bamtech.test.us-east-1.bamgrid.net:8020/user/hadoop/some-bucket-us-west-2;