У меня есть кластер Amazon EMR - 30 узлов
Мой код Python выглядит так -
spark = SparkSession \
.builder \
.appName("App") \
.config(conf=sparkConf) \
.getOrCreate()
def fetchCatData(cat, tableName):
df_gl = spark.sql("select * from {} where category = {}".format(tableName, cat))
df_pandas = df_gl.select("*").toPandas()
df_pandas.to_csv("/tmp/split/{}_{}.csv".format(tableName, cat))
catList = [14, 15, 63, 65, 74, 21, 23, 60, 79, 86, 107, 147, 196, 199, 200, 201, 229, 263, 265, 267, 328, 421, 468, 469,504]
tableList = ["Table1","Table2"
,"Table3",
"Table4", "Table5", "Table6",
"Table7"
]
def main(args):
log4jLogger = spark._jvm.org.apache.log4j
LOGGER = log4jLogger.LogManager.getLogger(__name__)
for table in tableList:
LOGGER.info("Starting Split for {}".format(table))
dataLocation = "s3://test/APP/{}".format( table)
df = spark.read.parquet(dataLocation)
df = df.repartition("CATEGORY").cache()
df.createOrReplaceTempView(table)
for cat in catList:
fetchGLData(cat, table)
Я хочу решить следующую проблему -
- По сути, я хочу прочитать мои данные о паркете, разделить их по категориям и сохранить их в формате данных pandas в csv.
- В настоящее время я запускаю это последовательно, я хочу запустить это параллельно с каждой категорией, запущенной на узле в EMR
- Я пытался использовать многопроцессорность, но я не доволен результатами.
Как лучше всего решить эту проблему за наименьшее количество времени?