Использование Cowboy Websocket Client для тестирования с помощью Elixir - PullRequest
0 голосов
/ 21 июня 2019

Во-первых, на самом деле не хватает документации для Cowboy вообще и Websockets в частности, но в целом его отлично использовать, если он расшифрован.Тогда получение этой информации от Эрланга до Эликсира - еще один шаг.Благодаря этому сообщению от 7stud я смог заставить функционировать websocket работать в целях тестирования, но я не могу заставить его слушать и при желании отправлять сообщения одновременно.Я думаю, это потому, что получение блокирует поток, который необходим для отправки, и это неразрывно связано с соединением веб-сокета, поэтому он не может отправлять, пока ожидает получения.Может быть, это понимание неверно.Я хотел бы быть исправленным.Я пытался порождать безрезультатно, поэтому я думаю, что получение блокирует поток веб-сокетов.

def ws do
    localhost = 'localhost'
    path = '/ws/app/1'
    port = 5000

    {:ok, _} = :application.ensure_all_started(:gun)
    connect_opts = %{
      connect_timeout: :timer.minutes(1),
      retry: 10,
      retry_timeout: 100
    }

    {:ok, conn_pid} = :gun.open(localhost, port, connect_opts)
    IO.inspect(conn_pid, label: "conn_pid")
    {:ok, protocol} = :gun.await_up(conn_pid)
    IO.inspect(protocol, label: "protocol")
    # Set custom header with cookie for device id
    stream_ref = :gun.ws_upgrade(conn_pid, path, [{"cookie", "device_id=1235"}])
    IO.inspect(stream_ref, label: "stream_ref")
    receive do
      {:gun_upgrade, ^conn_pid, ^stream_ref, ["websocket"], headers} ->
              upgrade_success(conn_pid, headers, stream_ref)
      {:gun_response, ^conn_pid, _, _, status, headers} ->
              exit({:ws_upgrade_failed, status, headers})
      {:gun_error, _conn_pid, _stream_ref, reason} ->
              exit({:ws_upgrade_failed, reason})
      whatever ->
        IO.inspect(whatever, label: "Whatever")
      # More clauses here as needed.
    after 5000 ->
        IO.puts "Took too long!"
        :erlang.exit("barf!")
    end
    :ok
  end

  def upgrade_success(conn_pid, headers, stream_ref) do
    IO.puts("Upgraded #{inspect(conn_pid)}. Success!\nHeaders:\n#{inspect(headers)}\n")

    IO.inspect(self(), label: "upgrade self")
    # This one runs and message is received
    run_test(conn_pid)
    # This should spawn and therefore not block
    listen(conn_pid, stream_ref)
    # This never runs
    run_test(conn_pid)
  end

  def listen(conn_pid, stream_ref) do
    spawn receive_messages(conn_pid, stream_ref)
  end
  def receive_messages(conn_pid, stream_ref) do
    IO.inspect conn_pid, label: "conn_pid!"
    IO.inspect stream_ref, label: "stream_ref!"
    IO.inspect(self(), label: "self pid")
    receive do
      {:gun_ws, ^conn_pid, ^stream_ref, {:text, msg} } ->
          IO.inspect(msg, label: "Message from websocket server:")
      other_messages ->
        IO.inspect(other_messages, label: "Other messages")
    after 5000 ->
      IO.puts "Receive timed out"
    end
    receive_messages(conn_pid, stream_ref)
  end

  def send_message(message, conn_pid) do
    :gun.ws_send(conn_pid, {:text, message})
  end

  def run_test(conn_pid) do
    IO.puts "Running test"
    message = "{\"type\":\"init\",\"body\":{\"device_id\":1234}}"
    send_message(message, conn_pid)
  end

  def stop(conn_pid) do
    :gun.shutdown(conn_pid)
  end

Ответы [ 2 ]

1 голос
/ 22 июня 2019

Из документов для пистолета :

