HBase批量操作

首页 » HBase » HBase批量操作

批量获取数据

Table对象的get()函数不仅可以接收Get对象,同样也可以接收Get集合

  • table.get(gets)会返回一个Result[]结果的数组,里面存放了本次查询的所有的数据,可以通过这个数组来遍历里面的数据。
  • result.rawCells(), result是单个结果,这里存放的是一行的所有的数据,rowCells方法会返回这一行所有的列(cell)的集合。
  • Cell对象是单个的列,要获取列中的值可以通过CellUtil.cloneXXX()方法,如CellUtil.cloneValue(cell)就会返回该列的值。
public List<String> getData(Table table, List<String> rows) throws Exception {
    List<Get> gets = new ArrayList<>();
    for (String str : rows) {
        Get get = new Get(Bytes.toBytes(str));
        gets.add(get);
    }
    List<String> values = new ArrayList<>();
    Result[] results = table.get(gets);
    for (Result result : results) {
        System.out.println("Row:" + Bytes.toString(result.getRow()));
        for (Cell kv : result.rawCells()) {
            String family = Bytes.toString(CellUtil.cloneFamily(kv));
            String qualifire = Bytes.toString(CellUtil.cloneQualifier(kv));
            String value = Bytes.toString(CellUtil.cloneValue(kv));
            values.add(value);
            System.out.println(family + ":" + qualifire + "\t" + value);
        }
    }
    return values;
}

批量删除数据

删除一行数据

Table table = conn.getTable(tableName);    //获取表    
byte[] row = Bytes.toBytes("row1");//定义行
Delete delete = new Delete(row);//创建delete对象
table.delete(delete);//删除

删除多行数据

Table table = conn.getTable(tableName);
List<Delete> deletes = new ArrayList<>();
for(int i = 1 ; i < 5;i++){
    byte[] row = Bytes.toBytes("row" + i);
    Delete delete = new Delete(row);
    deletes.add(delete);
}
table.delete(deletes);

批量导入数据到HBase中

向一行数据中添加四列数据

Table tableStep3 = connection.getTable(tableStep3Name);
// 循环添加数据
byte[] row = Bytes.toBytes("20001");
Put put = new Put(row);
for (int i = 1; i <= 4; i++) {
    byte[] columnFamily = Bytes.toBytes("data");
    byte[] qualifier = Bytes.toBytes(String.valueOf(i));
    byte[] value = Bytes.toBytes("value" + i);
    put.addColumn(columnFamily, qualifier, value);
}
tableStep3.put(put);

使用向多行数据中添加数据

List<Put> puts = new ArrayList<>();
// 循环添加数据
for (int i = 1; i <= 4; i++) {
    byte[] row = Bytes.toBytes("row" + i);
    Put put = new Put(row);
    byte[] columnFamily = Bytes.toBytes("data");
    byte[] qualifier = Bytes.toBytes(String.valueOf(i));
    byte[] value = Bytes.toBytes("value" + i);
    put.addColumn(columnFamily, qualifier, value);
    puts.add(put);
}
Table table = connection.getTable(tableName);
table.put(puts);

测试

表名 行键 列族:列
stu 20181122 basic_info:name 阿克蒙德
stu 20181122 basic_info:gender male
stu 20181122 basic_info:birthday 1987-05-23
stu 20181122 basic_info:connect tel:13974036666
stu 20181122 basic_info:address HuNan-ChangSha
stu 20181122 school_info:college ChengXing
stu 20181122 school_info:class class 1 grade 2
stu 20181122 school_info:object Software
stu 20181123 basic_info:name 萨格拉斯
stu 20181123 basic_info:gender male
stu 20181123 basic_info:birthday 1986-05-23
stu 20181123 basic_info:connect tel:18774036666
stu 20181123 basic_info:address HuNan-ChangSha
stu 20181123 school_info:college ChengXing
stu 20181123 school_info:class class 2 grade 2
stu 20181123 school_info:object Software
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.util.Bytes;

public class data {

 public void tablepput()throws Exception{
  Configuration config = new Configuration();
Connection connection = ConnectionFactory.createConnection(config);
             Admin admin = connection.getAdmin();
             TableName tableName = TableName.valueOf("stu");//定义表名
//TableDescriptor对象通过TableDescriptorBuilder构建;
TableDescriptorBuilder tableDescriptor = TableDescriptorBuilder.newBuilder(tableName);
ColumnFamilyDescriptor family = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("basic_info")).build();//构建列族对象
ColumnFamilyDescriptor family2 = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("school_info")).build();//构建列族对象
tableDescriptor.setColumnFamily(family);
tableDescriptor.setColumnFamily(family2);//设置列族
admin.createTable(tableDescriptor.build());//创建表
        List<Put>list=new ArrayList<Put>();
        String[]rows={"20181122","20181123"};
        String[][]basic_infos=
        {
            {"阿克蒙德","male","1987-05-23","tel:13974036666","HuNan-ChangSha"},
            {"萨格拉斯","male","1986-05-23","tel:18774036666","HuNan-ChangSha"}
        };
        String[]basic_colums={"name","gender","birthday","connect","address"};

        String[][]school_infos={
            {"ChenXing","class 1 grade 2","Software"},
            {"ChengXing","class 2 grade 2","Software"}
        };
        String[]school_colums={"college","class","object"};
        
        for(int i=0;i<rows.length;i++){

            Put put=new Put(Bytes.toBytes(rows[i]));
            for(int j=0;j<basic_infos.length;j++){
            byte[]columnFamily=Bytes.toBytes("basic_info");
            byte[]qualifier=Bytes.toBytes(basic_colums[j]);
            byte[]value=Bytes.toBytes(basic_infos[i][j]);
            put.addColumn(columnFamily, qualifier, value);
            }
            for(int k=0;k<school_infos.length;k++){
            byte[]columnFamily2=Bytes.toBytes("school_info");
            byte[]qualifier2=Bytes.toBytes(school_colums[k]);
            byte[]value2=Bytes.toBytes(school_infos[i][k]);
            put.addColumn(columnFamily2, qualifier2, value2);
            }
            list.add(put);
        }
        Table table = connection.getTable(tableName);
        table.put(list);
 }
}

