Проблемы с производительностью при чтении данных из улья с использованием Python - PullRequest
7 голосов
/ 08 июля 2019

У меня есть таблица в кусте с 351 837 (размер 110 МБ) записями, и я читаю эту таблицу, используя python и записываю на сервер sql.

В этом процессе при считывании данных из улья в информационный фрейм панд уходит много времени. Когда я загружаю целые записи (351 Кб), это занимает 90 минут.

Для улучшения я пошел следующим образом: чтение 10 тыс. Строк из куста и запись на сервер sql. Но чтение 10 тыс. Строк один раз из улья и назначение его в Dataframe занимает всего 4-5 минут времени.

def execute_hadoop_export():
       """
       This will run the steps required for a Hadoop Export.  
       Return Values is boolean for success fail
       """
       try:

           hql='select * from db.table '
           # Open Hive ODBC Connection
           src_conn = pyodbc.connect("DSN=****",autocommit=True)
           cursor=src_conn.cursor()
           #tgt_conn = pyodbc.connect(target_connection)

           # Using SQLAlchemy to dynamically generate query and leverage dataframe.to_sql to write to sql server...
           sql_conn_url = urllib.quote_plus('DRIVER={ODBC Driver 13 for SQL Server};SERVER=Xyz;DATABASE=Db2;UID=ee;PWD=*****')
           sql_conn_str = "mssql+pyodbc:///?odbc_connect={0}".format(sql_conn_url)
           engine = sqlalchemy.create_engine(sql_conn_str)
           # read source table.
           vstart=datetime.datetime.now()
           for df in pandas.read_sql(hql, src_conn,chunksize=10000):

               vfinish=datetime.datetime.now()

               print 'Finished 10k rows reading from hive and it took', (vfinish-vstart).seconds/60.0,' minutes'
           # Get connection string for target from Ctrl.Connnection

               df.to_sql(name='table', schema='dbo', con=engine, chunksize=10000, if_exists="append", index=False) 
               print 'Finished 10k rows writing into sql server and it took', (datetime.datetime.now()-vfinish).seconds/60.0, ' minutes'
               vstart=datetime.datetime.now()
           cursor.Close()


       except Exception, e:
           print str(e)

выход:

Result

Какой самый быстрый способ чтения данных таблицы улья в python?

Обновление структура таблицы улья

CREATE TABLE `table1`(
  `policynumber` varchar(15), 
  `unitidentifier` int, 
  `unitvin` varchar(150), 
  `unitdescription` varchar(100), 
  `unitmodelyear` varchar(4), 
  `unitpremium` decimal(18,2), 
  `garagelocation` varchar(150), 
  `garagestate` varchar(50), 
  `bodilyinjuryoccurrence` decimal(18,2), 
  `bodilyinjuryaggregate` decimal(18,2), 
  `bodilyinjurypremium` decimal(18,2), 
  `propertydamagelimits` decimal(18,2), 
  `propertydamagepremium` decimal(18,2), 
  `medicallimits` decimal(18,2), 
  `medicalpremium` decimal(18,2), 
  `uninsuredmotoristoccurrence` decimal(18,2), 
  `uninsuredmotoristaggregate` decimal(18,2), 
  `uninsuredmotoristpremium` decimal(18,2), 
  `underinsuredmotoristoccurrence` decimal(18,2), 
  `underinsuredmotoristaggregate` decimal(18,2), 
  `underinsuredmotoristpremium` decimal(18,2), 
  `umpdoccurrence` decimal(18,2), 
  `umpddeductible` decimal(18,2), 
  `umpdpremium` decimal(18,2), 
  `comprehensivedeductible` decimal(18,2), 
  `comprehensivepremium` decimal(18,2), 
  `collisiondeductible` decimal(18,2), 
  `collisionpremium` decimal(18,2), 
  `emergencyroadservicepremium` decimal(18,2), 
  `autohomecredit` tinyint, 
  `lossfreecredit` tinyint, 
  `multipleautopoliciescredit` tinyint, 
  `hybridcredit` tinyint, 
  `goodstudentcredit` tinyint, 
  `multipleautocredit` tinyint, 
  `fortyfivepluscredit` tinyint, 
  `passiverestraintcredit` tinyint, 
  `defensivedrivercredit` tinyint, 
  `antitheftcredit` tinyint, 
  `antilockbrakescredit` tinyint, 
  `perkcredit` tinyint, 
  `plantype` varchar(100), 
  `costnew` decimal(18,2), 
  `isnocontinuousinsurancesurcharge` tinyint)
