многопоточность бэкэнда в PHP 7 (Symfony4) - PullRequest
0 голосов
/ 10 января 2019

(я читаю другие вопросы, но они относятся к более старым версиям PHP или многопоточности внешнего интерфейса)

У меня есть приложение PHP / PostgreSQL, которое имеет сложную часть обработки бэкэнда. По сути, существует очень большой цикл (несколько тысяч итераций), повторяющий одни и те же данные снова и снова (с перестановками). В каждом цикле считываются одни и те же данные, применяются операции, результат записывается обратно в базу данных. Циклы полностью независимы друг от друга, результаты не сохраняются между циклами. Фактически, чтобы очистить кэш-память объекта (используя Doctrine), я очищаю кэш каждые 100 или около того циклов.

Итак, по сути, у меня есть:

for ($i=0; $i<5000; $i++) {
   // fetch data
   // manipulate data
   // write results to a different table
}

Во время этих циклов исходные данные никогда не затрагиваются, заполняются только несколько таблиц результатов.

В настоящее время это занимает несколько минут. Мне кажется, как учебник пример параллельной обработки.

Каков наилучший способ поместить это в несколько угроз? Меня не волнует порядок выполнения или даже если рабочая нагрузка распределяется равномерно (по характеру операций с данными, если все потоки запускают одинаковое количество циклов, они должны иметь более или менее одинаковую рабочую нагрузку). Все, что я хочу, это использовать больше процессорных ядер.

Я сделал многопоточность в PHP 5, и это было ... ну ... не идеально. Работоспособно, но сложно. Улучшилось ли это в PHP 7? Есть ли относительно простой способ сказать "for (...) и запустить его в n потоков"?

Если это имеет значение, приложение написано на Symfony4, и этот внутренний процесс вызывается через консольную команду.

1 Ответ

0 голосов
/ 10 января 2019

Существует расширение pthreads , которое было переписано для упрощения использования в v3. Он поддерживается в PHP 7.2+ и позволяет создавать многопоточные приложения на PHP.

В качестве альтернативы, поскольку вы используете Symfony - вы можете написать простую консольную команду, которая может использовать компонент Process для запуска подпроцессов как отдельных процессов ОС. Вот пример такого бегуна из реального проекта:

<?php

namespace App\Command;

use App\Command\Exception\StopCommandException;
use Symfony\Component\Console\Command\LockableTrait;
use Symfony\Component\Console\Exception\InvalidArgumentException;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\HttpKernel\KernelInterface;
use Symfony\Component\Process\Exception\RuntimeException;
use Symfony\Component\Process\PhpExecutableFinder;
use Symfony\Component\Process\Process;
use Webmozart\PathUtil\Path;

class ProcessingRunner extends AbstractCommand
{
    use LockableTrait;
    /**
     * @var Process[]
     */
    private $processes = [];
    /**
     * @var string[]
     */
    private $cmd;
    /**
     * @var KernelInterface
     */
    private $kernel;

    /**
     * @param KernelInterface $kernel
     */
    public function __construct(KernelInterface $kernel)
    {
        parent::__construct();
        $this->kernel = $kernel;
    }

    /**
     * {@inheritdoc}
     * @throws InvalidArgumentException
     */
    protected function configure(): void
    {
        $this
            ->setName('app:processing:runner')
            ->setDescription('Run processing into multiple threads')
            ->addOption('threads', 't', InputOption::VALUE_REQUIRED, 'Number of threads to run at once', 1)
            ->addOption('at-once', 'm', InputOption::VALUE_REQUIRED, 'Amount of items to process at once', 10);
    }

    /**
     * {@inheritdoc}
     * @throws \Symfony\Component\Process\Exception\LogicException
     * @throws InvalidArgumentException
     * @throws RuntimeException
     * @throws \Symfony\Component\DependencyInjection\Exception\InvalidArgumentException
     * @throws \InvalidArgumentException
     * @throws \LogicException
     */
    protected function execute(InputInterface $input, OutputInterface $output): ?int
    {
        if (!$this->lock()) {
            $output->writeln('The command is already running in another process.');
            return 0;
        }
        if (extension_loaded('pcntl')) {
            $stop = function () {
                StopCommandException::throw();
            };
            pcntl_signal(SIGTERM, $stop);
            pcntl_signal(SIGINT, $stop);
            pcntl_async_signals(true);
        }
        do {
            try {
                while (\count($this->processes) < $this->getInput()->getOption('threads')) {
                    $process = $this->createProcess();
                    $process->start();
                    $this->processes[] = $process;
                }
                $this->processes = array_filter($this->processes, function (Process $p) {
                    return $p->isRunning();
                });
                usleep(1000);
            } catch (StopCommandException $e) {
                try {
                    defined('SIGKILL') || define('SIGKILL', 9);
                    array_map(function (Process $p) {
                        $p->signal(SIGKILL);
                    }, $this->processes);
                } catch (\Throwable $e) {

                }
                break;
            }
        } while (true);
        $this->release();
        return 0;
    }

    /**
     * @return Process
     * @throws RuntimeException
     * @throws \Symfony\Component\DependencyInjection\Exception\InvalidArgumentException
     * @throws \InvalidArgumentException
     * @throws \LogicException
     * @throws InvalidArgumentException
     */
    private function createProcess(): Process
    {
        if (!$this->cmd) {
            $phpBinaryPath = (new PhpExecutableFinder())->find();
            $this->cmd = [
                $phpBinaryPath,
                '-f',
                Path::makeAbsolute('bin/console', $this->kernel->getProjectDir()),
                '--',
                'app:processing:worker',
                '-e',
                $this->kernel->getEnvironment(),
                '-m',
                $this->getInput()->getOption('at-once'),
            ];
        }
        return new Process($this->cmd);
    }
}
...