Как наиболее эффективно прочитать список файлов как один поток и хеш-фрагменты из него? - PullRequest
0 голосов
/ 21 февраля 2011

У меня есть список файлов, которые должны быть прочитаны кусками в byte[], который затем передается в функцию хеширования. Сложность заключается в следующем: если я достигну конца файла, мне нужно продолжить чтение следующего файла, пока я не заполню буфер, вот так:

читать 16 бит в качестве примера:
Файл 1: 00101010
Файл 2: 01101010111111111
нужно читать как 0010101001101010

Дело в том, что эти файлы могут быть размером в несколько гигабайт, и я не хочу полностью загружать их в память. Загрузка кусочков в буфер размером около 30 МБ вполне подойдет.

Я хочу использовать многопоточность, но будет ли эффективнее потоковое чтение файла? Я не знаю, является ли дисковый ввод-вывод настолько большим узким местом, что это того стоит. Достаточно ли ускорилось бы хеширование, если бы я только заправил эту часть и зафиксировал чтение каждого куска? Важно, чтобы хэши сохранялись в правильном порядке.

Второе, что мне нужно сделать, это сгенерировать сумму MD5 из каждого файла. Есть ли способ сделать это более эффективно, чем отдельный шаг?

(Этот вопрос частично совпадает с Существует ли встроенный способ обработки нескольких файлов в виде одного потока? , но я подумал, что это достаточно отличается)

Я действительно озадачен, какой подход выбрать, так как я довольно плохо знаком с C #, а также с многопоточностью. Я уже попробовал подходы, перечисленные ниже, но мне их недостаточно.

Поскольку я новичок в C #, я ценю все виды ввода по любому аспекту моего кода.


Этот фрагмент кода был многопоточным, но не «добавляет» потоки и, как таковой, генерирует недопустимые хеши:

public void DoHashing()
{
    ParallelOptions options = new ParallelOptions();
    options.MaxDegreeOfParallelism = numThreads;
    options.CancellationToken = cancelToken.Token;
    Parallel.ForEach(files, options, (string f, ParallelLoopState loopState) =>
        {
            options.CancellationToken.ThrowIfCancellationRequested();

            using (BufferedStream fileStream = new BufferedStream(File.OpenRead(f), bufferSize))
            {

                // Get the MD5sum first:
                using (MD5CryptoServiceProvider md5 = new MD5CryptoServiceProvider())
                {
                    md5.Initialize();
                    md5Sums[f] = BitConverter.ToString(md5.ComputeHash(fileStream)).Replace("-", "");
                }

                //setup for reading:
                byte[] buffer = new byte[(int)pieceLength];
                //I don't know if the buffer will f*ck up the filelenghth
                long remaining = (new FileInfo(f)).Length;
                int done = 0;
                while (remaining > 0)
                {
                    while (done < pieceLength)
                    {
                        options.CancellationToken.ThrowIfCancellationRequested();
                        //either try to read the piecelength, or the remaining length of the file.
                        int toRead = (int)Math.Min(pieceLength - done, remaining);
                        int read = fileStream.Read(buffer, done, toRead);

                        //if read == 0, EOF reached
                        if (read == 0)
                        {
                            remaining = 0;
                            break;
                        }

                        //offsets
                        done += read;
                        remaining -= read;
                    }
                    // Hash the piece
                    using (SHA1CryptoServiceProvider sha1 = new SHA1CryptoServiceProvider())
                    {
                        sha1.Initialize();
                        byte[] hash = sha1.ComputeHash(buffer);
                        hashes[f].AddRange(hash);
                    }
                    done = 0;
                    buffer = new byte[(int)pieceLength];
                }
            }
        }
    );


}

Этот другой фрагмент кода не является многопоточным (и не вычисляет MD5):

