Слияние паркетных файлов с разными столбцами в PySpark - PullRequest
0 голосов
/ 18 февраля 2019

Я пытаюсь объединить несколько файлов паркета, расположенных в HDFS, с помощью PySpark.
Эти файлы имеют разные столбцы и типы столбцов.

from pyspark.sql import SparkSession
from pyspark.sql import Row
spark = SparkSession.builder.appName("test").config("spark.dynamicAllocation.enabled", "true").config("spark.shuffle.service.enabled", "true").config("spark.executor.cores","10").config("spark.executor.memory", "48G").config("spark.driver.memory", "86G").config('spark.dynamicAllocation.maxExecutors','30').enableHiveSupport().getOrCreate()

import os
import calendar
import time
import string

sc = spark.sparkContext
df = sqlContext.read.parquet("hdfs_path/*.parquet").coalesce(1)
df.write.parquet("hdfs_destination_path")

Я получил следующее сообщение об ошибке -

Py4JJavaError: An error occurred while calling o83.parquet.
: org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:215)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:173)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:173)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:173)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:145)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
at org.apache.spark.sql.execution.datasources.DataSource.writeInFileFormat(DataSource.scala:438)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:474)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:610)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:233)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:217)
at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:509)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 4, pwccdhus-slave12.cip.com, executor 1): org.apache.spark.SparkException: Task failed while writing rows
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:191)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:190)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.UnsupportedOperationException: parquet.column.values.dictionary.PlainValuesDictionary$PlainDoubleDictionary
at parquet.column.Dictionary.decodeToBinary(Dictionary.java:44)
at org.apache.spark.sql.execution.vectorized.ColumnVector.getUTF8String(ColumnVector.java:631)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:438)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:315)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:258)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:256)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1375)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:261)
... 8 more

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:188)
... 45 more
Caused by: org.apache.spark.SparkException: Task failed while writing rows
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:191)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:190)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Caused by: java.lang.UnsupportedOperationException: parquet.column.values.dictionary.PlainValuesDictionary$PlainDoubleDictionary
at parquet.column.Dictionary.decodeToBinary(Dictionary.java:44)
at org.apache.spark.sql.execution.vectorized.ColumnVector.getUTF8String(ColumnVector.java:631)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:438)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:315)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:258)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:256)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1375)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:261)
... 8 more

Поэтому я попытался установить параметр для включения слияния схемы, но он тоже не сработал.

from pyspark.sql import SparkSession
from pyspark.sql import Row

spark = SparkSession.builder.appName("test").config("spark.dynamicAllocation.enabled", "true").config("spark.shuffle.service.enabled", "true").config("spark.executor.cores","10").config("spark.executor.memory", "48G").config("spark.driver.memory", "86G").config('spark.dynamicAllocation.maxExecutors','30').enableHiveSupport().getOrCreate()

import os
import calendar
import time
import string

sc = spark.sparkContext
spark.conf.set("spark.sql.parquet.mergeSchema", "true")
df = sqlContext.read.parquet("hdfs_path/*.parquet").coalesce(1)
df.write.parquet("hdfs_destination_path")

spark.conf.set("spark.sql.parquet.mergeSchema", "true")

Это привело к приведенной ниже ошибке -

Py4JJavaError: An error occurred while calling o152.parquet.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 1.0 failed 4 times, most recent failure: Lost task 1.3 in stage 1.0 (TID 9, pwccdhus-slave22.cip.com, executor 1): org.apache.spark.SparkException: Failed merging schema of file hdfs://pwccdhus-master1.cip.com:8020/hdfs_path/xyz.parquet/part-00000-a6b8e35f-ce2f-416f-8cce-3e5a1e252380-c000.snappy.parquet:
root
 |-- CONTRACTING_FIRM_CLIENT_ID: string (nullable = true)
 |-- COMPANY_CODE: string (nullable = true)
 |-- PROFIT_CENTER: string (nullable = true)
 |-- FISCAL_MONTH: integer (nullable = true)
 |-- CHARGED_HOURS: double (nullable = true)
 |-- FEE_REV_EXTERNAL_CLIENTS: double (nullable = true)
 |-- ENGAGEMENT_MARGIN: double (nullable = true)
 |-- PRODUCT_CODE: string (nullable = true)
 |-- MONTH: string (nullable = true)
 |-- WBS_ELEMENT_ID: double (nullable = true)

