Как перебирать pandas датафрейм построчно с 10M строками - PullRequest
0 голосов
/ 26 сентября 2019

Я работаю над веб-службами фляги Python и требование такое ... Приложение получит один файл в формате csv или xlsx.Я читаю этот файл и преобразую его в формат данных Pandas.Теперь мне нужно перебрать каждую строку данных и проверить определенное условие.если условие удовлетворяется, необходимо обновить несколько столбцов в одном и том же фрейме данных.

Я сделал это, используя приведенный ниже код, но меня не устраивает производительность ...

def ExecuteInParallel(convertContract,ratesDf,inputDf):
    for index, row in inputDf.iterrows():

               currencyFound = ratesDf.query('CCY1 =="{0}" and CCY2 == "{1}"'.format(row[convertContract.INPUT_CURRENCY]
                                                                                ,row[convertContract.RETURN_CURRENCY]))
               if(len(currencyFound.index) == 0):
                   raise BadRequest("Given Currency combination not found with provided date.")

               currentrate = currencyFound.Rate.values[0]
               if(convertContract.ROUNDING != None and convertContract.ROUNDING != ""):
                   rounding = int(convertContract.ROUNDING)
                   if(rounding > 0):
                       convertedamount = round(float(row[convertContract.INPUT_AMOUNT]) * currentrate,int(convertContract.ROUNDING))
                       inputDf.at[index,convertContract.RETURN_VALUE] = convertedamount
               else:
                   convertedamount = float(row[convertContract.INPUT_AMOUNT]) * currentrate
                   inputDf.at[index,convertContract.RETURN_VALUE] = convertedamount
               if(convertContract.RETURN_RATE == "True"):
                   inputDf.at[index,convertContract.RETURN_VALUE + "_FX Rate"] = currentrate

Я сделалнекоторые проанализировали производительность и пришли к выводу, что для итерации по 10 тыс. строк требуется около 470 секунд.

Я хочу выполнить это для 10 млн. строк.Поэтому я попробовал программирование потоков в Python, сохранив вызов функции выше, но с меньшими кадрами данных.Я создал блоки данных с 500 строками данных и передал вышеописанному методу собственный патрон, но не заметил ни одной разницы в секундах.

Может ли кто-нибудь помочь мне с этим.

def ConvertdataFramesValues(self,contract,ratesDf,inputDf):
    try:

            treadList = []
            size = 500
            list_of_dfs = list(inputDf.loc[i:i + size - 1,:] for i in range(0, len(inputDf),size))
            for frame in list_of_dfs:

                t1 = threading.Thread(target=ExecuteInParallel,args=(convertContract,ratesDf,frame))
                treadList.append(t1)
                t1.start()

            for t in treadList:
                t.join()

            inputDf = pd.concat(list_of_dfs)
            print(list_of_dfs[0].head())
        return inputDf

    except Exception as e:
        msg = "unable to convert data frame values. " + str(e)
        print(msg)
        raise BadRequest(msg)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...