«Путь не найден» при попытке прочитать файлы из корзины s3 в другом регионе - PullRequest
0 голосов
/ 08 апреля 2019

Я пытаюсь прочитать все файлы из двух разных сегментов s3, которые находятся в разных регионах.

При чтении первого сегмента, который находится в us-east-1, где находится экземпляр EMR, похоже, что код выглядит счастливым, поскольку мое основное понимание не выходит.

При чтении из второго сегмента, который находится в us-west-2, код выдает следующее сообщение: "Слияние не удалось: путь не существует: hdfs: //ip-10-240-15-43.bamtech.test.us-east-1.bamgrid.net: 8020 / user / hadoop / some-bucket-us-west -2; "

Я попытался установить конечные точки для обеих групп в конф. Спарк, как описано здесь https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.6.5/bk_cloud-data-access/content/s3-per-bucket-region-configs.html

val sparkConf = new SparkConf()
          .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
          //.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
          .set(
            "fs.s3.bucket.some-bucket-us-east-1.endpoint",
            "s3.amazonaws.com"
          )
          .set(
            "fs.s3.bucket.some-bucket-us-west-2.endpoint",
            "s3-us-west-2.amazonaws.com"
          )
          // Todo: Add european buckets.
          .registerKryoClasses(Array(classOf[Event]))

Я попытался изменить протокол s3 на s3a (и приведенные выше ключи свойств с "fs.s3" на "fs.s3a"), но приложение, похоже, зависло перед чтением из первого сегмента.

Имена сегментов изменяются, поскольку они чувствительны, но настоящие имена сегментов существуют. Я подозреваю, что это как-то связано с кросс-регионами, но из моего исследования похоже, что это было исправлено в emr 5.1.0 (я использую emr 5.21.0).

Вот главное понимание:

for {
      bucketOnePath               <- buildBucketPath(config.bucketOne.name)
      _                           <- log(s"Reading events from $bucketOnePath")
      bucketOneEvents: RDD[Event] <- parseEvents(bucketOnePath, config.service)
      _                           <- log(s"Enriching events from $bucketOnePath with originating region data")
      bucketOneEventsWithRegion: RDD[Event] <- enrichEventsWithRegion(
        bucketOneEvents,
        config.bucketOne.region
      )

      bucketTwoPath               <- buildBucketPath(config.bucketTwo.name)
      _                           <- log(s"Reading events from $bucketTwoPath")
      bucketTwoEvents: RDD[Event] <- parseEvents(config.bucketTwo.name, config.service)
      _                           <- log(s"Enriching events from $bucketTwoPath with originating region data")
      bucketTwoEventsWithRegion: RDD[Event] <- enrichEventsWithRegion(
        bucketTwoEvents,
        config.bucketTwo.region
      )

      _                        <- log("Merging events")
      mergedEvents: RDD[Event] <- merge(bucketOneEventsWithRegion, bucketTwoEventsWithRegion)
      if mergedEvents.isEmpty() == false
      _ <- log("Grouping merged events by partition key")
      mergedEventsByPartitionKey: RDD[(EventsPartitionKey, Iterable[Event])] <- eventsByPartitionKey(
        mergedEvents
      )

      _ <- log(s"Storing merged events to ${config.outputBucket.name}")
      _ <- store(config.outputBucket.name, config.service, mergedEventsByPartitionKey)
    } yield ()

А вот вывод stdout из журналов emr:

Reading events from s3://some-bucket-us-east-1/*/*/*/*/*.gz
Enriching events from s3://some-bucket-us-east-1/*/*/*/*/*.gz with originating region data
Reading events from s3://some-bucket-us-west-2/*/*/*/*/*.gz
Merge failed: Path does not exist: hdfs://ip-10-240-15-43.bamtech.test.us-east-1.bamgrid.net:8020/user/hadoop/some-bucket-us-west-2;
...