Получение данных

Пистолет отправляет сообщение Erlang на Процесс владельца для каждого полученного сообщения Websocket.

и :

Соединение

...

Соединения пистолета

...

Соединение пистолета - это процесс Эрланга, который управляет сокетом для удаленной конечной точки.Это соединение Gun принадлежит пользовательскому процессу, который называется владельцем соединения, и управляется деревом контроля приложения Gun.

Процесс владельца связывается с соединением Gun, вызывая функции из модуля.пистолет.Все функции выполняют свои соответствующие операции асинхронно.Соединение Gun будет отправлять сообщения Erlang владельцу процесса всякий раз, когда это необходимо.

Хотя это и не упоминается конкретно в документации, я почти уверен, что процесс владельца - это процесс, которыйзвонки gun:open().Мои попытки также показывают, что владелец процесса должен вызвать gun:ws_send().Другими словами, процесс-владелец должен и отправлять сообщения на сервер, и получать сообщения от сервера.

Следующий код управляет оружием с gen_server таким образом, что gen_server отправляет сообщениясервер и получает сообщения от сервера.

Когда gun получает сообщение от http-сервера cowboy, gun отправляет сообщение, то есть Pid ! Msg, процессу владельца.В следующем коде gen_server создает соединение в обратном вызове init/1, что означает, что пистолет будет бить (!) Сообщения, которые он получает от ковбоя в gen_server.Gen_server обрабатывает сообщения, отправленные непосредственно в его почтовый ящик с handle_info().

В handle_cast(), gen_server использует оружие для отправки запросов ковбою.Поскольку handle_cast() является асинхронным, это означает, что вы можете отправлять асинхронные сообщения ковбою.И когда gun получает сообщение от ковбоя, gun отправляет (!) Сообщение gen_server, а функция handle_info() gen_server обрабатывает сообщение.Внутри handle_info(), gen_server:reply/2 вызывается для передачи сообщения клиенту gen_server.В результате, клиент gen_server может перейти в условие получения, когда захочет проверить серверные сообщения, отправленные оружием.

-module(client).
-behavior(gen_server).
-export([start_server/0, send_sync/1, send_async/1, get_message/2, go/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2]).
-export([terminate/2, code_change/3]).  %%% client functions
-export([sender/1]).

%%% client functions
%%%

start_server() ->
    gen_server:start({local, ?MODULE}, ?MODULE, [], []).

send_sync(Requ) ->
    gen_server:call(?MODULE, Requ).

send_async(Requ) -> 
    gen_server:cast(?MODULE, {websocket_request, Requ}).

get_message(WebSocketPid, ClientRef) ->
    receive
        {ClientRef, {gun_ws, WebSocketPid, {text, Msg} }} ->
            io:format("Inside get_message(): Ref = ~w~n", [ClientRef]),
            io:format("Client received gun message: ~s~n", [Msg]);
        Other ->
            io:format("Client received other message: ~w~n", [Other])
    end.

receive_loop(WebSocketPid, ClientRef) ->
    receive
        {ClientRef, {gun_ws, WebSocketPid, {text, Msg} }} ->
            io:format("Client received Gun message: ~s~n", [Msg]);
        Other ->
            io:format("Client received other message: ~w~n", [Other])
    end,
    receive_loop(WebSocketPid, ClientRef).

go() ->
    {ok, GenServerPid} = start_server(),
    io:format("[ME]: Inside go(): GenServerPid=~w~n", [GenServerPid]),

    [{conn_pid, ConnPid}, {ref, ClientRef}] = send_sync(get_conn_pid),
    io:format("[ME]: Inside go(): ConnPid=~w~n", [ConnPid]),

    ok = send_async("ABCD"),
    get_message(ConnPid, ClientRef),

    spawn(?MODULE, sender, [1]),

    ok = send_async("XYZ"),
    get_message(ConnPid, ClientRef),

    receive_loop(ConnPid, ClientRef).

