PySpark: разбиение и хеширование нескольких фреймов данных, затем соединение - PullRequest
0 голосов
/ 22 ноября 2018

Справочная информация: Я работаю с клиническими данными с большим количеством различных .csv/.txt файлов.Все эти файлы PatientID , но с разными полями.Я импортирую эти файлы в DataFrames, что я буду join на более позднем этапе после первой обработки каждого из этих DataFrames в отдельности.Я показал примеры двух DataFrames ниже (df_A и df_B).Точно так же у меня есть несколько DataFrames - df_A, df_B, df_C .... df_J, и я сделаю join все они на более поздней стадии.

df_A = spark.read.schema(schema).format("csv").load(...)....            # Just an example
df_A.show(3)
#Example 1:
+----------+-----------------+
| patientID|   diagnosis_code|
+----------+-----------------+
|       A51|             XIII|
|       B22|               VI|
|       B13|               XV|
+----------+-----------------+
df_B.show(3)
#Example 2:
+-----------+----------+-------+-------------+--------+
|  patientID|  hospital|   city|  doctor_name|    Bill|
+-----------+----------+-------+-------------+--------+
|        A51|   Royal H| London|C.Braithwaite|  451.23|
|        B22|Surgery K.|  Leeds|      J.Small|   88.00|
|        B22|Surgery K.|  Leeds|      J.Small|  102.01|
+-----------+----------+-------+-------------+--------+
print("Number of partitions: {}".format(df_A.rdd.getNumPartitions()))# Num of partitions: 1
print("Partitioner: {}".format(df_A.rdd.partitioner))                # Partitioner: None

Number of partitions: 1 #With other DataFrames I get more partitions.
Partitioner: None

После прочтения всех этих .csv/.txt файлов в DataFrames я вижу, что для некоторых DataFrames данные распределяются только по 1 partition (как выше), но для других DataFrames может быть больше разделов,в зависимости от размера соответствующего файла .csv/.txt, который, в свою очередь, влияет на количество созданных блоков (размер по умолчанию 128 МБ в HDFS).У нас также нет partitioner на данный момент.

Вопрос: А разве не стоит перераспределять эти DataFrames на несколько partitions, hashed на основе PatientID , поэтомучто мы можем избежать как можно большего числа shuffling, когда мы join() кратны DataFrames?Если действительно, это то, что нужно, тогда я должен сделать перераспределение на основе PatientID и иметь одинаковые partitioner для всех DataFrames (не уверен, если это возможно)?Я также читал, что DataFrame делает все самостоятельно, но не следует ли нам указать hashing в соответствии с колонкой PatientID ?

Я буду очень признателен, если кто-то может предоставить некоторые полезные ссылкиили подсказки о том, какую стратегию оптимизации следует использовать при работе с этими несколькими DataFrames, основанными на PatientID .

...