Запуск параллельных потоков в задании PySpark - PullRequest
0 голосов
/ 09 мая 2018

Я пытаюсь запустить параллельные потоки в задании на искру. Это работает без проблем, когда я запускаю скрипт python из Cli, но, насколько я понимаю, на самом деле не стоит использовать преимущества параллельной обработки кластера EMR. На самом деле он не сохраняет данные, когда я запускаю как искровую работу. Я даже не уверен, что он создает фрейм данных искры, когда я запускаю его как работу спарка.

Я также пытался использовать map вместо параллельных потоков, но не смог заставить это работать.

Если я не могу заставить параллелизм работать как искровое задание, похоже, что я мог бы просто запустить его на одном экземпляре ec2 с параллельными потоками.

Итак, основная логика такова -

  1. Создание контекста искры в верхней части скрипта
  2. Внутри класса - извлечение списка файлов для обработки из очереди SQS
  3. Зацикливание списка файлов следующим методом

    # this is run for 10 blocks of 10 files each across the EMR cluster in parallel
    
    def parquet_driver(self):
    
        max_threads = 20
        futures=[]
        pool = ThreadPoolExecutor(max_threads)
        i = 0
        total_files_processed = 0
        while total_files_processed <= len(self.master_file_list):
            while i < max_threads:
                print('Processing %s' % self.master_file_list[i])
    
                futures.append(pool.submit(self.convert_to_parquet,
                  self.master_file_list[i]))
                i += 1
    
            for x in as_completed(futures):
                pass
    
            # add in i number of files to the total
            total_files_processed += i
    

Обратите внимание, что файл передается в метод с именем "convert_to_parquet".

def convert_to_parquet(self, file):

log_file_name = file.split(':')[2].replace('.dat', '.log')
logger = Logger(log_file_name).get()

try:
    bucket = s3.Bucket(file.split(':')[0])
    file_name = file.split(':')[2]
    file_obj = bucket.Object(file.split(':')[1] + '/' + file.split(':')[2])
    partition_key = file.split(':')[2].split('.')[2]
    target_table = file.split(':')[2].split('.')[1]
    receipt_handle = file.split(':')[3]
    file_contents = file_obj.get()["Body"].read()
    if 'al1' not in file.split(':')[2]:
        logger.debug('Record type = %s, deleting from queue and returning ..' % target_table)
    else:
        logger.debug('Working on %s..' % target_table)
        app_name = file
        #sc = SparkContext(appName=app_name)
        print('Reading the following file from s3: %s' % file_name)
        print('Found the following file contents on s3: %s' % file_contents)
        rdd = sc.parallelize(file_contents.split('\n')).map(lambda line: line.split(','))

        # rdd = sc.textFile(csv_file).map(lambda line: line.split(','))
        # pd.read_csv(csv_file)
        sqlContext = sql.SQLContext(sc)

        if hasattr(rdd, "toDF"):
            df = rdd.toDF()
        else:
            spark = SparkSession
            df = rdd.toDF()

        logger.debug("Partitioning data to: {0}".format(partition_key))

        # Go to redshift and get the data definition
        metadata = self.build_df_definition('al1')

        if 'cycle_date' in metadata['columns']:
            metadata['columns'].remove('cycle_date')
        if 'log_timestamp' in metadata['columns']:
            metadata['columns'].remove('log_timestamp')

        cols = metadata['columns']
        data_types = metadata['data_types']

        for idx in range(0,len(cols)):
            col_str = '_' + str(int(idx) + 1)
            df_field_value = regexp_replace(df[col_str], '"', '')
            df = df.withColumn(cols[idx],df_field_value.cast(data_types[idx]))

        df = df.withColumn("cycle_date",lit(partition_key))
        # this field will be pushed to the sqs queue
        df = df.withColumn("log_timestamp",lit(self.log_timestamp))

        full_cols = cols
        full_cols.append('cycle_date')
        full_cols.append('log_timestamp')
        print(full_cols)
        ref_df = df.select(full_cols)
        ref_df.show()

        partitionby=['year','month','day']
        output='/opt/data/' + '/' + target_table
        s3_loc = 's3://<bucket>/<prefix>/' + target_table
        codec='snappy'

        ref_df.write.partitionBy(['cycle_date']).format("parquet").save(s3_loc, mode="append")

        #sc.stop()
except Exception as e:
    logger.debug(e)
    traceback.print_exc()
    open("/opt/logs/dump.log","w").write(traceback.print_exc())
    exit()
else:
    # Delete received message from queue
    sqs.delete_message(
        QueueUrl=self.queue_url,
        ReceiptHandle=receipt_handle
    )
    logger.debug('Received and deleted file: %s' % file)
...