ZeroMQ Broadcast с прокси-сообщениями о потере - PullRequest
0 голосов
/ 26 апреля 2020

Я делаю некоторые тесты производительности на ZeroMQ, чтобы сравнить его с другими, такими как RabbitMQ и ActiveMQ.

В моих широковещательных тестах и ​​во избежание «Проблемы обнаружения Dynami c», как указано в документации ZeroMQ, я использовал прокси. В моем сценарии я использую 50 параллельных издателей, каждое из которых отправляет 500 сообщений с задержкой 1 мс между отправками. Каждое сообщение затем читают 50 подписчиков. И как я сказал, я теряю сообщения, каждый из подписчиков должен получить в общей сложности 25000 сообщений, и каждый из них получает только от 5000 до 10000 сообщений.

Я использую Windows и C#. Net client clrzmq4 (4.1.0.31).

Я уже пробовал некоторые решения, которые я нашел в других сообщениях:

  • Я установил задержку на TimeSpan.MaxValue
  • Я установил значение ReceiveHighWatermark в 0 (поскольку оно представлено как бесконечное, но я также пробовал Int32.MaxValue)
  • Я установил флажок для приемников с медленным запуском, я заставил приемники запускаться за несколько секунд до того, как издатели
  • Я должен был убедиться, что сборка мусора для экземпляров сокетов не производится (задержка должна делать это, но чтобы убедиться)
  • У меня похожий сценарий (с похожими логиками c) с использованием NetMQ и работает нормально. Другой сценарий не использует безопасность, хотя и этот (и это также причина, почему я использую clrzmq в этом, потому что мне нужна аутентификация клиента с сертификатами, которые еще не возможны в NetMQ).

РЕДАКТИРОВАТЬ:

public class MCVEPublisher
{
    public void publish(int numberOfMessages)
    {
        string topic = "TopicA";

        ZContext ZContext = ZContext.Create();
        ZSocket publisher = new ZSocket(ZContext, ZSocketType.PUB);

        //Security
        // Create or load certificates
        ZCert serverCert = Main.GetOrCreateCert("publisher");
        var actor = new ZActor(ZContext, ZAuth.Action, null);
        actor.Start();
        // send CURVE settings to ZAuth
        actor.Frontend.Send(new ZFrame("VERBOSE"));
        actor.Frontend.Send(new ZMessage(new List<ZFrame>()
            { new ZFrame("ALLOW"), new ZFrame("127.0.0.1") }));
        actor.Frontend.Send(new ZMessage(new List<ZFrame>()
            { new ZFrame("CURVE"), new ZFrame(".curve") }));
        publisher.CurvePublicKey = serverCert.PublicKey;
        publisher.CurveSecretKey = serverCert.SecretKey;
        publisher.CurveServer = true;


        publisher.Linger = TimeSpan.MaxValue;
        publisher.ReceiveHighWatermark = Int32.MaxValue;
        publisher.Connect("tcp://127.0.0.1:5678");
        Thread.Sleep(3500);
        for (int i = 0; i < numberOfMessages; i++)
        {
            Thread.Sleep(1);
            var update = $"{topic} {"message"}";
            using (var updateFrame = new ZFrame(update))
            {
                publisher.Send(updateFrame);
            }

        }

        //just to make sure it does not end instantly
        Thread.Sleep(60000);
        //just to make sure publisher is not garbage collected
        ulong Affinity = publisher.Affinity;
    }
}

public class MCVESubscriber
{
    private ZSocket subscriber;
    private List<string> prints = new List<string>();

    public void read()
    {
        string topic = "TopicA";

        var context = new ZContext();
        subscriber = new ZSocket(context, ZSocketType.SUB);

        //Security
        ZCert serverCert = Main.GetOrCreateCert("xpub");
        ZCert clientCert = Main.GetOrCreateCert("subscriber");
        subscriber.CurvePublicKey = clientCert.PublicKey;
        subscriber.CurveSecretKey = clientCert.SecretKey;
        subscriber.CurveServer = true;
        subscriber.CurveServerKey = serverCert.PublicKey;

        subscriber.Linger = TimeSpan.MaxValue;
        subscriber.ReceiveHighWatermark = Int32.MaxValue;

        // Connect
        subscriber.Connect("tcp://127.0.0.1:1234");
        subscriber.Subscribe(topic);
        while (true)
        {
            using (var replyFrame = subscriber.ReceiveFrame())
            {
                string messageReceived = replyFrame.ReadString();
                messageReceived = Convert.ToString(messageReceived.Split(' ')[1]);
                prints.Add(messageReceived);
            }
        }
    }

