Мне нужна помощь для извлечения данных из потокового потока.Извините за длину кода, я удалил как можно больше ненужных битов.Итак, у меня есть запрос на керлинг, который загружается с постоянного ценового ресурса Форекс.Вот код, который я использую.Очевидно, я удалил кредиты.
<?php
class Oanda
{
public $socket = null;
public $baseUrl = null;
public $version = null;
public $auth_token = null;
public $accounts = array();
public $fp = null;
public $tickData = null;
function __construct() {
$this->baseUrl = "https://api-fxpractice.oanda.com";
$this->streamUrl = "https://stream-fxpractice.oanda.com/";
$this->version = "v3";
$this->auth_token = "xxxxxxxxxx-yyyyyyyyyyyyyy";
$this->accounts = array('x-x-x-x-x-x-x-x');
}
public function valid($jsonObject, $verbose=FALSE, $message=FALSE) {
//Return boolean false if object has been corrupted or has error messages/codes included
if (isset($jsonObject->code)) {
if ($verbose && isset($jsonObject->message))
echo 'OandaWrap: Invalid object. ' . $jsonObject->message . ' ';
return false;
}
if (isset($jsonObject) === false || $jsonObject === false || empty($jsonObject)) {
if ($verbose && $message)
echo 'OandaWrap: Error. ' . $message . ' ';
return false;
}
return $jsonObject;
}
protected function data_decode($data) {
//Return decoded data
if (! $this->valid($data)) {
//Return a stdObj with failure codes and message
$failure = new stdClass();
$failure->code = -1;
$failure->message = 'OandaWrap throws curl error: ' . curl_error($this->socket);
return $failure;
}
return json_decode($data);
}
protected function authenticate($curl) {
//Authenticate our curl object
$headers = array('Content-Type: application/json',
'Accept-Datetime-Format: UNIX',
'Authorization: Bearer '. $this->auth_token);
curl_setopt($curl, CURLOPT_HTTPHEADER, $headers);
}
protected function configure($curl) {
//Configure default connection settings
curl_setopt($curl, CURLOPT_RETURNTRANSFER, true); //We want the data returned as a variable
curl_setopt($curl, CURLOPT_TIMEOUT, 10); //Maximum wait before timeout
$this->authenticate($curl); //Authenticate our socket
return $curl;
}
protected function socket_new() {
return $this->configure($socket = curl_init());
}
protected function socket() {
//Return our active socket for reuse
if (! $this->valid($this->socket))
$this->socket = $this->socket_new();
return $this->socket;
}
protected function stream($index, $endPoints=false, $queryData=false) {
//Send a GET request to Oanda
$queryData = ($queryData ? $queryData : array());
$endPoints = ($endPoints ? $endPoints : array());
$curl = $this->socket();
curl_setopt($curl, CURLOPT_HEADER, 0);
curl_setopt($curl, CURLOPT_CONNECTTIMEOUT, 0);
curl_setopt($curl, CURLOPT_TIMEOUT, 0);
curl_setopt($curl, CURLOPT_BUFFERSIZE, 256);
curl_setopt($curl, CURLOPT_RETURNTRANSFER, true);
curl_setopt($curl, CURLOPT_WRITEFUNCTION, array($this, "writeFunction"));
//curl_setopt($curl, CURLOPT_FILE, $this->fp); // Data will be sent to our stream ;-)
curl_setopt($curl, CURLOPT_URL, $this->streamUrl .'/'. $this->version .'/'. $index . ($endPoints ? '/'. implode('/', $endPoints) : '') . ($queryData ? '?' . http_build_query($queryData) : ''));
echo $this->baseUrl .'/'. $this->version .'/'. $index . ($endPoints ? '/'. implode('/', $endPoints) : '') . ($queryData ? '?' . http_build_query($queryData) : '') ."\n\n";
return $this->data_decode(curl_exec($curl)); //Launch and store decrypted data
}
public function writeFunction($curl, $dataIn){
$data = json_decode($dataIn);
if(!empty($data)):
if($data->type == "PRICE"):
$myfile = fopen('tickData.txt', "a");
fwrite($myfile, $dataIn);
fclose($myfile);
elseif($data->type == "HEARTBEAT"):
echo "Heartbeat\n";
endif;
else:
echo "No data!";
endif;
//ob_flush();
flush();
return strlen($dataIn);
}
public function GetPriceStreamByPair($account_id, $instruments)
{
return $this->stream("accounts",[$account_id,"pricing","stream"],["instruments" => $instruments]);
}
}
Я пытаюсь перевести данные в пригодный для использования формат.В идеале это класс, который манипулирует данными по мере их поступления.В настоящее время я пишу это в файл, который затем читается с использованием fgets в основной поток.Очевидно, что это не правильное решение.Казалось бы, тикданные часто теряются при удалении файла.
Я запускаю эти потоки данных с помощью php pthreads:
<?php
require("Oanda.class.php");
$o = new Oanda();
class Streamer extends Worker
{
public function run()
{
echo 'Running '.$this->getStacked().' jobs'.PHP_EOL;
}
}
class PricingMT extends Threaded {
public $o = null;
public $trading_pair = "";
public function __construct($trading_pair)
{
$this->trading_pair = $trading_pair;
}
public function run() {
$this->o->GetPriceStreamByPair($this->o->accounts[0], $this->trading_pair);
}
}
$worker = new Streamer();
$pairs = ['GBP_JPY'];
foreach ($pairs as $pair) :
$priceData = new PricingMT($pair);
$priceData->o = $o;
$worker->stack($priceData);
endforeach;
// Start all jobs
$worker->start();
while(true):
$myfile = fopen("tickData.txt", "rw");
while(! feof ($myfile))
{
echo fgets($myfile);
}
fclose($myfile);
file_put_contents("tickData.txt","");
sleep(0.5);
endwhile;
?>
У меня нет понимания системы потоков на самом делеусугубляется тем, что я не очень хорошо разбираюсь в ООП, либо просто не получаю свои данные.Любое направление будет высоко ценится!
Спасибо