Как я могу использовать AvroParquetWriter и писать в S3 через API AmazonS3? - PullRequest
4 голосов
/ 25 февраля 2020

В настоящее время я использую код ниже, чтобы написать паркет через Avro. Этот код записывает его в файловую систему, но я хочу записать в S3.

try {
    StopWatch sw = StopWatch.createStarted();
    Schema avroSchema = AvroSchemaBuilder.build("pojo", message.getTransformedMessage().get(0));
    final String parquetFile = "parquet/data.parquet";
    final Path path = new Path(parquetFile);

    ParquetWriter writer = AvroParquetWriter.<GenericData.Record>builder(path)
        .withSchema(avroSchema)
        .withConf(new org.apache.hadoop.conf.Configuration())
        .withCompressionCodec(CompressionCodecName.SNAPPY)
        .withWriteMode(Mode.OVERWRITE)//probably not good for prod. (overwrites files).
        .build();

    for (Map<String, Object> row : message.getTransformedMessage()) {
      StopWatch stopWatch = StopWatch.createStarted();
      final GenericRecord record = new GenericData.Record(avroSchema);
      row.forEach((k, v) -> {
        record.put(k, v);
      });
      writer.write(record);
    }
    //todo:  Write to S3.  We should probably write via the AWS objects.  This does not show that.
    //https://stackoverflow.com/questions/47355038/how-to-generate-parquet-file-using-pure-java-including-date-decimal-types-an
    writer.close();
    System.out.println("Total Time: " + sw);

  } catch (Exception e) {
    //do somethign here.  retryable?  non-retryable?  Wrap this excetion in one of these?
    transformedParquetMessage.getOriginalMessage().getMetaData().addException(e);
  }

Это нормально пишет в файл, но как мне получить его для потоковой передачи в API AmazonS3? Я нашел в Интернете некоторый код, использующий jar oop - aws, но для этого требуются некоторые exe-файлы Windows, и, конечно, мы хотим этого избежать. В настоящее время я использую только:

 <dependency>
  <groupId>org.apache.avro</groupId>
  <artifactId>avro</artifactId>
  <version>1.9.2</version>
</dependency>
<dependency>
  <groupId>org.apache.parquet</groupId>
  <artifactId>parquet-avro</artifactId>
  <version>1.8.1</version>
</dependency>
<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-core</artifactId>
  <version>1.2.1</version>
</dependency>

Итак, вопрос в том, есть ли способ перехватить поток вывода на AvroParquetWriter, чтобы я мог передать его на S3? Основная причина, по которой я хочу сделать это для повторов. S3 автоматически повторяется до 3 раз. Это очень нам поможет.

Ответы [ 2 ]

0 голосов
/ 15 апреля 2020

Это зависит от банка oop - aws, поэтому, если вы не хотите его использовать, я не уверен, что смогу вам помочь. Я, однако, работаю на ma c и у меня нет windows exe-файлов, поэтому я не уверен, откуда вы говорите, что они приходят. AvroParquetWriter уже зависит от Had oop, поэтому, даже если эта дополнительная зависимость для вас неприемлема, для других это может не иметь большого значения:

Вы можете использовать AvroParquetWriter для потоковой передачи непосредственно на S3, передав ему Имел oop Путь, который создается с параметром URI и установкой правильных настроек.

val uri = new URI("s3a://<bucket>/<key>")
val path = new Path(uri)

val config = new Configuration()
config.set("fs.s3a.access.key", key)
config.set("fs.s3a.secret.key", secret)
config.set("fs.s3a.session.token", sessionToken)
config.set("fs.s3a.aws.credentials.provider", credentialsProvider)

val writer = AvroParquetWriter.builder[GenericRecord](path).withConf(config).withSchema(schema).build()

Я использовал следующие зависимости (формат sbt):

"org.apache.avro" % "avro" % "1.8.1"
"org.apache.hadoop" % "hadoop-common" % "2.9.0"
"org.apache.hadoop" % "hadoop-aws" % "2.9.0"
"org.apache.parquet" % "parquet-avro" % "1.8.1"
0 голосов
/ 24 марта 2020

Надеюсь, я не ошибаюсь в этом вопросе, но, похоже, здесь вы конвертируете авро в паркет, и вы хотите загрузить паркет в s3

После того, как вы закроете ParquetWriter, вам следует вызовите метод, который выглядит следующим образом (при условии, что он не перехватывает поток записи из avro в паркет, он просто передает поток файла партера, в который больше не производится запись):

        AmazonS3 s3Client = AmazonS3ClientBuilder.standard().withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("ACCESS_KEY", "SECRET_KEY"))).build();
        S3Path outputPath = new S3Path();
        outputPath.setBucket("YOUR_BUCKET");
        outputPath.setKey("YOUR_FOLDER_PATH");
        try {
            InputStream parquetStream = new FileInputStream(new File(parquetFile));
            s3Client.putObject(outputPath.getBucket(), outputPath.getKey(), parquetStream, null);
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        }

с использованием AWS SDK

<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>aws-java-sdk</artifactId>
    <version>1.11.749</version>
</dependency>

Конечно, метод должен находиться в другом классе utils, и конструктор этого метода должен инициализировать s3Client AmazonS3 с учетными данными, поэтому все, что вам нужно сделать, это вызвать и получить доступ к его Член s3Client для размещения объектов

надеюсь, это поможет

...