void Hash()
{
    //examples, these got handled by other methods
    List<string> files = new List<string>();
    files.Add("a.txt");
    files.Add("b.blob");
    //....
    long totalFileLength;
    int pieceLength = Math.Pow(2,20);
    foreach (string file in files)
    {
        totalFileLength += (new FileInfo(file)).Length;
    }

    //Reading the file:
    long remaining = totalFileLength;
    byte[] buffer = new byte[Math.min(remaining, pieceSize)];
    int index = 0;
    FileStream fin = File.OpenRead(files[index]);
    int done = 0;
    int offset = 0;
    while (remaining > 0)
    {
        while (done < pieceLength)
        {
            int toRead = (int)Math.Min(pieceLength - offset, remaining);
            int read = fin.Read(buffer, done, toRead);

            //if read == 0, EOF reached
            if (read == 0)
            {
                index++;
                //if last file:
                if (index > files.Count)
                {
                    remaining = 0;
                    break;
                }
                //get ready for next round:
                offset = 0;
                fin.OpenRead(files[index]);
            }
            done += read;
            offset += read;
            remaining -= read;
        }
        //Doing the piece hash:
        HashPiece(buffer);

        //reset for next piece:
        done = 0;
        byte[] buffer = new byte[Math.min(remaining, pieceSize)];
    }
}
void HashPiece(byte[] piece)
{
    using (SHA1CryptoServiceProvider sha1 = new SHA1CryptoServiceProvider())
    {
        sha1.Initialize();
        //hashes is a List
        hashes.Add(sha1.ComputeHash(piece));
    }
}

Большое спасибо за ваше время и усилия.

Я не ищу полностью закодированные решения, любой указатель и идея, куда идти с этим, были бы превосходны.



Вопросы и замечания к ответу yodaj007:


Почему if (currentChunk.Length >= Constants.CHUNK_SIZE_IN_BYTES)? Почему не ==? Если фрагмент больше размера блока, мой хэш SHA1 получает другое значение.


            currentChunk.Sources.Add(new ChunkSource()
            {
                Filename = fi.FullName,
                StartPosition = 0,
                Length = (int)Math.Min(fi.Length, (long)needed)
            });

Это действительно интересная идея. Отложите чтение, пока оно вам не понадобится. Nice!


    chunks.Add(currentChunk = new Chunk());

Почему это делается в блоке if (currentChunk != null) и в блоке for (int i = 0; i < (fi.Length - offset) / Constants.CHUNK_SIZE_IN_BYTES; i++)? Разве первое не несколько избыточно?


Ответы [ 3 ]

3 голосов
/ 24 февраля 2011

Вот мой полный ответ. Я проверил это на одной из моих папок аниме. Он обрабатывает 14 файлов общим объемом 3,64 ГБ примерно за 16 секунд. На мой взгляд, использование любого параллелизма - это больше проблем, чем здесь стоит. Вы ограничены дисковым вводом / выводом, поэтому многопоточность только поможет вам. Мое решение может быть легко распараллелено.

Он начинается с чтения «чанк» информации об источнике: исходный файл, смещение и длина. Все это собирается очень быстро. Отсюда вы можете обрабатывать «чанки», используя потоки, как вам угодно. Код следует:

public static class Constants
{
    public const int CHUNK_SIZE_IN_BYTES = 32 * 1024 * 1024;        // 32MiB
}

public class ChunkSource
{
    public string Filename { get; set; }
    public int StartPosition { get; set; }
    public int Length { get; set; }
}

public class Chunk
{
    private List<ChunkSource> _sources = new List<ChunkSource>();

    public IList<ChunkSource> Sources { get { return _sources; } }
    public byte[] Hash { get; set; }
    public int Length
    {
        get { return Sources.Select(s => s.Length).Sum(); }
    }
}

static class Program
{
    static void Main()
    {
        DirectoryInfo di = new DirectoryInfo(@"C:\Stuff\Anime\Shikabane Hime Aka");

        string[] filenames = di.GetFiles().Select(fi=> fi.FullName).OrderBy(n => n).ToArray();
        var chunks = ChunkFiles(filenames);
        ComputeHashes(chunks);
    }

