MR统计单词个数

3年前 (2020-02-18) 1365次浏览 已收录 0个评论

对一组数据进行统计单词出现个数,总共20万行

map阶段

package hellozhazhalong;
import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
//不能使用基本数据类型,因为要支持序列化反序列化
public class MyMapper extends Mapper<Object, Text, Text, IntWritable>{
 private final static IntWritable one = new IntWritable(1);//value=1
    private Text word = new Text();//key
    
    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
      // hello zhazhalong 102
    //key 放的是偏移量
     //value 放的是文件的每一行
     StringTokenizer itr = new StringTokenizer(value.toString());
     //"\t\n\r\f" 切割
      while (itr.hasMoreTokens()) {
     	 //对一行进行切分并进行循环
        word.set(itr.nextToken());
        context.write(word, one);//引用传递
        //hello 1
        // zhazhalong 1
        //102 1
      }
    }
}

reduce阶段

package hellozhazhalong;
import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
          //reducer的输入类型来自map的输出
 //相同的key为一组,调用一次reduce方法,在方法内迭代这一组数据,进行计算:sum,count,max....
 private IntWritable result = new IntWritable();
    public void reduce(Text key, Iterable<IntWritable> values,
                       Context context) throws IOException, InterruptedException {
    /*
     * key:hello
     * values:(1,1,1,1,1,1....)
     */
     
//		   Iterator<IntWritable>it =values.iterator();
//		   while(it.hasNext()) {
//			   IntWritable val=it.next();//更新这一组的下一个key是否为true
//		   }
//		   
     
     int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }

}

 

客户端

package hellozhazhalong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class hello_zzl_count {
public static void main(String[] args) throws Exception {
 Configuration conf = new Configuration(true);
 // 创建Job
 Job job = Job.getInstance(conf);	
    job.setJarByClass(hello_zzl_count.class);

job.setJobName("myjob"); //下面的输入输出方法有局限性。 // job.setInputPath(new Path("in")); // job.setOutputPath(new Path("out")); org.apache.hadoop.fs.Path input =new org.apache.hadoop.fs.Path("/root/hello.txt"); FileInputFormat.addInputPath(job, input ); // FileInputFormat.addInputPath(job, input2 );//多个输入源 Path output=new Path("/output/04");//输出目录不能存在 //下面是删除代码但不建议删除 // if(output.getFileSystem(conf).exists(output)) { // output.getFileSystem(conf).delete(output, true); // } FileOutputFormat.setOutputPath(job, output); //DBInputFormat.setInput(job, inputClass, inputQuery, inputCountQuery); //数据库做输入源 //干预最小值,大切片 // FileInputFormat.setMinInputSplitSize(job, size); // 干预最大值,小切片 // FileInputFormat.setMaxInputSplitSize(job, size); job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //job.setNumReduceTasks(99);//合理根据key设置reduce数量 job.setReducerClass(MyReducer.class); job.waitForCompletion(true); } }

 

结果


渣渣龙, 版权所有丨如未注明 , 均为原创丨本网站采用BY-NC-SA协议进行授权
转载请注明原文链接:MR统计单词个数
喜欢 (0)

您必须 登录 才能发表评论!