Если это ваше собственное (самостоятельно написанное) приложение, возможно, вам следует добавить некоторые функции, которые позволяют приостанавливать или останавливать выполнение.
Один пример будет на каждой итерации X, сценарий проверяет ресурс на наличиекоманды.Если в очереди ресурсов есть команды, он выполняет их по порядку, удаляет их и продолжает (если применимо).
Например, для плоского файла или базы данных можно добавить команду STOP-SUSPEND_EXECUTION
.Когда ваш скрипт читает эту строку или строку, он приостанавливает нормальное выполнение, но продолжает периодически проверять ресурс.После чего, если прочитана команда RESUME
, выполнение возобновляется с того места, где оно было прервано, поскольку оно не вышло из итерационного цикла.
Теперь вы можете с помощью интерфейса командной строки или другого интерфейса добавлять команды в очередьи приложение ответит соответствующим образом.
Вы можете даже прихотить, добавив временные метки для отсрочки выполнения команды.
PS: Если вы выполняете такие задачи, как массовая рассылка и т. д., возможно, вырассмотреть возможность перемещения этих сценариев в интерфейс командной строки.Я упоминаю об этом только на основании вашего комментария о «закрытии браузера».
Может использовать некоторую работу, но она делает свое дело.run()
принимает в качестве аргумента функцию обратного вызова $job
.Эта функция представляет собой одну итерацию любой пакетной работы, которую вы выполняете (массовая рассылка и т. Д.) И $data
в виде массива данных.На каждой итерации $job
дается следующий элемент массива $data
в качестве набора аргументов.
$data = array(
array('name' => 'bob', 'email' => 'bob@site.com'),
array('name' => 'jim', 'email' => 'jim@site.com'),
array('name' => 'ann', 'email' => 'ann@site.com'),
);
$job = function($name, $email){
// do something with $name
// and $email
};
$batch->run($job, $data);
Вам нужны несколько таблиц (в стиле MySQL Workbench):
CREATE SCHEMA IF NOT EXISTS `batchtest` DEFAULT CHARACTER SET latin1 COLLATE latin1_swedish_ci ;
USE `batchtest` ;
CREATE TABLE IF NOT EXISTS `batchtest`.`job` (
`id` CHAR(24) NOT NULL ,
`alias` VARCHAR(255) NOT NULL ,
`status` INT NOT NULL DEFAULT 0 ,
`timestamp` TIMESTAMP NOT NULL ,
PRIMARY KEY (`id`) )
ENGINE = InnoDB;
CREATE TABLE IF NOT EXISTS `batchtest`.`queue` (
`id` INT UNSIGNED NOT NULL AUTO_INCREMENT ,
`job_id` CHAR(24) NOT NULL ,
`action` VARCHAR(255) NOT NULL ,
`params` TEXT NULL ,
`timestamp` TIMESTAMP NOT NULL ,
PRIMARY KEY (`id`) )
ENGINE = InnoDB;
Когда вы хотите приостановить / возобновить / прервать работу, добавьте строку в таблицу queue
с job_id
и action
(pause
, resume
или abort
) иработа ответит.Задание автоматически удалит выполненные команды из таблицы queue
.
В этом суть.
class BatchJob{
const STATUS_STARTING = 0;
const STATUS_RUNNING = 1;
const STATUS_PAUSED = 2;
const STATUS_ABORTED = 4;
const STATUS_COMPLETED = 5;
protected $_id = null;
protected $_alias = null;
protected $_pdo = null;
protected $_pauseSleep = null;
protected $_status = self::STATUS_STARTING;
protected $_jobTable = 'job';
protected $_queueTable = 'queue';
public function __construct($pdo, $alias){
$this->_pdo = $pdo;
$this->_alias = $alias;
$this->_id = vsprintf('%04x%04x%04x%04x%04x%04x', array(
mt_rand(0, 0xffff),
mt_rand(0, 0xffff),
mt_rand(0, 0xffff),
mt_rand(0, 0xffff),
mt_rand(0, 0xffff),
mt_rand(0, 0xffff),
));
$this->output("Initializing job");
$this->_pdo->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
$statement = $this->_pdo->prepare("INSERT INTO {$this->_jobTable} (id, alias, status) VALUES (:id, :alias, :status)");
$statement->execute(array(
':id' => $this->_id,
':alias' => $this->_alias,
':status' => $this->_status,
));
}
public function run($job, Array $data, $pauseSleep = 10){
$this->_pauseSleep = $pauseSleep;
$iteration = 0;
$this->updateStatus(self::STATUS_RUNNING);
while($this->_status != self::STATUS_ABORTED
&&$this->_status != self::STATUS_COMPLETED){
$statement = $this->_pdo->prepare("SELECT id, action, params FROM {$this->_queueTable} WHERE job_id = :job_id");
$statement->execute(array(
':job_id' => $this->_id,
));
foreach($statement->fetchAll() as $command){
switch($command['action']){
case 'resume':
$this->updateStatus(self::STATUS_RUNNING);
break;
case 'pause':
$this->updateStatus(self::STATUS_PAUSED);
break;
case 'abort':
$this->updateStatus(self::STATUS_ABORTED, true, false);
exit;
break;
}
$statement = $this->_pdo->prepare("DELETE FROM {$this->_queueTable} WHERE id = :id");
$statement->execute(array(
':id' => $command['id'],
));
}
if($this->_status == self::STATUS_PAUSED){
sleep($this->_pauseSleep);
continue;
}
call_user_func_array($job, (Array) current($data));
if(!next($data)){
$this->updateStatus(self::STATUS_COMPLETED, true, false);
}
}
}
protected function output($string){
echo ">>> [{$this->_alias}:{$this->_id}] [" . date('Y-m-d H:i:s') . "] {$string}" . PHP_EOL;
}
protected function updateStatus($status = null, $updateDatabase = true, $updateOutput = true){
if(!is_null($status)){
$this->_status = $status;
}
if($updateDatabase){
$statement = $this->_pdo->prepare("UPDATE {$this->_jobTable} SET status = :status WHERE id = :id");
$statement->execute(array(
':id' => $this->_id,
':status' => $this->_status,
));
}
if($updateOutput){
$reflection = new ReflectionClass(__CLASS__);
$statusCodes = array_flip($reflection->getConstants());
$this->output("Job status change [{$statusCodes[$this->_status]}]");
}
}
public function __destruct(){
$this->updateStatus();
}
}