Я решил это, добавив gc_collect_cycles () к функции onClose.
Мой код теперь выглядит следующим образом:
<?php
//Pusher
namespace MyApp;
use Ratchet\ConnectionInterface;
use Ratchet\Wamp\WampServerInterface;
class Pusher implements WampServerInterface
{
//Array of channels with connected clients
protected $subscribed_channels= array();
protected $clients=array();
protected $connections;
public function __construct() {
$this->connections = new \SplObjectStorage;
}
//Accepts client connections and adds the channel subscribed to in the subscribed channels
public function onSubscribe(ConnectionInterface $conn, $channel)
{
$channel_id=$channel->getId();
$this->subscribed_channels[$channel_id] = $channel;
$connection_id=$conn->resourceId;
$this->clients[$connection_id]=array('channel'=>$channel);
$channel_subscriptions=$channel->count();
//echo "Client {$connection_id} has subscribed to {$channel_id} channel. {$channel_subscriptions} Cleints Subscribed \n";
}
//Accepts data as json containing the channel being pushed to, and the data being pushed.
public function onPush($request)
{
$request_array = json_decode($request, true);
if(!isset($request_array['channel']))
{
echo"Channel not sent by app \n";
return;
}
//Check if data has been sent by app
if(!isset($request_array['data']))
{
echo"Data not sent by app \n";
return;
}
// If the channel being pushed to has no subscribers, don't push
if (!array_key_exists($request_array['channel'], $this->subscribed_channels))
{
echo"{$request_array['channel']} channel has no subscribers \n";
return;
}
$channel = $this->subscribed_channels[$request_array['channel']];
// Foward the request to all the subscribers in the channel
$data=json_encode($request_array['data']);
$channel->broadcast($data);
//echo "Pushing data to {$request_array['channel']} channel at ".strftime('%Y-%m-%d %H:%M:%S')."\n";
}
public function onUnSubscribe(ConnectionInterface $conn, $channel)
{
$connection_id=$conn->resourceId;
$channel_id=$channel->getId();
$channel_subscriptions=$channel->count()-1;
if($channel_subscriptions==0)
{
unset($this->subscribed_channels[$channel_id]);
}
//echo "Client {$conn->resourceId} has disconnected from {$channel_id} channel. {$channel_subscriptions} Clients subscribed \n";
}
public function onOpen(ConnectionInterface $conn)
{
$this->connections->attach($conn);
}
public function onClose(ConnectionInterface $conn)
{
$connection_id=$conn->resourceId;
$channel=$this->clients[$connection_id]['channel'];
unset($this->clients[$connection_id]);
$this->onUnSubscribe($conn,$channel);
$this->connections->detach($conn);
gc_collect_cycles();
}
public function onCall(ConnectionInterface $conn, $id, $channel, array $params)
{
// In this application if clients send data it's because the user hacked around in console
$conn->callError($id, $channel, 'You are not allowed to make calls')->close();
}
public function onPublish(ConnectionInterface $conn, $channel, $event, array $exclude, array $eligible)
{
// In this application if clients send data it's because the user hacked around in console
$conn->close();
}
public function onError(ConnectionInterface $conn, \Exception $e)
{
echo "An error has occurred: {$e->getMessage()}\n";
}
}
//Web socket server code
<?php
require dirname(__DIR__) . '/vendor/autoload.php';
$loop = React\EventLoop\Factory::create();
$pusher = new MyApp\Pusher;
// Listen for the web server to make a ZeroMQ push after an ajax request
$context = new React\ZMQ\Context($loop);
$pull = $context->getSocket(ZMQ::SOCKET_PULL);
$pull->bind('tcp://127.0.0.1:5555'); // Binding to 127.0.0.1 means the only client that can connect is itself
$pull->on('message', array($pusher, 'onPush'));
//Replace the variables $my_ip and $my_port with your server $ip and $port.
$webSock = new React\Socket\Server('$my_ip:$my_port', $loop);
$webServer = new Ratchet\Server\IoServer(
new Ratchet\Http\HttpServer(
new Ratchet\WebSocket\WsServer(
new Ratchet\Wamp\WampServer(
$pusher
)
)
),
$webSock
);
$loop->run();
//My php app code
//You will need the zmq php module in your php for this to work
function push_to_socket($channel,$data)
{
$message = array(
'channel' => $channel
, 'data' => $data
);
$context = new ZMQContext();
$socket = $context->getSocket(ZMQ::SOCKET_PUSH, 'my pusher');
$socket->connect("tcp://localhost:5555");
$socket->send(json_encode($message));
}
//My Javascript code
//You need AutobahnJS for this to work.
//You might also need jQuery or else modify the code to eliminate jQuery syntax
//Replace 'channel' with a channel/topic name you want to use.
<script type="text/javascript">
$(document).ready(get_connected);
function get_connected()
{
var hasWs = 'WebSocket' in window || 'MozWebSocket' in window;
if (hasWs)
{
conn = new ab.Session('wss:{my_domain}/wss',
function()
{
conn.subscribe('channel', function(topic,message){
//Process your message
});
},
function() {
//console.warn('Connection closed');
get_connected();
},
{'skipSubprotocolCheck': true}
);
}
}
</script>