PHP & pthreads3: отправка конкретного задания работнику в зависимости от условий - PullRequest
0 голосов
/ 06 февраля 2019

Я хочу привнести многопоточность в мой проект, который написан на PHP.Я обнаружил, что pthreads - это хорошая библиотека для удовлетворения моих требований.Я постараюсь описать рабочий процесс моего приложения абстрактно, чтобы решение можно было применить для любого другого проекта с такой же архитектурой.

Прежде всего, у меня есть очередь сообщений, которая предоставляетсяRabbitMQ.У меня есть процесс проверки этих сообщений.После того, как я возьму и проверим сообщение из очереди, мне нужно отправить его одному из моих внешних приложений.

У меня есть класс (назовем его EXT_APP), который отвечает за взаимодействие с внешним приложением через сокет.Также у меня есть класс (назовем его EXT_APP_DISPATCHER), который имеет пул EXT_APP, и когда я заканчиваю проверять сообщение, я отправляю его через EXT_APP_DISPATCHER в EXT_APP с наименьшим количеством занятых каналов (скажем, не беспокойтесь о каналах, скажем таку каждого EXT_APP есть функция, которая дает значение, которое описывает, насколько тяжело сейчас загружается это внешнее приложение)

Поэтому я хочу, чтобы все эти EXP_APP были в отдельных потоках, потому что есть функции, которые читают / записывают события из/ к сокету, и это может быть хорошим моментом для повышения производительности приложения.Но как я могу решить, в какой поток я отправляю проверенное сообщение в зависимости от его «загрузки».Я ясно понимаю, что не могу просто вызвать функцию объекта, который находится в другом потоке.Есть способ реализовать или изменить пул потоков, чтобы я отправлял сообщение в EXT_APP в зависимости от их загрузки.

Я написал классы, чтобы вы поняли, о чем я говорю:

class EXT_APP_DISPATCHER
{
    private $connection_options = array(
        array(
            'host' => '127.0.0.1',
            'scheme' => 'tcp://',
            'port' => 5039,
            'username' => 'asterisk_2',
            'secret' => 'ajhu8dyvUWzV5JKa',
            'connect_timeout' => 100,
            'read_timeout' => 100
        ),
        array(
            'host' => '127.0.0.1',
            'scheme' => 'tcp://',
            'port' => 5038,
            'username' => 'asterisk_1',
            'secret' => 'DWSg33tCqwr7rXBk',
            'connect_timeout' => 100,
            'read_timeout' => 100
        )
    );

    private $pool;

    public function __construct()
    {
        $this->pool = new \Pool(2, EXT_APP::class, [$this->connection_options]);
    }

    public function processMessage($message): void
    {
        //todo Here I need to implement load balancing
        //todo I need to get load from every worker and decide which one should receive task
        $this->pool->submit(new TASK());
    }
}

class EXT_APP extends \Worker
{
    public static $counter = 0;

    private $client;

    public function __construct($options)
    {
        $this->client = new ClientImpl($options[self::$counter]);
        self::$counter++;

        $this->client->registerEventListener(function ($event)
        {
            echo 'Hey I received an event!';
        });

        $this->client->open();
    }

    public function run()
    {
    }

    public function sendEvent($event)
    {
        $this->client->send($event);
    }

    public function getAmountOfBusyChannels()
    {
        $action = new CoreShowChannelsAction();
        $actionResult = $this->client->send($action);

        foreach ($actionResult->getEvents() as $eventMessage) {
            if ($eventMessage instanceof CoreShowChannelsCompleteEvent) {
                return $eventMessage->getKey('listitems');
            }
        }

        return null;
    }
}

class TASK extends \Threaded
{
    public function run()
    {
        $this->worker->sendEvent(null);
    }
}

/*
 * EXT_APP_DISPATCHER initialization which is going to be located in main application class, 
 * so actually here before calling function processMessage 
 * I do receive message from RabbitMQ and validate it
 */
$ext_app_dispatcher = new EXT_APP_DISPATCHER();
$ext_app_dispatcher->processMessage(null);
$ext_app_dispatcher->processMessage(null);

Я ожидаю, что класс EXT_APP_DISPATCHER получит сообщение и передаст его работнику, который имеет наименьшую нагрузку.Все работники должны быть в разных темах.У вас есть идеи, как это можно сделать наилучшим образом?

...