    private static List<Chunk> ChunkFiles(string[] filenames)
    {
        List<Chunk> chunks = new List<Chunk>();
        Chunk currentChunk = null;
        int offset = 0;

        foreach (string filename in filenames)
        {
            FileInfo fi = new FileInfo(filename);
            if (!fi.Exists)
                throw new FileNotFoundException(filename);

            Debug.WriteLine(String.Format("File: {0}", filename));
            //
            // First, start off by either starting a new chunk or 
            // by finishing a leftover chunk from a previous file.
            //
            if (currentChunk != null)
            {
                //
                // We get here if the previous file had leftover bytes that left us with an incomplete chunk
                //
                int needed = Constants.CHUNK_SIZE_IN_BYTES - currentChunk.Length;
                if (needed == 0)
                    throw new InvalidOperationException("Something went wonky, shouldn't be here");

                offset = needed;
                currentChunk.Sources.Add(new ChunkSource()
                {
                    Filename = fi.FullName,
                    StartPosition = 0,
                    Length = (int)Math.Min(fi.Length, (long)needed)
                });

                if (currentChunk.Length >= Constants.CHUNK_SIZE_IN_BYTES)
                {
                    chunks.Add(currentChunk = new Chunk());
                }
            }
            else
            {
                offset = 0;
            }

            //
            // Note: Using integer division here
            //
            for (int i = 0; i < (fi.Length - offset) / Constants.CHUNK_SIZE_IN_BYTES; i++)
            {
                chunks.Add(currentChunk = new Chunk());
                currentChunk.Sources.Add(new ChunkSource()
                {
                    Filename = fi.FullName,
                    StartPosition = i * Constants.CHUNK_SIZE_IN_BYTES + offset,
                    Length = Constants.CHUNK_SIZE_IN_BYTES
                });

                Debug.WriteLine(String.Format("Chunk source created: Offset = {0,10}, Length = {1,10}", currentChunk.Sources[0].StartPosition, currentChunk.Sources[0].Length));
            }

            int leftover = (int)(fi.Length - offset) % Constants.CHUNK_SIZE_IN_BYTES;
            if (leftover > 0)
            {
                chunks.Add(currentChunk = new Chunk());
                currentChunk.Sources.Add(new ChunkSource()
                {
                    Filename = fi.FullName,
                    StartPosition = (int)(fi.Length - leftover),
                    Length = leftover
                });
            }
            else
            {
                currentChunk = null;
            }

        }

        return chunks;
    }

    private static void ComputeHashes(IList<Chunk> chunks)
    {
        if (chunks == null || chunks.Count == 0)
            return;

        Dictionary<string, MemoryMappedFile> files = new Dictionary<string, MemoryMappedFile>();

        foreach (var chunk in chunks)
        {
            MemoryMappedFile mms = null;
            byte[] buffer = new byte[Constants.CHUNK_SIZE_IN_BYTES];

            Stopwatch sw = Stopwatch.StartNew();
            foreach (var source in chunk.Sources)
            {
                lock (files)
                {
                    if (!files.TryGetValue(source.Filename, out mms))
                    {
                        Debug.WriteLine(String.Format("Opening {0}", source.Filename));
                        files.Add(source.Filename, mms = MemoryMappedFile.CreateFromFile(source.Filename, FileMode.Open));
                    }
                }

                var view = mms.CreateViewStream(source.StartPosition, source.Length);
                view.Read(buffer, 0, source.Length);
            }

            Debug.WriteLine("Done reading sources in {0}ms", sw.Elapsed.TotalMilliseconds);
            sw.Restart();
            MD5 md5 = MD5.Create();
            chunk.Hash = md5.ComputeHash(buffer);
            sw.Stop();

            Debug.WriteLine(String.Format("Computed hash: {0} in {1}ms", String.Join("-", chunk.Hash.Select(h=> h.ToString("X2")).ToArray()), sw.Elapsed.TotalMilliseconds));
        }

        foreach (var x in files.Values)
        {
            x.Dispose();
        }
    }
}

Я не гарантирую, что все безупречно свободно от ошибок. Но мне было весело работать над этим. Посмотрите на окно вывода в Visual Studio для получения отладочной информации. Это выглядит так:

