Задания Gearman передаются нескольким работникам (PHP) - PullRequest
3 голосов
/ 08 марта 2012

У меня проблема в том, что в приложении PHP задания Gearman иногда передаются нескольким работникам. Я мог бы уменьшить код, чтобы воспроизвести его в один файл. Теперь я не уверен, является ли это ошибкой в ​​Gearman или ошибкой в ​​библиотеке pecl или, возможно, в моем коде.

Вот код для воспроизведения ошибки:

#!/usr/bin/php
<?php

// Try 'standard', 'exception' or 'exception-sleep'.
$sWorker = 'exception';



// Detect run mode "client" or "worker".
if (!isset($argv[1]))
    $sMode = 'client';
else
    $sMode = 'worker-' . $sWorker;

$sLogFilePath = __DIR__ . '/log.txt';

switch ($sMode) {

    case 'client':

        // Remove all queued test jobs and quit if there are test workers running.
        prepare();

        // Init the greaman client.
        $Client= new GearmanClient;
        $Client->addServer();

        // Empty the log file.
        file_put_contents($sLogFilePath, '');

        // Start some worker processes.
        $aPids = array();       
        for ($i = 0; $i < 100; $i++)
            $aPids[] = exec('php ' . __FILE__ . ' worker > /dev/null 2>&1 & echo $!');

        // Start some jobs. Also try doHigh(), doBackground() and
        // doBackgroundHigh();
        for ($i = 0; $i < 50; $i++)
            $Client->doNormal('test', $i);

        // Wait a second (when running jobs in background).
        // sleep(1);

        // Prepare the log file entries.
        $aJobs = array();
        $aLines = file($sLogFilePath);
        foreach ($aLines as $sLine) {
            list($sTime, $sPid, $sHandle, $sWorkload) = $aAttributes = explode("\t", $sLine);
            $sWorkload = trim($sWorkload);
            if (!isset($aJobs[$sWorkload]))
                $aJobs[$sWorkload] = array();
            $aJobs[$sWorkload][] = $aAttributes;
        }

        // Remove all jobs that have been passed to only one worker as expected.
        foreach ($aJobs as $sWorkload => $aJob) {
            if (count($aJob) === 1)
                unset($aJobs[$sWorkload]);
        }

        echo "\n\n";

        if (empty($aJobs))
            echo "No job has been passed to more than one worker.";
        else {
            echo "Those jobs has been passed more than one times to a worker:\n";
            foreach ($aJobs as $sWorload => $aJob) {

                echo "\nJob #" . $sWorload . ":\n";
                foreach ($aJob as $aAttributes)
                    echo "  $aAttributes[2] (Worker PID: $aAttributes[1])\n";
            }
        }

        echo "\n\n";

        // Kill all started workers.
        foreach ($aPids as $sPid)
            exec('kill ' . $sPid . ' > /dev/null 2>&1');

    break;

    case 'worker-standard':
        $Worker = new GearmanWorker;
        $Worker->addServer();
        $Worker->addFunction('test', 'logJob');
                    $bShutdown = false;
        while ($Worker->work())
            if ($bShutdown)
                continue;
        break;

    case 'worker-exception':
        try {
            $Worker = new GearmanWorker;
            $Worker->addServer();
            $Worker->addFunction('test', 'logJob');
            $bShutdown = false;
            while ($Worker->work())
                if ($bShutdown)
                    throw new \Exception;

        } catch (\Exception $E) {
        }
    break;

    case 'worker-exception-sleep':
        try {
            $Worker = new GearmanWorker;
            $Worker->addServer();
            $Worker->addFunction('test', 'logJob');
            $bShutdown = false;
            while ($Worker->work())
            {
                if ($bShutdown) {
                    sleep(1);
                    throw new \Exception;
                }
            }
        } catch (\Exception $E) {
        }
    break;
}

function logJob(\GearmanJob $Job)
{
    global $bShutdown, $sLogFilePath;
    $sLine = microtime(true) . "\t" . getmypid() . "\t" . $Job->handle() . "\t" . $Job->workload() . "\n";
    file_put_contents($sLogFilePath, $sLine, FILE_APPEND);
    $bShutdown = true;
}


function prepare()
{
    $rGearman = fsockopen('127.0.0.1', '4730', $iErrno, $sErrstr, 3);
    $aBuffer = array();
    fputs ($rGearman, 'STATUS' . PHP_EOL);
    stream_set_timeout($rGearman, 1);
    while (!feof($rGearman))
        if ('.' . PHP_EOL !== $sLine = fgets($rGearman, 128))
            $aBuffer[] = $sLine;
        else
            break;
    fclose($rGearman);

    $bJobsInQueue = false;
    $bWorkersRunning = false;
    foreach ($aBuffer as $sFunctionLine) {
        list($sFunctionName, $iQueuedJobs, $iRunningJobs, $iWorkers) = explode("\t", $sFunctionLine);
        if ('test' === $sFunctionName) {
            if (0 != $iQueuedJobs)
                $bJobsInQueue = true;
            if (0 != $iWorkers)
                $bWorkersRunning = true;;
        }
    }

    // Exit if there are workers running.
    if ($bWorkersRunning)
        die("There are some Gearman workers running that have registered a 'test' function. Please stop these workers and run again.\n\n");

    // If there are test jobs in the queue start a worker that eat up the jobs.
    if ($bJobsInQueue) {
        $sPid = exec('gearman -n -w -f test > /dev/null 2>&1 & echo $!');
        sleep(1);
        exec ("kill $sPid > /dev/null 2>&1");
        // Repeat this method to make sure all jobs are removed.
        prepare();
    }
}

Когда вы запускаете этот код в командной строке, он должен вывести "No job has been passed to more than one worker.", но всегда его выводит список некоторых заданий, которые были переданы нескольким работникам. Ошибка не появляется, если вы установили $sWorker = 'standard'; или 'exception-sleep'.

Мне бы очень помогло, если бы вы могли запустить код и сказать мне, если вы в состоянии воспроизвести ошибку, если у меня есть ошибка в коде.

1 Ответ

3 голосов
/ 21 мая 2012

У меня точно такая же проблема с Gearman 0.24, PECL lib 1.0.2.Был в состоянии воспроизвести ошибку с вашим скриптом каждый раз.

Старая версия Gearman (я думаю, 0.14) работала нормально.

Обновление Gearman до 0.33 исправило проблему.

...