Моя проблема заключается в следующем:
У меня есть большой кадр данных с именем customer_data_pk, содержащий 230 миллионов строк, а другой содержит 200 миллионов строк с именем customer_data_pk_isb.
Оба имеют столбец callID, для которого я хотел бы выполнить левое соединение, левый информационный кадр - customer_data_pk.
Каков наилучший способ выполнения операции соединения?
Что я пробовал?
Простое соединение, т. Е.
customer_data_pk.join(customer_data_pk_isb, customer_data_pk.btn ==
customer_data_pk_isb.btn, 'left')
выдает нехватку памяти или (просто тайм-аут с ошибкой: удаление драйвера executor без последних тактов: 468990 мс превышает время ожидания 120000 мс).
После всего этого соединение по-прежнему не работает. Я все еще изучаю PySpark, поэтому, возможно, я неправильно понял основы. Если бы кто-то мог пролить свет на это, было бы здорово.
Я тоже попробовал это, но не сработало, и код завис:
customer_data_pk.persist (StorageLevel.DISK_ONLY)
Далее, от завершения конфигурации я использую: --conf spark.sql.shuffle.partitions = 5000
Мой полный код:
from pyspark import SparkContext
from pyspark import SQLContext
import time
import pyspark
sc = SparkContext("local", "Example")
sqlContext = SQLContext(sc);
customer_data_pk = sqlContext.read.format('jdbc').options(
url='jdbc:mysql://localhost/matchingqueryautomation',
driver='com.mysql.jdbc.Driver',
dbtable='customer_pk',
user='XXXX',
password='XXXX').load()
customer_data_pk.persist(pyspark.StorageLevel.DISK_ONLY)
customer_data_pk_isb = sqlContext.read.format('jdbc').options(
url='jdbc:mysql://localhost/lookupdb',
driver='com.mysql.jdbc.Driver',
dbtable='customer_pk_isb',
user='XXXX',
password='XXXX').load()
print('###########################', customer_data_pk.join(customer_data_pk_isb, customer_data_pk.btn == customer_data_pk_isb.btn, 'left').count(),
'###########################')