批量获取数据
Table对象的get()函数不仅可以接收Get对象,同样也可以接收Get集合
- table.get(gets)会返回一个Result[]结果的数组,里面存放了本次查询的所有的数据,可以通过这个数组来遍历里面的数据。
- result.rawCells(), result是单个结果,这里存放的是一行的所有的数据,rowCells方法会返回这一行所有的列(cell)的集合。
- Cell对象是单个的列,要获取列中的值可以通过CellUtil.cloneXXX()方法,如CellUtil.cloneValue(cell)就会返回该列的值。
public List<String> getData(Table table, List<String> rows) throws Exception { List<Get> gets = new ArrayList<>(); for (String str : rows) { Get get = new Get(Bytes.toBytes(str)); gets.add(get); } List<String> values = new ArrayList<>(); Result[] results = table.get(gets); for (Result result : results) { System.out.println("Row:" + Bytes.toString(result.getRow())); for (Cell kv : result.rawCells()) { String family = Bytes.toString(CellUtil.cloneFamily(kv)); String qualifire = Bytes.toString(CellUtil.cloneQualifier(kv)); String value = Bytes.toString(CellUtil.cloneValue(kv)); values.add(value); System.out.println(family + ":" + qualifire + "\t" + value); } } return values; }
批量删除数据
删除一行数据
Table table = conn.getTable(tableName); //获取表 byte[] row = Bytes.toBytes("row1");//定义行 Delete delete = new Delete(row);//创建delete对象 table.delete(delete);//删除
删除多行数据
Table table = conn.getTable(tableName); List<Delete> deletes = new ArrayList<>(); for(int i = 1 ; i < 5;i++){ byte[] row = Bytes.toBytes("row" + i); Delete delete = new Delete(row); deletes.add(delete); } table.delete(deletes);
批量导入数据到HBase中
向一行数据中添加四列数据
Table tableStep3 = connection.getTable(tableStep3Name); // 循环添加数据 byte[] row = Bytes.toBytes("20001"); Put put = new Put(row); for (int i = 1; i <= 4; i++) { byte[] columnFamily = Bytes.toBytes("data"); byte[] qualifier = Bytes.toBytes(String.valueOf(i)); byte[] value = Bytes.toBytes("value" + i); put.addColumn(columnFamily, qualifier, value); } tableStep3.put(put);
使用向多行数据中添加数据
List<Put> puts = new ArrayList<>(); // 循环添加数据 for (int i = 1; i <= 4; i++) { byte[] row = Bytes.toBytes("row" + i); Put put = new Put(row); byte[] columnFamily = Bytes.toBytes("data"); byte[] qualifier = Bytes.toBytes(String.valueOf(i)); byte[] value = Bytes.toBytes("value" + i); put.addColumn(columnFamily, qualifier, value); puts.add(put); } Table table = connection.getTable(tableName); table.put(puts);
测试
表名 | 行键 | 列族:列 | 值 |
---|---|---|---|
stu | 20181122 | basic_info:name | 阿克蒙德 |
stu | 20181122 | basic_info:gender | male |
stu | 20181122 | basic_info:birthday | 1987-05-23 |
stu | 20181122 | basic_info:connect | tel:13974036666 |
stu | 20181122 | basic_info:address | HuNan-ChangSha |
stu | 20181122 | school_info:college | ChengXing |
stu | 20181122 | school_info:class | class 1 grade 2 |
stu | 20181122 | school_info:object | Software |
stu | 20181123 | basic_info:name | 萨格拉斯 |
stu | 20181123 | basic_info:gender | male |
stu | 20181123 | basic_info:birthday | 1986-05-23 |
stu | 20181123 | basic_info:connect | tel:18774036666 |
stu | 20181123 | basic_info:address | HuNan-ChangSha |
stu | 20181123 | school_info:college | ChengXing |
stu | 20181123 | school_info:class | class 2 grade 2 |
stu | 20181123 | school_info:object | Software |
import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableDescriptors; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.util.Bytes; public class data { public void tablepput()throws Exception{ Configuration config = new Configuration(); Connection connection = ConnectionFactory.createConnection(config); Admin admin = connection.getAdmin(); TableName tableName = TableName.valueOf("stu");//定义表名 //TableDescriptor对象通过TableDescriptorBuilder构建; TableDescriptorBuilder tableDescriptor = TableDescriptorBuilder.newBuilder(tableName); ColumnFamilyDescriptor family = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("basic_info")).build();//构建列族对象 ColumnFamilyDescriptor family2 = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("school_info")).build();//构建列族对象 tableDescriptor.setColumnFamily(family); tableDescriptor.setColumnFamily(family2);//设置列族 admin.createTable(tableDescriptor.build());//创建表 List<Put>list=new ArrayList<Put>(); String[]rows={"20181122","20181123"}; String[][]basic_infos= { {"阿克蒙德","male","1987-05-23","tel:13974036666","HuNan-ChangSha"}, {"萨格拉斯","male","1986-05-23","tel:18774036666","HuNan-ChangSha"} }; String[]basic_colums={"name","gender","birthday","connect","address"}; String[][]school_infos={ {"ChenXing","class 1 grade 2","Software"}, {"ChengXing","class 2 grade 2","Software"} }; String[]school_colums={"college","class","object"}; for(int i=0;i<rows.length;i++){ Put put=new Put(Bytes.toBytes(rows[i])); for(int j=0;j<basic_infos.length;j++){ byte[]columnFamily=Bytes.toBytes("basic_info"); byte[]qualifier=Bytes.toBytes(basic_colums[j]); byte[]value=Bytes.toBytes(basic_infos[i][j]); put.addColumn(columnFamily, qualifier, value); } for(int k=0;k<school_infos.length;k++){ byte[]columnFamily2=Bytes.toBytes("school_info"); byte[]qualifier2=Bytes.toBytes(school_colums[k]); byte[]value2=Bytes.toBytes(school_infos[i][k]); put.addColumn(columnFamily2, qualifier2, value2); } list.add(put); } Table table = connection.getTable(tableName); table.put(list); } }
批量操作
List<Row> rows = new ArrayList<>(); //上传操作 Put put = new Put(Bytes.toBytes("row1")); put.addColumn(Bytes.toBytes("data"),Bytes.toBytes("2"), Bytes.toBytes("value1")); rows.add(put); //删除操作 Delete delete = new Delete(Bytes.toBytes("row1")); delete.addColumn(Bytes.toBytes("data"),Bytes.toBytes("1")); rows.add(delete); //获取操作 Get get = new Get(Bytes.toBytes("row1")); get.addColumn(Bytes.toBytes("data"),Bytes.toBytes("2")); rows.add(get); //定义结果数组长度 Object[] results = new Object[rows.size()]; table.batch(rows, results);//这是一个同步的操作,批量操作的结果将会在操作之后放在results中 for (Object object : results) { System.out.println(object.toString()); }
不要将put和delete操作放在同一个批处理的请求中,因为在多线程实现顺序和资源竞争的情况下,会产生不可预料的结果。不过可以使用异步处理的方法:table.batchCallback()
扫描表中所有的数据
//创建 Scan扫描对象: Scan scan = new Scan(); //获取 Resultscanner对象: ResultScanner scanner = table.getScanner(scan); //利用 ResultScanner对象遍历数据: for(Result result : scanner){ for (Cell kv : result.rawCells()) { String family = Bytes.toString(CellUtil.cloneFamily(kv)); String qualifire = Bytes.toString(CellUtil.cloneQualifier(kv)); String value = Bytes.toString(CellUtil.cloneValue(kv)); values.add(value); System.out.println(family + ":" + qualifire + "\t" + value); } }
还有其他方法创建Scan对象和ResultScanner对象,如下是获取这两个对象的不同方式:
Resultscanner getScanner(Scan scan) Resultscanner getScanner(byte[] family) Resultscanner getScanner(byte[] family,byte[] qualifier) //后两个方法隐式的创建了Scan实例 //Scan类的构造器 Scan() Scan(byte[] startRow,Filter filter) Scan(byte[] startRow) Scan(byte[] startRow,byte[] stopRow) //[startRow,stopRow) //Scan的限制条件,限制返回内容 Scan addFamily(byte[] family) Scan addColumn(byte[] family,byte[] qualifier)
测试:
扫描表tableName所有行中所有列的值
package step2; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.util.Bytes; import java.util.List; import java.util.ArrayList; public class Task { public void scanTable(String tablename) throws Exception { /********* Begin *********/ Configuration config = HBaseConfiguration.create(); Connection connection = ConnectionFactory.createConnection(config); Table table = connection.getTable(TableName.valueOf(tablename)); Scan scan = new Scan(); ResultScanner scanner = table.getScanner(scan); for(Result result : scanner){ for (Cell kv : result.rawCells()) { List<String>values=new ArrayList<String>(); String family = Bytes.toString(CellUtil.cloneFamily(kv)); String qualifire = Bytes.toString(CellUtil.cloneQualifier(kv)); String value = Bytes.toString(CellUtil.cloneValue(kv)); values.add(value); System.out.println(value); } } scanner.close(); /********* End *********/ } }
使用缓存和批量参数扫描数据表
由于ResultScanner是继承了Iterable接口,是一个迭代器,所以在for循环遍历中,ResultScanner对象的时候实际调用的是ResultScanner对象的next()方法,每个next()对象调用都会产生一个单独的RPC请求,不过可以通过使用RPC请求一次获取多行数据,这样就可以减少请求的次数。
void setCaching(int caching)//设置缓存大小
还有一种办法就是非扫描全部表的内容
Scan scan = new Scan(); scan.addColumn(Bytes.toBytes("data"),Bytes.toBytes("2"));//设置扫描data:2列 scan.setStartRow(Bytes.toBytes("A"));//设置从A开始扫描 以ASCALL码升序排序 scan.setStopRow(Bytes.toBytes("Z"));//设置到Z结束;”A”< rowKey <”Z”
提示:
新的 API
使用 withStartRow()
和 withStopRow()
方法来代替。
评论 抢沙发