sender(Count) -> %Send messages to handle_info() every 3 secs
    send_async(lists:concat(["Hello", Count])),
    timer:sleep(3000),
    sender(Count+1).

%%%%%% gen_server callbacks
%%%

init(_Arg) ->
    {ok, {no_client, ws()}}.

handle_call(get_conn_pid, From={_ClientPid, ClientRef}, _State={_Client, WebSocketPid}) ->
    io:format("[ME]: Inside handle_call(): From = ~w~n", [From]),
    {reply, [{conn_pid, WebSocketPid}, {ref, ClientRef}], _NewState={From, WebSocketPid} };
handle_call(stop, _From, State) ->
    {stop, normal, shutdown_ok, State}; %Calls terminate()
handle_call(_Other, _From, State) ->
    {ok, State}.

handle_cast({websocket_request, Msg}, State={_From, WebSocketPid}) ->
    gun:ws_send(WebSocketPid, {text, Msg}), %{text, "It's raining!"}),
    {noreply, State}.

handle_info(Msg, State={From, _WebSocketPid}) ->
    io:format("[ME]: Inside handle_info(): Msg=~w~n", [Msg]),
    gen_server:reply(From, Msg),
    {noreply, State}.

terminate(_Reason, _State={_From, WebSocketPid}) -> 
    gun:shutdown(WebSocketPid).


code_change(_OldVsn, State, _Extra) ->
    {ok, State}.


%%%% private functions
%%%

ws() ->
    {ok, _} = application:ensure_all_started(gun),
    {ok, ConnPid} = gun:open("localhost", 8080),
    {ok, _Protocol} = gun:await_up(ConnPid),

    gun:ws_upgrade(ConnPid, "/please_upgrade_to_websocket"),

    receive
        {gun_ws_upgrade, ConnPid, ok, Headers} ->
            io:format("[ME]: Inside gun_ws_upgrade receive clause: ~w~n", 
                      [ConnPid]),
            upgrade_success_handler(ConnPid, Headers);
        {gun_response, ConnPid, _, _, Status, Headers} ->
            exit({ws_upgrade_failed, Status, Headers});
        {gun_error, _ConnPid, _StreamRef, Reason} ->
            exit({ws_upgrade_failed, Reason})
    after 1000 ->
        exit(timeout)
    end.


upgrade_success_handler(ConnPid, _Headers) ->
    io:format("[ME]: Inside upgrade_success_handler(): ~w~n", [ConnPid]),  
    ConnPid.

====== *

Ой, ответ ниже показывает, как заставить сервер передавать данные клиенту.

Хорошо, я понял - в erlang.Этот пример немного замучен.Вам нужно сделать пару вещей:

1) Вам нужно получить pid процесса, выполняющего функции websocket_*, который не совпадает с pid запроса:

Инициализация после обновления

У Cowboy есть отдельные процессы для обработки соединения и запросов.Поскольку Websocket берет на себя соединение, обработка протокола Websocket происходит не так, как обработка запросов.

Это отражено в различных обратных вызовах, которые имеют обработчики Websocket.Обратный вызов init / 2 вызывается из процесса временного запроса, а обратные вызовы websocket_ из процесса подключения.

Это означает, что некоторая инициализация не может быть выполнена из init / 2.Все, что потребует текущего pid или будет привязано к текущему pid, не будет работать должным образом.Необязательный websocket_init / 1 можно использовать [для получения pid процесса, выполняющего обратные вызовы websocket_]:

https://ninenines.eu/docs/en/cowboy/2.6/guide/ws_handlers/

Вот код, который я использовал:

init(Req, State) ->
    {cowboy_websocket, Req, State}.  %Perform websocket setup

websocket_init(State) ->
    io:format("[ME]: Inside websocket_init"),
    spawn(?MODULE, push, [self(), "Hi, there"]),
    {ok, State}.

