对一组数据进行统计单词出现个数,总共20万行

map阶段
package hellozhazhalong; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; //不能使用基本数据类型,因为要支持序列化反序列化 public class MyMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1);//value=1 private Text word = new Text();//key public void map(Object key, Text value, Context context) throws IOException, InterruptedException { // hello zhazhalong 102 //key 放的是偏移量 //value 放的是文件的每一行 StringTokenizer itr = new StringTokenizer(value.toString()); //"\t\n\r\f" 切割 while (itr.hasMoreTokens()) { //对一行进行切分并进行循环 word.set(itr.nextToken()); context.write(word, one);//引用传递 //hello 1 // zhazhalong 1 //102 1 } } }
reduce阶段
package hellozhazhalong; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ //reducer的输入类型来自map的输出 //相同的key为一组,调用一次reduce方法,在方法内迭代这一组数据,进行计算:sum,count,max.... private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { /* * key:hello * values:(1,1,1,1,1,1....) */ // Iterator<IntWritable>it =values.iterator(); // while(it.hasNext()) { // IntWritable val=it.next();//更新这一组的下一个key是否为true // } // int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } }
客户端
package hellozhazhalong; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class hello_zzl_count { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(true); // 创建Job Job job = Job.getInstance(conf); job.setJarByClass(hello_zzl_count.class);
job.setJobName("myjob"); //下面的输入输出方法有局限性。 // job.setInputPath(new Path("in")); // job.setOutputPath(new Path("out")); org.apache.hadoop.fs.Path input =new org.apache.hadoop.fs.Path("/root/hello.txt"); FileInputFormat.addInputPath(job, input ); // FileInputFormat.addInputPath(job, input2 );//多个输入源 Path output=new Path("/output/04");//输出目录不能存在 //下面是删除代码但不建议删除 // if(output.getFileSystem(conf).exists(output)) { // output.getFileSystem(conf).delete(output, true); // } FileOutputFormat.setOutputPath(job, output); //DBInputFormat.setInput(job, inputClass, inputQuery, inputCountQuery); //数据库做输入源 //干预最小值,大切片 // FileInputFormat.setMinInputSplitSize(job, size); // 干预最大值,小切片 // FileInputFormat.setMaxInputSplitSize(job, size); job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //job.setNumReduceTasks(99);//合理根据key设置reduce数量 job.setReducerClass(MyReducer.class); job.waitForCompletion(true); } }
