Проблема при записи файла паркета - PullRequest
0 голосов
/ 27 августа 2018

Я пытаюсь написать файл паркета, используя схему avro. Но всегда получаю эту проблему.

Exception in thread "main" java.lang.NoSuchFieldError: DEFAULT_WRITER_VERSION
at org.apache.parquet.hadoop.ParquetWriter.<clinit>(ParquetWriter.java:46)
at com.ice.practice.AvroToParquet.main(AvroToParquet.java:52)

Моя программа-пример выглядит следующим образом: я создал схему avro, затем покрыл ее схемой паркета, а затем с помощью parquewriter пытаюсь использовать GenericRecords.

 import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.schema.MessageType;

import org.apache.parquet.avro.*;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;

public class AvroToParquet {

    public static void main(String[] args) throws IOException {

        Schema aSchema = new Schema.Parser().parse(new File("d:\\emp.avsc"));

        List<GenericData.Record> SourceRecords = new ArrayList<>();
        int NoOfRecords = 10;
        int NoOfColumns = 3;
        for(int i=0;i<NoOfRecords;i++)
        {
            GenericData.Record recordHolder = new GenericData.Record(aSchema);
            recordHolder.put("name", "emp"+i);
            recordHolder.put("salary", (10000+(i*1000))+"");
            recordHolder.put("dept", "java"+i);
            SourceRecords.add(recordHolder);
        }



        MessageType pSchema = new AvroSchemaConverter().convert(aSchema);

        @SuppressWarnings("deprecation")
        AvroWriteSupport<GenericRecord> wSupport = new AvroWriteSupport<>(pSchema, aSchema);
        CompressionCodecName cCodeName = CompressionCodecName.SNAPPY;

        int blockSize = 256 * 1024 * 1024;
        int pageSize = 64 * 1024;

        Path outputPath = new Path("d:\\emp.parquet");

        @SuppressWarnings("deprecation")
        ParquetWriter<GenericRecord> pWriter = new ParquetWriter<GenericRecord>(outputPath,wSupport,cCodeName,blockSize,pageSize) {
        };
        for(GenericRecord record : SourceRecords)
        {
            pWriter.write(record);
        }
        pWriter.close();
    }

}

Авро схема:

"type":"record",
"name":"employee",
"namespace":"ice.report",
"fields":[
    {
        "name":"name",
        "type":"string"
    },
    {
        "name":"salary",
        "type":"string"
    },
    {
        "name":"dept",
        "type":"string"
    }

]

}

Пожалуйста, дайте мне знать, как обойти эту проблему.

1 Ответ

0 голосов
/ 10 января 2019

Я бы порекомендовал вам не использовать устаревшие конструкторы. На самом деле, они устарели по причине. Вместо этого попробуйте классы AvroParquetReader и AvroParquetWriter. Для подробного объяснения, пожалуйста, обратитесь к этой теме . А пока позвольте мне предложить вам следующее решение:

Java-код: ParquetAvroHandler.java

package com.parquet.avro;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InputStream;

public class ParquetAvroHandler
{
    private static final Schema SCHEMA; 
    private static final String SCHEMA_PATH = "path/to/your/schema.avsc";
    private static final Path OUTPUT_PATH = new Path("result.parquet");
    private static final Logger LOGGER = LoggerFactory.getLogger(ParquetAvroHandler.class);

    static
    {
        try (InputStream inStream = ParquetAvroHandler.class.getResourceAsStream(SCHEMA_PATH))
        {
            SCHEMA = new Schema.Parser().parse(IOUtils.toString(inStream, "UTF-8"));
        }
        catch (Exception e)
        {
            LOGGER.error("Can't read SCHEMA file from {}", SCHEMA_PATH);
            LOGGER.error(e.getLocalizedMessage());
            throw new RuntimeException("Can't read SCHEMA file from " + SCHEMA_PATH, e);
        }
    }