Файл: C: \ Stuff \ Anime \ Shikabane Hime Aka \ Эпизод 02.mkv
Источник чанка создан: смещение = 26966010, длина = 33554432
Источник чанка создан: смещение = 60520442, длина = 33554432
Источник чанка создан: смещение = 94074874, длина = 33554432
Источник чанка создан: смещение = 127629306, длина = 33554432
Источник чанка создан: смещение = 161183738, длина = 33554432
Источник чанка создан: смещение = 194738170, длина = 33554432
Источник чанка создан: смещение = 228292602, длина = 33554432

...

Открытие C: \ Stuff \ Anime \ Shikabane Hime Aka \ Эпизод 02.mkv
Закончено чтение источников за 42,9362мс
Поток '' (0xc10) завершился с кодом 0 (0x0).
Вычисляемый хэш: 3C-81-A5-2C-90-02-24-23-42-5B-19-A2-15-56-AB-3F в 94.2481 мс
Закончено чтение источников за 0,0053 мс
Вычисленный хэш: 58-F0-6D-D5-88-D8-FF-B3-BE-B4-6A-DA-63-09-43-6B в 98,9263 мс
Закончено чтение источников за 29.4805мс
Вычисленный хэш: F7-19-8D-A8-FE-9C-07-6E-DB-D5-74-A6-E1-E7-A6-26 в 85.0061мс
Закончено чтение источников за 28,4971мс
Вычисленный хэш: 49-F2-CB-75-89-9A-BC-FA-94-A7-DF-E0-DB-02-8A-99 в 84.2799 мс
Закончено чтение источников за 31.106мс
Вычисленный хэш: 29-7B-18-BD-ED-E9-0C-68-4B-47-C6-5F-D0-16-8A-44 в 84,1444 мс
Закончено чтение источников за 31.2204мс
Вычисленный хэш: F8-91-F1-90-CF-9C-37-4E-82-68-C2-44-0D-A7-6E-F8 за 84,2592 мс
Закончено чтение источников за 31.0031мс
Вычисленный хэш: 65-97-ED-95-07-31-BF-C8-3A-BA-2B-DA-03-37-FD-00 в 95,331 мс
Закончено чтение источников за 33.0072мс
Вычисленный хэш: 9B-F2-83-E6-A8-DF-FD-8D-6C-5C-9E-F4-20-0A-38-4B в 85,9561 мс
Закончено чтение источников за 31.6232мс
Вычисленный хэш: B6-7C-6B-95-69-BC-9C-B2-1A-07-B3-13-28-A8-10-BC в 84,1866 мс

Вот параллельная версия. Это в основном то же самое на самом деле. Использование параллелизма = 3 сокращает время обработки до 9 секунд.

    private static void ComputeHashes(IList<Chunk> chunks)
    {
        if (chunks == null || chunks.Count == 0)
            return;

        Dictionary<string, MemoryMappedFile> files = new Dictionary<string, MemoryMappedFile>();
        Parallel.ForEach(chunks, new ParallelOptions() { MaxDegreeOfParallelism = 2 }, (chunk, state, index) =>
            {
                MemoryMappedFile mms = null;
                byte[] buffer = new byte[Constants.CHUNK_SIZE_IN_BYTES];

                Stopwatch sw = Stopwatch.StartNew();
                foreach (var source in chunk.Sources)
                {
                    lock (files)
                    {
                        if (!files.TryGetValue(source.Filename, out mms))
                        {
                            Debug.WriteLine(String.Format("Opening {0}", source.Filename));
                            files.Add(source.Filename, mms = MemoryMappedFile.CreateFromFile(source.Filename, FileMode.Open));
                        }
                    }

                    var view = mms.CreateViewStream(source.StartPosition, source.Length);
                    view.Read(buffer, 0, source.Length);
                }

                Debug.WriteLine("Done reading sources in {0}ms", sw.Elapsed.TotalMilliseconds);
                sw.Restart();
                MD5 md5 = MD5.Create();
                chunk.Hash = md5.ComputeHash(buffer);
                sw.Stop();

                Debug.WriteLine(String.Format("Computed hash: {0} in {1}ms", String.Join("-", chunk.Hash.Select(h => h.ToString("X2")).ToArray()), sw.Elapsed.TotalMilliseconds));
            });

        foreach (var x in files.Values)
        {
            x.Dispose();
        }
    }

