Как сгруппировать и сохранить порядок групп на отсортированном файле - PullRequest
2 голосов
/ 17 марта 2020

У меня есть большой CSV-файл, отсортированный по нескольким его столбцам, назовем эти столбцы sorted_columns.
Я хочу выполнить групповую обработку для этих sorted_columns и применить к каждой логике c из этих групп.

Файл не полностью помещается в память, поэтому я хочу прочитать его порциями и выполнить groupby для каждого порции.

Я заметил, что порядок групп не сохраняется, даже если файл уже отсортирован по этим столбцам.

В конце концов я пытаюсь сделать следующее:

import pandas as pd

def run_logic(key, group):
    # some logic
    pass

last_group = pd.DataFrame()
last_key = None

for chunk_df in df:
    grouped_by_df = chunk_df.groupby(sorted_columns, sort=True)

    for key, group in grouped_by_df:
        if last_key is None or last_key == key:
            last_key = key
            last_group = pd.concat([last_group, group])
        else:  # last_key != key
            run_logic(last_key, last_group)
            last_key = key
            last_group = group.copy()
run_logic(last_key, last_group)

Но это не работает, потому что groupby не обещает, что порядок групп сохраняется. Если один и тот же key существует в двух последовательных чанках, не обещается, что в первом чанке это будет последняя группа, а в следующем чанке это будет первая. Я попытался изменить groupby для использования sort=False, а также попытался изменить порядок столбцов, но это не помогло.

Кто-нибудь знает, как сохранить порядок групп если ключи уже отсортированы в исходном файле?

Есть ли другой способ прочитать всю группу сразу из файла?

Ответы [ 2 ]

7 голосов
/ 26 марта 2020

Я считаю, что суть вашей проблемы заключается в том, что вы пытаетесь объединить каждую группу с помощью только одной итерации в кадре данных. Существует компромисс между количеством групп в памяти и количеством раз, которое вам нужно для чтения кадра данных

ПРИМЕЧАНИЕ. Я специально показываю подробный код, чтобы передать идею о необходимости повторения ДФ много раз. Оба решения стали относительно сложными, но все же достигли того, что ожидали. Есть много аспектов кода, которые можно улучшить, любая помощь в рефакторинге кода приветствуется

Я буду использовать этот фиктивный файл «data.csv», чтобы проиллюстрировать мои решения. сохраняя data.csv в том же каталоге, что и скрипт, вы можете просто скопировать и вставить решения и запустить их.

sorted1,sorted2,sorted3,othe1,other2,other3,other4 
1, 1, 1, 'a', 'a', 'a', 'a'  
1, 1, 1, 'a', 'a', 'a', 'a'
1, 1, 1, 'a', 'a', 'a', 'a'
1, 1, 1, 'a', 'a', 'a', 'a'  
2, 1, 1, 'a', 'a', 'a', 'a'  
2, 1, 1, 'd', 'd', 'd', 'd'
2, 1, 1, 'd', 'd', 'd', 'a'   
3, 1, 1, 'e', 'e', 'e', 'e'  
3, 1, 1, 'b', 'b', 'b', 'b'  

Первоначальное решение в сценарии, в котором мы можем сохранить все ключ групп:

Сначала соберите все строки группы, а затем обработайте.

по сути, я бы сделал: для каждой итерации в df (чанки) выберите одну группу (или много, если позволяет память). Проверьте, не обработано ли оно еще, просмотрев словарь обработанных групповых ключей, затем накапливайте выбранные групповые строки в каждом чанке, повторяя каждый чанк. После завершения итерации всех блоков обработайте данные группы.

import pandas as pd
def run_logic(key, group):
    # some logic
    pass
