Я пытаюсь запустить параллельные потоки в задании на искру. Это работает без проблем, когда я запускаю скрипт python из Cli, но, насколько я понимаю, на самом деле не стоит использовать преимущества параллельной обработки кластера EMR. На самом деле он не сохраняет данные, когда я запускаю как искровую работу. Я даже не уверен, что он создает фрейм данных искры, когда я запускаю его как работу спарка.
Я также пытался использовать map вместо параллельных потоков, но не смог заставить это работать.
Если я не могу заставить параллелизм работать как искровое задание, похоже, что я мог бы просто запустить его на одном экземпляре ec2 с параллельными потоками.
Итак, основная логика такова -
- Создание контекста искры в верхней части скрипта
- Внутри класса - извлечение списка файлов для обработки из очереди SQS
Зацикливание списка файлов следующим методом
# this is run for 10 blocks of 10 files each across the EMR cluster in parallel
def parquet_driver(self):
max_threads = 20
futures=[]
pool = ThreadPoolExecutor(max_threads)
i = 0
total_files_processed = 0
while total_files_processed <= len(self.master_file_list):
while i < max_threads:
print('Processing %s' % self.master_file_list[i])
futures.append(pool.submit(self.convert_to_parquet,
self.master_file_list[i]))
i += 1
for x in as_completed(futures):
pass
# add in i number of files to the total
total_files_processed += i
Обратите внимание, что файл передается в метод с именем "convert_to_parquet".
def convert_to_parquet(self, file):
log_file_name = file.split(':')[2].replace('.dat', '.log')
logger = Logger(log_file_name).get()
try:
bucket = s3.Bucket(file.split(':')[0])
file_name = file.split(':')[2]
file_obj = bucket.Object(file.split(':')[1] + '/' + file.split(':')[2])
partition_key = file.split(':')[2].split('.')[2]
target_table = file.split(':')[2].split('.')[1]
receipt_handle = file.split(':')[3]
file_contents = file_obj.get()["Body"].read()
if 'al1' not in file.split(':')[2]:
logger.debug('Record type = %s, deleting from queue and returning ..' % target_table)
else:
logger.debug('Working on %s..' % target_table)
app_name = file
#sc = SparkContext(appName=app_name)
print('Reading the following file from s3: %s' % file_name)
print('Found the following file contents on s3: %s' % file_contents)
rdd = sc.parallelize(file_contents.split('\n')).map(lambda line: line.split(','))
# rdd = sc.textFile(csv_file).map(lambda line: line.split(','))
# pd.read_csv(csv_file)
sqlContext = sql.SQLContext(sc)
if hasattr(rdd, "toDF"):
df = rdd.toDF()
else:
spark = SparkSession
df = rdd.toDF()
logger.debug("Partitioning data to: {0}".format(partition_key))
# Go to redshift and get the data definition
metadata = self.build_df_definition('al1')
if 'cycle_date' in metadata['columns']:
metadata['columns'].remove('cycle_date')
if 'log_timestamp' in metadata['columns']:
metadata['columns'].remove('log_timestamp')
cols = metadata['columns']
data_types = metadata['data_types']
for idx in range(0,len(cols)):
col_str = '_' + str(int(idx) + 1)
df_field_value = regexp_replace(df[col_str], '"', '')
df = df.withColumn(cols[idx],df_field_value.cast(data_types[idx]))
df = df.withColumn("cycle_date",lit(partition_key))
# this field will be pushed to the sqs queue
df = df.withColumn("log_timestamp",lit(self.log_timestamp))
full_cols = cols
full_cols.append('cycle_date')
full_cols.append('log_timestamp')
print(full_cols)
ref_df = df.select(full_cols)
ref_df.show()
partitionby=['year','month','day']
output='/opt/data/' + '/' + target_table
s3_loc = 's3://<bucket>/<prefix>/' + target_table
codec='snappy'
ref_df.write.partitionBy(['cycle_date']).format("parquet").save(s3_loc, mode="append")
#sc.stop()
except Exception as e:
logger.debug(e)
traceback.print_exc()
open("/opt/logs/dump.log","w").write(traceback.print_exc())
exit()
else:
# Delete received message from queue
sqs.delete_message(
QueueUrl=self.queue_url,
ReceiptHandle=receipt_handle
)
logger.debug('Received and deleted file: %s' % file)