Вот мой полный ответ. Я проверил это на одной из моих папок аниме. Он обрабатывает 14 файлов общим объемом 3,64 ГБ примерно за 16 секунд. На мой взгляд, использование любого параллелизма - это больше проблем, чем здесь стоит. Вы ограничены дисковым вводом / выводом, поэтому многопоточность только поможет вам. Мое решение может быть легко распараллелено.
Он начинается с чтения «чанк» информации об источнике: исходный файл, смещение и длина. Все это собирается очень быстро. Отсюда вы можете обрабатывать «чанки», используя потоки, как вам угодно. Код следует:
public static class Constants
{
public const int CHUNK_SIZE_IN_BYTES = 32 * 1024 * 1024; // 32MiB
}
public class ChunkSource
{
public string Filename { get; set; }
public int StartPosition { get; set; }
public int Length { get; set; }
}
public class Chunk
{
private List<ChunkSource> _sources = new List<ChunkSource>();
public IList<ChunkSource> Sources { get { return _sources; } }
public byte[] Hash { get; set; }
public int Length
{
get { return Sources.Select(s => s.Length).Sum(); }
}
}
static class Program
{
static void Main()
{
DirectoryInfo di = new DirectoryInfo(@"C:\Stuff\Anime\Shikabane Hime Aka");
string[] filenames = di.GetFiles().Select(fi=> fi.FullName).OrderBy(n => n).ToArray();
var chunks = ChunkFiles(filenames);
ComputeHashes(chunks);
}
private static List<Chunk> ChunkFiles(string[] filenames)
{
List<Chunk> chunks = new List<Chunk>();
Chunk currentChunk = null;
int offset = 0;
foreach (string filename in filenames)
{
FileInfo fi = new FileInfo(filename);
if (!fi.Exists)
throw new FileNotFoundException(filename);
Debug.WriteLine(String.Format("File: {0}", filename));
//
// First, start off by either starting a new chunk or
// by finishing a leftover chunk from a previous file.
//
if (currentChunk != null)
{
//
// We get here if the previous file had leftover bytes that left us with an incomplete chunk
//
int needed = Constants.CHUNK_SIZE_IN_BYTES - currentChunk.Length;
if (needed == 0)
throw new InvalidOperationException("Something went wonky, shouldn't be here");
offset = needed;
currentChunk.Sources.Add(new ChunkSource()
{
Filename = fi.FullName,
StartPosition = 0,
Length = (int)Math.Min(fi.Length, (long)needed)
});
if (currentChunk.Length >= Constants.CHUNK_SIZE_IN_BYTES)
{
chunks.Add(currentChunk = new Chunk());
}
}
else
{
offset = 0;
}
//
// Note: Using integer division here
//
for (int i = 0; i < (fi.Length - offset) / Constants.CHUNK_SIZE_IN_BYTES; i++)
{
chunks.Add(currentChunk = new Chunk());
currentChunk.Sources.Add(new ChunkSource()
{
Filename = fi.FullName,
StartPosition = i * Constants.CHUNK_SIZE_IN_BYTES + offset,
Length = Constants.CHUNK_SIZE_IN_BYTES
});
Debug.WriteLine(String.Format("Chunk source created: Offset = {0,10}, Length = {1,10}", currentChunk.Sources[0].StartPosition, currentChunk.Sources[0].Length));
}
int leftover = (int)(fi.Length - offset) % Constants.CHUNK_SIZE_IN_BYTES;
if (leftover > 0)
{
chunks.Add(currentChunk = new Chunk());
currentChunk.Sources.Add(new ChunkSource()
{
Filename = fi.FullName,
StartPosition = (int)(fi.Length - leftover),
Length = leftover
});
}
else
{
currentChunk = null;
}
}
return chunks;
}
private static void ComputeHashes(IList<Chunk> chunks)
{
if (chunks == null || chunks.Count == 0)
return;
Dictionary<string, MemoryMappedFile> files = new Dictionary<string, MemoryMappedFile>();
foreach (var chunk in chunks)
{
MemoryMappedFile mms = null;
byte[] buffer = new byte[Constants.CHUNK_SIZE_IN_BYTES];
Stopwatch sw = Stopwatch.StartNew();
foreach (var source in chunk.Sources)
{
lock (files)
{
if (!files.TryGetValue(source.Filename, out mms))
{
Debug.WriteLine(String.Format("Opening {0}", source.Filename));
files.Add(source.Filename, mms = MemoryMappedFile.CreateFromFile(source.Filename, FileMode.Open));
}
}
var view = mms.CreateViewStream(source.StartPosition, source.Length);
view.Read(buffer, 0, source.Length);
}
Debug.WriteLine("Done reading sources in {0}ms", sw.Elapsed.TotalMilliseconds);
sw.Restart();
MD5 md5 = MD5.Create();
chunk.Hash = md5.ComputeHash(buffer);
sw.Stop();
Debug.WriteLine(String.Format("Computed hash: {0} in {1}ms", String.Join("-", chunk.Hash.Select(h=> h.ToString("X2")).ToArray()), sw.Elapsed.TotalMilliseconds));
}
foreach (var x in files.Values)
{
x.Dispose();
}
}
}
Я не гарантирую, что все безупречно свободно от ошибок. Но мне было весело работать над этим. Посмотрите на окно вывода в Visual Studio для получения отладочной информации. Это выглядит так:
Файл: C: \ Stuff \ Anime \ Shikabane Hime Aka \ Эпизод 02.mkv
Источник чанка создан: смещение = 26966010, длина = 33554432
Источник чанка создан: смещение = 60520442, длина = 33554432
Источник чанка создан: смещение = 94074874, длина = 33554432
Источник чанка создан: смещение = 127629306, длина = 33554432
Источник чанка создан: смещение = 161183738, длина = 33554432
Источник чанка создан: смещение = 194738170, длина = 33554432
Источник чанка создан: смещение = 228292602, длина = 33554432
...
Открытие C: \ Stuff \ Anime \ Shikabane Hime Aka \ Эпизод 02.mkv
Закончено чтение источников за 42,9362мс
Поток '' (0xc10) завершился с кодом 0 (0x0).
Вычисляемый хэш: 3C-81-A5-2C-90-02-24-23-42-5B-19-A2-15-56-AB-3F в 94.2481 мс
Закончено чтение источников за 0,0053 мс
Вычисленный хэш: 58-F0-6D-D5-88-D8-FF-B3-BE-B4-6A-DA-63-09-43-6B в 98,9263 мс
Закончено чтение источников за 29.4805мс
Вычисленный хэш: F7-19-8D-A8-FE-9C-07-6E-DB-D5-74-A6-E1-E7-A6-26 в 85.0061мс
Закончено чтение источников за 28,4971мс
Вычисленный хэш: 49-F2-CB-75-89-9A-BC-FA-94-A7-DF-E0-DB-02-8A-99 в 84.2799 мс
Закончено чтение источников за 31.106мс
Вычисленный хэш: 29-7B-18-BD-ED-E9-0C-68-4B-47-C6-5F-D0-16-8A-44 в 84,1444 мс
Закончено чтение источников за 31.2204мс
Вычисленный хэш: F8-91-F1-90-CF-9C-37-4E-82-68-C2-44-0D-A7-6E-F8 за 84,2592 мс
Закончено чтение источников за 31.0031мс
Вычисленный хэш: 65-97-ED-95-07-31-BF-C8-3A-BA-2B-DA-03-37-FD-00 в 95,331 мс
Закончено чтение источников за 33.0072мс
Вычисленный хэш: 9B-F2-83-E6-A8-DF-FD-8D-6C-5C-9E-F4-20-0A-38-4B в 85,9561 мс
Закончено чтение источников за 31.6232мс
Вычисленный хэш: B6-7C-6B-95-69-BC-9C-B2-1A-07-B3-13-28-A8-10-BC в 84,1866 мс
Вот параллельная версия. Это в основном то же самое на самом деле. Использование параллелизма = 3 сокращает время обработки до 9 секунд.
private static void ComputeHashes(IList<Chunk> chunks)
{
if (chunks == null || chunks.Count == 0)
return;
Dictionary<string, MemoryMappedFile> files = new Dictionary<string, MemoryMappedFile>();
Parallel.ForEach(chunks, new ParallelOptions() { MaxDegreeOfParallelism = 2 }, (chunk, state, index) =>
{
MemoryMappedFile mms = null;
byte[] buffer = new byte[Constants.CHUNK_SIZE_IN_BYTES];
Stopwatch sw = Stopwatch.StartNew();
foreach (var source in chunk.Sources)
{
lock (files)
{
if (!files.TryGetValue(source.Filename, out mms))
{
Debug.WriteLine(String.Format("Opening {0}", source.Filename));
files.Add(source.Filename, mms = MemoryMappedFile.CreateFromFile(source.Filename, FileMode.Open));
}
}
var view = mms.CreateViewStream(source.StartPosition, source.Length);
view.Read(buffer, 0, source.Length);
}
Debug.WriteLine("Done reading sources in {0}ms", sw.Elapsed.TotalMilliseconds);
sw.Restart();
MD5 md5 = MD5.Create();
chunk.Hash = md5.ComputeHash(buffer);
sw.Stop();
Debug.WriteLine(String.Format("Computed hash: {0} in {1}ms", String.Join("-", chunk.Hash.Select(h => h.ToString("X2")).ToArray()), sw.Elapsed.TotalMilliseconds));
});
foreach (var x in files.Values)
{
x.Dispose();
}
}
EDIT
Я нашел ошибку, или то, что я считаю ошибкой. Необходимо установить смещение чтения равным 0, если мы начинаем новый файл.
РЕДАКТИРОВАТЬ 2 на основе обратной связи
Это обрабатывает хэши в отдельном потоке. Необходимо ограничить ввод / вывод. Я столкнулся с OutOfMemoryException
без этого. Хотя на самом деле это не так уж и лучше. Помимо этого ... Я не уверен, как это может быть улучшено дальше. Возможно, повторно используя буферы, может быть.
public class QueueItem
{
public Chunk Chunk { get; set; }
public byte[] buffer { get; set; }
}
private static void ComputeHashes(IList<Chunk> chunks)
{
if (chunks == null || chunks.Count == 0)
return;
Dictionary<string, MemoryMappedFile> files = new Dictionary<string, MemoryMappedFile>();
foreach (var filename in chunks.SelectMany(c => c.Sources).Select(c => c.Filename).Distinct())
{
files.Add(filename, MemoryMappedFile.CreateFromFile(filename, FileMode.Open));
}
AutoResetEvent monitor = new AutoResetEvent(false);
ConcurrentQueue<QueueItem> hashQueue = new ConcurrentQueue<QueueItem>();
CancellationToken token = new CancellationToken();
Task.Factory.StartNew(() =>
{
int processCount = 0;
QueueItem item = null;
while (!token.IsCancellationRequested)
{
if (hashQueue.TryDequeue(out item))
{
MD5 md5 = MD5.Create();
item.Chunk.Hash = md5.ComputeHash(item.buffer);
if (processCount++ > 1000)
{
processCount = 0;
monitor.Set();
}
}
}
}, token);
foreach (var chunk in chunks)
{
if (hashQueue.Count > 10000)
{
monitor.WaitOne();
}
QueueItem item = new QueueItem()
{
buffer = new byte[Constants.CHUNK_SIZE_IN_BYTES],
Chunk = chunk
};
Stopwatch sw = Stopwatch.StartNew();
foreach (var source in chunk.Sources)
{
MemoryMappedFile mms = files[source.Filename];
var view = mms.CreateViewStream(source.StartPosition, source.Length);
view.Read(item.buffer, 0, source.Length);
}
sw.Restart();
sw.Stop();
hashQueue.Enqueue(item);
}
foreach (var x in files.Values)
{
x.Dispose();
}
}