push(WebSocketHandleProcess, Greeting) ->
    timer:sleep(4000),
    WebSocketHandleProcess ! {text, Greeting}.

websocket_handle({text, Msg}, State) ->
    timer:sleep(10000), %Don't respond to client request just yet.
    {
     reply, 
     {text, io_lib:format("Server received: ~s", [Msg]) },
     State
    };
websocket_handle(_Other, State) ->  %Ignore
    {ok, State}.

Это приведет к отправке сообщения клиенту, пока клиент ожидает ответа на запрос, который клиент ранее отправил на сервер.

2) Если вы отправляете сообщение процессу, которыйвыполняет функции websocket_*:

Pid ! {text, Msg}

, тогда это сообщение будет обрабатываться функцией websocket_info(), а не функцией websocket_handle():

websocket_info({text, Text}, State) ->
    {reply, {text, Text}, State};
websocket_info(_Other, State) ->
    {ok, State}.

Возвращаемое значение функции websocket_info() работает так же, как и возвращаемое значение функции websocket_handle().

Поскольку ваш клиент оружия теперь получает несколько сообщений, клиент оружия должен получать в цикле:

upgrade_success_handler(ConnPid, Headers) ->
    io:format("Upgraded ~w. Success!~nHeaders:~n~p~n", 
              [ConnPid, Headers]),

    gun:ws_send(ConnPid, {text, "It's raining!"}),

    get_messages(ConnPid).  %Move the receive clause into a recursive function

get_messages(ConnPid) ->
    receive
        {gun_ws, ConnPid, {text, "Greeting: " ++ Greeting} } ->
            io:format("~s~n", [Greeting]),
            get_messages(ConnPid);

        {gun_ws, ConnPid, {text, Msg} } ->
            io:format("~s~n", [Msg]),
            get_messages(ConnPid)
    end.
0 голосов
/ 25 июня 2019

Спасибо 7stud за пример кода и правки, которые отражены ниже:

Вот моя интерпретация Elixir, чтобы дать базовый клиент WebSocket для оружия:

defmodule WebsocketTester.Application do

  use Application

  def start(_type, _args) do

    path = '/ws/app/1'

    port = 5000

    host = 'localhost'

    args = %{path: path, port: port, host: host}

    children = [
      { WebSocket.Client, args }
    ]
    Supervisor.start_link(children, strategy: :one_for_one, name: WebsocketTester.Supervisor)
  end
end

