У меня достаточно большой (~ 1 ТБ) набор данных Parquet, разделенный столбцом database_id
. Я хочу скопировать этот набор данных в новый набор данных, сохранив только те строки, в которых столбец index
содержится в отдельной таблице "special_indexes".
Мой текущий подход:
import pyspark.sql.functions as F
big_table = spark.read.parquet("path/to/partitioned/big_table.parquet")
unique_indexes_table = spark.read.parquet("path/to/unique_indexes.parquet")
out = (
big_table
.join(F.broadcast(unique_indexes_table), on="index")
.write
.save(
path="{path/to/partitioned/big_table_2.parquet}",
format='parquet',
mode='overwrite',
partitionBy="database_id")
)
Однако это приводит к случайному перемешиванию и приводит к ошибке java.io.IOException: No space left on device
в моем кластере из 10 узлов, где каждый узел имеет ~ 900 МБ дискового пространства в SPARK_LOCAL_DIRS
.
Я пытался заставить это работать несколько дней, но безуспешно. Я собираюсь переписать это в pyarrow
, где я читаю раздел и выполняю соединение по одному, но я не могу понять, почему pyspark
не может сделать то же самое?
Я разместил диаграмму SQL и план выполнения запроса ниже. Я думаю, что именно шаг Exchange
является причиной проблемы, и я не уверен, зачем это нужно?!
== Parsed Logical Plan ==
'InsertIntoHadoopFsRelationCommand file:/scratch/username/datapkg_output_dir/uniparc-domain-wstructure/master/remove_duplicate_matches/adjacency_matrix.parquet, false, ['database_id], Parquet, Map(path -> /scratch/username/datapkg_output_dir/uniparc-domain-wstructure/master/remove_duplicate_matches/adjacency_matrix.parquet), Overwrite, [__index_level_0__#77L, uniparc_id#70, sequence#71, database#72, interpro_name#73, interpro_id#74, domain_start#75L, domain_end#76L, domain_length#78L, structure_id#79, model_id#80, chain_id#81, pc_identity#82, alignment_length#83, mismatches#84, gap_opens#85, q_start#86, q_end#87, s_start#88, s_end#89, evalue_log10#90, bitscore#91, qseq#92, sseq#93, ... 11 more fields]
+- AnalysisBarrier
+- RepartitionByExpression [database_id#104], 200
+- Project [__index_level_0__#77L, uniparc_id#70, sequence#71, database#72, interpro_name#73, interpro_id#74, domain_start#75L, domain_end#76L, domain_length#78L, structure_id#79, model_id#80, chain_id#81, pc_identity#82, alignment_length#83, mismatches#84, gap_opens#85, q_start#86, q_end#87, s_start#88, s_end#89, evalue_log10#90, bitscore#91, qseq#92, sseq#93, ... 11 more fields]
+- Join Inner, (__index_level_0__#77L = __index_level_0__#222L)
:- Relation[uniparc_id#70,sequence#71,database#72,interpro_name#73,interpro_id#74,domain_start#75L,domain_end#76L,__index_level_0__#77L,domain_length#78L,structure_id#79,model_id#80,chain_id#81,pc_identity#82,alignment_length#83,mismatches#84,gap_opens#85,q_start#86,q_end#87,s_start#88,s_end#89,evalue_log10#90,bitscore#91,qseq#92,sseq#93,... 11 more fields] parquet
+- ResolvedHint (broadcast)
+- Relation[__index_level_0__#222L] parquet
== Analyzed Logical Plan ==
InsertIntoHadoopFsRelationCommand file:/scratch/username/datapkg_output_dir/uniparc-domain-wstructure/master/remove_duplicate_matches/adjacency_matrix.parquet, false, [database_id#104], Parquet, Map(path -> /scratch/username/datapkg_output_dir/uniparc-domain-wstructure/master/remove_duplicate_matches/adjacency_matrix.parquet), Overwrite, [__index_level_0__#77L, uniparc_id#70, sequence#71, database#72, interpro_name#73, interpro_id#74, domain_start#75L, domain_end#76L, domain_length#78L, structure_id#79, model_id#80, chain_id#81, pc_identity#82, alignment_length#83, mismatches#84, gap_opens#85, q_start#86, q_end#87, s_start#88, s_end#89, evalue_log10#90, bitscore#91, qseq#92, sseq#93, ... 11 more fields]
+- RepartitionByExpression [database_id#104], 200
+- Project [__index_level_0__#77L, uniparc_id#70, sequence#71, database#72, interpro_name#73, interpro_id#74, domain_start#75L, domain_end#76L, domain_length#78L, structure_id#79, model_id#80, chain_id#81, pc_identity#82, alignment_length#83, mismatches#84, gap_opens#85, q_start#86, q_end#87, s_start#88, s_end#89, evalue_log10#90, bitscore#91, qseq#92, sseq#93, ... 11 more fields]
+- Join Inner, (__index_level_0__#77L = __index_level_0__#222L)
:- Relation[uniparc_id#70,sequence#71,database#72,interpro_name#73,interpro_id#74,domain_start#75L,domain_end#76L,__index_level_0__#77L,domain_length#78L,structure_id#79,model_id#80,chain_id#81,pc_identity#82,alignment_length#83,mismatches#84,gap_opens#85,q_start#86,q_end#87,s_start#88,s_end#89,evalue_log10#90,bitscore#91,qseq#92,sseq#93,... 11 more fields] parquet
+- ResolvedHint (broadcast)
+- Relation[__index_level_0__#222L] parquet
== Optimized Logical Plan ==
InsertIntoHadoopFsRelationCommand file:/scratch/username/datapkg_output_dir/uniparc-domain-wstructure/master/remove_duplicate_matches/adjacency_matrix.parquet, false, [database_id#104], Parquet, Map(path -> /scratch/username/datapkg_output_dir/uniparc-domain-wstructure/master/remove_duplicate_matches/adjacency_matrix.parquet), Overwrite, [__index_level_0__#77L, uniparc_id#70, sequence#71, database#72, interpro_name#73, interpro_id#74, domain_start#75L, domain_end#76L, domain_length#78L, structure_id#79, model_id#80, chain_id#81, pc_identity#82, alignment_length#83, mismatches#84, gap_opens#85, q_start#86, q_end#87, s_start#88, s_end#89, evalue_log10#90, bitscore#91, qseq#92, sseq#93, ... 11 more fields]
+- RepartitionByExpression [database_id#104], 200
+- Project [__index_level_0__#77L, uniparc_id#70, sequence#71, database#72, interpro_name#73, interpro_id#74, domain_start#75L, domain_end#76L, domain_length#78L, structure_id#79, model_id#80, chain_id#81, pc_identity#82, alignment_length#83, mismatches#84, gap_opens#85, q_start#86, q_end#87, s_start#88, s_end#89, evalue_log10#90, bitscore#91, qseq#92, sseq#93, ... 11 more fields]
+- Join Inner, (__index_level_0__#77L = __index_level_0__#222L)
:- Filter isnotnull(__index_level_0__#77L)
: +- Relation[uniparc_id#70,sequence#71,database#72,interpro_name#73,interpro_id#74,domain_start#75L,domain_end#76L,__index_level_0__#77L,domain_length#78L,structure_id#79,model_id#80,chain_id#81,pc_identity#82,alignment_length#83,mismatches#84,gap_opens#85,q_start#86,q_end#87,s_start#88,s_end#89,evalue_log10#90,bitscore#91,qseq#92,sseq#93,... 11 more fields] parquet
+- ResolvedHint (broadcast)
+- Filter isnotnull(__index_level_0__#222L)
+- Relation[__index_level_0__#222L] parquet
== Physical Plan ==
Execute InsertIntoHadoopFsRelationCommand InsertIntoHadoopFsRelationCommand file:/scratch/username/datapkg_output_dir/uniparc-domain-wstructure/master/remove_duplicate_matches/adjacency_matrix.parquet, false, [database_id#104], Parquet, Map(path -> /scratch/username/datapkg_output_dir/uniparc-domain-wstructure/master/remove_duplicate_matches/adjacency_matrix.parquet), Overwrite, [__index_level_0__#77L, uniparc_id#70, sequence#71, database#72, interpro_name#73, interpro_id#74, domain_start#75L, domain_end#76L, domain_length#78L, structure_id#79, model_id#80, chain_id#81, pc_identity#82, alignment_length#83, mismatches#84, gap_opens#85, q_start#86, q_end#87, s_start#88, s_end#89, evalue_log10#90, bitscore#91, qseq#92, sseq#93, ... 11 more fields]
+- Exchange hashpartitioning(database_id#104, 200)
+- *(2) Project [__index_level_0__#77L, uniparc_id#70, sequence#71, database#72, interpro_name#73, interpro_id#74, domain_start#75L, domain_end#76L, domain_length#78L, structure_id#79, model_id#80, chain_id#81, pc_identity#82, alignment_length#83, mismatches#84, gap_opens#85, q_start#86, q_end#87, s_start#88, s_end#89, evalue_log10#90, bitscore#91, qseq#92, sseq#93, ... 11 more fields]
+- *(2) BroadcastHashJoin [__index_level_0__#77L], [__index_level_0__#222L], Inner, BuildRight
:- *(2) Project [uniparc_id#70, sequence#71, database#72, interpro_name#73, interpro_id#74, domain_start#75L, domain_end#76L, __index_level_0__#77L, domain_length#78L, structure_id#79, model_id#80, chain_id#81, pc_identity#82, alignment_length#83, mismatches#84, gap_opens#85, q_start#86, q_end#87, s_start#88, s_end#89, evalue_log10#90, bitscore#91, qseq#92, sseq#93, ... 11 more fields]
: +- *(2) Filter isnotnull(__index_level_0__#77L)
: +- *(2) FileScan parquet [uniparc_id#70,sequence#71,database#72,interpro_name#73,interpro_id#74,domain_start#75L,domain_end#76L,__index_level_0__#77L,domain_length#78L,structure_id#79,model_id#80,chain_id#81,pc_identity#82,alignment_length#83,mismatches#84,gap_opens#85,q_start#86,q_end#87,s_start#88,s_end#89,evalue_log10#90,bitscore#91,qseq#92,sseq#93,... 11 more fields] Batched: false, Format: Parquet, Location: InMemoryFileIndex[file:/scratch/username/datapkg_output_dir/uniparc-domain-wstructure/v0.1/contru..., PartitionCount: 1373, PartitionFilters: [], PushedFilters: [IsNotNull(__index_level_0__)], ReadSchema: struct