Из документов для пистолета :
Получение данных
Пистолет отправляет сообщение Erlang на Процесс владельца для каждого полученного сообщения Websocket.
и :
Соединение
...
Соединения пистолета
...
Соединение пистолета - это процесс Эрланга, который управляет сокетом для удаленной конечной точки.Это соединение Gun принадлежит пользовательскому процессу, который называется владельцем соединения, и управляется деревом контроля приложения Gun.
Процесс владельца связывается с соединением Gun, вызывая функции из модуля.пистолет.Все функции выполняют свои соответствующие операции асинхронно.Соединение Gun будет отправлять сообщения Erlang владельцу процесса всякий раз, когда это необходимо.
Хотя это и не упоминается конкретно в документации, я почти уверен, что процесс владельца - это процесс, которыйзвонки gun:open()
.Мои попытки также показывают, что владелец процесса должен вызвать gun:ws_send()
.Другими словами, процесс-владелец должен и отправлять сообщения на сервер, и получать сообщения от сервера.
Следующий код управляет оружием с gen_server
таким образом, что gen_server отправляет сообщениясервер и получает сообщения от сервера.
Когда gun получает сообщение от http-сервера cowboy, gun отправляет сообщение, то есть Pid ! Msg
, процессу владельца.В следующем коде gen_server
создает соединение в обратном вызове init/1
, что означает, что пистолет будет бить (!) Сообщения, которые он получает от ковбоя в gen_server
.Gen_server обрабатывает сообщения, отправленные непосредственно в его почтовый ящик с handle_info()
.
В handle_cast()
, gen_server
использует оружие для отправки запросов ковбою.Поскольку handle_cast()
является асинхронным, это означает, что вы можете отправлять асинхронные сообщения ковбою.И когда gun получает сообщение от ковбоя, gun отправляет (!) Сообщение gen_server, а функция handle_info()
gen_server обрабатывает сообщение.Внутри handle_info()
, gen_server:reply/2
вызывается для передачи сообщения клиенту gen_server
.В результате, клиент gen_server
может перейти в условие получения, когда захочет проверить серверные сообщения, отправленные оружием.
-module(client).
-behavior(gen_server).
-export([start_server/0, send_sync/1, send_async/1, get_message/2, go/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2]).
-export([terminate/2, code_change/3]). %%% client functions
-export([sender/1]).
%%% client functions
%%%
start_server() ->
gen_server:start({local, ?MODULE}, ?MODULE, [], []).
send_sync(Requ) ->
gen_server:call(?MODULE, Requ).
send_async(Requ) ->
gen_server:cast(?MODULE, {websocket_request, Requ}).
get_message(WebSocketPid, ClientRef) ->
receive
{ClientRef, {gun_ws, WebSocketPid, {text, Msg} }} ->
io:format("Inside get_message(): Ref = ~w~n", [ClientRef]),
io:format("Client received gun message: ~s~n", [Msg]);
Other ->
io:format("Client received other message: ~w~n", [Other])
end.
receive_loop(WebSocketPid, ClientRef) ->
receive
{ClientRef, {gun_ws, WebSocketPid, {text, Msg} }} ->
io:format("Client received Gun message: ~s~n", [Msg]);
Other ->
io:format("Client received other message: ~w~n", [Other])
end,
receive_loop(WebSocketPid, ClientRef).
go() ->
{ok, GenServerPid} = start_server(),
io:format("[ME]: Inside go(): GenServerPid=~w~n", [GenServerPid]),
[{conn_pid, ConnPid}, {ref, ClientRef}] = send_sync(get_conn_pid),
io:format("[ME]: Inside go(): ConnPid=~w~n", [ConnPid]),
ok = send_async("ABCD"),
get_message(ConnPid, ClientRef),
spawn(?MODULE, sender, [1]),
ok = send_async("XYZ"),
get_message(ConnPid, ClientRef),
receive_loop(ConnPid, ClientRef).
sender(Count) -> %Send messages to handle_info() every 3 secs
send_async(lists:concat(["Hello", Count])),
timer:sleep(3000),
sender(Count+1).
%%%%%% gen_server callbacks
%%%
init(_Arg) ->
{ok, {no_client, ws()}}.
handle_call(get_conn_pid, From={_ClientPid, ClientRef}, _State={_Client, WebSocketPid}) ->
io:format("[ME]: Inside handle_call(): From = ~w~n", [From]),
{reply, [{conn_pid, WebSocketPid}, {ref, ClientRef}], _NewState={From, WebSocketPid} };
handle_call(stop, _From, State) ->
{stop, normal, shutdown_ok, State}; %Calls terminate()
handle_call(_Other, _From, State) ->
{ok, State}.
handle_cast({websocket_request, Msg}, State={_From, WebSocketPid}) ->
gun:ws_send(WebSocketPid, {text, Msg}), %{text, "It's raining!"}),
{noreply, State}.
handle_info(Msg, State={From, _WebSocketPid}) ->
io:format("[ME]: Inside handle_info(): Msg=~w~n", [Msg]),
gen_server:reply(From, Msg),
{noreply, State}.
terminate(_Reason, _State={_From, WebSocketPid}) ->
gun:shutdown(WebSocketPid).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%%% private functions
%%%
ws() ->
{ok, _} = application:ensure_all_started(gun),
{ok, ConnPid} = gun:open("localhost", 8080),
{ok, _Protocol} = gun:await_up(ConnPid),
gun:ws_upgrade(ConnPid, "/please_upgrade_to_websocket"),
receive
{gun_ws_upgrade, ConnPid, ok, Headers} ->
io:format("[ME]: Inside gun_ws_upgrade receive clause: ~w~n",
[ConnPid]),
upgrade_success_handler(ConnPid, Headers);
{gun_response, ConnPid, _, _, Status, Headers} ->
exit({ws_upgrade_failed, Status, Headers});
{gun_error, _ConnPid, _StreamRef, Reason} ->
exit({ws_upgrade_failed, Reason})
after 1000 ->
exit(timeout)
end.
upgrade_success_handler(ConnPid, _Headers) ->
io:format("[ME]: Inside upgrade_success_handler(): ~w~n", [ConnPid]),
ConnPid.
====== *
Ой, ответ ниже показывает, как заставить сервер передавать данные клиенту.
Хорошо, я понял - в erlang.Этот пример немного замучен.Вам нужно сделать пару вещей:
1) Вам нужно получить pid процесса, выполняющего функции websocket_*
, который не совпадает с pid запроса:
Инициализация после обновления
У Cowboy есть отдельные процессы для обработки соединения и запросов.Поскольку Websocket берет на себя соединение, обработка протокола Websocket происходит не так, как обработка запросов.
Это отражено в различных обратных вызовах, которые имеют обработчики Websocket.Обратный вызов init / 2 вызывается из процесса временного запроса, а обратные вызовы websocket_ из процесса подключения.
Это означает, что некоторая инициализация не может быть выполнена из init / 2.Все, что потребует текущего pid или будет привязано к текущему pid, не будет работать должным образом.Необязательный websocket_init / 1 можно использовать [для получения pid процесса, выполняющего обратные вызовы websocket_]:
https://ninenines.eu/docs/en/cowboy/2.6/guide/ws_handlers/
Вот код, который я использовал:
init(Req, State) ->
{cowboy_websocket, Req, State}. %Perform websocket setup
websocket_init(State) ->
io:format("[ME]: Inside websocket_init"),
spawn(?MODULE, push, [self(), "Hi, there"]),
{ok, State}.
push(WebSocketHandleProcess, Greeting) ->
timer:sleep(4000),
WebSocketHandleProcess ! {text, Greeting}.
websocket_handle({text, Msg}, State) ->
timer:sleep(10000), %Don't respond to client request just yet.
{
reply,
{text, io_lib:format("Server received: ~s", [Msg]) },
State
};
websocket_handle(_Other, State) -> %Ignore
{ok, State}.
Это приведет к отправке сообщения клиенту, пока клиент ожидает ответа на запрос, который клиент ранее отправил на сервер.
2) Если вы отправляете сообщение процессу, которыйвыполняет функции websocket_*
:
Pid ! {text, Msg}
, тогда это сообщение будет обрабатываться функцией websocket_info()
, а не функцией websocket_handle()
:
websocket_info({text, Text}, State) ->
{reply, {text, Text}, State};
websocket_info(_Other, State) ->
{ok, State}.
Возвращаемое значение функции websocket_info()
работает так же, как и возвращаемое значение функции websocket_handle()
.
Поскольку ваш клиент оружия теперь получает несколько сообщений, клиент оружия должен получать в цикле:
upgrade_success_handler(ConnPid, Headers) ->
io:format("Upgraded ~w. Success!~nHeaders:~n~p~n",
[ConnPid, Headers]),
gun:ws_send(ConnPid, {text, "It's raining!"}),
get_messages(ConnPid). %Move the receive clause into a recursive function
get_messages(ConnPid) ->
receive
{gun_ws, ConnPid, {text, "Greeting: " ++ Greeting} } ->
io:format("~s~n", [Greeting]),
get_messages(ConnPid);
{gun_ws, ConnPid, {text, Msg} } ->
io:format("~s~n", [Msg]),
get_messages(ConnPid)
end.