Справочная информация: Я работаю с клиническими данными с большим количеством различных .csv/.txt
файлов.Все эти файлы PatientID , но с разными полями.Я импортирую эти файлы в DataFrames
, что я буду join
на более позднем этапе после первой обработки каждого из этих DataFrames
в отдельности.Я показал примеры двух DataFrames
ниже (df_A
и df_B
).Точно так же у меня есть несколько DataFrames
- df_A
, df_B
, df_C
.... df_J
, и я сделаю join
все они на более поздней стадии.
df_A = spark.read.schema(schema).format("csv").load(...).... # Just an example
df_A.show(3)
#Example 1:
+----------+-----------------+
| patientID| diagnosis_code|
+----------+-----------------+
| A51| XIII|
| B22| VI|
| B13| XV|
+----------+-----------------+
df_B.show(3)
#Example 2:
+-----------+----------+-------+-------------+--------+
| patientID| hospital| city| doctor_name| Bill|
+-----------+----------+-------+-------------+--------+
| A51| Royal H| London|C.Braithwaite| 451.23|
| B22|Surgery K.| Leeds| J.Small| 88.00|
| B22|Surgery K.| Leeds| J.Small| 102.01|
+-----------+----------+-------+-------------+--------+
print("Number of partitions: {}".format(df_A.rdd.getNumPartitions()))# Num of partitions: 1
print("Partitioner: {}".format(df_A.rdd.partitioner)) # Partitioner: None
Number of partitions: 1 #With other DataFrames I get more partitions.
Partitioner: None
После прочтения всех этих .csv/.txt
файлов в DataFrames
я вижу, что для некоторых DataFrames
данные распределяются только по 1 partition
(как выше), но для других DataFrames
может быть больше разделов,в зависимости от размера соответствующего файла .csv/.txt
, который, в свою очередь, влияет на количество созданных блоков (размер по умолчанию 128 МБ в HDFS
).У нас также нет partitioner
на данный момент.
Вопрос: А разве не стоит перераспределять эти DataFrames
на несколько partitions
, hashed
на основе PatientID , поэтомучто мы можем избежать как можно большего числа shuffling
, когда мы join()
кратны DataFrames
?Если действительно, это то, что нужно, тогда я должен сделать перераспределение на основе PatientID и иметь одинаковые partitioner
для всех DataFrames
(не уверен, если это возможно)?Я также читал, что DataFrame
делает все самостоятельно, но не следует ли нам указать hashing
в соответствии с колонкой PatientID ?
Я буду очень признателен, если кто-то может предоставить некоторые полезные ссылкиили подсказки о том, какую стратегию оптимизации следует использовать при работе с этими несколькими DataFrames
, основанными на PatientID .