Cassandra с PHP - при вызове cassandra-test.php я получаю «Вызов неопределенного метода CassandraClient :: batch_insert ()» - PullRequest
1 голос
/ 31 августа 2011

Я сейчас пытаюсь заставить Cassandra работать с PHP на Windows 7.Я установил cassandra и thrift ... Когда я вызываю cassandra-test.php, я получаю следующую ошибку:

( ! ) Fatal error: Call to undefined method

CassandraClient :: batch_insert () в C: \ xampp \ htdocs \YiiPlayground \ cassandra-test.php в строке 75 Стек вызовов # Время Функция памяти Расположение 1 0,0014 337552 {main} () .. \ cassandra-test.php: 0 2 0.0138 776232 CassandraDB-> InsertRecord () .. \ cassandra-test.php: 304

Файл cassandra-test.php выглядит следующим образом:

<?php
// CassandraDB version 0.1
// Software Projects Inc
// http://www.softwareprojects.com
//

// Includes
$GLOBALS['THRIFT_ROOT'] = 'C:/xampp/htdocs/Yii/kallaspriit-Cassandra-PHP-Client-Library/thrift';
//$GLOBALS['THRIFT_ROOT'] = realpath('E:/00-REGIESTART/Programme/Cassandra/thrift');
require_once $GLOBALS['THRIFT_ROOT'].'/packages/cassandra/Cassandra.php';
require_once $GLOBALS['THRIFT_ROOT'].'/packages/cassandra/cassandra_types.php';
require_once $GLOBALS['THRIFT_ROOT'].'/transport/TSocket.php';
require_once $GLOBALS['THRIFT_ROOT'].'/protocol/TBinaryProtocol.php';
require_once $GLOBALS['THRIFT_ROOT'].'/transport/TFramedTransport.php';
require_once $GLOBALS['THRIFT_ROOT'].'/transport/TBufferedTransport.php';

class CassandraDB
{
  // Internal variables
  protected $socket;
  protected $client;
  protected $keyspace;
  protected $transport;
  protected $protocol;
  protected $err_str = "";
  protected $display_errors = 0;
  protected $consistency = 1;
  protected $parse_columns = 1;

  // Functions

  // Constructor - Connect to Cassandra via Thrift
  function CassandraDB  ($keyspace, $host = "127.0.0.1", $port = 9160)
  {
    // Initialize
    $this->err_str = '';

    try
    {
    // Store passed 'keyspace' in object
    $this->keyspace = $keyspace;

    // Make a connection to the Thrift interface to Cassandra
    $this->socket = new TSocket($host, $port);
    $this->transport = new TFramedTransport($this->socket, 1024, 1024);
    $this->protocol = new TBinaryProtocolAccelerated($this->transport);
    $this->client = new CassandraClient($this->protocol);
    $this->transport->open();
    }
    catch (TException $tx)
    {
      // Error occured
      $this->err_str = $tx->why;
      $this->Debug($tx->why." ".$tx->getMessage());
    }
  }

  // Insert Column into ColumnFamily
  // (Equivalent to RDBMS Insert record to a table)
  function InsertRecord  ($table /* ColumnFamily */, $key /* ColumnFamily Key */, $record /* Columns */)
  {
    // Initialize
    $this->err_str = '';

    try
    {
      // Timestamp for update
      $timestamp = time();

      // Build batch mutation
      $cfmap = array();
      $cfmap[$table] = $this->array_to_supercolumns_or_columns($record, $timestamp);

      // Insert
      $this->client->batch_insert($this->keyspace, $key, $cfmap, $this->consistency);

      // If we're up to here, all is well
      $result = 1;
    }
    catch (TException $tx)
    {
      // Error occured
      $result = 0;
      $this->err_str = $tx->why;
      $this->Debug($tx->why." ".$tx->getMessage());
    }

    // Return result
    return $result;
  }

  // Insert SuperColumn into SuperColumnFamily
  // (Equivalent to RDMBS Insert record to a "nested table")
  function InsertRecordArray  ($table /* SuperColumnFamily */, $key_parent /* Super CF */,
                $record /* Columns */)
  {
    // Initialize
    $err_str = '';

    try
    {
      // Timestamp for update
      $timestamp = time();

      // Build batch mutation
      $cfmap = array();
      $cfmap[$table] = $this->array_to_supercolumns_or_columns($record, $timestamp);

      // Insert
      $this->client->batch_insert($this->keyspace, $key_parent, $cfmap, $this->consistency);

      // If we're up to here, all is well
      $result = 1;
    }
    catch (TException $tx)
    {
      // Error occured
      $result = 0;
      $this->err_str = $tx->why;
      $this->Debug($tx->why." ".$tx->getMessage());
    }

    // Return result
    return $result;
  }

  // Get record by key
  function GetRecordByKey  ($table /* ColumnFamily or SuperColumnFamily */, $key, $start_from="", $end_at="")
  {
    // Initialize
    $err_str = '';

    try
    {
      return $this->get($table, $key, NULL, $start_from, $end_at);
    }
    catch (TException $tx)
    {
      // Error occured
      $this->err_str = $tx->why;
      $this->Debug($tx->why." ".$tx->getMessage());
      return array();
    }
  }

