У меня есть таблица с 38 миллионами строк. Один столбец - это URL. Мне нужно перейти к этому URL для каждой строки, извлечь и обработать XML, чтобы сохранить его в новом столбце.
Я распараллелил весь процесс на куски с помощью пула и также использую кадры данных pandas. Я использую 100-процентную 8-ядерную емкость с 1000-мегапиксельным интернетом, и процесс рассчитан на 12 дней.
Любой совет, как улучшить это?
class Receptores():
def aux_tupla(self, df):
df['aux_tupla'] = df['uri'].str.replace('/v01/', '/depot/').apply(lambda x: self.uriToDicts3(x))
return df
def uriToDicts3(self, url):
regex = self.URL_REGEX.match(url)
path = "%s/%s/%s/%s.gz" % (regex.group(1), regex.group(2), regex.group(4), regex.group(5))
_file = self.bucket.get_key(path, validate=False)
compressed_file = BytesIO()
try:
_file.get_file(compressed_file)
compressed_file.seek(0)
decompressed_file = gzip.GzipFile(fileobj=compressed_file, mode='rb')
rq = decompressed_file.read()
except boto.exception.S3ResponseError as ex:
print("Error >>", ex.message)
return json.dumps({}), json.dumps({})
soup = bs(rq, 'xml')
detalle = soup.find('Detalle')
detalle = json.dumps(xmltodict.parse(str(detalle)))
dictionary = {}
for key in self.datos_adicionales:
try:
value = soup.find(key)
if value is None:
value = soup.find(text=re.compile(key)).parent.parent.find('ValorDA').get_text()
else:
value = value.get_text()
dictionary[key] = value
except Exception:
continue
dictionary = json.dumps(dictionary)
return dictionary, detalle
def pool_only(self, df):
df_split = np.array_split(df, 8)
pool = Pool(8)
df = pd.concat(pool.map(self.aux_tupla, df_split))
pool.close()
pool.join()
return df
def main(self, dia, choice='pool'):
t1 = time.time()
df = self.getUris(dia, limit=True)
print('FINISHED: {} get Uris in {}'.format(dia, time.time() - t1))
if choice == 'pool':
df = self.pool_only(df)
elif choice == 'combined':
self.pool(df)
df = pd.concat(self.dfs)
print([i.shape[0] for i in self.dfs])
elif choice == 'thread':
self.thread_only(df)
df = pd.concat(self.dfs)
print([i.shape[0] for i in self.dfs])
else:
df['aux_tupla'] = df['uri'].str.replace('/v01/', '/depot/').apply(lambda x: self.uriToDicts3(x))
print('FINISHED: {} , {} rows uriToDicts3 in {} hours'.format(dia, df.shape[0], (time.time() - t1) / 3600))
df[['data_adicional', 'detalle']] = df['aux_tupla'].apply(pd.Series)
df.drop('aux_tupla', axis=1, inplace=True)
# self.insert_table(df)
return df
def parallel(dia):
t1 = time.time()
a = Receptores().main(dia, choice='pool')
a.to_csv('{}.csv'.format(dia), index=False)
# print('LISTO {} - {}'.format(dia, time.time() - t1))
return a
if __name__ == '__main__':
t1 = time.time()
# df = pd.read_csv('dia_emision_batch.csv')
# dias = [str(i) for i in df.loc[:, 'dia_emision']]
dias = ['20180101', '20170910', '20170730']
for i in dias:
if os.path.exists('{}.csv'.format(i)):
print('Already exists:', i)
continue
try:
parallel(i)
except Exception:
print('Failed!', i)
print('TOTAL TIME: {}'.format((time.time() - t1) / 3600))