Как сохранить сокеты в хэше и зациклить их из другого потока? - PullRequest
8 голосов
/ 04 февраля 2010

Я работаю на многопоточном TCP-сервере. В основном потоке я слушаю сокет и создаю новый поток для новых входящих соединений. Я хочу сохранить все входящие соединения в хэше, чтобы получить доступ к ним из другого потока.

Из ветки монитора я не могу прочитать какие-либо недавно добавленные соединения. Похоже, при создании потока монитора создается новый хэш клиента.

Как мне сохранить список всех сокетов и зациклить их из потока моего монитора?

Текущий код:

#!/usr/bin/perl
use strict;
use IO::Socket;
use threads;
use Thread::Queue;

# init
my $clients = {};
my $queue = Thread::Queue->new;

# thread that monitors
threads->create("monitor");

# create the listen socket
my $listenSocket = IO::Socket::INET->new(LocalPort  => 12345,
                                      Listen   => 10,
                                      Proto   => 'tcp',
                                      Reuse   => 1);

# make sure we are bound to the port
die "Cant't create a listening socket: $@" unless $listenSocket;

print "Server ready. Waiting for connections on 34567 ... \n";

# wait for connections at the accept call
while (my $connection = $listenSocket->accept) {
    # set client socket to non blocking
    my $nonblocking = 1;
    ioctl($connection, 0x8004667e, \\$nonblocking);

    # autoflush
    $connection->autoflush(1);

    # debug
    print "Accepted new connection\n";

    # add to list
    $clients->{time()} = $connection;

    # start new thread and listen on the socket
    threads->create("readData", $connection);
}

sub readData {
     # socket parameter
     my ($client) = @_;

     # read client
     while (<$client>) {
      # remove newline
      chomp $_;

  # add to queue
      $queue->enqueue($_);
     }

     close $client;
}

sub monitor {
    # endless loop
    while (1) {

        # loop while there is something in the queue
        while ($queue->pending) {

            # get data from a queue
            my $data = $queue->dequeue;

            # loop all sockets
            while ( my ($key, $value) = each(%$clients) ) {

               # send to socket
               print $value "$data\n";

            }
        }

        # wait 0,25 seconds
        select(undef, undef, undef, 0.25);
    }
}

close $listenSocket;

Ответы [ 2 ]

8 голосов
/ 05 февраля 2010

Вам необходимо отправить $clients через share из threads::shared:

my $clients = &share({});

Старомодный синтаксис произошел из-за задокументированной проблемы с прототипами Perl. Если у вас хотя бы Perl 5.8.9 , используйте более приятный

my $clients = shared_clone({});

вместо.

Вы также хотите защитить $clients с помощью замка, например, ,

my $clients_lock : shared;
{
  lock $clients_lock;
  $clients->{time()} = fileno $connection;
}

Наконец, поскольку IO::Socket::INET экземпляры являются типоглобами Perl, вы не можете делиться ими, поэтому вместо этого добавьте их дескрипторы сокетов (от fileno) к $clients и затем fdopen сокет при необходимости с

open my $fh, ">&=", $sockdesc or warn ...

Программа ниже повторяет входящие данные для других подключенных сокетов:

#!/usr/bin/perl

use strict;
use IO::Socket;
use threads;
use threads::shared;
use Thread::Queue;

# init
my $clients = &share({});
my $clients_lock : shared;

my $queue = Thread::Queue->new;

# thread that monitors
threads->create("monitor");

# create the listen socket
my $port = 12345;
my $listenSocket = IO::Socket::INET->new(
  LocalPort  => $port,
  Listen     => 10,
  Proto      => 'tcp',
  Reuse      => 1
);

# make sure we are bound to the port
die "Can't create a listening socket: $@" unless $listenSocket;

print "Server ready. Waiting for connections on $port ... \n";

# wait for connections at the accept call
while (my $connection = $listenSocket->accept) {
  # set client socket to non blocking
  my $nonblocking = 1;
  ioctl($connection, 0x8004667e, \\$nonblocking);

  # autoflush
  $connection->autoflush(1);

  # debug
  print "Accepted new connection\n";

  # add to list
  {
    lock $clients_lock;
    $clients->{time()} = fileno $connection;
  }

  # start new thread and listen on the socket
  threads->create("readData", $connection);
}

sub readData {
  # socket parameter
  my ($client) = @_;

  # read client
  while (<$client>) {
    chomp;
    $queue->enqueue($_);
  }

  close $client;
}

sub monitor {
  # endless loop
  while (1) {
    # loop while there is something in the queue
    while ($queue->pending) {
      # get data from a queue
      my $data = $queue->dequeue;

      # loop all sockets
      {
        lock $clients_lock;
        while ( my ($key, $value) = each(%$clients) ) {
          # send to socket
          if (open my $fh, ">&=", $value) {
            print $fh "$data\n";
          }
          else {
            warn "$0: fdopen $value: $!";
          }
        }
      }
    }

    # wait 0,25 seconds
    select(undef, undef, undef, 0.25);
  }
}

close $listenSocket;
1 голос
/ 04 февраля 2010

Не слишком много опыта использования потоков в Perl, но я думаю, что вы просто хотите поделиться своим списком клиентов:

    use <A HREF="http://search.cpan.org/perldoc/threads::shared" rel="nofollow noreferrer">threads::shared</A>;
    my $clients : shared = {};


Обновление:

Perl жалуется на:

my $hash : shared = {};

но, похоже, все в порядке с:

my $hash = {};
share($hash);

Также этот код:

my $hash = { key1 => "value1" };
share($hash);

, кажется, очищает хеш-таблицу, но

my $hash = {};
share($hash);
$hash->{key1} = "value1";

работает так, как я ожидал.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...