Python: Как ускорить функцию запросов на большом фрейме данных панд - PullRequest
0 голосов
/ 30 апреля 2018

У меня есть таблица с 38 миллионами строк. Один столбец - это URL. Мне нужно перейти к этому URL для каждой строки, извлечь и обработать XML, чтобы сохранить его в новом столбце. Я распараллелил весь процесс на куски с помощью пула и также использую кадры данных pandas. Я использую 100-процентную 8-ядерную емкость с 1000-мегапиксельным интернетом, и процесс рассчитан на 12 дней. Любой совет, как улучшить это?

class Receptores():
    def aux_tupla(self, df):
        df['aux_tupla'] = df['uri'].str.replace('/v01/', '/depot/').apply(lambda x: self.uriToDicts3(x))
        return df

    def uriToDicts3(self, url):
        regex = self.URL_REGEX.match(url)
        path = "%s/%s/%s/%s.gz" % (regex.group(1), regex.group(2), regex.group(4), regex.group(5))
        _file = self.bucket.get_key(path, validate=False)
        compressed_file = BytesIO()
        try:
            _file.get_file(compressed_file)
            compressed_file.seek(0)
            decompressed_file = gzip.GzipFile(fileobj=compressed_file, mode='rb')
            rq = decompressed_file.read()

        except boto.exception.S3ResponseError as ex:
            print("Error >>", ex.message)
            return json.dumps({}), json.dumps({})

        soup = bs(rq, 'xml')
        detalle = soup.find('Detalle')
        detalle = json.dumps(xmltodict.parse(str(detalle)))
        dictionary = {}
        for key in self.datos_adicionales:
            try:
                value = soup.find(key)
                if value is None:
                    value = soup.find(text=re.compile(key)).parent.parent.find('ValorDA').get_text()
                else:
                    value = value.get_text()
                dictionary[key] = value
            except Exception:
                continue
        dictionary = json.dumps(dictionary)
        return dictionary, detalle


    def pool_only(self, df):
        df_split = np.array_split(df, 8)
        pool = Pool(8)
        df = pd.concat(pool.map(self.aux_tupla, df_split))
        pool.close()
        pool.join()
        return df

    def main(self, dia, choice='pool'):
        t1 = time.time()
        df = self.getUris(dia, limit=True)
        print('FINISHED: {} get Uris in {}'.format(dia, time.time() - t1))
        if choice == 'pool':
        df = self.pool_only(df)
        elif choice == 'combined':
            self.pool(df)
            df = pd.concat(self.dfs)
            print([i.shape[0] for i in self.dfs])
        elif choice == 'thread':
            self.thread_only(df)
            df = pd.concat(self.dfs)
            print([i.shape[0] for i in self.dfs])
        else:
            df['aux_tupla'] = df['uri'].str.replace('/v01/', '/depot/').apply(lambda x: self.uriToDicts3(x))
        print('FINISHED: {} , {} rows uriToDicts3 in {} hours'.format(dia, df.shape[0], (time.time() - t1) / 3600))
        df[['data_adicional', 'detalle']] = df['aux_tupla'].apply(pd.Series)
        df.drop('aux_tupla', axis=1, inplace=True)
        # self.insert_table(df)
        return df

def parallel(dia):
    t1 = time.time()
    a = Receptores().main(dia, choice='pool')
    a.to_csv('{}.csv'.format(dia), index=False)
    # print('LISTO {} - {}'.format(dia, time.time() - t1))
    return a

if __name__ == '__main__':
    t1 = time.time()
    # df = pd.read_csv('dia_emision_batch.csv')
    # dias = [str(i) for i in df.loc[:, 'dia_emision']]
    dias = ['20180101', '20170910', '20170730']
    for i in dias:
        if os.path.exists('{}.csv'.format(i)):
            print('Already exists:', i)
            continue
        try:
            parallel(i)
        except Exception:
            print('Failed!', i)
    print('TOTAL TIME: {}'.format((time.time() - t1) / 3600))
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...