Я хочу привнести многопоточность в мой проект, который написан на PHP.Я обнаружил, что pthreads - это хорошая библиотека для удовлетворения моих требований.Я постараюсь описать рабочий процесс моего приложения абстрактно, чтобы решение можно было применить для любого другого проекта с такой же архитектурой.
Прежде всего, у меня есть очередь сообщений, которая предоставляетсяRabbitMQ.У меня есть процесс проверки этих сообщений.После того, как я возьму и проверим сообщение из очереди, мне нужно отправить его одному из моих внешних приложений.
У меня есть класс (назовем его EXT_APP), который отвечает за взаимодействие с внешним приложением через сокет.Также у меня есть класс (назовем его EXT_APP_DISPATCHER), который имеет пул EXT_APP, и когда я заканчиваю проверять сообщение, я отправляю его через EXT_APP_DISPATCHER в EXT_APP с наименьшим количеством занятых каналов (скажем, не беспокойтесь о каналах, скажем таку каждого EXT_APP есть функция, которая дает значение, которое описывает, насколько тяжело сейчас загружается это внешнее приложение)
Поэтому я хочу, чтобы все эти EXP_APP были в отдельных потоках, потому что есть функции, которые читают / записывают события из/ к сокету, и это может быть хорошим моментом для повышения производительности приложения.Но как я могу решить, в какой поток я отправляю проверенное сообщение в зависимости от его «загрузки».Я ясно понимаю, что не могу просто вызвать функцию объекта, который находится в другом потоке.Есть способ реализовать или изменить пул потоков, чтобы я отправлял сообщение в EXT_APP в зависимости от их загрузки.
Я написал классы, чтобы вы поняли, о чем я говорю:
class EXT_APP_DISPATCHER
{
private $connection_options = array(
array(
'host' => '127.0.0.1',
'scheme' => 'tcp://',
'port' => 5039,
'username' => 'asterisk_2',
'secret' => 'ajhu8dyvUWzV5JKa',
'connect_timeout' => 100,
'read_timeout' => 100
),
array(
'host' => '127.0.0.1',
'scheme' => 'tcp://',
'port' => 5038,
'username' => 'asterisk_1',
'secret' => 'DWSg33tCqwr7rXBk',
'connect_timeout' => 100,
'read_timeout' => 100
)
);
private $pool;
public function __construct()
{
$this->pool = new \Pool(2, EXT_APP::class, [$this->connection_options]);
}
public function processMessage($message): void
{
//todo Here I need to implement load balancing
//todo I need to get load from every worker and decide which one should receive task
$this->pool->submit(new TASK());
}
}
class EXT_APP extends \Worker
{
public static $counter = 0;
private $client;
public function __construct($options)
{
$this->client = new ClientImpl($options[self::$counter]);
self::$counter++;
$this->client->registerEventListener(function ($event)
{
echo 'Hey I received an event!';
});
$this->client->open();
}
public function run()
{
}
public function sendEvent($event)
{
$this->client->send($event);
}
public function getAmountOfBusyChannels()
{
$action = new CoreShowChannelsAction();
$actionResult = $this->client->send($action);
foreach ($actionResult->getEvents() as $eventMessage) {
if ($eventMessage instanceof CoreShowChannelsCompleteEvent) {
return $eventMessage->getKey('listitems');
}
}
return null;
}
}
class TASK extends \Threaded
{
public function run()
{
$this->worker->sendEvent(null);
}
}
/*
* EXT_APP_DISPATCHER initialization which is going to be located in main application class,
* so actually here before calling function processMessage
* I do receive message from RabbitMQ and validate it
*/
$ext_app_dispatcher = new EXT_APP_DISPATCHER();
$ext_app_dispatcher->processMessage(null);
$ext_app_dispatcher->processMessage(null);
Я ожидаю, что класс EXT_APP_DISPATCHER получит сообщение и передаст его работнику, который имеет наименьшую нагрузку.Все работники должны быть в разных темах.У вас есть идеи, как это можно сделать наилучшим образом?