Почему NetMQ не работает в среде NUnit - PullRequest
4 голосов
/ 10 июля 2019

Я создал Gist с моей реализацией NetMQ, так как я чувствую, что немного вставил сюда: https://gist.github.com/gthvidsten/e626d7e6c51012b1ba152d22e034d93d

Если я сделаю следующее в консольном приложении .Net Core, все будет работать нормально иЯ получаю событие MessageReceived:

static void Main(string[] args)
{
    _transportWithHost = new NetMqTransport(
        "tcp://localhost:9990",
        "tcp://localhost:9991",
        true);
    _transportWithHost.Start();

    Console.WriteLine("Press [Enter] to publish");
    Console.ReadLine();

    _transportWithHost.MessageReceived += (sender, e) =>
    {
        ; // Breakpoints here are hit
    };
    _transportWithHost.Publish(new byte[] { 1, 2, 3, 4 });

    Console.WriteLine("Press [Enter] to exit");
    Console.ReadLine();
}

Однако, если я пытаюсь сделать то же самое в тестовой среде NUnit, событие MessageReceived никогда не запускается!

class NetMqTransportTests
{
    private NetMqTransport _transportWithHost;

    [OneTimeSetUp]
    public void Setup()
    {
        _transportWithHost = new NetMqTransport(
            "tcp://localhost:9990",
            "tcp://localhost:9991",
            true);
        _transportWithHost.Start();
    }

    [Test]
    public void PublishTest()
    {
        ManualResetEvent mre = new ManualResetEvent(false);

        _transportWithHost.MessageReceived += (sender, e) =>
        {
            mre.Set();
            // Breakpoints here are never hit as MessageReceived is never called
        };

        _transportWithHost.Publish(new byte[] { 1, 2, 3, 4 });

        bool eventFired = mre.WaitOne(new TimeSpan(0, 0, 5));
        Assert.True(eventFired);
    }
}

Почему практически идентичный код работает в консольном приложении, но не в среде NUnit?

1 Ответ

3 голосов
/ 17 июля 2019

Я смог воспроизвести его и обнаружил этот поток https://github.com/zeromq/netmq/issues/482, который указывает на проблему с синхронизацией между запуском издателя и временем получения сообщения подписчиком.

using NetMQ;
using NetMQ.Sockets;
using NUnit.Framework;
using System;
using System.Threading;
using System.Threading.Tasks;

namespace Tests
{
    class NetMqTransportTests
    {
        [Test]
        public void TestMulticastNetMQ()
        {
            bool wasCalled = false;

            var coreEventbus = "tcp://localhost:12345";

            Task.Run(() =>
            {
                using (var subSocket = new SubscriberSocket())
                {
                    subSocket.Connect(coreEventbus);
                    subSocket.Subscribe("account");

                    while (true)
                    {
                        string messageTopicReceived = subSocket.ReceiveFrameString();
                        string messageReceived = subSocket.ReceiveFrameString();

                        Assert.IsTrue(messageReceived == "testing");
                        wasCalled = true;
                        break;
                    }
                }

            });

            Thread.Sleep(TimeSpan.FromSeconds(1));

            using (var pubSocket = new PublisherSocket())
            {
                pubSocket.Bind(coreEventbus);

                Thread.Sleep(500);
                pubSocket.SendMoreFrame("account").SendFrame("testing");
            }

            Thread.Sleep(TimeSpan.FromSeconds(5));

            Assert.IsTrue(wasCalled);
        }
    }
}

Обновление:

Вот модульные тесты, поставляемые с библиотекой NetMQ: https://github.com/zeromq/netmq/blob/master/src/NetMQ.Tests/XPubSubTests.cs

Посмотрите, как они разбивают создание экземпляра NetMqTransport на использование как XPublisherSocket, так и XPublisherSocket..

Также обратите внимание, что согласно проблеме 482 они делают задержку в 500 мс, чтобы позволить абоненту подключиться до получения сообщения, как они говорили в проблеме:

[Fact]
public void TopicPubSub()
{
    using (var pub = new XPublisherSocket())
    using (var sub = new XPublisherSocket())
    {
        var port = pub.BindRandomPort("tcp://127.0.0.1");
        sub.Connect("tcp://127.0.0.1:" + port);
        sub.SendFrame(new byte[] { 1, (byte)'A' });

        // let the subscriber connect to the publisher before sending a message
        Thread.Sleep(500);

        var msg = pub.ReceiveFrameBytes();
...