php通过thrift操作hbase
环境配置
操作系统 centos 5.8 hadoop版本cloudera cdh4u3 hbase版本hbase-0.90.4-cdh4u3 php版本5.2
1. 下载并编译thrift
# wget http://ftp.tc.edu.tw/pub/Apache/thrift/0.8.0/thrift-0.8.0.tar.gz
安装所需的依赖包
# yum install automake libtool flex bison pkgconfig gcc-c++ boost-devel libevent-devel zlib-devel python-devel ruby-devel php php-devel
# tar zxvf thrift-0.8.0.tar.gz
# cd thrift-0.8.0
# ./configure --prefix=/home/thrift --with-php-config=/usr/bin/php-config
# make && make install
2 生成php和hbase的接口文件:
# cd /home/thrift/
# bin/thrift --gen php $HBASE_HOME/src/main/resources/org/apache/hadoop/hbase/thrift/Hbase.thrift
# cd gen-php/Hbase
# ls
Hbase.php Hbase_types.php
3. 把PHP客户端需要的包及刚才生成的接口文件复制出来供php程序调用:
# mkdir -p /var/www/html/hbasethrift/libs (/var/www/html为apache的web主目录)
# cp -a /home/soft/thrift-0.8.0/lib/php/src /var/www/html/hbasethrift/libs
# mkdir -p /var/www/html/hbasethrift/libs/packages
# cp -a /home/thrift/gen-php/Hbase /var/www/html/hbasethrift/libs/packages
4. 启动hbase thrift server,测试php连接hbase
# ./bin/hbase-daemon.sh start thrift
hbase thrift 默认监听端口为9090
测试php连接与操作hbase代码
# vi hbasethrift.php
- php
- $GLOBALS['THRIFT_ROOT'] = '/home/www/html/hbasethrift/libs';
- require_once( $GLOBALS['THRIFT_ROOT'].'/Thrift.php' );
- require_once( $GLOBALS['THRIFT_ROOT'].'/transport/TSocket.php' );
- require_once( $GLOBALS['THRIFT_ROOT'].'/transport/TBufferedTransport.php' );
- require_once( $GLOBALS['THRIFT_ROOT'].'/protocol/TBinaryProtocol.php' );
- require_once( $GLOBALS['THRIFT_ROOT'].'/packages/Hbase/Hbase.php' );
- $socket = new TSocket( 'localhost', 9090 );
- $socket->setSendTimeout( 10000 ); // Ten seconds (too long for production, but this is just a demo ;)
- $socket->setRecvTimeout( 20000 ); // Twenty seconds
- $transport = new TBufferedTransport( $socket );
- $protocol = new TBinaryProtocol( $transport );
- $client = new HbaseClient( $protocol );
- $transport->open();
- echo nl2br( "listing tables...\n" );
- $tables = $client->getTableNames();
- sort( $tables );
- foreach ( $tables as $name ) {
- echo nl2br( " found: {$name}\n" );
- }
- $columns = array(
- new ColumnDescriptor( array(
- 'name' => 'entry:',
- 'maxVersions' => 10
- ) ),
- new ColumnDescriptor( array(
- 'name' => 'unused:'
- ) )
- );
- $t = "table1";
- echo( "creating table: {$t}\n" );
- try {
- $client->createTable( $t, $columns );
- } catch ( AlreadyExists $ae ) {
- echo( "WARN: {$ae->message}\n" );
- }
- $t = "test";
- echo( "column families in {$t}:\n" );
- $descriptors = $client->getColumnDescriptors( $t );
- asort( $descriptors );
- foreach ( $descriptors as $col ) {
- echo( " column: {$col->name}, maxVer: {$col->maxVersions}\n" );
- }
- $t = "table1";
- echo( "column families in {$t}:\n" );
- $descriptors = $client->getColumnDescriptors( $t );
- asort( $descriptors );
- foreach ( $descriptors as $col ) {
- echo( " column: {$col->name}, maxVer: {$col->maxVersions}\n" );
- }
- $t = "table1";
- $row = "row_name";
- $valid = "foobar-\xE7\x94\x9F\xE3\x83\x93";
- $mutations = array(
- new Mutation( array(
- 'column' => 'entry:foo',
- 'value' => $valid
- ) ),
- );
- // 多记录批量提交(200提交一次时测试小记录大概在5000/s左右): $rows = array('timestamp'=>$timestamp, 'columns'=>array('txt:col1'=>$col1, 'txt:col2'=>$col2, 'txt:col3'=>$col3)); $records = array(rowkey=>$rows,...); $batchrecord = array(); foreach ($records as $rowkey => $rows) { $timestamp = $rows['timestamp']; $columns = $rows['columns']; // 生成一条记录 $record = array(); foreach($columns as $column => $value) { $col = new Mutation(array('column'=>$column, 'value'=>$value)); array_push($record, $col); } // 加入记录数组 $batchTmp = new BatchMutation(array('row'=>$rowkey, 'mutations'=>$record)); array_push($batchrecord, $batchTmp); } $ret = $hbase->mutateRows('test', $batchrecord);
- $client->mutateRow( $t, $row, $mutations );
- $table_name = "table1";
- $row_name = 'row_name';
- $fam_col_name = 'entry:foo';
- $arr = $client->get($table_name, $row_name , $fam_col_name);
- // $arr = array
- foreach ( $arr as $k=>$v ) {
- // $k = TCell
- echo ("value = {$v->value} , <br> ");
- echo ("timestamp = {$v->timestamp} <br>");
- }
- $table_name = "table1";
- $row_name = "row_name";
- $arr = $client->getRow($table_name, $row_name);
- // $client->getRow return a array
- foreach ( $arr as $k=>$TRowResult ) {
- // $k = 0 ; non-use
- // $TRowResultTRowResult = TRowResult
- var_dump($TRowResult);
- }
- //scannerOpenWithStop($tableName, $startRow, $stopRow, $columns);
- $table_name = 'zTest';
- $startRow="9-9-20120627-";
- $stopRow="9-9-20120627_";
- $columns = Array ('info:');
- $result =$client->scannerOpenWithStop($table_name,$startRow,$stopRow,$columns);
- while (true) {
- $record = $client->scannerGet($result);
- if ($record == NULL) {
- break;
- }
- foreach($record as $TRowResult) {
- $row = $TRowResult->row;
- $column = $TRowResult->columns;
- foreach($column as $family_column=>$Tcell){
- echo("$family_column={$Tcell->value}
"); - echo("timestamp is $Tcell->timestamp");
- }
- }
- }
- $transport->close();
- ?>
通过浏览器访问http://localhost/hbasethrift/hbasethrift.php,如果显示hbase中的表名与新建表table1 ,说明连接成功。
hbase thrift api 参考http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/thrift/doc-files/index.html
参考http://www.banping.com/2011/07/08/hbase-thrift-php/