gen_server закрывает сокет прослушивания - PullRequest
4 голосов
/ 03 сентября 2011

Я пытаюсь сделать так, чтобы процесс gen_server принял нового клиента и немедленно породил нового потомка для обработки следующего. Проблема, с которой я сталкиваюсь, заключается в том, что когда сокет завершается и последовательно завершает работу, он также закрывает сокет прослушивания, и я не могу понять, почему, даже если он больше не ссылается на него.

Есть идеи, что я делаю не так?

gen_server:

-module(simple_tcp).
-behaviour(gen_server).

%% API
-export([start_link/1, stop/0, start/0, start/1]).

%% gen-server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).

-define(SERVER, ?MODULE).
-define(DEFAULT_PORT, 1055).

-record(state, {port, lsock}).

start_link({port, Port}) ->
    gen_server:start_link(?MODULE, [{port, Port}], []);

start_link({socket, Socket}) ->
    gen_server:start_link(?MODULE, [{socket, Socket}], []).

start({port, Port}) ->
    simple_tcp_sup:start_child({port, Port});

start({socket, Socket}) ->
    simple_tcp_sup:start_child({socket, Socket}).

start() ->
    start({port, ?DEFAULT_PORT}).

stop() ->
    gen_server:cast(?SERVER, stop).

% Callback functions
init([{port, Port}]) ->
    {ok, LSock} = gen_tcp:listen(Port, [{active, true},{reuseaddr, true}]),
    init([{socket, LSock}]);