EDIT

Я нашел ошибку, или то, что я считаю ошибкой. Необходимо установить смещение чтения равным 0, если мы начинаем новый файл.

РЕДАКТИРОВАТЬ 2 на основе обратной связи

Это обрабатывает хэши в отдельном потоке. Необходимо ограничить ввод / вывод. Я столкнулся с OutOfMemoryException без этого. Хотя на самом деле это не так уж и лучше. Помимо этого ... Я не уверен, как это может быть улучшено дальше. Возможно, повторно используя буферы, может быть.

public class QueueItem
{
    public Chunk Chunk { get; set; }
    public byte[] buffer { get; set; }
}

private static void ComputeHashes(IList<Chunk> chunks)
{
    if (chunks == null || chunks.Count == 0)
        return;

    Dictionary<string, MemoryMappedFile> files = new Dictionary<string, MemoryMappedFile>();
    foreach (var filename in chunks.SelectMany(c => c.Sources).Select(c => c.Filename).Distinct())
    {
        files.Add(filename, MemoryMappedFile.CreateFromFile(filename, FileMode.Open));
    }

    AutoResetEvent monitor = new AutoResetEvent(false);
    ConcurrentQueue<QueueItem> hashQueue = new ConcurrentQueue<QueueItem>();
    CancellationToken token = new CancellationToken();

    Task.Factory.StartNew(() =>
        {
            int processCount = 0;
            QueueItem item = null;
            while (!token.IsCancellationRequested)
            {
                if (hashQueue.TryDequeue(out item))
                {
                    MD5 md5 = MD5.Create();
                    item.Chunk.Hash = md5.ComputeHash(item.buffer);

                    if (processCount++ > 1000)
                    {
                        processCount = 0;
                        monitor.Set();
                    }
                }
            }
        }, token);

    foreach (var chunk in chunks)
    {
        if (hashQueue.Count > 10000)
        {
            monitor.WaitOne();
        }

        QueueItem item = new QueueItem()
        {
            buffer = new byte[Constants.CHUNK_SIZE_IN_BYTES],
            Chunk = chunk
        };

        Stopwatch sw = Stopwatch.StartNew();
        foreach (var source in chunk.Sources)
        {
            MemoryMappedFile mms = files[source.Filename];
            var view = mms.CreateViewStream(source.StartPosition, source.Length);
            view.Read(item.buffer, 0, source.Length);
        }

        sw.Restart();
        sw.Stop();
        hashQueue.Enqueue(item);
    }

    foreach (var x in files.Values)
    {
        x.Dispose();
    }
}
0 голосов
/ 24 февраля 2011
В 1000буфер функции из текущего ViewAccessor, когда вы исчерпали файл, начните разбивать следующий файл, используя текущее смещение хеш-патрона в качестве смещения ViewAccessor
0 голосов
/ 23 февраля 2011

Я тоже новичок в C #, но я думаю, что вы ищете System.IO.MemoryMappedFiles пространство имен начиная с C # 4.0

Используя эти функции API, операционная система сама заботится о том, как управлять текущей областью файлов в памяти.

Вместо кода копирования и вставки здесь, продолжайте читать эту статью: http://www.developer.com/net/article.php/3828586/Using-Memory-Mapped-Files-in-NET-40.htm

Что касается MD5, используйте класс System.Security.Cryptography.MD5CryptoServiceProvider. Может быть, это быстрее.

В вашем случае, когда вам нужно преодолеть «границы» одного файла, сделайте это. Пусть операционная система обрабатывает, как отображенные в памяти файлы представлены в памяти. Работайте так же, как если бы вы работали с буферами небольшого размера.

...