def accumulate_nextGroup(alreadyProcessed_groups):
    past_accumulated_group = pd.DataFrame()
    pastChunk_groupKey = None
    for chunk_index, chunk_df in enumerate(pd.read_csv("data.csv",iterator=True, chunksize=3)):
            groupby_data = chunk_df.groupby(sorted_columns, sort=True) 
            for currentChunk_groupKey, currentChunk_group in groupby_data:
                if (pastChunk_groupKey is None or pastChunk_groupKey == currentChunk_groupKey)\
                        and currentChunk_groupKey not in alreadyProcessed_groups.keys():
                    pastChunk_groupKey = currentChunk_groupKey
                    past_accumulated_group = pd.concat(
                            [past_accumulated_group, currentChunk_group]
                                                      )
                    print(f'I am the choosen group({currentChunk_groupKey}) of the moment in the chunk {chunk_index+1}')
                else: 
                    if currentChunk_groupKey in alreadyProcessed_groups:
                        print(f'group({currentChunk_groupKey}) is  not the choosen group because it was already processed')
                    else:
                        print(f'group({currentChunk_groupKey}) is  not the choosen group({pastChunk_groupKey}) yet :(')
    return pastChunk_groupKey, past_accumulated_group

alreadyProcessed_groups = {}
sorted_columns = ["sorted1","sorted2","sorted3"]
number_of_unique_groups = 3 # 
for iteration_in_df in range(number_of_unique_groups):  
    groupKey, groupData = accumulate_nextGroup(alreadyProcessed_groups)
    run_logic(groupKey, groupData)
    alreadyProcessed_groups[groupKey] = "Already Processed"
    print(alreadyProcessed_groups)
    print(f"end of {iteration_in_df+1} iterations in df")
    print("*"*50)

РЕШЕНИЕ ВЫХОДА 1:

I am the choosen group((1, 1, 1)) of the moment in the chunk 1
I am the choosen group((1, 1, 1)) of the moment in the chunk 2
group((2, 1, 1)) is  not the choosen group((1, 1, 1)) yet :(
group((2, 1, 1)) is  not the choosen group((1, 1, 1)) yet :(
group((3, 1, 1)) is  not the choosen group((1, 1, 1)) yet :(
{(1, 1, 1): 'Already Processed'}
end of 1 iterations in df
**************************************************
group((1, 1, 1)) is  not the choosen group because it was already processed
group((1, 1, 1)) is  not the choosen group because it was already processed
I am the choosen group((2, 1, 1)) of the moment in the chunk 2
I am the choosen group((2, 1, 1)) of the moment in the chunk 3
group((3, 1, 1)) is  not the choosen group((2, 1, 1)) yet :(
{(1, 1, 1): 'Already Processed', (2, 1, 1): 'Already Processed'}
end of 2 iterations in df
**************************************************
group((1, 1, 1)) is  not the choosen group because it was already processed
group((1, 1, 1)) is  not the choosen group because it was already processed
group((2, 1, 1)) is  not the choosen group because it was already processed
group((2, 1, 1)) is  not the choosen group because it was already processed
I am the choosen group((3, 1, 1)) of the moment in the chunk 3
{(1, 1, 1): 'Already Processed', (2, 1, 1): 'Already Processed', (3, 1, 1): 'Already Processed'}
end of 3 iterations in df
**************************************************

ОБНОВЛЕНИЕ РЕШЕНИЕ 2: в случае, когда мы не можем сохранить все ключи группы в словаре:

В случае, когда мы не можем сохранить все ключи группы в словаре нам нужно использовать относительный индекс каждой группы, созданный в каждом чанке, чтобы создать глобальный ссылочный индекс для каждой группы. (обратите внимание, что это решение гораздо более плотное, чем предыдущее)

главное в этом решении - нам не нужно значение ключей группы для идентификации групп. Более подробно, вы можете представить каждый чанк как узел в обращенном связном списке, где первый чанк указывает на ноль, второй чанк указывает на первый чанк и т. Д. Одна итерация на фрейме данных соответствует одному обходу в этом связанном списке , Для каждого шага (обработки чанка) единственная информация, которую вам нужно хранить каждый раз, - это предыдущая головка, хвост и размер чанка, и только с этой информацией вы можете назначить групповым ключам в любом чанке уникальный идентификатор индекса.

Другая важная информация состоит в том, что, поскольку файл отсортирован, индекс ссылки первого элемента чанка будет либо последним элементом предыдущего чанка последнего элемента +1. Это позволяет вывести глобальный ссылочный индекс из индекс чанка.

import pandas as pd
import pysnooper
def run_logic(key, group):
    # some logic
    pass

def generate_currentChunkGroups_globalReferenceIdx(groupby_data,
        currentChunk_index, previousChunk_link):
    if currentChunk_index == 0:
        groupsIn_firstChunk=len(groupby_data.groups.keys())
        currentGroups_globalReferenceIdx = [(i,groupKey) 
                for i,(groupKey,_) in enumerate(groupby_data)]
    else:
        lastChunk_firstGroup, lastChunk_lastGroup, lastChunk_nGroups \
                = previousChunk_link 
        currentChunk_firstGroupKey = list(groupby_data.groups.keys())[0] 
        currentChunk_nGroups = len(groupby_data.groups.keys())

        lastChunk_lastGroupGlobalIdx, lastChunk_lastGroupKey \
                = lastChunk_lastGroup
        if currentChunk_firstGroupKey == lastChunk_lastGroupKey:
            currentChunk_firstGroupGlobalReferenceIdx =  lastChunk_lastGroupGlobalIdx
        else:
            currentChunk_firstGroupGlobalReferenceIdx =  lastChunk_lastGroupGlobalIdx + 1

        currentGroups_globalReferenceIdx = [
                (currentChunk_firstGroupGlobalReferenceIdx+i, groupKey)
                    for (i,groupKey) in enumerate(groupby_data.groups.keys())
                    ]

    next_previousChunk_link = (currentGroups_globalReferenceIdx[0],
            currentGroups_globalReferenceIdx[-1],
            len(currentGroups_globalReferenceIdx)
    )
    return currentGroups_globalReferenceIdx, next_previousChunk_link   

def accumulate_nextGroup(countOf_alreadyProcessedGroups, lastChunk_index, dataframe_accumulator):
    previousChunk_link = None
    currentIdx_beingProcessed = countOf_alreadyProcessedGroups
    for chunk_index, chunk_df in enumerate(pd.read_csv("data.csv",iterator=True, chunksize=3)):
        print(f'ITER:{iteration_in_df} CHUNK:{chunk_index} InfoPrevChunk:{previousChunk_link} lastProcessed_chunk:{lastChunk_index}')
        if (lastChunk_index !=  None) and (chunk_index < lastChunk_index):
            groupby_data = chunk_df.groupby(sorted_columns, sort=True) 
            currentChunkGroups_globalReferenceIdx, next_previousChunk_link \
                    = generate_currentChunkGroups_globalReferenceIdx(
                            groupby_data, chunk_index, previousChunk_link
                            )
        elif((lastChunk_index == None) or (chunk_index >= lastChunk_index)):
            if (chunk_index == lastChunk_index):
                groupby_data = chunk_df.groupby(sorted_columns, sort=True) 
                currentChunkGroups_globalReferenceIdx, next_previousChunk_link \
                        = generate_currentChunkGroups_globalReferenceIdx(
                                groupby_data, chunk_index, previousChunk_link
                                )
                currentChunkGroupGlobalIndexes = [GlobalIndex \
                        for (GlobalIndex,_) in currentChunkGroups_globalReferenceIdx]
                if((lastChunk_index is None) or (lastChunk_index <= chunk_index)):
                    lastChunk_index = chunk_index
                if currentIdx_beingProcessed in currentChunkGroupGlobalIndexes:
                    currentGroupKey_beingProcessed = [tup 
                            for tup in currentChunkGroups_globalReferenceIdx
                            if tup[0] == currentIdx_beingProcessed][0][1]
                    currentChunk_group = groupby_data.get_group(currentGroupKey_beingProcessed)
                    dataframe_accumulator = pd.concat(
                            [dataframe_accumulator, currentChunk_group]
                                                     )
            else: 
                groupby_data = chunk_df.groupby(sorted_columns, sort=True) 
                currentChunkGroups_globalReferenceIdx, next_previousChunk_link \
                        = generate_currentChunkGroups_globalReferenceIdx(
                                groupby_data, chunk_index, previousChunk_link
                                )
                currentChunkGroupGlobalIndexes = [GlobalIndex \
                        for (GlobalIndex,_) in currentChunkGroups_globalReferenceIdx]
                if((lastChunk_index is None) or (lastChunk_index <= chunk_index)):
                    lastChunk_index = chunk_index
                if currentIdx_beingProcessed in currentChunkGroupGlobalIndexes:
                    currentGroupKey_beingProcessed = [tup 
                            for tup in currentChunkGroups_globalReferenceIdx
                            if tup[0] == currentIdx_beingProcessed][0][1]
                    currentChunk_group = groupby_data.get_group(currentGroupKey_beingProcessed)
                    dataframe_accumulator = pd.concat(
                            [dataframe_accumulator, currentChunk_group]
                                                     )
                else:
                    countOf_alreadyProcessedGroups+=1
                    lastChunk_index = chunk_index-1
                    break
        previousChunk_link = next_previousChunk_link
    print(f'Done with chunks for group of global index:{currentIdx_beingProcessed} corresponding to groupKey:{currentGroupKey_beingProcessed}')
    return countOf_alreadyProcessedGroups, lastChunk_index, dataframe_accumulator, currentGroupKey_beingProcessed

sorted_columns = ["sorted1","sorted2","sorted3"]
number_of_unique_groups = 3 # 
lastChunk_index = None 
for iteration_in_df in range(number_of_unique_groups):  
    dataframe_accumulator = pd.DataFrame()
    countOf_alreadyProcessedGroups,lastChunk_index, group_data, currentGroupKey_Processed=\
            accumulate_nextGroup(
                    iteration_in_df, lastChunk_index, dataframe_accumulator
                                )
    run_logic(currentGroupKey_Processed, dataframe_accumulator)
    print(f"end of iteration number {iteration_in_df+1} in the df and processed {currentGroupKey_Processed}")
    print(group_data)
    print("*"*50)

ВЫХОДНОЕ РЕШЕНИЕ 2:

ITER:0 CHUNK:0 InfoPrevChunk:None lastProcessed_chunk:None
ITER:0 CHUNK:1 InfoPrevChunk:((0, (1, 1, 1)), (0, (1, 1, 1)), 1) lastProcessed_chunk:0
ITER:0 CHUNK:2 InfoPrevChunk:((0, (1, 1, 1)), (1, (2, 1, 1)), 2) lastProcessed_chunk:1
Done with chunks for group of global index:0 corresponding to groupKey:(1, 1, 1)
end of iteration number 1 in the df and processed (1, 1, 1)
   sorted1  sorted2  sorted3 othe1 other2 other3 other4 
0        1        1        1   'a'    'a'    'a'   'a'  
1        1        1        1   'a'    'a'    'a'     'a'
2        1        1        1   'a'    'a'    'a'     'a'
3        1        1        1   'a'    'a'    'a'   'a'  
**************************************************
ITER:1 CHUNK:0 InfoPrevChunk:None lastProcessed_chunk:1
ITER:1 CHUNK:1 InfoPrevChunk:((0, (1, 1, 1)), (0, (1, 1, 1)), 1) lastProcessed_chunk:1
ITER:1 CHUNK:2 InfoPrevChunk:((0, (1, 1, 1)), (1, (2, 1, 1)), 2) lastProcessed_chunk:1
Done with chunks for group of global index:1 corresponding to groupKey:(2, 1, 1)
end of iteration number 2 in the df and processed (2, 1, 1)
   sorted1  sorted2  sorted3 othe1 other2 other3  other4 
4        2        1        1   'a'    'a'    'a'    'a'  
5        2        1        1   'd'    'd'    'd'   'd'   
6        2        1        1   'd'    'd'    'd'   'a'   
**************************************************
ITER:2 CHUNK:0 InfoPrevChunk:None lastProcessed_chunk:2
ITER:2 CHUNK:1 InfoPrevChunk:((0, (1, 1, 1)), (0, (1, 1, 1)), 1) lastProcessed_chunk:2
ITER:2 CHUNK:2 InfoPrevChunk:((0, (1, 1, 1)), (1, (2, 1, 1)), 2) lastProcessed_chunk:2
Done with chunks for group of global index:2 corresponding to groupKey:(3, 1, 1)
end of iteration number 3 in the df and processed (3, 1, 1)
   sorted1  sorted2  sorted3 othe1 other2 other3 other4 
7        3        1        1   'e'    'e'    'e'   'e'  
8        3        1        1   'b'    'b'    'b'    'b' 
**************************************************
3 голосов
/ 22 марта 2020

itertools.groupby

Возвращает ключ и итератор для всех значений, сгруппированных по этому ключу. Если ваш файл уже отсортирован по желаемому ключу, вам подходит go. функция groupby будет обрабатывать почти все для вас.

Из документации :

Операция groupby() аналогична фильтру uniq в Unix. Он генерирует разрыв или новую группу каждый раз, когда изменяется значение ключевой функции (поэтому обычно необходимо отсортировать данные с использованием той же ключевой функции). Это поведение отличается от GROUP BY SQL, который агрегирует общие элементы независимо от их порядка ввода.

run_logi c - это любой бизнес-лог c, который вы хотите применить к группе. записей. Этот пример просто подсчитывает количество наблюдений в итераторе.

data_iter просто испускает 1 строку на CSV. Пока ваш файл отсортирован по требуемым полям, вам не нужно читать весь файл в память.

chunks использует groupby для группировки входного итератора с использованием первых 3 полей входной строки. Выдает ключ и соответствующий итератор значений, связанных с этим ключом.

#!/usr/bin/env python3

import csv
from itertools import groupby

def run_logic(key, group):
    cntr = 0
    for rec in group:
        cntr = cntr + 1
    return (key, cntr)


def data_iter(filename):
    with open(filename, "r") as fin:
        csvin = csv.reader(fin)
        for row in csvin:
            yield row


def chunks(diter):
    for chunk, iter_ in groupby(diter, key=lambda x: x[0:3]):
        yield (chunk, iter_)


if __name__ == "__main__":
    csviter = data_iter("test.csv")
    chunk_iter = chunks(csviter)
    for chunk, iter_ in chunk_iter:
        print(run_logic(chunk, iter_))

Входные данные

['1', '1', '1', 'a', 'a', 'a', 'a']  
['1', '1', '1', 'b', 'b', 'b', 'b']  
['1', '1', '1', 'c', 'c', 'c', 'c']  
['1', '1', '1', 'd', 'd', 'd', 'd']  
['1', '1', '1', 'e', 'e', 'e', 'e']  
['2', '1', '1', 'a', 'a', 'a', 'a']  
['2', '1', '1', 'd', 'd', 'd', 'd']  
['2', '1', '1', 'e', 'e', 'e', 'e']  
['2', '1', '1', 'b', 'b', 'b', 'b']  
['2', '1', '1', 'c', 'c', 'c', 'c']  
['3', '1', '1', 'e', 'e', 'e', 'e']  
['3', '1', '1', 'b', 'b', 'b', 'b']  
['3', '1', '1', 'c', 'c', 'c', 'c']  
['3', '1', '1', 'a', 'a', 'a', 'a']  
['3', '1', '1', 'd', 'd', 'd', 'd']

групповые данные

Группа : ['1', '1', '1']

['1', '1', '1', 'a', 'a', 'a', 'a']
['1', '1', '1', 'b', 'b', 'b', 'b']
['1', '1', '1', 'c', 'c', 'c', 'c']
['1', '1', '1', 'd', 'd', 'd', 'd']
['1', '1', '1', 'e', 'e', 'e', 'e']

Группа : ['2', '1', '1']

['2', '1', '1', 'a', 'a', 'a', 'a']
['2', '1', '1', 'd', 'd', 'd', 'd']
['2', '1', '1', 'e', 'e', 'e', 'e']
['2', '1', '1', 'b', 'b', 'b', 'b']
['2', '1', '1', 'c', 'c', 'c', 'c']

Группа : ['3', '1', '1']

['3', '1', '1', 'e', 'e', 'e', 'e']
['3', '1', '1', 'b', 'b', 'b', 'b']
['3', '1', '1', 'c', 'c', 'c', 'c']
['3', '1', '1', 'a', 'a', 'a', 'a']
['3', '1', '1', 'd', 'd', 'd', 'd']

Применить бизнес-логи c

Группа : ['1', '1', '1']

(['1', '1', '1'], 5)

Группа : ['2', '1', '1']

(['2', '1', '1'], 5)

Группа : ['3', '1', '1']

(['3', '1', '1'], 5)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...