Я хочу, чтобы конечный результат был одним объединенным файлом в указанном месте.Какой должен быть подход?
PySpark - единственный вариант, с которым мне нужно идти дальше.

Я также попробовал следующий способ -

import os
import calendar
import time
import string


sc = spark.sparkContext 
path = input("Enter Path: ")

fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
list_status = fs.listStatus(spark._jvm.org.apache.hadoop.fs.Path(path))
result = [file.getPath().getName() for file in list_status]
gzList = [ fi for fi in result if fi.endswith(".gz") ]
parquetList = [ fi for fi in result if fi.endswith(".parquet") ]


column_names = "ColA|ColB|ColC" 
temp = spark.createDataFrame( 
[ tuple('' for i in column_names.split("|")) 
], 
column_names.split("|") 
).where("1=0")

temp = temp.withColumn("id", monotonically_increasing_id())
if (len(gzList) == 0):
    for i in range(len(parquetList)):
        df = spark.read.parquet(path + parquetList[i])
        df.withColumn("id", monotonically_increasing_id())
        temp = df.join(df, "id", "outer").drop("id")

Я получаю сообщение об ошибке ниже -

AnalysisException: u'USING column `id` cannot be resolved on the left side of the join. The left-side columns: [WBS_ELEMENT_ID, WBS_ELEMENT_NAME, PROJECT_TYPE_ID, PROJECT_TYPE_NAME, CONTRACT_ID, CONTRACT_LINE_NUMBER, CONTRACT_LINE_NAME, WBS_FUNC, WBS_FUNC_DESCR, WBS_ELMT_STAT, WBS_ELMT_STAT_DESCR, ENG_CREATION_DATE, END_DATE, PROFIT_CENTER, COMPANY_CODE, CONTRACTING_FIRM_CLIENT_ID, PRODUCT_CODE];'

Что я делаю не так?Я пытаюсь запустить цикл, который будет читать все файлы и объединять их по одному.

1 Ответ

0 голосов
/ 18 февраля 2019

Поскольку файловые структуры различны для каждого файла в каталоге hdfs_path, вам придется читать каждый файл по отдельности и создавать DataFrame.Используйте этот scala-код в spark-shell

// spark-shell
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

val hadoopconf = new Configuration();
val fs = FileSystem.get(hadoopconf);

val baseDir = "hdfs_path"
val fileUris = fs.listStatus(new Path(baseDir)).filter(_.isFile).map(_.getPath.toUri.toString).filter(_.endsWith("parquet"))
fileUris.foreach(println) // print file names

val dfs = fileUris.map(sqlContext.read.parquet)
val primaryKeyCols = Seq("col1", "col2")

// If you want to left join
val joined_df = dfs.reduce((x, y) => x.join(y, primaryKeyCols, "left"))

. Это даст вам массив DataFrame.Затем вы должны перейти к объединить их.Вам следует либо join (если вы хотите объединить по горизонтали), либо union (до объединить по вертикали / добавить) метод в DataFrame.Обратите внимание, что для объединения DataFrames вам нужно сделать так, чтобы они имели одинаковую схему.

Нет AFAIK, нет интеграции api файловой системы с pyspark.Однако вы можете напечатать файл Uris в spark-shell и в отдельной консоли pyspark вы можете жестко закодировать этот список.

# pyspark
file_uris = [
"hdfs://namenaode:8020/hdfs_path/file1.parquet", 
"hdfs://namenaode:8020/hdfs_path/file2.parquet", 

"hdfs://namenaode:8020/hdfs_path/fileN.parquet"
]

dfs = [sqlContext.read.parquet(x) for x in file_uris]
primary_key_cols = ["col1", "col2"]
df = reduce(lambda x,y: x.join(y, primary_key_cols, "left"), dfs) # for left join
...