Я обнаружил, что xdelta (реализация VCDIFF и, возможно, другие реализации diff с двоичной передачей), фактически упомянутые в некоторых ответах на упомянутые мною вопросы, имеют "стандартное" поведение "из коробки". Я был рад этому, упомянутому в RF. C 3284:
Обычный способ справиться с ограничением памяти - разбить входной файл на куски, называемые «windows», и обработать их отдельно. Здесь, за исключением неопубликованной работы Vo, мало что было сделано для разработки эффективных оконных схем. Текущие методы, включая Vdelta, просто используют исходный и целевой windows с соответствующими адресами в исходных и целевых файлах.
Таким образом, они действительно могут соответствовать моей цели, хотя их вывод не является текстовым.
Прежде, чем я это нашел, и поскольку я не мог найти готовую текстовую реализацию, я реализовал свой собственный "оконный diff", используя diffflib Python, и подумал, что поделюсь с миром .
Я добавил несколько базовых c параметров вывода и тестового кода, но обратите внимание, что это не было проверено сверх того, что вы читаете ниже, поэтому используйте его на свой страх и риск. Код не соответствует утверждению в моем случае из реальной жизни - надеюсь, у кого-то есть более изящное решение.
import difflib
import itertools
import unittest
# thanks to https://docs.python.org/3/library/itertools.html#itertools-recipes
def grouper(iterable, n, fillvalue=None):
"Collect data into fixed-length chunks or blocks"
# grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx"
args = [iter(iterable)] * n
return itertools.zip_longest(*args, fillvalue=fillvalue)
class Missing:
"""
Utility class that may be used to nicely represent a 'missing' value.
"""
def __repr__(self):
return '<missing>'
def __sizeof__(self):
return 0
def __hash__(self):
return 0
def rindex_predicate(seq, pred):
"""
Find the next-to right-most index, such that for items following this index the given predicate holds.
e.g. rindex_predicate([1, 2], lambda x: x>2) -> 1
rindex_predicate([1, 2], lambda x: x>1) -> 0
rindex_predicate([1, 2], lambda x: x>0) -> -1
"""
i = len(seq) - 1
while i >= 0 and pred(seq[i]):
i -= 1
return i
def rsplit_predicate(seq, pred):
"""
Split the sequence to two, so that the second part consists only of items for which the predicate holds,
and the predicate does not hold for the last item of the first part (if it exists).
e.g. rsplit_predicate([1,2,3], lambda x: x>2) -> ([1,2], [3])
"""
i = rindex_predicate(seq, pred)
if i == len(seq) - 1:
return seq, []
if i == -1:
return [], seq
return seq[:i+1], seq[i+1:]
def windowed_diff(seq1, seq2, window_size):
"""
Performs a diff of two sequences, similar to SequenceMatcher, using constant space (limited memory)
determined by window_size. As opposed to SequenceMatcher, the sequences may be generators.
This is done by breaking the sequences up to windows. Each pair of windows is compared almost independently.
By allowing tails of unmatched items in one window to be compared on the following window, synchronization
problems that result from the window size may be partially avoided. However, if the synchornization is lost
for a length beyond the given window_size, the diff algorithm might not be able to regain it as it would
if a full-memory diff was used.
Returns a generator which generates a tuple (a, b, opcodes) where a and b are the respective windows
being compared, and opcodes are what SequenceMatcher.get_opcodes() would return with respect to these
windows.
"""
# some utilities
missing = Missing()
missing_group = tuple()
is_missing = lambda x: x is missing
def isnt_equal(opcode):
tag, alo, ahi, blo, bhi = opcode
return tag != 'equal'
# get windows ready
groups1 = grouper(seq1, window_size, fillvalue=missing)
groups2 = grouper(seq2, window_size, fillvalue=missing)
tail1, tail2 = [], []
overflow1, overflow2 = [], []
tail = []
pos1, pos2 = 0, 0
for group1, group2 in itertools.zip_longest(groups1, groups2, fillvalue=missing_group):
assert len(tail1) <= (window_size*2) and len(tail2) <= (window_size*2)
# we prepend any previous unmatched items (see tail below)
window1, window2 = tuple(tail1) + group1, tuple(tail2) + group2
window1, _ = rsplit_predicate(window1, is_missing)
window2, _ = rsplit_predicate(window2, is_missing)
# don't let the window size overflow
# we save the overflow of the window and append it to the tail later
# the largest left-hand part of the window becomes resolved as we
# look for the right-most equal opcode
# this means that any added tail will not contain an equal opcode
# if the tail happens to exceed window_size and contains no equal opcode,
# since it is prepended to the subsequent window, in the next cycle that
# entire window will be resolved by out-of-sync test below, and no new
# items will be added to tail, keeping the size of tail in check
tail1, tail2 = [], []
overflow1, overflow2 = window1[window_size:], window2[window_size:]
window1, window2 = window1[:window_size], window2[:window_size]
cruncher = difflib.SequenceMatcher(isjunk=None, a=window1, b=window2)
opcodes = list(cruncher.get_opcodes())
# the tail is whatever didn't match at the end of the window
resolved, tail = rsplit_predicate(opcodes, isnt_equal)
if not resolved:
# no items are equal - we are out of sync
# resolve all opcodes and move on
resolved = tail
tail = []
# matches up to the last equal (synchronization) point are yielded to caller
yield pos1, pos2, window1, window2, resolved
# push any discrepencies at end of window to next block
for tag, alo, ahi, blo, bhi in tail:
if tag == 'delete':
# it was in block1 but not in block2
tail1 += window1[alo:ahi]
elif tag == 'insert':
tail2 += window2[blo:bhi]
elif tag == 'replace':
tail1 += window1[alo:ahi]
tail2 += window2[blo:bhi]
else:
raise ValueError(tag)
tail1 += list(overflow1)
tail2 += list(overflow2)
pos1 += len(window1) - len(tail1)
pos2 += len(window2) - len(tail2)
# flush out last tail
if overflow1:
window1 += overflow1
if overflow2:
window2 += overflow2
if tail or overflow1 or overflow2:
# re-evaluate last window, minus resolved
_, alo, ahi, blo, bhi = resolved[-1]
window1 = window1[ahi:]
window2 = window2[bhi:]
# pos1,2 were already updated at end of previous loop
cruncher = difflib.SequenceMatcher(isjunk=None, a=window1, b=window2)
opcodes = list(cruncher.get_opcodes())
yield pos1, pos2, window1, window2, opcodes
# based on difflib code (https://github.com/python/cpython/blob/3.7/Lib/difflib.py)
def windowed_text_diff(a, b, window_size):
"""
Same as difflib.Differ().compare(), only using windowed_diff instead of SequenceMatcher.
"""
differ = difflib.Differ()
for apos, bpos, a, b, opcodes in windowed_diff(a, b, window_size):
for tag, alo, ahi, blo, bhi in opcodes:
if tag == 'replace':
g = differ._fancy_replace(a, alo, ahi, b, blo, bhi)
elif tag == 'delete':
g = differ._dump('-', a, alo, ahi)
elif tag == 'insert':
g = differ._dump('+', b, blo, bhi)
elif tag == 'equal':
g = differ._dump(' ', a, alo, ahi)
else:
raise ValueError('unknown tag %r' % (tag,))
yield from g
def unified_diff_from_grouped_opcodes(a, b, apos, bpos, grouped_opcodes, lineterm='\n'):
"""
Same as difflib.unified_diff, only the grouped_opcodes are given, no file info is given and no type checks are done.
"""
for group in grouped_opcodes:
first, last = group[0], group[-1]
file1_range = difflib._format_range_unified(first[1]+apos, last[2]+apos)
file2_range = difflib._format_range_unified(first[3]+bpos, last[4]+bpos)
yield '@@ -{} +{} @@{}'.format(file1_range, file2_range, lineterm)
for tag, i1, i2, j1, j2 in group:
if tag == 'equal':
for line in a[i1:i2]:
yield ' ' + line
continue
if tag in {'replace', 'delete'}:
for line in a[i1:i2]:
yield '-' + line
if tag in {'replace', 'insert'}:
for line in b[j1:j2]:
yield '+' + line
def windowed_unified_diff(a, b, window_size, fromfile='', tofile='', fromfiledate='',
tofiledate='', n=3, lineterm='\n'):
started = False
for apos, bpos, a, b, opcodes in windowed_diff(a, b, window_size):
if not started:
started = True
fromdate = '\t{}'.format(fromfiledate) if fromfiledate else ''
todate = '\t{}'.format(tofiledate) if tofiledate else ''
yield '--- {}{}{}'.format(fromfile, fromdate, lineterm)
yield '+++ {}{}{}'.format(tofile, todate, lineterm)
matcher = difflib.SequenceMatcher(None, a, b)
matcher.opcodes = opcodes
grouped_opcodes = matcher.get_grouped_opcodes(n)
yield from unified_diff_from_grouped_opcodes(a, b, apos, bpos, grouped_opcodes, lineterm=lineterm)
class WindowedDiffTests(unittest.TestCase):
def test_windowed_diff(self):
import random
def generate_lines(n, prob_mutation):
if prob_mutation == 0:
return ('%d' % (i,) for i in range(n))
for i in range(n):
if random.random() < prob_mutation:
action = random.choice(['insert', 'delete', 'edit'])
if action == 'insert':
yield 'inserted at %d' % (i,)
elif action == 'delete':
continue
elif action == 'edit':
yield '%d+' % (i,)
continue
yield '%d' % (i,)
for subtest1 in itertools.product(
['a', 'b', 'neither'], # cut
[False, True], # append
[False, True], # prepend
[15, 16], # n
range(0, 100, 30), # pct_mutation
range(1, 10) # seed
):
cut, append, prepend, n, pct_mutation, seed = subtest1
prob_mutation = pct_mutation/100.0
random.seed(seed)
a = list(generate_lines(n, prob_mutation=0))
b = list(generate_lines(n, prob_mutation=prob_mutation))
if cut == 'a':
a = a[:int(n*0.9)]
elif cut == 'b':
b = b[:int(n*0.8)]
if append:
a += ['@' * (1+(i%int(n/2))) for i in range(n*2)]
b += ['!' * (2+(i%(int(n/2)+1))) for i in range(n*2)]
if prepend:
a = ['@' * (1+(i%int(n/2))) for i in range(n*2)] + a
b = ['!' * (2+(i%(int(n/2)+1))) for i in range(n*2)] + b
base_text_diff_result = list(difflib.ndiff(a, b))
base_unified_diff_result = list(difflib.unified_diff(a, b, n=0))
base_opcodes = difflib.SequenceMatcher(None, a, b).get_opcodes()
for window_size in set([1, 2, 3, 4, int(n/2)-1, int(n/2), n-1, n, n+1, n*2]):
subtest = subtest1 + (window_size,)
with self.subTest(subtest):
result = list(windowed_text_diff(a=(x for x in a), b=(x for x in b), window_size=window_size))
self.assertEqual(list(difflib.restore(result, 1)), a, result)
self.assertEqual(list(difflib.restore(result, 2)), b, result)
def main():
import argparse
import sys
parser = argparse.ArgumentParser()
parser.add_argument('--window-size', type=int, default=1024*64)
parser.add_argument('--unified-diff', action='store_true')
parser.add_argument('--context', type=int, default=3)
parser.add_argument('file1')
parser.add_argument('file2')
args = parser.parse_args()
with open(args.file1, 'rt') as f1, open(args.file2, 'rt') as f2:
if args.unified_diff:
result = windowed_unified_diff(f1, f2, args.window_size, n=args.context, fromfile=f1.name, tofile=f2.name)
else:
result = windowed_text_diff(f1, f2, args.window_size)
for line in result:
sys.stdout.write(line)
if __name__ == "__main__":
main()