Как запустить код Python через узлы в кластере EMR - PullRequest
0 голосов
/ 09 сентября 2018

У меня есть кластер Amazon EMR - 30 узлов Мой код Python выглядит так -

spark = SparkSession \
        .builder \
        .appName("App") \
        .config(conf=sparkConf) \
        .getOrCreate()

def fetchCatData(cat, tableName):
    df_gl = spark.sql("select * from {} where category = {}".format(tableName, cat))
    df_pandas = df_gl.select("*").toPandas()
    df_pandas.to_csv("/tmp/split/{}_{}.csv".format(tableName, cat))

catList = [14, 15, 63, 65, 74, 21, 23, 60, 79, 86, 107, 147, 196, 199, 200, 201, 229, 263, 265, 267, 328, 421, 468, 469,504]
tableList = ["Table1","Table2"
             ,"Table3",
             "Table4", "Table5", "Table6",
             "Table7"
             ]

def main(args):
    log4jLogger = spark._jvm.org.apache.log4j
    LOGGER = log4jLogger.LogManager.getLogger(__name__)

    for table in tableList:
        LOGGER.info("Starting Split for {}".format(table))
        dataLocation = "s3://test/APP/{}".format( table)
        df = spark.read.parquet(dataLocation)
        df = df.repartition("CATEGORY").cache()
        df.createOrReplaceTempView(table)
        for cat in catList:
            fetchGLData(cat, table)

Я хочу решить следующую проблему -

  1. По сути, я хочу прочитать мои данные о паркете, разделить их по категориям и сохранить их в формате данных pandas в csv.
  2. В настоящее время я запускаю это последовательно, я хочу запустить это параллельно с каждой категорией, запущенной на узле в EMR
  3. Я пытался использовать многопроцессорность, но я не доволен результатами.

Как лучше всего решить эту проблему за наименьшее количество времени?

1 Ответ

0 голосов
/ 09 сентября 2018

Не уверен, почему вы хотите преобразовать в pandas dataframe, но используя spark dataframe, созданный из вашего spark sql, вы можете напрямую писать в csv.

Однако, если вы хотите, чтобы csv представлял собой один файл, вам нужно будет переразбить на 1, который не будет использовать все узлы. Если вы не беспокоитесь о том, сколько файлов он генерирует, вы можете перераспределить фрейм данных, чтобы включить больше разделов. Каждый раздел будет затем обрабатываться узлами и выводиться до тех пор, пока все разделы не будут завершены.

Один файл не использует все узлы (обратите внимание, что CSV будет папка, содержащая фактический CSV)

df_gl = spark.sql("select * from {} where category = {}".format(tableName, cat)) df_gl.repartition(1).write.mode("overwrite").csv("/tmp/split/{}_{}.csv".format(tableName, cat))

Параллельная обработка с использованием нескольких узлов и вывод в виде нескольких разделенных файлов (примечание. CSV будет папкой с действительными CSV-файлами)

df_gl = spark.sql("select * from {} where category = {}".format(tableName, cat)).repartition(10) df_gl.write.mode("overwrite").csv("/tmp/split/{}_{}.csv".format(tableName, cat))

...