Gearman с несколькими серверами и PHP-работниками - PullRequest
9 голосов
/ 16 августа 2011

У меня проблема с работниками Gearman, работающими на нескольких серверах, которую я не могу решить.

Проблема возникает, когда рабочий сервер переводится в автономный режим, а не отменяется рабочий процесс, и приводит к ошибкам и сбоям всех других рабочих процессов.

Пример только с 1 клиентом и 2 работниками -

Клиент:

$client = new GearmanClient ();

$client->addServer ('192.168.1.200');
$client->addServer ('192.168.1.201');

$job = $client->do ('generate_tile', serialize ($arrData));

рабочий:

$worker = new GearmanWorker ();

$worker->addServer ('192.168.1.200');
$worker->addServer ('192.168.1.201');

$worker->addFunction ('generate_tile', 'generate_tile');

while (1)
{
    if (!$worker->work ())
    {

        switch ($worker->returnCode ())
        {

            default:
                echo "Error: " . $worker->returnCode () . ': ' . $worker->error () . "\n";
                break;

        }

    }
}

function generate_tile ($job) { ... }

Рабочий код запускается на 2 отдельных серверах. Когда каждый сервер включен и работает, оба работника выполняют задания, как и ожидалось. Когда один из рабочих процессов отменяется, другой рабочий выполняет все задания, как ожидалось.

Однако, когда сервер с отмененным рабочим процессом выключен и полностью отключен, запросы к клиентскому сценарию зависают, а оставшийся рабочий процесс не получает никаких заданий.

Я получаю следующий набор ошибок от оставшегося рабочего процесса:

Error: 46: gearman_con_wait:timeout reached
Error: 46: gearman_con_wait:timeout reached
Error: 4: gearman_con_flush:write:110
Error: 46: gearman_con_wait:timeout reached
Error: 4: gearman_con_flush:write:113
Error: 4: gearman_con_flush:write:113
Error: 4: gearman_con_flush:write:113
....

Когда я запускаю другой сервер, не запуская на нем рабочий процесс, оставшийся рабочий процесс немедленно переходит в жизнь и выполняет все оставшиеся задания.

Мне кажется, что мне нужен некоторый код в рабочем процессе, чтобы справиться с любыми серверами, которые могут быть отключены, однако я не могу понять, как это сделать.

Большое спасибо,

Andy

Ответы [ 4 ]

5 голосов
/ 16 августа 2011

Наши тесты с несколькими серверами Gearman показывают, что если последний сервер в списке (192.168.1.201 в вашем случае) будет отключен, рабочие перестанут работать так, как вы описываете.(Кроме того, рабочие получают задания с последнего сервера. Они обрабатывают задания на .200, только если на .201 нет заданий).

Похоже, что это ошибка со связанным списком на сервере gearman., который, как сообщается, исправлен несколько раз, но со всеми доступными версиями gearman ошибка сохраняется.Извините, я знаю, что это не решение, но у нас была та же проблема, и мы не нашли решение.(если кто-то может предоставить рабочее решение этой проблемы, я согласен дать большую награду)

4 голосов
/ 29 мая 2012

В дополнение к комментарию @Darhazer выше. Мы также нашли это и решили так: -

// Gearman workers show a strong preference for servers at the end of a list so randomize the order
$worker = new GearmanWorker();
$s2 = explode(",", Configure::read('workers.servers'));
shuffle($s2);
$servers = implode(",", $s2);
$worker->addServers($servers); 

У нас работает от 6 до 10 работников в любое время, и срок их действия истекает после выполнения x запросов.

2 голосов
/ 17 августа 2011

Я использую этот класс, который отслеживает, какие задания работают на каких серверах.Это не было тщательно проверено, просто написал это сейчас.Я вставил отредактированную версию, поэтому возможна опечатка или что-то в этом роде, но в противном случае проблема может быть решена.

<?
class MyGearmanClient {
        static $server = "server1,server2,server3";
        static $server_array = false;
        static $workingServers = false;
        static $gmclient = false;
        static $timeout = 5000;
        static $defaultTimeout = 5000;

