Использование одной и той же модели для параллельной десериализации (эффективно) - PullRequest
3 голосов
/ 25 октября 2011

Я пытаюсь прочитать несколько файлов, которые были сериализованы с ProtoBuf.NET, используя .NET Задачи, подобные этой:

public static ResultsDump Amalgamate(RuntimeTypeModel model, IEnumerable<string> files)
{      
  var readDumpTasks = 
    files.Select(fn =>
      Task<ResultsDump>.Factory.StartNew(() => {
        try {
          using (var dumpFile = new FileStream(fn, FileMode.Open))
          {
            var miniDump = (ResultsDump)model.Deserialize(dumpFile, null, typeof(ResultsDump));
            if (miniDump == null) {
              throw new Exception(string.Format("Failed to deserialize dump file {0}", fn));
            }
            //readDumps.Add(miniDump);
            return miniDump;
          }
        }
        catch (Exception e) {
          throw new Exception(string.Format("cannot read dump file {0}: {1}", fn, e.Message), e);
        }
      })).ToArray();

  Task.WaitAll(readDumpTasks);

  var allDumps = readDumpTasks.Select(t => t.Result).ToList();

  // Goes on.. irrelevant to the question
}

По какой-то причине загрузка ЦП на самом деле не идет выше одного ядра . Есть ли в Protobuf.NET какая-то внутренняя блокировка, которая не любит одновременную десериализацию нескольких файлов?

Я пробовал это с несколькими экземплярами RuntimeTypeModel, а также с одним, и он, кажется, всегда достигает пика при очень "низком" уровне загрузки процессора.

Я даже не прав, обвиняя ProtoBuf.NET? Это распределитель памяти .NET / TPL?

1 Ответ

6 голосов
/ 25 октября 2011

Преднамеренно очень ограниченная блокировка в protobuf-net;он действительно блокируется только при проверке типов (первый запуск), чтобы увидеть, что нужно.Как только модель понятна, она не блокируется и разрабатывается так, чтобы быть тривиально параллельной.

Как отмечается (комментарии), весьма вероятно, что IO - это ваше узкое место.Действительно, распараллеливание доступа к одному и тому же физическому диску / шпинделю обычно значительно уменьшает пропускную способность , поскольку буфер более загружен и должен выполнять больше поиска, чем непрерывного чтения.

Это должно бытьлегко тестировать / проверять: для выполнения теста вместо чтения с диска загрузите их все в память first ;

var ms = new MemoryStream(
    File.ReadAllBytes(path));

После загрузки всех файлов выполните один и тот же кодно передавая MemoryStream s в качестве входных данных.Если он все еще не масштабируется, это может быть ошибкой.Я строго подозреваю , однако, что вы обнаружите, что он хорошо распараллеливается в этой точке.


Вот рабочий пример, который для меня насыщает все мои ядра одновременной десериализацией:

using System.Collections.Generic;
using ProtoBuf;
using System;
using System.IO;
using System.Threading.Tasks;


internal class Program
{
    private static void Main()
    {
        var foo = new Foo { Bars = new List<Bar>() };
        var rand = new Random(1234);
        for (int i = 0; i < 1000; i++)
        {
            var bar = new Bar
            {
                A = rand.Next(),
                B = rand.Next(),
                C = rand.Next(),
                D = rand.Next(),
                E = rand.Next(),
                F = rand.Next(),
                G = rand.Next(),
                H = rand.Next()
            };
            foo.Bars.Add(bar);
        }
        var ms = new MemoryStream();
        Serializer.Serialize(ms, foo);
        var bytes = ms.ToArray();
        const int count = 100000;
        Parallel.For(0, count, x =>
        {
            Serializer.Deserialize<Foo>(new MemoryStream(bytes));
        });
    }
}
[ProtoContract]
internal class Foo
{
    [ProtoMember(1)]
    public List<Bar> Bars { get; set; }
}
[ProtoContract]
internal class Bar
{
    [ProtoMember(1)]
    public int A { get; set; }
    [ProtoMember(2)]
    public int B { get; set; }
    [ProtoMember(3)]
    public int C { get; set; }
    [ProtoMember(4)]
    public int D { get; set; }
    [ProtoMember(5)]
    public int E { get; set; }
    [ProtoMember(6)]
    public int F { get; set; }
    [ProtoMember(7)]
    public int G { get; set; }
    [ProtoMember(8)]
    public int H { get; set; }
}
...