Как сделать синхронный запрос веб-сокета с помощью websocketsharp? - PullRequest
1 голос
/ 30 мая 2020

Я работаю над проектом c#. Я хотел бы отправить запрос API через библиотеку websocketsharp каким-то синхронным способом.

Я пытался сделать это следующим образом:

  1. Перед отправкой при любом запросе WS мы создаем новый объект SynchronousRequest () с уникальным идентификатором и добавляем вновь созданный объект в какой-то список ожидания

  2. Мы отправляем запрос WS, добавляя уникальный идентификатор к полезной нагрузке, на ответ - сервер вернет тот же идентификатор.

  3. Мы начинаем ждать сигнала события (сигнализация происходит после получения ответа)

В обработчике ответа:

Как только приходит ответ WS, я пытаюсь сопоставить контекст по уникальному идентификатору После совпадения мы сигнализируем событию, что ответ был получен, и добавляем полезную нагрузку ответа в объект synchronousRequest ()

Проблема - шаг 3, как только я использую WaitOne () в случае, когда весь клиент websocket зависает и больше не будет получено никаких ответов, что приведет к полной тупиковой ситуации.

Как я могу это сделать какой-то вызов WaitOne () в отдельном потоке или, возможно, существует полностью лучшее решение для моей проблемы, поэтому весь клиент не зависает, и мы сопоставляем контексты?

public class SynchronousRequest
{
    public long requestId;
    public ManualResetEvent resetEvent = new ManualResetEvent(false);
    public dynamic response;

    public SynchronousRequest()
    {
        var random = new Random();
        requestId = random.Next();
    }

}


public class APIWebSocket: BaseAPIWebSocket
{


    private List<SynchronousRequest> waitingSyncRequests = new List<SynchronousRequest>();


    public APIWebSocket()
    {
        ws = new WebSocket("wss://www.someserver.com");

        registerConnectionEvents(); //Registers onOpen(), onMessage() handlers and similar
    }



    public void SendSyncTest()
    {
        var sr = new SynchronousRequest();
        waitingSyncRequests.Add(sr);

        //some data to send
        var msg = new
        {
            jsonrpc = "2.0",
            method = "public/ticker",
            id = sr.requestId, //Response should contain the same ID
            @params = new
            {
                instrument_name = "ETH"
            }
        };

        ws.Send(JsonConvert.SerializeObject(msg));

        //Below WaitOne() causes the entire websocket connection/thread to block
        // No further messages will be received by HandleMessage() once we call WaitOne()

        sr.resetEvent.WaitOne(); //Wait until we receive notification that response has been received

        //do some processing on response here... 

       //Synchronous request completed, remove it from list
       waitingSyncRequests.Remove(sr);
    }




    protected override void OnReceivedMessage(System.Object sender, WebSocketSharp.MessageEventArgs e)
    {
        dynamic message = JsonConvert.DeserializeObject(e.Data);

        if (message.id != null )
        {
            //Find a resetEvent for given message.id
            var matchingSyncRequest = waitingSyncRequests.First(r => r.requestId == message.id);
            if (matchingSyncRequest != null)
            {
                matchingSyncRequest.response = message;
                matchingSyncRequest.resetEvent.Set(); //Notify that response has been received
            }
        }

    }

}

1 Ответ

0 голосов
/ 16 июля 2020

Насколько я понимаю, вам нужно await для какого-то события, которое будет назначено в будущем. Вы можете использовать TaskCompletionSource.

  • 1. Каждый раз, когда вы отправляете сообщение, в котором вам нужен его результат, вы можете создать TaskCompletionSource (tcs), добавить его к * 1008. * и отправьте сообщение через сокет.

  • 2.Вы await tcs.Task или tcs.Task.Result на этом tcs.

  • 3 . На другом Thread / Task вы можете обработать свой входящий responses (в вашем случае у вас есть обработчик полученного сообщения. Всякий раз, когда вы получаете ответ целевого type или id, вы можете просто получить tcs из словаря на основе идентификатора и Task.SetResult(response) it. В этот момент вызывающий абонент (тот, которого вы ждете, разблокируется).

     public class Message
     {
       public string ID{get;set;}
     }
     public class Response
     {
         public string Id{get;set;}
     }
    
     public void MyClass
     {
        private ConcurrentDictionary<string,TaskCompletionSource<Response>>map=new ConcurrentDictionary<string,TaskCompletionSource<Response>>();
        private Websocket socket;
        public void SomeEventTrigger()
        {
            var msg=new Message{ Id="somespecialID" };
            var tcs=new TaskCompletionSource<Response>();
            ws.Send(JsonConvert.SerializeObject(msg));
            if(!this.map.TryAdd(msg.Id,tcs))
            {
               return;
            }
            var result=tcs.Result; //this gets blocked until you use `tcs.SetResult`- >  that would happen in your OnReceivedMessage  
            this.map.TryRemove(msg.Id,out TaskCompletionSource<Response>resp);
    
        }
        protected override void OnReceivedMessage(System.Object sender, WebSocketSharp.MessageEventArgs e)
        {
             Response message = JsonConvert.DeserializeObject<Response>(e.Data);
    
             if (message.id != null )
             {
    
                 if(this.map.TryGetValue(message.id),out TaskCompletionSource<Response> tcs)
                 {
                    tcs.SetResult(message); //this unblocks the method that wrote the message to the socket   (above)
                 }
    
    
             }
          }
       }
    

PS Вам необходимо убедиться, что вы используете Task.SetResult на правильном tcs, чтобы правильный вызов метода SomeEventTrigger перестал ждать.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...