  // Print debug message
  function Debug      ($str)
  {
    // If verbose is off, we're done
    if (!$this->display_errors) return;

    // Print
    echo date("Y-m-d h:i:s")." CassandraDB ERROR: $str\r\n";
  }

  // Turn verbose debug on/off (Default is off)
  function SetDisplayErrors($flag)
  {
    $this->display_errors = $flag;
  }

  // Set Consistency level (Default is 1)
  function SetConsistency  ($consistency)
  {
    $this->consistency = $consistency;
  }

  // Build cf array
  function array_to_supercolumns_or_columns($array, $timestamp=null)
  {
    if(empty($timestamp)) $timestamp = time();

    $ret = null;
    foreach($array as $name => $value) {
      $c_or_sc = new cassandra_ColumnOrSuperColumn();
      if(is_array($value)) {
        $c_or_sc->super_column = new cassandra_SuperColumn();
        $c_or_sc->super_column->name = $this->unparse_column_name($name, true);
        $c_or_sc->super_column->columns = $this->array_to_columns($value, $timestamp);
        $c_or_sc->super_column->timestamp = $timestamp;
      }
      else
      {
        $c_or_sc = new cassandra_ColumnOrSuperColumn();
        $c_or_sc->column = new cassandra_Column();
        $c_or_sc->column->name = $this->unparse_column_name($name, true);
        $c_or_sc->column->value = $value;
        $c_or_sc->column->timestamp = $timestamp;
      }
      $ret[] = $c_or_sc;
    }

    return $ret;
  }


  // Parse column names for Cassandra
  function parse_column_name($column_name, $is_column=true)
  {
    if(!$column_name) return NULL;

    return $column_name;
  }

  // Unparse column names for Cassandra
  function unparse_column_name($column_name, $is_column=true)
  {
    if(!$column_name) return NULL;

    return $column_name;
  }

  // Convert supercolumns or columns into an array
  function supercolumns_or_columns_to_array($array)
  {
    $ret = null;
    for ($i=0; $i<count($array); $i++)
    foreach ($array[$i] as $object)
    {
      if ($object)
      {
        // If supercolumn
        if (isset($object->columns))
        {
          $record = array();
          for ($j=0; $j<count($object->columns); $j++)
          {
            $column = $object->columns[$j];
            $record[$column->name] = $column->value;
          }
          $ret[$object->name] = $record;
        }
        // (Otherwise - not supercolumn)
        else
        {
          $ret[$object->name] = $object->value;
        }
      }
    }

    return $ret;
  }

  // Get record from Cassandra
  function get($table, $key, $super_column=NULL, $slice_start="", $slice_finish="")
  {
    try
    {
    $column_parent = new cassandra_ColumnParent();
    $column_parent->column_family = $table;
    $column_parent->super_column = $this->unparse_column_name($super_column, false);

    $slice_range = new cassandra_SliceRange();
    $slice_range->start = $slice_start;
    $slice_range->finish = $slice_finish;
    $predicate = new cassandra_SlicePredicate();
    $predicate->slice_range = $slice_range;

    $resp = $this->client->get_slice($this->keyspace, $key, $column_parent, $predicate, $this->consistency);

    return $this->supercolumns_or_columns_to_array($resp);
    }
    catch (TException $tx)
    {
    $this->Debug($tx->why." ".$tx->getMessage());
    return array();
    }
  }

  // Convert array to columns
  function array_to_columns($array, $timestamp=null) {
    if(empty($timestamp)) $timestamp = time();

    $ret = null;
    foreach($array as $name => $value) {
      $column = new cassandra_Column();
      $column->name = $this->unparse_column_name($name, false);
      $column->value = $value;
      $column->timestamp = $timestamp;

      $ret[] = $column;
    }
    return $ret;
  }

  // Get error string
  function ErrorStr()
  {
    return $this->err_str;
  }
} 


// Initialize Cassandra
$cassandra = new CassandraDB("SPI");

// Debug on
$cassandra->SetDisplayErrors(true);

// Insert record ("Columns" in Cassandra)
$record = array();
$record["name"] = "Mike Peters";
$record["email"] = "mike at softwareprojects.com";
if ($cassandra->InsertRecord('mytable', "Mike Peters", $record)) {
    echo "Record (Columns) inserted successfully.\r\n";
}

// Print record
$record = $cassandra->GetRecordByKey('mytable', "Mike Peters");
print_r($record);
?>

Есть идеи по этому поводу, как это исправить?Большое спасибо!

1 Ответ

1 голос
/ 02 сентября 2011

Вы действительно не хотите делать Thrift вручную, если вы можете избежать этого.Взгляните на библиотеку phpcassa: https://github.com/thobbs/phpcassa

О, и выше, похоже, что вы хотите «batch_mutate», а не «batch_insert» на ln.75. Этот метод изменил имена в версиях cassandra> 0.6.x

...