defmodule WebSocket.Client do

  use GenServer

  def child_spec(opts) do
    %{
      id: __MODULE__,
      start: {__MODULE__, :start_link, [opts]},
      type: :worker,
      restart: :permanent,
      shutdown: 500
    }
  end

  def start_link(args) do
    GenServer.start_link(__MODULE__, args, name: __MODULE__)
  end

  # GenServer callbacks

  def init(args) do
    # Set up the websocket connection
    # get > upgrade
    # Initial state with gun_pid and stream_ref
    # %{gun_pid: gun_pid, stream_ref: stream_ref} = ws(args)
    {:ok, init_ws(args)}
  end

  # Give back gun_pid from state
  def handle_call(:get_conn, from, %{gun_pid: gun_pid, stream_ref: stream_ref}) do
    IO.inspect(gun_pid, label: "handle call gun pid")
    {:reply, %{gun_pid: gun_pid, stream_ref: stream_ref}, %{from: from, gun_pid: gun_pid} }
  end
  # Everything else
  def handle_call(other, from, state) do
    IO.inspect(other, label: "other call")
    IO.inspect(from, label: "from")
    {:ok, state}
  end
  # Client sends message to server.
  def handle_cast({:websocket_request, message}, %{gun_pid: gun_pid} = state) do
    IO.puts message
    IO.inspect(gun_pid, label: "gun_pid")
    :gun.ws_send(gun_pid, {:text, message})
    {:noreply, state}
  end

  def handle_info(message, %{from: from} = state) do
    IO.inspect(message, label: "Inside handle_info(): ")
    GenServer.reply(from, message)
    {:noreply, state}
  end

  def terminate(reason, _state) do
    IO.puts "Terminated due to #{reason}."
    :ok
  end


  def code_change(_old_version, state, _extra) do
    {:ok, state}
  end

  ## Client functions
  # Used for getting gun_pid from state
  def send_sync(request) do
    GenServer.call(__MODULE__, request)
  end

  # Send a message async
  def send_async(request) do
    GenServer.cast(__MODULE__, {:websocket_request, request})
  end

  # Receive a single message
  def get_message(stream_ref, gun_pid) do
      receive do
          {^stream_ref, {:gun_ws, ^gun_pid, {:text, message} }} ->
              IO.puts("Client received gun message: #{message}")
          other ->
            IO.inspect(other, label: "Client received other message")
      end
  end

  # Receive all messages recursively
  def receive_loop(stream_ref, gun_pid) do
    IO.puts "Listening"
      get_message(stream_ref, gun_pid)
      receive_loop(stream_ref, gun_pid)
  end

  def go() do
    # Get the gun_pid from state
    %{gun_pid: gun_pid, stream_ref: stream_ref} = send_sync(:get_gun_pid)
    IO.inspect(gun_pid, label: "Inside go(): gun_pid=")
    # Send messages manually
    :ok = send_async(Jason.encode!(%{type: "info", greet: "yo"}))
    # Or to send just text
    # :ok = send_async("yo")

    # Receive messages manually
    get_message(stream_ref, gun_pid)

    # Start sending loop
    spawn sender 1

    # Start listening
    receive_loop(stream_ref, gun_pid)
  end

  # Send messages to handle_info() every 3 secs
  def sender(count) do
      send_async("count is #{count}")
      :timer.sleep(3000)
      sender(count+1)
  end

  ## End of client functions

  # Initialize the websocket connection
  def init_ws(args) do

    %{ path: path, port: port, host: host} = args

    {:ok, _} = :application.ensure_all_started(:gun)
    connect_opts = %{
      connect_timeout: :timer.minutes(1),
      retry: 10,
      retry_timeout: 100
    }

    {:ok, gun_pid} = :gun.open(host, port, connect_opts)
    {:ok, _protocol} = :gun.await_up(gun_pid)
    # Set custom header with cookie for device id - set_headers can be left out if you don't want custom headers
    stream_ref = :gun.ws_upgrade(gun_pid, path, set_headers("I like cookies"))
    receive do
      {:gun_upgrade, ^gun_pid, ^stream_ref, ["websocket"], headers} ->
            upgrade_success(gun_pid, headers, stream_ref)
      {:gun_response, ^gun_pid, _, _, status, headers} ->
              exit({:ws_upgrade_failed, status, headers})
      {:gun_error, _gun_pid, _stream_ref, reason} ->
              exit({:ws_upgrade_failed, reason})
      whatever ->
        IO.inspect(whatever, label: "Whatever")
      # More clauses here as needed.
    after 5000 ->
        IO.puts "Took too long!"
        :erlang.exit("barf!")
    end
    # stop(gun_pid)
  end


  def set_headers(cookie_value) do
    [{"cookie", "my_cookie=#{cookie_value}"}]
  end

  # This just returns the gun_pid for further reference which gets stored in the GenServer state.
  def upgrade_success(gun_pid, headers, stream_ref) do
    IO.puts("Upgraded #{inspect(gun_pid)}. Success!\nHeaders:\n#{inspect(headers)}\n")
    %{stream_ref: stream_ref, gun_pid: gun_pid}
  end

  # To stop gun
  def stop(gun_pid) do
    :gun.shutdown(gun_pid)
  end

end

Чтобы использовать это:

iex -S mix
iex> WebSocket.Client.go
...