Я пытаюсь найти лучший способ считывания данных паркета из хранилища S3. Первый подход :
BasicSessionCredentials cred = new BasicSessionCredentials(key,secret, "");
AmazonS3 client = AmazonS3ClientBuilder
.standard()
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration("custom_endpoint", region))
.withCredentials(new AWSStaticCredentialsProvider(cred))
.build();
GetObjectRequest req = new GetObjectRequest("bucket_name", "relative_path", "");
S3Object obj = client.getObject(req);
S3ObjectInputStream cont = obj.getObjectContent();
Таким образом, я могу прочитать объект, но не смог найти способ чтения данных паркета из InputStream
Второй подход :
String SCHEMA_TEMPLATE = "{" +
"\"type\": \"record\",\n" +
" \"name\": \"schema\",\n" +
" \"fields\": [\n" +
" {\"name\": \"timeStamp\", \"type\": \"string\"},\n" +
" {\"name\": \"temperature\", \"type\": \"double\"},\n" +
" {\"name\": \"pressure\", \"type\": \"double\"}\n" +
" ]" +
"}";
String PATH_SCHEMA = "s3a";
Path internalPath = new Path(PATH_SCHEMA, bucketName, folderName);
Schema schema = new Schema.Parser().parse(SCHEMA_TEMPLATE);
Configuration configuration = new Configuration();
configuration.set("fs.s3a.access.key", "key");
configuration.set("fs.s3a.secret.key", "secret");
configuration.set("fs.s3a.endpoint", "custom_endpoint");
AvroReadSupport.setRequestedProjection(configuration, schema);
ParquetReader<GenericRecord> = AvroParquetReader.GenericRecord>builder(internalPath).withConf(configuration).build();
GenericRecord genericRecord = parquetReader.read();
while(genericRecord != null) {
Map<String, String> valuesMap = new HashMap<>();
genericRecord.getSchema().getFields().forEach(field -> valuesMap.put(field.name(), genericRecord.get(field.name()).toString()));
genericRecord = parquetReader.read();
}
Но во втором случае я не могу прочитать данные и получить SocketTimeoutException . Помогите мне найти правильный подход Спасибо