Я пытаюсь использовать многопроцессорную обработку для чтения 100 CSV-файлов параллельно (и впоследствии обрабатывать их отдельно параллельно).Вот мой код, работающий на Jupyter, размещенный на моем главном узле EMR в AWS.(В конечном итоге это будут 100k CSV-файлов, следовательно, потребуется распределенное чтение).
import findspark
import boto3
from multiprocessing.pool import ThreadPool
import logging
import sys
findspark.init()
from pyspark import SparkContext, SparkConf, sql
conf = SparkConf().setMaster("local[*]")
conf.set('spark.scheduler.mode', 'FAIR')
sc = SparkContext.getOrCreate(conf)
spark = sql.SparkSession.builder.master("local[*]").appName("ETL").getOrCreate()
s3 = boto3.resource(...)
bucket = ''
bucketObj = s3.Bucket(bucket)
numNodes = 64
def processTest(key):
logger.info(key + ' ---- Start\n')
fLog = spark.read.option("header", "true") \
.option("inferSchema", "true") \
.csv(buildS3Path(bucket) + key)
logger.info(key + ' ---- Finish Read\n')
fLog = renameColumns(NAME_MAP, fLog)
logger.info(key + ' ---- Finish Rename\n')
(landLog, flags) = validate(fLog)
logger.info(key + ' ---- Finish Validation\n')
files = list(bucketObj.objects.filter(Prefix=subfolder))
keys = list(map(lambda obj: obj.key, files))
keys = keys
# files = s3C.list_objects(Bucket=bucket, Prefix=subfolder)['Contents']
p = ThreadPool(numNodes)
p.map(process, keys)
Работает нормально, за исключением того, что используется только главный узел.![The blue line is my master node.](https://i.stack.imgur.com/VNFO3.png)
Синяя линия - загрузка ЦП на моем главном узле.Все журналы показывают, что я работаю на одной машине:
INFO:pyspark:172.31.29.33
Как заставить spark распределять пул среди рабочих?