        static function randomServer() {
                return self::$server_array[rand(0, count(self::$server_array) -1)];
        }

        static function getServer($job = false) {
                if (self::$server_array == false) {
                        self::$server_array = explode(",", self::$server);
                        self::$workingServers = array();
                }

                $serverList = array();
                if ($job) {
                        if (array_key_exists($job, self::$workingServers)) {
                                foreach (self::$server_array as $server) {
                                        if (array_key_exists($server, self::$workingServers[$job])) {
                                                if (self::$workingServers[$job][$server]) {
                                                        $serverList[] = $server;
                                                }
                                        } else {
                                                $serverList[] = $server;
                                        }
                                }
                                if (count($serverList) == 0) {
                                        # All servers have failed, need to insert all the servers again and retry.
                                        $serverList = self::$workingServers[$job] = self::$server_array;
                                }
                                return $serverList[rand(0, count($serverList) - 1)];
                        } else {
                                return self::randomServer();
                        }
                } else {
                        return self::randomServer();
                }
        }

        static function serverWorked($server, $job) {
                self::$workingServers[$job][$server] = $server;
        }

        static function serverFailed($server, $job) {
                self::$workingServers[$job][$server] = false;
        }

        static function Connect($server = false, $job = false) {
                if ($server) {
                        self::$server = self::getServer();
                }

                self::$gmclient= new GearmanClient();
                self::$gmclient->setTimeout(self::$timeout);

                # add the default job server
                self::$gmclient->addServer($server = self::getServer($job));

                return $server;
        }

        static function Destroy() {
                self::$gmclient = false;
        }

        static function Client($name, $vars, $timeout = false) {
                if (is_int($timeout)) {
                        self::$timeout = $timeout;
                } else {
                        self::$timeout = self::$defaultTimeout;
                }


                do {
                        $server = self::Connect(false, $name);
                        $value = self::$gmclient->do($name, $vars);
                        $return_code = self::$gmclient->returnCode();
                        if (!$value) {
                                $error_message = self::$gmclient->error();
                                if ($return_code == 47) {
                                        self::serverFailed($server, $name);
                                        if (count(self::$server_array) > 1) {
                                             // ADDED SINGLE SERVER LOOP AVOIDANCE // echo "Timeout on server $server, trying another server...\n";
                                             continue;
                                        } else {
                                             return false;
                                        }
                                }
                                echo "ERR: $error_message ($return_code)\n";
                        }
                        # printf("Worker has returned\n");
                        $short_value = substr($value, 0, 80);
                        switch ($return_code)
                        {
                        case GEARMAN_WORK_DATA:
                                echo "DATA: $short_value\n";
                                break;
                        case GEARMAN_SUCCESS:
                                self::serverWorked($server, $name);
                                break;
                        case GEARMAN_WORK_STATUS:
                                list($numerator, $denominator)= self::$gmclient->doStatus();
                                echo "Status: $numerator/$denominator\n";
                                break;
                        case GEARMAN_TIMEOUT:
                                // self::Connect();
                                // Fall through
                        default:
                                echo "ERR: $error_message " . self::$gmclient->error() . " ($return_code)\n";
                                break;
                        }
                }
                while($return_code != GEARMAN_SUCCESS);

                $rv = unserialize($value);
                return $rv["rv"];
        }
}

# Example usage:
#    $rv = MyGearmanClient::Client("Function", $args);

?>
0 голосов
/ 19 мая 2017

, так как 'addServer' из клиента gearman не работает должным образом, этот код может выбрать сервер заданий случайным образом, а в случае сбоя попробуйте следующий, таким образом вы можете сбалансировать нагрузку.

        // job servers
        $jobservers = array('192.168.1.1','192.168.1.2');
        // prepare gearman client
        $gmclient = new GearmanClient();
        // shuffle job servers (deliver jobs equally by server)
        shuffle($jobservers);
        // add job servers
        foreach($jobservers as $jobserver) {
            // add random jobserver
            $gmclient->addServer($jobserver);
            // check server state if ok end foreach
            if (@$gmclient->ping('ping')) break;
            // if connections fails reset client
            $gmclient = new GearmanClient();
        }
...