在基于HBase数据库的开发中,对应Java语言来说,可以直接使用HBase的原生API来操作HBase表数据,当然你要是不嫌麻烦可以使用Thrift客户端Java API,这里有我曾经使用过的 HBase Thrift客户端Java API实践,可以参考。对于具有其他编程语言背景的开发人员,为了获取HBase带来的好处,那么就可以选择使用HBase Thrift客户端对应编程语言的API,来实现与HBase的交互。
这里,我们使用C#客户端来操作HBase。HBase的Thrift接口的定义,可以通过链接http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/resources/org/apache/hadoop/hbase/thrift/Hbase.thrift?view=markup看到,我们需要安装Thrift编译器,才能生成HBase跨语言的API,这里,我使用的版本是0.9.0。需要注意的是,一定要保证,安装了某个版本Thrift的Thrift编译器,在导入对应语言库的时候,版本一定要统一,否则就会出现各种各样的问题,因为不同Thrift版本,对应编程语言的库API可能有变化。
首先,下载上面链接的内容,保存为Hbase.thrift。
然后,执行如下命令,生成C#编程语言的HBase Thrift客户端API:
1 |
[hadoop@master hbase]$ thrift --gen csharp Hbase.thrift |
2 |
[hadoop@master hbase]$ ls
|
这里,我们基于C#语言,使用HBase 的Thrift 客户端API访问HBase表。事实上,如果使用Java来实现对HBase表的操作,最好是使用HBase的原生API,无论从性能还是便利性方面,都会提供更好的体验。使用Thrift API访问,实际也是在HBase API之上进行了一层封装,可能初次使用Thrift API感觉很别扭,有时候还要参考Thrift服务端的实现代码。
准备工作如下:
-
- 下载Thrift软件包,解压缩后,拷贝thrift-0.9.0/lib/java/src下面的代码到工作区(开发工具中)
- 将上面生成的gen-csharp目录中代码拷贝到工作区
- 保证HBase集群正常运行,接着启动HBase的Thrift服务,执行如下命令:
1 |
bin/hbase thrift -b master -p 9090 start |
上面,HBase的Thrift服务端口为9090,下面通过Thrift API访问的时候,需要用到,而不是HBase的服务端口(默认60000)。
接着,实现一个简单的例子,访问Hbase表。
首先,我们通过HBase Shell创建一个表:
1 |
create 'test_info' , 'info'
|
表名为test_info,列簇名称为info。
然后,我们开始基于上面生成的Thrift代码来实现对HBase表的操作。
这里,我们实际上是对HBase Thrift客户端Java API实践中的Java代码进行了翻译,改写成C#语言的相关操作。我们在客户端,进行了一层抽象,更加便于传递各种参数,抽象类为AbstractHBaseThriftService,对应的命名空间为HbaseThrift.HBase.Thrift,该类实现代码如下所示:
02 |
using System.Collections.Generic;
|
05 |
using System.Threading.Tasks;
|
07 |
using Thrift.Transport;
|
08 |
using Thrift.Protocol;
|
10 |
namespace HbaseThrift.HBase.Thrift
|
12 |
public abstract class AbstractHBaseThriftService
|
14 |
protected static readonly string CHARSET = "UTF-8" ;
|
15 |
private string host = "localhost" ;
|
16 |
private int port = 9090;
|
17 |
private readonly TTransport transport;
|
18 |
protected readonly Hbase.Client client;
|
20 |
public AbstractHBaseThriftService() : this ( "localhost" , 9090)
|
25 |
public AbstractHBaseThriftService( string host, int port)
|
29 |
transport = new TSocket(host, port);
|
30 |
TProtocol protocol = new TBinaryProtocol(transport, true , true );
|
31 |
client = new Hbase.Client(protocol);
|
35 |
if (transport != null )
|
43 |
if (transport != null )
|
49 |
public abstract List< string > GetTables();
|
51 |
public abstract void Update( string table, string rowKey, bool writeToWal,
|
52 |
string fieldName, string fieldValue, Dictionary< string , string > attributes);
|
53 |
public abstract void Update( string table, string rowKey, bool writeToWal,
|
54 |
Dictionary< string , string > fieldNameValues, Dictionary< string , string > attributes);
|
56 |
public abstract void DeleteCell( string table, string rowKey, bool writeToWal,
|
57 |
string column, Dictionary< string , string > attributes);
|
58 |
public abstract void DeleteCells( string table, string rowKey, bool writeToWal,
|
59 |
List< string > columns, Dictionary< string , string > attributes);
|
61 |
public abstract void DeleteRow( string table, string rowKey,
|
62 |
Dictionary< string , string > attributes);
|
64 |
public abstract int ScannerOpen( string table, string startRow, List< string > columns,
|
65 |
Dictionary< string , string > attributes);
|
66 |
public abstract int ScannerOpen( string table, string startRow, string stopRow, List< string > columns,
|
67 |
Dictionary< string , string > attributes);
|
68 |
public abstract int ScannerOpenWithPrefix( string table, string startAndPrefix,
|
69 |
List< string > columns, Dictionary< string , string > attributes);
|
70 |
public abstract int ScannerOpenTs( string table, string startRow,
|
71 |
List< string > columns, long timestamp, Dictionary< string , string > attributes);
|
72 |
public abstract int ScannerOpenTs( string table, string startRow, string stopRow,
|
73 |
List< string > columns, long timestamp, Dictionary< string , string > attributes);
|
75 |
public abstract List<TRowResult> ScannerGetList( int id, int nbRows);
|
76 |
public abstract List<TRowResult> ScannerGet( int id);
|
78 |
public abstract List<TRowResult> GetRow( string table, string row,
|
79 |
Dictionary< string , string > attributes);
|
80 |
public abstract List<TRowResult> GetRows( string table,
|
81 |
List< string > rows, Dictionary< string , string > attributes);
|
82 |
public abstract List<TRowResult> GetRowsWithColumns( string table,
|
83 |
List< string > rows, List< string > columns, Dictionary< string , string > attributes);
|
85 |
public abstract void ScannerClose( int id);
|
88 |
* Iterate result rows(just for test purpose)
|
91 |
public abstract void IterateResults(TRowResult result);
|
这里,简单叙述一下,我们提供的客户端API的基本功能:
- 建立到Thrift服务的连接:Open()
- 获取到HBase中的所有表名:GetTables()
- 更新HBase表记录:Update()
- 删除HBase表中一行的记录的数据(cell):DeleteCell()和DeleCells()
- 删除HBase表中一行记录:deleteRow()
- 打开一个Scanner,返回id:ScannerOpen()、ScannerOpenWithPrefix()和ScannerOpenTs();然后用返回的id迭代记录:ScannerGetList()和ScannerGet()
- 获取一行记录结果:GetRow()、GetRows()和GetRowsWithColumns()
- 关闭一个Scanner:ScannerClose()
- 迭代结果,用于调试:IterateResults()
比如,我们想要实现分页的逻辑,可能和传统的关系型数据库操作有些不同。基于HBase表的实现是,首先打开一个Scanner实例(例如调用ScannerOpen()),返回一个id,然后再使用该id,调用ScannerGetList()方法(可以指定每次返回几条记录的变量nbRows的值),返回一个记录列表,反复调用该ScannerGetList()方法,直到此次没有结果返回为止。后面会通过测试用例来实际体会。
现在,我们基于上抽象出来的客户端操作接口,给出一个基本的实现,代码如下所示:
002 |
using System.Collections.Generic;
|
005 |
using System.Threading.Tasks;
|
007 |
namespace HbaseThrift.HBase.Thrift
|
009 |
class HBaseThriftClient : AbstractHBaseThriftService
|
011 |
public HBaseThriftClient() : this ( "localhost" , 9090)
|
016 |
public HBaseThriftClient( string host, int port) : base (host, port)
|
021 |
public override List< string > GetTables()
|
023 |
List< byte []> tables = client.getTableNames();
|
024 |
List<String> list = new List<String>();
|
025 |
foreach ( byte [] table in tables)
|
027 |
list.Add(Decode(table));
|
032 |
public override void Update( string table, string rowKey, bool writeToWal, string fieldName, string fieldValue, Dictionary< string , string > attributes)
|
034 |
byte [] tableName = Encode(table);
|
035 |
byte [] row = Encode(rowKey);
|
036 |
Dictionary< byte [], byte []> encodedAttributes = EncodeAttributes(attributes);
|
037 |
List<Mutation> mutations = new List<Mutation>();
|
038 |
Mutation mutation = new Mutation();
|
039 |
mutation.IsDelete = false ;
|
040 |
mutation.WriteToWAL = writeToWal;
|
041 |
mutation.Column = Encode(fieldName);
|
042 |
mutation.Value = Encode(fieldValue);
|
043 |
mutations.Add(mutation);
|
044 |
client.mutateRow(tableName, row, mutations, encodedAttributes);
|
047 |
public override void Update( string table, string rowKey, bool writeToWal, Dictionary< string , string > fieldNameValues, Dictionary< string , string > attributes)
|
049 |
byte [] tableName = Encode(table);
|
050 |
byte [] row = Encode(rowKey);
|
051 |
Dictionary< byte [], byte []> encodedAttributes = EncodeAttributes(attributes);
|
052 |
List<Mutation> mutations = new List<Mutation>();
|
053 |
foreach (KeyValuePair<String, String> pair in fieldNameValues)
|
055 |
Mutation mutation = new Mutation();
|
056 |
mutation.IsDelete = false ;
|
057 |
mutation.WriteToWAL = writeToWal;
|
058 |
mutation.Column = Encode(pair.Key);
|
059 |
mutation.Value = Encode(pair.Value);
|
060 |
mutations.Add(mutation);
|
062 |
client.mutateRow(tableName, row, mutations, encodedAttributes);
|
065 |
public override void DeleteCell( string table, string rowKey, bool writeToWal, string column, Dictionary< string , string > attributes)
|
067 |
byte [] tableName = Encode(table);
|
068 |
byte [] row = Encode(rowKey);
|
069 |
Dictionary< byte [], byte []> encodedAttributes = EncodeAttributes(attributes);
|
070 |
List<Mutation> mutations = new List<Mutation>();
|
071 |
Mutation mutation = new Mutation();
|
072 |
mutation.IsDelete = true ;
|
073 |
mutation.WriteToWAL = writeToWal;
|
074 |
mutation.Column = Encode(column);
|
075 |
mutations.Add(mutation);
|
076 |
client.mutateRow(tableName, row, mutations, encodedAttributes);
|
079 |
public override void DeleteCells( string table, string rowKey, bool writeToWal, List< string > columns, Dictionary< string , string > attributes)
|
081 |
byte [] tableName = Encode(table);
|
082 |
byte [] row = Encode(rowKey);
|
083 |
Dictionary< byte [], byte []> encodedAttributes = EncodeAttributes(attributes);
|
084 |
List<Mutation> mutations = new List<Mutation>();
|
085 |
foreach ( string column in columns)
|
087 |
Mutation mutation = new Mutation();
|
088 |
mutation.IsDelete = true ;
|
089 |
mutation.WriteToWAL = writeToWal;
|
090 |
mutation.Column = Encode(column);
|
091 |
mutations.Add(mutation);
|
093 |
client.mutateRow(tableName, row, mutations, encodedAttributes);
|
096 |
public override void DeleteRow( string table, string rowKey, Dictionary< string , string > attributes)
|
098 |
byte [] tableName = Encode(table);
|
099 |
byte [] row = Encode(rowKey);
|
100 |
Dictionary< byte [], byte []> encodedAttributes = EncodeAttributes(attributes);
|
101 |
client.deleteAllRow(tableName, row, encodedAttributes);
|
104 |
public override int ScannerOpen( string table, string startRow, List< string > columns, Dictionary< string , string > attributes)
|
106 |
byte [] tableName = Encode(table);
|
107 |
byte [] start = Encode(startRow);
|
108 |
List< byte []> encodedColumns = EncodeStringList(columns);
|
109 |
Dictionary< byte [], byte []> encodedAttributes = EncodeAttributes(attributes);
|
110 |
return client.scannerOpen(tableName, start, encodedColumns, encodedAttributes);
|
113 |
public override int ScannerOpen( string table, string startRow, string stopRow, List< string > columns, Dictionary< string , string > attributes)
|
115 |
byte [] tableName = Encode(table);
|
116 |
byte [] start = Encode(startRow);
|
117 |
byte [] stop = Encode(stopRow);
|
118 |
List< byte []> encodedColumns = EncodeStringList(columns);
|
119 |
Dictionary< byte [], byte []> encodedAttributes = EncodeAttributes(attributes);
|
120 |
return client.scannerOpenWithStop(tableName, start, stop, encodedColumns, encodedAttributes);
|
123 |
public override int ScannerOpenWithPrefix( string table, string startAndPrefix, List< string > columns, Dictionary< string , string > attributes)
|
125 |
byte [] tableName = Encode(table);
|
126 |
byte [] prefix = Encode(startAndPrefix);
|
127 |
List< byte []> encodedColumns = EncodeStringList(columns);
|
128 |
Dictionary< byte [], byte []> encodedAttributes = EncodeAttributes(attributes);
|
129 |
return client.scannerOpenWithPrefix(tableName, prefix, encodedColumns, encodedAttributes);
|
132 |
public override int ScannerOpenTs( string table, string startRow, List< string > columns, long timestamp, Dictionary< string , string > attributes)
|
134 |
byte [] tableName = Encode(table);
|
135 |
byte [] start = Encode(startRow);
|
136 |
List< byte []> encodedColumns = EncodeStringList(columns);
|
137 |
Dictionary< byte [], byte []> encodedAttributes = EncodeAttributes(attributes);
|
138 |
return client.scannerOpenTs(tableName, start, encodedColumns, timestamp, encodedAttributes);
|
141 |
public override int ScannerOpenTs( string table, string startRow, string stopRow, List< string > columns, long timestamp, Dictionary< string , string > attributes)
|
143 |
byte [] tableName = Encode(table);
|
144 |
byte [] start = Encode(startRow);
|
145 |
byte [] stop = Encode(stopRow);
|
146 |
List< byte []> encodedColumns = EncodeStringList(columns);
|
147 |
Dictionary< byte [], byte []> encodedAttributes = EncodeAttributes(attributes);
|
148 |
return client.scannerOpenWithStopTs(tableName, start, stop, encodedColumns, timestamp, encodedAttributes);
|
151 |
public override List<TRowResult> ScannerGetList( int id, int nbRows)
|
153 |
return client.scannerGetList(id, nbRows);
|
156 |
public override List<TRowResult> ScannerGet( int id)
|
158 |
return client.scannerGet(id);
|
161 |
public override List<TRowResult> GetRow( string table, string row, Dictionary< string , string > attributes)
|
163 |
byte [] tableName = Encode(table);
|
164 |
byte [] startRow = Encode(row);
|
165 |
Dictionary< byte [], byte []> encodedAttributes = EncodeAttributes(attributes);
|
166 |
return client.getRow(tableName, startRow, encodedAttributes);
|
169 |
public override List<TRowResult> GetRows( string table, List< string > rows, Dictionary< string , string > attributes)
|
171 |
byte [] tableName = Encode(table);
|
172 |
List< byte []> encodedRows = EncodeStringList(rows);
|
173 |
Dictionary< byte [], byte []> encodedAttributes = EncodeAttributes(attributes);
|
174 |
return client.getRows(tableName, encodedRows, encodedAttributes);
|
177 |
public override List<TRowResult> GetRowsWithColumns( string table, List< string > rows, List< string > columns, Dictionary< string , string > attributes)
|
179 |
byte [] tableName = Encode(table);
|
180 |
List< byte []> encodedRows = EncodeStringList(rows);
|
181 |
List< byte []> encodedColumns = EncodeStringList(columns);
|
182 |
Dictionary< byte [], byte []> encodedAttributes = EncodeAttributes(attributes);
|
183 |
return client.getRowsWithColumns(tableName, encodedRows, encodedColumns, encodedAttributes);
|
186 |
public override void ScannerClose( int id)
|
188 |
client.scannerClose(id);
|
191 |
public override void IterateResults(TRowResult result)
|
193 |
foreach (KeyValuePair< byte [], TCell> pair in result.Columns)
|
195 |
Console.WriteLine( "\tCol=" + Decode(pair.Key) + ", Value=" + Decode(pair.Value.Value));
|
199 |
private String Decode( byte [] bs)
|
201 |
return UTF8Encoding.Default.GetString(bs);
|
204 |
private byte [] Encode(String str)
|
206 |
return UTF8Encoding.Default.GetBytes(str);
|
209 |
private Dictionary< byte [], byte []> EncodeAttributes(Dictionary<String, String> attributes)
|
211 |
Dictionary< byte [], byte []> encodedAttributes = new Dictionary< byte [], byte []>();
|
212 |
foreach (KeyValuePair<String, String> pair in attributes)
|
214 |
encodedAttributes.Add(Encode(pair.Key), Encode(pair.Value));
|
216 |
return encodedAttributes;
|
219 |
private List< byte []> EncodeStringList(List<String> strings)
|
221 |
List< byte []> list = new List< byte []>();
|
224 |
foreach (String str in strings)
|
226 |
list.Add(Encode(str));
|
上面代码,给出了基本的实现,接着我们给出测试用例,调用我们实现的客户端操作,与HBase表进行交互。实现的测试用例类如下所示:
002 |
using System.Collections.Generic;
|
005 |
using System.Threading.Tasks;
|
007 |
namespace HbaseThrift.HBase.Thrift
|
011 |
private readonly AbstractHBaseThriftService client;
|
013 |
public Test(String host, int port)
|
015 |
client = new HBaseThriftClient(host, port);
|
018 |
public Test() : this ( "master" , 9090)
|
023 |
static String RandomlyBirthday()
|
025 |
Random r = new Random();
|
026 |
int year = 1900 + r.Next(100);
|
027 |
int month = 1 + r.Next(12);
|
028 |
int date = 1 + r.Next(30);
|
029 |
return year + "-" + month.ToString().PadLeft(2, '0' ) + "-" + date.ToString().PadLeft(2, '0' );
|
032 |
static String RandomlyGender()
|
034 |
Random r = new Random();
|
035 |
int flag = r.Next(2);
|
036 |
return flag == 0 ? "M" : "F" ;
|
039 |
static String RandomlyUserType()
|
041 |
Random r = new Random();
|
042 |
int flag = 1 + r.Next(10);
|
043 |
return flag.ToString();
|
051 |
public void CaseForUpdate() {
|
052 |
bool writeToWal = false ;
|
053 |
Dictionary<String, String> attributes = new Dictionary<String, String>(0);
|
054 |
string table = SetTable();
|
056 |
for ( int i = 0; i < 10000000; i++) {
|
057 |
string rowKey = i.ToString().PadLeft(4, '0' );
|
058 |
Dictionary<String, String> fieldNameValues = new Dictionary<String, String>();
|
059 |
fieldNameValues.Add( "info:birthday" , RandomlyBirthday());
|
060 |
fieldNameValues.Add( "info:user_type" , RandomlyUserType());
|
061 |
fieldNameValues.Add( "info:gender" , RandomlyGender());
|
062 |
client.Update(table, rowKey, writeToWal, fieldNameValues, attributes);
|
066 |
public void CaseForDeleteCells() {
|
067 |
bool writeToWal = false ;
|
068 |
Dictionary<String, String> attributes = new Dictionary<String, String>(0);
|
069 |
String table = SetTable();
|
071 |
for ( long i = 5; i < 10; i++) {
|
072 |
String rowKey = i.ToString().PadLeft(4, '0' );
|
073 |
List<String> columns = new List<String>(0);
|
074 |
columns.Add( "info:birthday" );
|
075 |
client.DeleteCells(table, rowKey, writeToWal, columns, attributes);
|
079 |
public void CaseForDeleteRow() {
|
080 |
Dictionary<String, String> attributes = new Dictionary<String, String>(0);
|
081 |
String table = SetTable();
|
083 |
for ( long i = 5; i < 10; i++) {
|
084 |
String rowKey = i.ToString().PadLeft(4, '0' );
|
085 |
client.DeleteRow(table, rowKey, attributes);
|
089 |
public void CaseForScan() {
|
090 |
Dictionary<String, String> attributes = new Dictionary<String, String>(0);
|
091 |
String table = SetTable();
|
092 |
String startRow = "0005" ;
|
093 |
String stopRow = "0015" ;
|
094 |
List<String> columns = new List<String>(0);
|
095 |
columns.Add( "info:birthday" );
|
096 |
int id = client.ScannerOpen(table, startRow, stopRow, columns, attributes);
|
098 |
List<TRowResult> results = client.ScannerGetList(id, nbRows);
|
099 |
while (results != null ) {
|
100 |
foreach (TRowResult result in results) {
|
101 |
client.IterateResults(result);
|
103 |
results = client.ScannerGetList(id, nbRows);
|
105 |
client.ScannerClose(id);
|
108 |
public void CaseForGet() {
|
109 |
Dictionary<String, String> attributes = new Dictionary<String, String>(0);
|
110 |
String table = SetTable();
|
111 |
List<String> rows = new List<String>(0);
|
115 |
List<String> columns = new List<String>(0);
|
116 |
columns.Add( "info:birthday" );
|
117 |
columns.Add( "info:gender" );
|
118 |
List<TRowResult> results = client.GetRowsWithColumns(table, rows, columns, attributes);
|
119 |
foreach (TRowResult result in results) {
|
120 |
client.IterateResults(result);
|
124 |
private string SetTable()
|
126 |
string table = "test_info" ;
|
130 |
static void Main( string [] args)
|
132 |
Test test = new Test();
|
133 |
//test.CaseForUpdate(); // insert or update rows/cells
|
134 |
//test.CaseForDeleteCells(); // delete cells
|
135 |
//test.CaseForDeleteRow(); // delete rows
|
136 |
test.CaseForScan(); // scan rows
|
137 |
//test.CaseForGet(); // get rows
|
上面的测试可以实现操作Hbase表数据。另外,在生成的Thrift客户端代码中,Iface中给出了全部的服务接口,可以根据需要来选择,客户端Client实现了与Thrift交互的一些逻辑的处理,通过该类对象可以代理HBase提供的Thrift服务。
时间: 2024-10-29 20:21:18