Я считаю, что суть вашей проблемы заключается в том, что вы пытаетесь объединить каждую группу с помощью только одной итерации в кадре данных. Существует компромисс между количеством групп в памяти и количеством раз, которое вам нужно для чтения кадра данных
ПРИМЕЧАНИЕ. Я специально показываю подробный код, чтобы передать идею о необходимости повторения ДФ много раз. Оба решения стали относительно сложными, но все же достигли того, что ожидали. Есть много аспектов кода, которые можно улучшить, любая помощь в рефакторинге кода приветствуется
Я буду использовать этот фиктивный файл «data.csv», чтобы проиллюстрировать мои решения. сохраняя data.csv в том же каталоге, что и скрипт, вы можете просто скопировать и вставить решения и запустить их.
sorted1,sorted2,sorted3,othe1,other2,other3,other4
1, 1, 1, 'a', 'a', 'a', 'a'
1, 1, 1, 'a', 'a', 'a', 'a'
1, 1, 1, 'a', 'a', 'a', 'a'
1, 1, 1, 'a', 'a', 'a', 'a'
2, 1, 1, 'a', 'a', 'a', 'a'
2, 1, 1, 'd', 'd', 'd', 'd'
2, 1, 1, 'd', 'd', 'd', 'a'
3, 1, 1, 'e', 'e', 'e', 'e'
3, 1, 1, 'b', 'b', 'b', 'b'
Первоначальное решение в сценарии, в котором мы можем сохранить все ключ групп:
Сначала соберите все строки группы, а затем обработайте.
по сути, я бы сделал: для каждой итерации в df (чанки) выберите одну группу (или много, если позволяет память). Проверьте, не обработано ли оно еще, просмотрев словарь обработанных групповых ключей, затем накапливайте выбранные групповые строки в каждом чанке, повторяя каждый чанк. После завершения итерации всех блоков обработайте данные группы.
import pandas as pd
def run_logic(key, group):
# some logic
pass
def accumulate_nextGroup(alreadyProcessed_groups):
past_accumulated_group = pd.DataFrame()
pastChunk_groupKey = None
for chunk_index, chunk_df in enumerate(pd.read_csv("data.csv",iterator=True, chunksize=3)):
groupby_data = chunk_df.groupby(sorted_columns, sort=True)
for currentChunk_groupKey, currentChunk_group in groupby_data:
if (pastChunk_groupKey is None or pastChunk_groupKey == currentChunk_groupKey)\
and currentChunk_groupKey not in alreadyProcessed_groups.keys():
pastChunk_groupKey = currentChunk_groupKey
past_accumulated_group = pd.concat(
[past_accumulated_group, currentChunk_group]
)
print(f'I am the choosen group({currentChunk_groupKey}) of the moment in the chunk {chunk_index+1}')
else:
if currentChunk_groupKey in alreadyProcessed_groups:
print(f'group({currentChunk_groupKey}) is not the choosen group because it was already processed')
else:
print(f'group({currentChunk_groupKey}) is not the choosen group({pastChunk_groupKey}) yet :(')
return pastChunk_groupKey, past_accumulated_group
alreadyProcessed_groups = {}
sorted_columns = ["sorted1","sorted2","sorted3"]
number_of_unique_groups = 3 #
for iteration_in_df in range(number_of_unique_groups):
groupKey, groupData = accumulate_nextGroup(alreadyProcessed_groups)
run_logic(groupKey, groupData)
alreadyProcessed_groups[groupKey] = "Already Processed"
print(alreadyProcessed_groups)
print(f"end of {iteration_in_df+1} iterations in df")
print("*"*50)
РЕШЕНИЕ ВЫХОДА 1:
I am the choosen group((1, 1, 1)) of the moment in the chunk 1
I am the choosen group((1, 1, 1)) of the moment in the chunk 2
group((2, 1, 1)) is not the choosen group((1, 1, 1)) yet :(
group((2, 1, 1)) is not the choosen group((1, 1, 1)) yet :(
group((3, 1, 1)) is not the choosen group((1, 1, 1)) yet :(
{(1, 1, 1): 'Already Processed'}
end of 1 iterations in df
**************************************************
group((1, 1, 1)) is not the choosen group because it was already processed
group((1, 1, 1)) is not the choosen group because it was already processed
I am the choosen group((2, 1, 1)) of the moment in the chunk 2
I am the choosen group((2, 1, 1)) of the moment in the chunk 3
group((3, 1, 1)) is not the choosen group((2, 1, 1)) yet :(
{(1, 1, 1): 'Already Processed', (2, 1, 1): 'Already Processed'}
end of 2 iterations in df
**************************************************
group((1, 1, 1)) is not the choosen group because it was already processed
group((1, 1, 1)) is not the choosen group because it was already processed
group((2, 1, 1)) is not the choosen group because it was already processed
group((2, 1, 1)) is not the choosen group because it was already processed
I am the choosen group((3, 1, 1)) of the moment in the chunk 3
{(1, 1, 1): 'Already Processed', (2, 1, 1): 'Already Processed', (3, 1, 1): 'Already Processed'}
end of 3 iterations in df
**************************************************
ОБНОВЛЕНИЕ РЕШЕНИЕ 2: в случае, когда мы не можем сохранить все ключи группы в словаре:
В случае, когда мы не можем сохранить все ключи группы в словаре нам нужно использовать относительный индекс каждой группы, созданный в каждом чанке, чтобы создать глобальный ссылочный индекс для каждой группы. (обратите внимание, что это решение гораздо более плотное, чем предыдущее)
главное в этом решении - нам не нужно значение ключей группы для идентификации групп. Более подробно, вы можете представить каждый чанк как узел в обращенном связном списке, где первый чанк указывает на ноль, второй чанк указывает на первый чанк и т. Д. Одна итерация на фрейме данных соответствует одному обходу в этом связанном списке , Для каждого шага (обработки чанка) единственная информация, которую вам нужно хранить каждый раз, - это предыдущая головка, хвост и размер чанка, и только с этой информацией вы можете назначить групповым ключам в любом чанке уникальный идентификатор индекса.
Другая важная информация состоит в том, что, поскольку файл отсортирован, индекс ссылки первого элемента чанка будет либо последним элементом предыдущего чанка последнего элемента +1. Это позволяет вывести глобальный ссылочный индекс из индекс чанка.
import pandas as pd
import pysnooper
def run_logic(key, group):
# some logic
pass
def generate_currentChunkGroups_globalReferenceIdx(groupby_data,
currentChunk_index, previousChunk_link):
if currentChunk_index == 0:
groupsIn_firstChunk=len(groupby_data.groups.keys())
currentGroups_globalReferenceIdx = [(i,groupKey)
for i,(groupKey,_) in enumerate(groupby_data)]
else:
lastChunk_firstGroup, lastChunk_lastGroup, lastChunk_nGroups \
= previousChunk_link
currentChunk_firstGroupKey = list(groupby_data.groups.keys())[0]
currentChunk_nGroups = len(groupby_data.groups.keys())
lastChunk_lastGroupGlobalIdx, lastChunk_lastGroupKey \
= lastChunk_lastGroup
if currentChunk_firstGroupKey == lastChunk_lastGroupKey:
currentChunk_firstGroupGlobalReferenceIdx = lastChunk_lastGroupGlobalIdx
else:
currentChunk_firstGroupGlobalReferenceIdx = lastChunk_lastGroupGlobalIdx + 1
currentGroups_globalReferenceIdx = [
(currentChunk_firstGroupGlobalReferenceIdx+i, groupKey)
for (i,groupKey) in enumerate(groupby_data.groups.keys())
]
next_previousChunk_link = (currentGroups_globalReferenceIdx[0],
currentGroups_globalReferenceIdx[-1],
len(currentGroups_globalReferenceIdx)
)
return currentGroups_globalReferenceIdx, next_previousChunk_link
def accumulate_nextGroup(countOf_alreadyProcessedGroups, lastChunk_index, dataframe_accumulator):
previousChunk_link = None
currentIdx_beingProcessed = countOf_alreadyProcessedGroups
for chunk_index, chunk_df in enumerate(pd.read_csv("data.csv",iterator=True, chunksize=3)):
print(f'ITER:{iteration_in_df} CHUNK:{chunk_index} InfoPrevChunk:{previousChunk_link} lastProcessed_chunk:{lastChunk_index}')
if (lastChunk_index != None) and (chunk_index < lastChunk_index):
groupby_data = chunk_df.groupby(sorted_columns, sort=True)
currentChunkGroups_globalReferenceIdx, next_previousChunk_link \
= generate_currentChunkGroups_globalReferenceIdx(
groupby_data, chunk_index, previousChunk_link
)
elif((lastChunk_index == None) or (chunk_index >= lastChunk_index)):
if (chunk_index == lastChunk_index):
groupby_data = chunk_df.groupby(sorted_columns, sort=True)
currentChunkGroups_globalReferenceIdx, next_previousChunk_link \
= generate_currentChunkGroups_globalReferenceIdx(
groupby_data, chunk_index, previousChunk_link
)
currentChunkGroupGlobalIndexes = [GlobalIndex \
for (GlobalIndex,_) in currentChunkGroups_globalReferenceIdx]
if((lastChunk_index is None) or (lastChunk_index <= chunk_index)):
lastChunk_index = chunk_index
if currentIdx_beingProcessed in currentChunkGroupGlobalIndexes:
currentGroupKey_beingProcessed = [tup
for tup in currentChunkGroups_globalReferenceIdx
if tup[0] == currentIdx_beingProcessed][0][1]
currentChunk_group = groupby_data.get_group(currentGroupKey_beingProcessed)
dataframe_accumulator = pd.concat(
[dataframe_accumulator, currentChunk_group]
)
else:
groupby_data = chunk_df.groupby(sorted_columns, sort=True)
currentChunkGroups_globalReferenceIdx, next_previousChunk_link \
= generate_currentChunkGroups_globalReferenceIdx(
groupby_data, chunk_index, previousChunk_link
)
currentChunkGroupGlobalIndexes = [GlobalIndex \
for (GlobalIndex,_) in currentChunkGroups_globalReferenceIdx]
if((lastChunk_index is None) or (lastChunk_index <= chunk_index)):
lastChunk_index = chunk_index
if currentIdx_beingProcessed in currentChunkGroupGlobalIndexes:
currentGroupKey_beingProcessed = [tup
for tup in currentChunkGroups_globalReferenceIdx
if tup[0] == currentIdx_beingProcessed][0][1]
currentChunk_group = groupby_data.get_group(currentGroupKey_beingProcessed)
dataframe_accumulator = pd.concat(
[dataframe_accumulator, currentChunk_group]
)
else:
countOf_alreadyProcessedGroups+=1
lastChunk_index = chunk_index-1
break
previousChunk_link = next_previousChunk_link
print(f'Done with chunks for group of global index:{currentIdx_beingProcessed} corresponding to groupKey:{currentGroupKey_beingProcessed}')
return countOf_alreadyProcessedGroups, lastChunk_index, dataframe_accumulator, currentGroupKey_beingProcessed
sorted_columns = ["sorted1","sorted2","sorted3"]
number_of_unique_groups = 3 #
lastChunk_index = None
for iteration_in_df in range(number_of_unique_groups):
dataframe_accumulator = pd.DataFrame()
countOf_alreadyProcessedGroups,lastChunk_index, group_data, currentGroupKey_Processed=\
accumulate_nextGroup(
iteration_in_df, lastChunk_index, dataframe_accumulator
)
run_logic(currentGroupKey_Processed, dataframe_accumulator)
print(f"end of iteration number {iteration_in_df+1} in the df and processed {currentGroupKey_Processed}")
print(group_data)
print("*"*50)
ВЫХОДНОЕ РЕШЕНИЕ 2:
ITER:0 CHUNK:0 InfoPrevChunk:None lastProcessed_chunk:None
ITER:0 CHUNK:1 InfoPrevChunk:((0, (1, 1, 1)), (0, (1, 1, 1)), 1) lastProcessed_chunk:0
ITER:0 CHUNK:2 InfoPrevChunk:((0, (1, 1, 1)), (1, (2, 1, 1)), 2) lastProcessed_chunk:1
Done with chunks for group of global index:0 corresponding to groupKey:(1, 1, 1)
end of iteration number 1 in the df and processed (1, 1, 1)
sorted1 sorted2 sorted3 othe1 other2 other3 other4
0 1 1 1 'a' 'a' 'a' 'a'
1 1 1 1 'a' 'a' 'a' 'a'
2 1 1 1 'a' 'a' 'a' 'a'
3 1 1 1 'a' 'a' 'a' 'a'
**************************************************
ITER:1 CHUNK:0 InfoPrevChunk:None lastProcessed_chunk:1
ITER:1 CHUNK:1 InfoPrevChunk:((0, (1, 1, 1)), (0, (1, 1, 1)), 1) lastProcessed_chunk:1
ITER:1 CHUNK:2 InfoPrevChunk:((0, (1, 1, 1)), (1, (2, 1, 1)), 2) lastProcessed_chunk:1
Done with chunks for group of global index:1 corresponding to groupKey:(2, 1, 1)
end of iteration number 2 in the df and processed (2, 1, 1)
sorted1 sorted2 sorted3 othe1 other2 other3 other4
4 2 1 1 'a' 'a' 'a' 'a'
5 2 1 1 'd' 'd' 'd' 'd'
6 2 1 1 'd' 'd' 'd' 'a'
**************************************************
ITER:2 CHUNK:0 InfoPrevChunk:None lastProcessed_chunk:2
ITER:2 CHUNK:1 InfoPrevChunk:((0, (1, 1, 1)), (0, (1, 1, 1)), 1) lastProcessed_chunk:2
ITER:2 CHUNK:2 InfoPrevChunk:((0, (1, 1, 1)), (1, (2, 1, 1)), 2) lastProcessed_chunk:2
Done with chunks for group of global index:2 corresponding to groupKey:(3, 1, 1)
end of iteration number 3 in the df and processed (3, 1, 1)
sorted1 sorted2 sorted3 othe1 other2 other3 other4
7 3 1 1 'e' 'e' 'e' 'e'
8 3 1 1 'b' 'b' 'b' 'b'
**************************************************