init([{socket, Socket}]) ->
    io:fwrite("Starting server with socket: ~p~n", [self()]),
    {ok, Port} = inet:port(Socket),
    {ok, #state{port=Port, lsock=Socket}, 0}. 

handle_call(_Msg, _From, State) ->
    {noreply, State}.

handle_cast(stop, State) ->
    {stop, ok, State}.

handle_info({tcp, Socket, RawData}, State) ->
    gen_tcp:send(Socket, io_lib:fwrite("Received raw data: ~p~n", [RawData])),
    {noreply, State};

handle_info({tcp_error, _Socket, Reason}, State) ->
    io:fwrite("Error: ~p~n", [Reason]),
    {stop, normal, State};

handle_info(timeout, #state{lsock = LSock} = State) ->
    case gen_tcp:accept(LSock) of
        {ok, Sock} ->
            io:fwrite("Accepting connection...~p~n", [self()]),
            start({socket, LSock}),
            {noreply, #state{lsock=Sock}};

        {error, Reason} ->
            io:fwrite("Error: ~p, ~p~n", [Reason, self()]),
            {stop, normal, State}
    end;

handle_info({tcp_closed, _Port}, State) ->
    io:fwrite("Socket closed: ~p~n", [self()]),
    simple_tcp_sup:kill_child(self()),
    {stop, normal, State}.

terminate(_Reason, _State) ->
    io:fwrite("Shutting down server: ~p~n", [self()]),
    ok.

code_change(_OldVsn, State, _Extra) ->
    {ok, State}.

руководитель:

-module(simple_tcp_sup).

-behaviour(supervisor).

-export([start_link/0,
         start_child/1
        ]). 

-export([init/1]).

-define(SERVER, ?MODULE).

start_link() ->
    supervisor:start_link({local, ?SERVER}, ?MODULE, []).

start_child({socket, Socket}) ->
    io:fwrite("Spawning child with socket...~n"),
    supervisor:start_child(?SERVER, [{socket, Socket}]);

start_child({port, Port}) ->
    io:fwrite("Spawning child with port...~n"),
    supervisor:start_child(?SERVER, [{port, Port}]).

init([]) ->
    Element = {simple_tcp, {simple_tcp, start_link, []},
               temporary, brutal_kill, worker, [simple_tcp]},
    Children = [Element],
    RestartStrategy = {simple_one_for_one, 0, 1}, 
    {ok, {RestartStrategy, Children}}.

Ответы [ 2 ]

5 голосов
/ 03 сентября 2011

Ваш третий handle_info меняет роли Sock и LSock. Он должен передать Sock дочернему процессу и оставить свое состояние без изменений.

Кстати: плохая карма для восстановления State с нуля (#state{lsock=Sock}), вы всегда должны получать новый State из текущего State (State#state{lsock=Sock}), на случай, если позже вы добавите больше переменных состояния. На самом деле, в этом есть ошибка (хотя и незначительная), поскольку вы выбрасываете номер порта.

1 голос
/ 03 сентября 2011

Хорошо, я предлагаю вам разрешить обработку Socket отдельными процессами, которые асинхронно связываются с gen_server и linked с ним.У меня есть пример кода, который покажет вам, как это можно сделать.Gen_server запускается и порождает прослушиватель TCP, который после успешного получения сокета прослушивания сообщает нашему gen_server об изменении его внутреннего состояния.Я расположил код сверху вниз.Все соответствующие функции были показаны.Сосредоточьтесь на процессах обработки сокетов и на том, как они взаимодействуют с gen_server

-define(PEER_CLIENT_TIMEOUT,timer:seconds(20)).
-define(PORT_RANGE,{10245,10265}).
-define(DEBUG(X,Y),error_logger:info_msg(X,Y)).
-define(ERROR(L),error_logger:error_report(L)).
-define(SOCKET_OPTS(IP),[inet,binary,{backlog,100},{packet,0},
                            {reuseaddr,true},{active,true},
                            {ip,IP}]).

%%----------------------------------------------------
%% gen_server starts here....

start(PeerName)-> 
    gen_server:start_link({local,?MODULE},?MODULE,PeerName,[]).

%%%-------------------------------------------
%% Gen_server init/1 function

init(PeerName)->
    process_flag(trap_exit,true),
    %% starting the whole Socket chain below..
    start_link_listener(),
    %% Socket stuff started, gen_server can now wait for async
    %% messages
    {ok,[]}.

%%% ---- Socket handling functions ---------

%% Function: start_link_listener/0
%% Purpose: Starts the whole chain of listening
%%          and waiting for connections. Executed
%%          directly by the gen_server process, But
%%          spawns a separate process to do the rest

start_link_listener()-> 
    Ip_address = get_myaddr(),  
    spawn_link(fun() -> listener(?SOCKET_OPTS(Ip_address)) end).

%%%----------------------------------------------   
%% Function: get_myaddr/0
%% Purpose: To pick the active IP address on my machine to
%%          listen on

get_myaddr()-> 
    ?DEBUG("Server> Trying to extract My Local Ip Address....",[]),
    {ok,Name} = inet:gethostname(),
    {ok,IP} = inet:getaddr(Name,inet),
    ?DEBUG("Server> Found Alive Local IP address: ~p.....~n",[IP]),
    IP.

%%%--------------------------------------------------
%% Function: listener/1, executed in a separate process
%% Purpose: Tries a given ?PORT_RANGE, with the given Socket Options
%%          Once it acquires a ListenSocket, it will cast the gen_server!

listener(SocketOpts)->
    process_flag(trap_exit,true),
    Ports = lists:seq(element(1,?PORT_RANGE),element(2,?PORT_RANGE)),
    case try_listening(SocketOpts,Ports) of
        {ok,Port,LSocket}->              
                PP = proplists:get_value(ip,SocketOpts),
                ?MODULE:started_listener(Port,PP,LSocket),              
                accept_connection(LSocket);
        {error,failed} -> {error,failed,SocketOpts}
    end.

try_listening(_Opts,[])-> {error,failed};
try_listening(Opts,[Port|Rest])->
    case gen_tcp:listen(Port,Opts) of
        {ok,Listen_Socket} -> {ok,Port,Listen_Socket};
        {error,_} -> try_listening(Opts,Rest)
    end.
%%%---------------------------------------------------------
%% Helper Functions for Converting IP Address from tuple
%% to string and vice versa

str(X) when is_integer(X)-> integer_to_list(X).

formalise_ipaddress({A,B,C,D})-> 
    str(A) ++ "." ++ str(B) ++ "." ++ str(C) ++ "." ++ str(D).

unformalise_address(String)-> 
    [A,B,C,D] = string:tokens(String,"."),
    {list_to_integer(A),list_to_integer(B),list_to_integer(C),list_to_integer(D)}.

%%%--------------------------------------------------
%% Function: get_source_connection/1
%% Purpose: Retrieving the IP and Port at the other
%%          end of the connection

get_source_connection(Socket)->
    try inet:peername(Socket) of
        {ok,{IP_Address, Port}} -> 
            [{ipAddress,formalise_ipaddress(IP_Address)},{port,Port}];
        _ -> failed_to_retrieve_address
    catch
        _:_ -> failed_to_retrieve_address
    end.

%%%-----------------------------------------------------
%% Function: accept_connection/1
%% Purpose: waits for a connection and re-uses the 
%%          ListenSocket by spawning another thread
%%          to take it and listen too. It casts the gen_server
%%          at each connection and provides details about it.

accept_connection(ListenSocket)->    
    case gen_tcp:accept(ListenSocket,infinity) of
        {ok, Socket}-> 
            %% re-use the ListenSocket below.....
            spawn_link(fun() -> accept_connection(ListenSocket) end),            
            OtherEnd = get_source_connection(Socket),
            ?MODULE:accepted_connection(OtherEnd),          
            loop(Socket,OtherEnd);
        {error,_} = Reason -> 
            ?ERROR(["Listener has failed to accept a connection",
                    {listener,self()},{reason,Reason}])
    end.

%%%-------------------------------------------------------------------------
%% Function: loop/2
%% Purpose: TCP reception loop, it casts the gen_server
%%          as soon as it receives something. gen_server
%%          is responsible for generating reponse
%% OtherEnd ::= [{ipAddress,StringIPAddress},{Port,Port}] or 'failed_to_retrieve_address'

loop(Socket,OtherEnd)-> 
    receive
        {tcp, Socket, Data}-> 
            ?DEBUG("Acceptor: ~p has received a binary message from: ~p~n",[self(),OtherEnd]),
            Reply = ?MODULE:incoming_binary_message(Data,OtherEnd),
            gen_tcp:send(Socket,Reply),         
            gen_tcp:close(Socket),
            exit(normal);
        {tcp_closed, Socket} -> 
            ?DEBUG("Acceptor: ~p. Socket closed by other end: ~p~n",[self(),OtherEnd]),
            ?MODULE:socket_closed(OtherEnd),
            exit(normal);
        Any -> ?DEBUG("Acceptor: ~p has received a message: ~p~n",[self(),Any])
    end.

%%%----------------------------------------------
%% Gen_server Asynchronous APIs

accepted_connection(failed_to_retrieve_address)-> ok;
accepted_connection([{ipAddress,StringIPAddress},{Port,Port}])->     
    gen_server:cast(?MODULE,{connected,StringIPAddress,Port}).

socket_closed(failed_to_retrieve_address)-> ok;
socket_closed([{ipAddress,StringIPAddress},{Port,Port}])->
    gen_server:cast(?MODULE,{socket_closed,StringIPAddress,Port}).

incoming_binary_message(Data,_OtherEnd)->  %% expecting a binary reply
    case analyse_protocol(Data) of
        wrong -> term_to_binary("protocol violation!");
        Val -> gen_server:call(?MODULE,{request,Val},infinity)
    end.

%%% -------------------- handle cast ------------------------------------------

handle_cast({listener_starts,_Port,_MyTupleIP,_LSocket} = Object,State)->
    NewState = do_something_with_the_listen_report(Object),
    {noreply,NewState};
handle_cast({connected,_StringIPAddress,_Port} = Object,State)->
    NewState = do_something_with_the_connection_report(Object),
    {noreply,NewState};
handle_cast({socket_closed,_StringIPAddress,_Port} = Object,State)->
    NewState = do_something_with_the_closed_connection_report(Object),
    {noreply,NewState};
handle_cast(Any,State)->
    ?DEBUG("Server> I have been casted some unknown message: ~p~n",[Any]),
    {noreply,State}.


%%%% ---------------------- handle call --------------
handle_call({request,Val},_,State)->
    {NewState,Reply} = req(Val,State),
    {reply,Reply,NewState};
handle_call(_,_,State)-> {reply,[],State}.

req(Val,State)->
    %% modify gen_server state and 
    %% build reply
    {NewState,Reply} = modify_state_and_get_reply(State,Val),
    {NewState,Reply}.

%%------------------- terminate/2 --------------------

terminate(_Reason,_State)-> ok.  

%%----------------- code_change/3   ------------------

code_change(_,State,_)-> {ok,State}.

Благодаря асинхронной возможности gen_server мы можем обрабатывать детали сокетов из отдельных связанных процессов.Затем эти процессы будут связываться с gen_server через cast и не блокируя gen_server от его параллельной природы.

...