Быстрый поиск файла / поиск текста
Оптимизация
- Обработка файла в двоичном формате (устраняет накладные расходы на преобразование двоичного кода в текст)
- Поиск в буфере время, а не построчно
- Преобразование найденной строки из двоичного в текст
Код
На основе
def find_line(fname, goal):
" Generator of lines containing goal string "
def str_to_bytes(s):
' String to byte array '
result = bytearray()
result.extend(map(ord, s))
return result
def find_buffer(fname, goal, start=0, bsize=2**17):
"""
Processing file in binary to find goal string
- removes overhead of binary to text conversions
Uses find method on a buffer of data at a time
- faster than check each line by line
"""
bgoal = str_to_bytes(goal) # convert goal string to binary
bnew_line = str_to_bytes(os.linesep) # line separator as byte array
if bsize < len(bgoal):
raise ValueError("The buffer size must be larger than the string being searched for.")
with open(fname, 'rb') as f:
if start > 0:
f.seek(start)
overlap = len(bgoal) - 1
while True:
buffer = f.read(bsize)
pos = buffer.find(bgoal)
if pos >= 0:
start_line = buffer.rfind(bnew_line, 0, pos)
end_line = buffer.find(bnew_line, pos)
yield buffer[start_line+len(bnew_line):end_line] #f.tell() - len(buffer) + pos
if not buffer:
return None
f.seek(f.tell() - overlap)
# Find line in file
os_encoding = locale.getpreferredencoding()
delimiter = str_to_bytes(',') # column delimiter in bytes
bgoal = str_to_bytes(goal)
for line in find_buffer(fname, goal):
# convert to text
# Insure it's in 2nd column
columns = line.split(str_to_bytes(','))
if columns[1] == bgoal:
return line.decode(os_encoding)
else:
return None
Использование
line = find_buffer_method(filename, search_string)
Тестирование производительности
Создание текстового файла размером 1M, аналогичного формату плакатов
Код создания файла
На основе
Создание файла CSV с 1 миллионом строк
import csv
import random
records= 100000 # number of rows to generate
print("Making %d records\n" % records)
fieldnames = ['item_no.','item_Name','item_price']
with open("data.csv", "w", newline='') as my_file:
writer = csv.DictWriter(my_file, fieldnames=fieldnames)
writer.writeheader()
for i in range(records):
writer.writerow(
{
'item_no.': i,
'item_Name': "X{}".format(i),
'item_price': random.randint(1, 100)
})
Построчный метод (для определения базовой скорости)
def line_by_line(fname, goal):
" Line by line search for goal string "
with open(fname, 'r') as f:
for line in f:
if goal in line:
# Insure found in 2nd column
columns = line.split(',')
if columns[1] == goal:
return line
return None
Время с использованием Timeit
# Rows find_line_method find_buffer_method Speed Up
10,000 2.99 ms (7 runs/100 loops) 295 µs (7 runs/1000 loops) 10X
100,000 30.40 ms (7 runs/10 loops) 1.78 ms (7 runs/1000 loops) 17X
1M 321.00 ms (7 runs/1 loop) 19.7 ms (7 runs/10 loops) 17X
Обратите внимание, что размер буфера ~ В 17 раз быстрее для файлов большего размера.
Примечание. Строка в приведенной выше таблице зависит от того, в какой строке находится строка цели в файле строк размером 1M для теста.