mr结果输出到hbase

首页 » HBase » mr结果输出到hbase

主程序

package zhazhalong.hbase.wc;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class wcrun {
public static void main(String[] args) throws Exception{
 Configuration conf = new Configuration();
 conf.set("fs.defaultFS","hdfs://node01:9000");
 conf.set("hbase.zookeeper.quorum", "node01,node02,node03");
 Job job=Job.getInstance(conf);
 job.setJarByClass(wcrun.class);
 
 job.setMapperClass(wcmap.class);
 job.setMapOutputKeyClass(Text.class);
 job.setMapOutputValueClass(IntWritable.class);
 TableMapReduceUtil.initTableReducerJob("wc", wcreducer.class, job, null, null, null, null, false);
 FileInputFormat.addInputPath(job, new Path("/user/hive_remote/warehouse/wc/"));
 
 job.waitForCompletion(true);

}
}

map程序

package zhazhalong.hbase.wc;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class wcmap extends Mapper<LongWritable, Text, Text, IntWritable>{
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
  throws IOException, InterruptedException {
 String[]strs=value.toString().split(" ");
 for (String str : strs) {
  context.write(new Text(str), new IntWritable(1));
 }
}
}

reducer程序

package zhazhalong.hbase.wc;

import java.io.IOException;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;


public class wcreducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable>{
@Override
protected void reduce(Text arg0, Iterable<IntWritable> arg1,Context context)
  throws IOException, InterruptedException {
 int sum=0;
 for(IntWritable it: arg1) {
  sum+=it.get();
 }
 Put put=new Put(arg0.toString().getBytes());
 put.add("cf".getBytes(),"ct".getBytes(),(sum+"").getBytes());
 context.write(null,put);
}
}

 

分享到:
赞(0) 打赏

评论 7

评论前必须登录!

 

  1. #1

    我也是小白以后多多交流

    你好6个月前 (03-27)
  2. #2

    最好再详细点

    你哥6个月前 (03-27)
  3. #3

    我也是小白以后多多交流

    我也是渣渣6个月前 (03-27)
  4. #4

    我给你点赞了

    靓妹6个月前 (03-28)
  5. #5

    记住这个网站了

    hello4个月前 (05-26)
  6. #6

    我加你了哦

    靓妹4个月前 (05-26)
  7. #7

    good厉害了

    沥青4个月前 (05-26)

觉得文章有用就打赏一下弟弟吧

支付宝扫一扫打赏

微信扫一扫打赏

Vieu4.5主题
专业打造轻量级个人企业风格博客主题!专注于前端开发,全站响应式布局自适应模板。
正在播放:

作者想对您说:

累了就停下来听首歌吧

听完后会给您一个好心情

最后

等到您不容易

还希望您能多待一会儿

      00:00/00:00