Я пытаюсь преобразовать текстовый файл в файл паркета. Я могу найти только «как конвертировать в паркет» из другого формата файла или кода, написанного на Scala / Python.
Вот что я придумал
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.MessageTypeParser;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;
private static final StructField[] fields = new StructField[]{
new StructField("timeCreate", DataTypes.StringType, false, Metadata.empty()),
new StructField("cookieCreate", DataTypes.StringType, false,Metadata.empty())
};//simplified
private static final StructType schema = new StructType(fields);
public static void main(String[] args) throws IOException {
SparkSession spark = SparkSession
.builder().master("spark://levanhuong:7077")
.appName("Convert text file to Parquet")
.getOrCreate();
spark.conf().set("spark.executor.memory", "1G");
WriteParquet(spark, args);
}
public static void WriteParquet(SparkSession spark, String[] args){
JavaRDD<String> data = spark.read().textFile(args[0]).toJavaRDD();
JavaRDD<Row> output = data.map((Function<String, Row>) s -> {
DataModel model = new DataModel(s);
return RowFactory.create(model);
});
Dataset<Row> df = spark.createDataFrame(output.rdd(),schema);
df.printSchema();
df.show(2);
df.write().parquet(args[1]);
}
args[0]
- это путь к входному файлу, args[1]
- это путь к выходному файлу. вот упрощенная модель данных. DateTime
поля правильно отформатированы в функции set ()
public class DataModel implements Serializable {
DateTime timeCreate;
DateTime cookieCreate;
public DataModel(String data){
String model[] = data.split("\t");
setTimeCreate(model[0]);
setCookieCreate(model[1]);
}
А вот и ошибка. Журнал ошибок указывает на df.show(2)
, но я думаю, что ошибка была вызвана map()
. Я не уверен, почему, так как я не вижу кастинга в коде
>java.lang.ClassCastException: cannot assign instance of
java.lang.invoke.SerializedLambda to field org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.fun$1
of type org.apache.spark.api.java.function.Function in instance
of org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1
Я думаю, этого достаточно, чтобы воссоздать ошибку, пожалуйста, скажите мне, если мне нужно предоставить дополнительную информацию.