    /**
     * Reads an existing Apache Avro-based Parquet file from the
     * specified location and prints it into the system console
     * 
     * @param filePath path to the input file
     * @throws IOException
     **/
    public void read(Path filePath) throws IOException
    {
        Configuration configuration = new Configuration();
        HadoopInputFile inputFile = HadoopInputFile.fromPath(filePath, configuration);

        try (ParquetReader<GenericData.Record> reader = AvroParquetReader
                .<GenericData.Record>builder(inputFile)
                .withConf(configuration)
                .build())
        {
            GenericData.Record record;
            while ((record = reader.read()) != null)
            {
                System.out.println(record);
            }
        }
    }

    /**
     * Creates a new Apache Avro-based Parquet file or overwrites the existing one
     *  
     * @param records set of records to write to the file
     * @param filePath path to the output file
     * @throws IOException
     **/
    public void write(List<GenericData.Record> records, Path filePath) throws IOException
    {
        try (ParquetWriter<GenericData.Record> writer = AvroParquetWriter
                .<GenericData.Record>builder(filePath)
                .withSchema(SCHEMA)
                .withConf(new Configuration())
                .withCompressionCodec(CompressionCodecName.SNAPPY)
                .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
                .build())
        {

            for (GenericData.Record record : records)
            {
                writer.write(record);
            }
        }
    }

    public static void main(String[] args) //throws IOException
    {
        try
        {
            GenericData.Record record = new GenericData.Record(SCHEMA);

            record.put("Name", "John");
            record.put("Id", 1);
            record.put("PhoneNumber", "555-555-5551");
            record.put("ZipCode", 88888);
            record.put("isAlive", true);
            records.add(record);

            record = new GenericData.Record(SCHEMA);
            record.put("Name", "Jane");
            record.put("Id", 2);
            record.put("PhoneNumber", "555-555-5552");
            record.put("ZipCode", 99999);
            record.put("isAlive", false);
            records.add(record);

            ParquetAvroHandler handler = new ParquetAvroHandler();
            handler.write(records, OUTPUT_PATH);
            handler.read(OUTPUT_PATH);
        }
        catch (Exception e)
        {
            LOGGER.error(e.getMessage());
            e.printStackTrace();
        }
    }
}

Схема Avro: schema.avsc

{
   "namespace": "example.avro",
   "type": "record",
   "name": "org.apache.avro.file.Header",
   "fields":
   [
      {"name": "Name", "type": "string"},
      {"name": "Id",  "type": ["int", "null"]},
      {"name": "PhoneNumber", "type": ["string", "null"]},
      {"name": "ZipCode", "type": ["int", "null"]},
      {"name": "isAlive", "type": "boolean"}
   ] 
 }

Файл POM: pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>org.apache.parquet</groupId>
    <artifactId>parquet</artifactId>
    <version>1.10.0</version>
    <name>Sample Name</name>
    <description>Sample Description</description>    

    <dependencies>
        <!-- Generic  -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-io</artifactId>
            <version>1.3.2</version>
        </dependency>

        <!-- Avro & Hadoop  -->
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.8.2</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>3.0.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-core</artifactId>
            <version>1.2.1</version>
        </dependency>

        <!-- Parquet  -->
        <dependency>
            <groupId>org.apache.parquet</groupId>
            <artifactId>parquet-avro</artifactId>
            <version>1.10.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.parquet</groupId>
            <artifactId>parquet-column</artifactId>
            <version>1.10.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.parquet</groupId>
            <artifactId>parquet-common</artifactId>
            <version>1.10.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.parquet</groupId>
            <artifactId>parquet-encoding</artifactId>
            <version>1.10.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.parquet</groupId>
            <artifactId>parquet-format</artifactId>
            <version>2.6.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.parquet</groupId>
            <artifactId>parquet-hadoop</artifactId>
            <version>1.10.0</version>
        </dependency>

    </dependencies>
</project>

Конфигурация журнала: log4j.properties

# Root logger option
log4j.rootLogger=INFO, file, console

# Direct log messages to a log file
log4j.appender.file=org.apache.log4j.RollingFileAppender
log4j.appender.file.File=\systemlog.log
log4j.appender.file.MaxFileSize=10MB
log4j.appender.file.MaxBackupIndex=10
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n

# Direct log messages to console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.Target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
...