Многопоточный PHP-поток curl - Forex Auto Trading - PullRequest
1 голос
/ 19 марта 2019

Мне нужна помощь для извлечения данных из потокового потока.Извините за длину кода, я удалил как можно больше ненужных битов.Итак, у меня есть запрос на керлинг, который загружается с постоянного ценового ресурса Форекс.Вот код, который я использую.Очевидно, я удалил кредиты.

<?php

    class Oanda
    {

        public $socket = null;
        public $baseUrl = null;
        public $version = null;
        public $auth_token = null;
        public $accounts = array();

        public $fp = null;
        public $tickData = null;


        function __construct() {        

            $this->baseUrl = "https://api-fxpractice.oanda.com";
            $this->streamUrl = "https://stream-fxpractice.oanda.com/";
            $this->version = "v3";
            $this->auth_token = "xxxxxxxxxx-yyyyyyyyyyyyyy";
            $this->accounts = array('x-x-x-x-x-x-x-x');

        }   


        public function valid($jsonObject, $verbose=FALSE, $message=FALSE) {
            //Return boolean false if object has been corrupted or has error messages/codes included
            if (isset($jsonObject->code)) {
                if ($verbose && isset($jsonObject->message))
                    echo 'OandaWrap: Invalid object. ' . $jsonObject->message . ' ';
                return false;
            }
            if (isset($jsonObject) === false || $jsonObject === false || empty($jsonObject)) {
                if ($verbose && $message)
                    echo 'OandaWrap: Error. ' . $message . ' ';
                return false;
            }
            return $jsonObject;
        }


        protected function data_decode($data) {
            //Return decoded data
            if (! $this->valid($data)) {
                //Return a stdObj with failure codes and message
                $failure = new stdClass();
                $failure->code = -1;
                $failure->message = 'OandaWrap throws curl error: ' . curl_error($this->socket);

                return $failure;
            }

            return json_decode($data);
        }

        protected function authenticate($curl) {
            //Authenticate our curl object
            $headers = array('Content-Type: application/json',          
                             'Accept-Datetime-Format: UNIX',            
                             'Authorization: Bearer '. $this->auth_token);

            curl_setopt($curl, CURLOPT_HTTPHEADER, $headers);
        }


        protected function configure($curl) {
            //Configure default connection settings
            curl_setopt($curl, CURLOPT_RETURNTRANSFER, true);               //We want the data returned as a variable
            curl_setopt($curl, CURLOPT_TIMEOUT, 10);                        //Maximum wait before timeout
            $this->authenticate($curl);                                 //Authenticate our socket
            return $curl;
        }


        protected function socket_new() {
            return $this->configure($socket = curl_init());
        }


        protected function socket() {
            //Return our active socket for reuse
            if (! $this->valid($this->socket))
                $this->socket = $this->socket_new();
            return $this->socket;
        }


        protected function stream($index, $endPoints=false, $queryData=false) {
            //Send a GET request to Oanda
            $queryData = ($queryData ? $queryData : array());
            $endPoints = ($endPoints ? $endPoints : array());
            $curl = $this->socket();


            curl_setopt($curl, CURLOPT_HEADER, 0);
            curl_setopt($curl, CURLOPT_CONNECTTIMEOUT, 0);
            curl_setopt($curl, CURLOPT_TIMEOUT, 0);
            curl_setopt($curl, CURLOPT_BUFFERSIZE, 256);
            curl_setopt($curl, CURLOPT_RETURNTRANSFER, true);
            curl_setopt($curl, CURLOPT_WRITEFUNCTION, array($this, "writeFunction"));       
            //curl_setopt($curl, CURLOPT_FILE, $this->fp);    // Data will be sent to our stream ;-)
            curl_setopt($curl, CURLOPT_URL, $this->streamUrl .'/'. $this->version .'/'. $index . ($endPoints ? '/'. implode('/', $endPoints) : '') . ($queryData ? '?' . http_build_query($queryData) : ''));

            echo $this->baseUrl .'/'. $this->version .'/'. $index . ($endPoints ? '/'. implode('/', $endPoints) : '') . ($queryData ? '?' . http_build_query($queryData) : '') ."\n\n";

            return $this->data_decode(curl_exec($curl));         //Launch and store decrypted data
        }

        public function writeFunction($curl, $dataIn){

            $data = json_decode($dataIn);

            if(!empty($data)):
                if($data->type == "PRICE"):

                    $myfile = fopen('tickData.txt', "a");
                    fwrite($myfile, $dataIn);
                    fclose($myfile);

                elseif($data->type == "HEARTBEAT"):
                    echo "Heartbeat\n";
                endif;

            else:
                echo "No data!";
            endif;

            //ob_flush();
            flush();
            return strlen($dataIn);
        }

        public function GetPriceStreamByPair($account_id, $instruments)
        {
            return $this->stream("accounts",[$account_id,"pricing","stream"],["instruments" => $instruments]);
        }

    }

Я пытаюсь перевести данные в пригодный для использования формат.В идеале это класс, который манипулирует данными по мере их поступления.В настоящее время я пишу это в файл, который затем читается с использованием fgets в основной поток.Очевидно, что это не правильное решение.Казалось бы, тикданные часто теряются при удалении файла.

Я запускаю эти потоки данных с помощью php pthreads:

<?php

require("Oanda.class.php");
$o = new Oanda();

class Streamer extends Worker
{
    public function run()
    {
        echo 'Running '.$this->getStacked().' jobs'.PHP_EOL;
    }
}


class PricingMT extends Threaded {


    public $o = null;
    public $trading_pair = "";

    public function __construct($trading_pair)
    {
        $this->trading_pair = $trading_pair;
    }

    public function run() {
        $this->o->GetPriceStreamByPair($this->o->accounts[0], $this->trading_pair);
   }
}



$worker   = new Streamer();

$pairs = ['GBP_JPY'];

foreach ($pairs as $pair) :
    $priceData = new PricingMT($pair);
    $priceData->o = $o;
    $worker->stack($priceData);
endforeach;

// Start all jobs
$worker->start();



while(true):

    $myfile = fopen("tickData.txt", "rw");

    while(! feof ($myfile)) 
    { 
        echo fgets($myfile); 
    }

    fclose($myfile);
    file_put_contents("tickData.txt","");

    sleep(0.5);

endwhile;

?>

У меня нет понимания системы потоков на самом делеусугубляется тем, что я не очень хорошо разбираюсь в ООП, либо просто не получаю свои данные.Любое направление будет высоко ценится!

Спасибо

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...