mr结果输出到hbase

2020-02-28 271次浏览 已收录 7个评论

主程序

package zhazhalong.hbase.wc;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class wcrun {
public static void main(String[] args) throws Exception{
 Configuration conf = new Configuration();
 conf.set("fs.defaultFS","hdfs://node01:9000");
 conf.set("hbase.zookeeper.quorum", "node01,node02,node03");
 Job job=Job.getInstance(conf);
 job.setJarByClass(wcrun.class);
 
 job.setMapperClass(wcmap.class);
 job.setMapOutputKeyClass(Text.class);
 job.setMapOutputValueClass(IntWritable.class);
 TableMapReduceUtil.initTableReducerJob("wc", wcreducer.class, job, null, null, null, null, false);
 FileInputFormat.addInputPath(job, new Path("/user/hive_remote/warehouse/wc/"));
 
 job.waitForCompletion(true);

}
}

map程序

package zhazhalong.hbase.wc;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class wcmap extends Mapper<LongWritable, Text, Text, IntWritable>{
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
  throws IOException, InterruptedException {
 String[]strs=value.toString().split(" ");
 for (String str : strs) {
  context.write(new Text(str), new IntWritable(1));
 }
}
}

reducer程序

package zhazhalong.hbase.wc;

import java.io.IOException;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;


public class wcreducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable>{
@Override
protected void reduce(Text arg0, Iterable<IntWritable> arg1,Context context)
  throws IOException, InterruptedException {
 int sum=0;
 for(IntWritable it: arg1) {
  sum+=it.get();
 }
 Put put=new Put(arg0.toString().getBytes());
 put.add("cf".getBytes(),"ct".getBytes(),(sum+"").getBytes());
 context.write(null,put);
}
}

 


渣渣龙, 版权所有丨如未注明 , 均为原创丨本网站采用BY-NC-SA协议进行授权
转载请注明原文链接:mr结果输出到hbase
喜欢 (0)

您必须 登录 才能发表评论!

(7)个小伙伴在吐槽
  1. 我也是小白以后多多交流
    你好2020-03-27 12:48
  2. 最好再详细点
    你哥2020-03-27 13:38
  3. 我也是小白以后多多交流
    我也是渣渣2020-03-27 14:04
  4. 我给你点赞了
    靓妹2020-03-28 16:27
  5. 记住这个网站了
    hello2020-05-26 09:27
  6. 我加你了哦
    靓妹2020-05-26 09:53
  7. good厉害了
    沥青2020-05-26 10:43