В NetMQ очень важна модель потоков.Действительно важно то, что вы не должны использовать сокет в нескольких потоках.Так что, если Thread # 1 создал сокет, он должен использовать его.Если вы хотите отправить сообщение из другого потока (скажем, из потока № 2), используя тот же сокет, просто забудьте об этом.Вы должны каким-то образом послать сообщения из потока № 2 в поток № 1, а затем отправить его через сокет клиенту.
Так что в принципе CreatePullAndPushSocket не так, и могут произойти странные вещи, чем.Вы создаете сокеты в одном потоке, а используете в другом.Это просто неправильно.
Другая вещь - это твоя тема. Спи.Вы не должны использовать Thread.Sleep, потому что ваш поток спит 1 с, а затем проверяет сокет один раз, затем спит и проверяет один раз.NetMQ имеет функцию TryReceive с таймаутом.Таким образом, он может проверить сокет на 1 секунду и выйти, чтобы проверить, звоните ли вы отменить / остановить или что-то еще.Или, что еще лучше, есть модуль опроса, который будет постоянно прослушивать сокет и позволять нам вызывать stop из другого потока.
Давайте посмотрим на этот код:
private Poller _poller;
public void Start()
{
Task.Factory.StartNew(()=> {
using(var pullSocket = new PullSocket("tcp://ip1:port1"))
using(_poller = new Poller())
{
pullSocket.ReceiveReady += (object sender, NetMQSocketEventArgsnetMqSocketEventArgs) =>
{
var message = netMqSocketEventArgs.Socket.ReceiveMultipartMessage();
}
_poller.Add(pullSocket);
_poller.Run();
}
});
}
public void Stop()
{
_poller?.Stop();
}
или, если вы хотите использовать код без программы-поллера с циклом while:
private readonly CancellationTokenSource _cts;
public void Start()
{
_cts = new CancellationTokenSource();
var token = _cts.Token;
Task.Factory.StartNew(()=> {
using(var pullSocket = new PullSocket("tcp://ip1:port1"))
{
while (cancellationToken.IsCancellationRequested == false)
{
NetMQMessage message = new NetMQMessage();
bool success = workerSocket.TryReceiveMultipartMessage(TimeSpan.FromSeconds(5), ref message);
if (success == false)
{
continue;
}
//if You reach this line, than You have a message
}
}
},
token,
TaskCreationOptions.LongRunning,
TaskScheduler.Default);
}
public void Stop()
{
_cts.Cancel();
_cts.Token.WaitHandle.WaitOne();//if You want wait until service will stop
}
Итак, возвращаясь к Вашему вопросу, вы должны использовать сокет только внутри его потока, в котором вы его создали.Хорошая вещь - всегда использовать сокет в операторе using, чтобы всегда освобождать его в конце.
Я не вижу использования метода SendMessageToClient, но я предположил, что Вы вызываете его с какой-то кнопки или чего-то еще.Вы можете сделать это, если конструктор сокета был вызван из этого потока.Если бы вы могли показать мне, где Вы вызываете этот метод, я мог бы кое-что сказать по этому поводу.