CLUSTERED BY ( 
  policynumber, 
  unitidentifier) 
INTO 50 BUCKETS

Примечание. Я также пытался использовать опцию sqoop export, но моя таблица кустов уже имеет формат корзины.

Ответы [ 2 ]

3 голосов
/ 16 июля 2019

Каков наилучший способ прочитать вывод с диска с помощью Pandas после использования cmd.get_results?(например, из команды Hive).Например, рассмотрим следующее:

out_file = 'results.csv'
delimiter = chr(1)
....

Qubole.configure(qubole_key)
hc_params = ['--query', query]
hive_args = HiveCommand.parse(hc_params)
cmd = HiveCommand.run(**hive_args)
if (HiveCommand.is_success(cmd.status)):
    with open(out_file, 'wt') as writer:
        cmd.get_results(writer, delim=delimiter, inline=False)

Если после успешного выполнения запроса я проверяю первые несколько байтов файла results.csv, я вижу следующее: $ head -c 300 results.csv b'flight_uid\twinning_price\tbid_price\timpressions_source_timestamp\n'b'0FY6ZsrnMy\x012000\x012270.0\x011427243278000\n0FamrXG9AW\x01710\x01747.0\x011427243733000\n0FY6ZsrnMy\x012000\x012270.0\x011427245266000\n0FY6ZsrnMy\x012000\x012270.0\x011427245088000\n0FamrXG9AW\x01330\x01747.0\x011427243407000\n0FamrXG9AW\x01710\x01747.0\x011427243981000\n0FamrXG9AW\x01490\x01747.0\x011427245289000\n Когда я пытаюсь открыть этов Pandas:

df = pd.read_csv('results.csv')

это, очевидно, не работает (я получаю пустой DataFrame), так как он не отформатирован должным образом как файл CSV.Хотя я мог бы попытаться открыть results.csv и постобработать его (удалить b 'и т. Д.), Прежде чем открыть его в Pandas, это был бы довольно хакерский способ его загрузки.Я правильно использую интерфейс?Это использует самую последнюю версию qds_sdk: 1.4.2 от трех часов назад.

3 голосов
/ 10 июля 2019

Я пробовал с мульти-обработкой, и я могу уменьшить это 8-10 минут с 2 часов.Пожалуйста, найдите ниже сценарии.

from multiprocessing import Pool
import pandas as pd
import datetime
from query import hivetable
from write_tosql import write_to_sql
p = Pool(37)
lst=[]
#we have 351k rows so generating series to use in hivetable method
for i in range(1,360000,10000):
    lst.append(i)
print 'started reading ',datetime.datetime.now()
#we have 40 cores in  cluster 
p = Pool(37)
s=p.map(hivetable, [i for i in lst])
s_df=pd.concat(s)
print 'finished reading ',datetime.datetime.now()
print 'Started writing to sql server ',datetime.datetime.now()
write_to_sql(s_df)
print 'Finished writing to sql server ',datetime.datetime.now()

--------- файл query.py -------

import pyodbc
from multiprocessing import Pool
from functools import partial
import pandas as pd

conn = pyodbc.connect("DSN=******",autocommit=True)

def hivetable(row):
    query = 'select * from (select row_number() OVER (order by policynumber) as rownum, * from dbg.tble ) tbl1 where rownum between '+str(row) +' and '+str(row+9999)+';'
    result = pd.read_sql(query,conn)
    return result

--------- Write_tosql.pyфайл ---------

import sqlalchemy
import urllib
import pyodbc
def write_to_sql(s_df):
    sql_conn_url = urllib.quote_plus('DRIVER={ODBC Driver 13 for SQL Server};SERVER=ser;DATABASE=db;UID=sqoop;PWD=#####;')
    sql_conn_str = "mssql+pyodbc:///?odbc_connect={0}".format(sql_conn_url)
    engine = sqlalchemy.create_engine(sql_conn_str)
    s_df.rename(columns=lambda x: remove_table_alias(x), inplace=True)
    s_df.to_sql(name='tbl2', schema='dbo', con=engine, chunksize=10000, if_exists="append", index=False)
def remove_table_alias(columnName):
    try:
        if(columnName.find(".") != -1):
            return columnName.split(".")[1]
        return columnName
    except Exception, e:
        print "ERROR in _remove_table_alias ",str(e)

Любые другие решения помогут мне сократить время.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...