批量操作

List<Row> rows = new ArrayList<>();
//上传操作
Put put = new Put(Bytes.toBytes("row1"));
put.addColumn(Bytes.toBytes("data"),Bytes.toBytes("2"), Bytes.toBytes("value1"));
rows.add(put);
//删除操作
Delete delete = new Delete(Bytes.toBytes("row1"));
delete.addColumn(Bytes.toBytes("data"),Bytes.toBytes("1"));
rows.add(delete);
//获取操作
Get get = new Get(Bytes.toBytes("row1"));
get.addColumn(Bytes.toBytes("data"),Bytes.toBytes("2"));
rows.add(get);
//定义结果数组长度
Object[] results = new Object[rows.size()];
table.batch(rows, results);//这是一个同步的操作,批量操作的结果将会在操作之后放在results中
for (Object object : results) {
    System.out.println(object.toString());
}

不要将put和delete操作放在同一个批处理的请求中,因为在多线程实现顺序和资源竞争的情况下,会产生不可预料的结果。不过可以使用异步处理的方法:table.batchCallback()


扫描表中所有的数据

//创建 Scan扫描对象:
Scan scan = new Scan();
//获取 Resultscanner对象:
ResultScanner scanner = table.getScanner(scan);
//利用 ResultScanner对象遍历数据:
for(Result result : scanner){
    for (Cell kv : result.rawCells()) {
        String family = Bytes.toString(CellUtil.cloneFamily(kv));
        String qualifire = Bytes.toString(CellUtil.cloneQualifier(kv));
        String value = Bytes.toString(CellUtil.cloneValue(kv));
        values.add(value);
        System.out.println(family + ":" + qualifire + "\t" + value);
    }
}

还有其他方法创建Scan对象和ResultScanner对象,如下是获取这两个对象的不同方式:

Resultscanner getScanner(Scan scan)
Resultscanner getScanner(byte[] family)
Resultscanner getScanner(byte[] family,byte[] qualifier)
//后两个方法隐式的创建了Scan实例
//Scan类的构造器
Scan()
Scan(byte[] startRow,Filter filter)
Scan(byte[] startRow)
Scan(byte[] startRow,byte[] stopRow)
//[startRow,stopRow)
//Scan的限制条件,限制返回内容
Scan addFamily(byte[] family)
Scan addColumn(byte[] family,byte[] qualifier)

测试:

扫描表tableName所有行中所有列的值

package step2;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import java.util.List;
import java.util.ArrayList;
public class Task {

 public void scanTable(String tablename) throws Exception {
  /********* Begin *********/
        Configuration config = HBaseConfiguration.create(); 
        Connection connection = ConnectionFactory.createConnection(config); 
        Table table = connection.getTable(TableName.valueOf(tablename));
        Scan scan = new Scan();
        ResultScanner scanner = table.getScanner(scan);

        for(Result result : scanner){
            for (Cell kv : result.rawCells()) {
                List<String>values=new ArrayList<String>();
                String family = Bytes.toString(CellUtil.cloneFamily(kv));
                String qualifire = Bytes.toString(CellUtil.cloneQualifier(kv));
                String value = Bytes.toString(CellUtil.cloneValue(kv));
                values.add(value);
                System.out.println(value);
            }
        }
    
        scanner.close();
  /********* End *********/
 }
}

使用缓存和批量参数扫描数据表

由于ResultScanner是继承了Iterable接口,是一个迭代器,所以在for循环遍历中,ResultScanner对象的时候实际调用的是ResultScanner对象的next()方法,每个next()对象调用都会产生一个单独的RPC请求,不过可以通过使用RPC请求一次获取多行数据,这样就可以减少请求的次数。

void setCaching(int caching)//设置缓存大小

还有一种办法就是非扫描全部表的内容

Scan scan = new Scan();
scan.addColumn(Bytes.toBytes("data"),Bytes.toBytes("2"));//设置扫描data:2列
scan.setStartRow(Bytes.toBytes("A"));//设置从A开始扫描 以ASCALL码升序排序
scan.setStopRow(Bytes.toBytes("Z"));//设置到Z结束;”A”< rowKey <”Z” 

提示:

新的 API使用 withStartRow()withStopRow()方法来代替。

分享到:
赞(0) 打赏

评论 2

评论前必须登录!

 

  1. #1

    最好再详细点

    靓仔4个月前 (05-26)
  2. #2

    渣渣辉4个月前 (05-26)

觉得文章有用就打赏一下弟弟吧

支付宝扫一扫打赏

微信扫一扫打赏

Vieu4.5主题
专业打造轻量级个人企业风格博客主题!专注于前端开发,全站响应式布局自适应模板。
正在播放:

作者想对您说:

累了就停下来听首歌吧

听完后会给您一个好心情

最后

等到您不容易

还希望您能多待一会儿

      00:00/00:00