    public void PrintMessages()
    {
        Console.WriteLine("printing " + prints.Count);
    }

}

public class Main
{
    static void Main(string[] args)
    {
        broadcast(500, 50, 50, 30000);
    }

    public static void broadcast(int numberOfMessages, int numberOfPublishers, int numberOfSubscribers, int timeOfRun)
    {
        new Thread(() =>
        {
            using (var context = new ZContext())
            using (var xsubSocket = new ZSocket(context, ZSocketType.XSUB))
            using (var xpubSocket = new ZSocket(context, ZSocketType.XPUB))
            {
                //Security
                ZCert serverCert = GetOrCreateCert("publisher");
                ZCert clientCert = GetOrCreateCert("xsub");
                xsubSocket.CurvePublicKey = clientCert.PublicKey;
                xsubSocket.CurveSecretKey = clientCert.SecretKey;
                xsubSocket.CurveServer = true;
                xsubSocket.CurveServerKey = serverCert.PublicKey;

                xsubSocket.Linger = TimeSpan.MaxValue;
                xsubSocket.ReceiveHighWatermark = Int32.MaxValue;
                xsubSocket.Bind("tcp://*:5678");


                //Security
                serverCert = GetOrCreateCert("xpub");
                var actor = new ZActor(ZAuth.Action0, null);
                actor.Start();
                // send CURVE settings to ZAuth
                actor.Frontend.Send(new ZFrame("VERBOSE"));
                actor.Frontend.Send(new ZMessage(new List<ZFrame>()
                    { new ZFrame("ALLOW"), new ZFrame("127.0.0.1") }));
                actor.Frontend.Send(new ZMessage(new List<ZFrame>()
                    { new ZFrame("CURVE"), new ZFrame(".curve") }));
                xpubSocket.CurvePublicKey = serverCert.PublicKey;
                xpubSocket.CurveSecretKey = serverCert.SecretKey;
                xpubSocket.CurveServer = true;

                xpubSocket.Linger = TimeSpan.MaxValue;
                xpubSocket.ReceiveHighWatermark = Int32.MaxValue;
                xpubSocket.Bind("tcp://*:1234");


                using (var subscription = ZFrame.Create(1))
                {
                    subscription.Write(new byte[] { 0x1 }, 0, 1);
                    xpubSocket.Send(subscription);
                }


                Console.WriteLine("Intermediary started, and waiting for messages");
                // proxy messages between frontend / backend
                ZContext.Proxy(xsubSocket, xpubSocket);
                Console.WriteLine("end of proxy");


                //just to make sure it does not end instantly
                Thread.Sleep(60000);
                //just to make sure xpubSocket and xsubSocket are not garbage collected
                ulong Affinity = xpubSocket.Affinity;
                int ReceiveHighWatermark = xsubSocket.ReceiveHighWatermark;

            }
        }).Start();

        Thread.Sleep(5000); //to make sure proxy started
        List<MCVESubscriber> Subscribers = new List<MCVESubscriber>();
        for (int i = 0; i < numberOfSubscribers; i++)
        {
            MCVESubscriber ZeroMqSubscriber = new MCVESubscriber();
            new Thread(() =>
            {
                ZeroMqSubscriber.read();
            }).Start();
            Subscribers.Add(ZeroMqSubscriber);
        }


        Thread.Sleep(10000);//to make sure all subscribers started
        for (int i = 0; i < numberOfPublishers; i++)
        {
            MCVEPublisher ZeroMqPublisherBroadcast = new MCVEPublisher();
            new Thread(() =>
            {
                ZeroMqPublisherBroadcast.publish(numberOfMessages);
            }).Start();
        }

        Thread.Sleep(timeOfRun);
        foreach (MCVESubscriber Subscriber in Subscribers)
        {
            Subscriber.PrintMessages();
        }
    }

    public static ZCert GetOrCreateCert(string name, string curvpath = ".curve")
    {
        ZCert cert;
        string keyfile = Path.Combine(curvpath, name + ".pub");
        if (!File.Exists(keyfile))
        {
            cert = new ZCert();
            Directory.CreateDirectory(curvpath);
            cert.SetMeta("name", name);
            cert.Save(keyfile);
        }
        else
        {
            cert = ZCert.Load(keyfile);
        }

        return cert;
    }
}

Этот код также выдает ожидаемое количество сообщений, когда защита отключена, но при включении его нет.

кто-нибудь знает что-то еще проверить? Или это было с кем-то раньше?

Спасибо

...