计算每月温度最高的两天。
一、客户端
package zhazhalong.mr.tq; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class tqmr { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(true); Job job = Job.getInstance(conf); job.setJarByClass(tqmr.class); //-----map start job.setMapperClass(TMapper.class); job.setMapOutputKeyClass(TQ.class); job.setMapOutputValueClass(IntWritable.class); //分区 job.setPartitionerClass(TPartitioner.class); //排序比较器 job.setSortComparatorClass(TSortComparator.class); //job.setCombinerClass(TCombiner.class); //-----map end //-----reduce start //分组比较器:相同的key为一组 job.setGroupingComparatorClass(TGroupingCompartor.class); job.setReducerClass(TReduce.class); //reduce end Path input =new Path("/root/tq.txt"); FileInputFormat.addInputPath(job, input ); Path output=new Path("/output/05"); FileOutputFormat.setOutputPath(job, output); job.waitForCompletion(true); } }
二、Mapper
package zhazhalong.mr.tq; import java.io.IOException; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.Date; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.util.StringUtils; public class TMapper extends Mapper<LongWritable, Text, TQ, IntWritable>{ //map工作就是做映射,对数据进行整理kv给reduce. TQ mkey=new TQ(); IntWritable mval=new IntWritable(); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, TQ, IntWritable>.Context context) throws IOException, InterruptedException { /* * 1949-10-01 14:21:02 34c */ try { String[] str=StringUtils.split(value.toString(), '\t'); SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd"); Date date=sdf.parse(str[0]); Calendar cal=Calendar.getInstance(); cal.setTime(date); mkey.setYear(cal.get(Calendar.YEAR)); mkey.setMonth(cal.get(Calendar.MONTH)+1); mkey.setDay(cal.get(Calendar.DAY_OF_MONTH)); int wd=Integer.parseInt(str[1].substring(0, str[1].length()-1));//取出温度 mkey.setWd(wd); mval.set(wd); context.write(mkey, mval); } catch (ParseException e) { e.printStackTrace(); } } }
三、reduce
package zhazhalong.mr.tq; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class TReduce extends Reducer<TQ, IntWritable, Text, IntWritable>{ Text rkey=new Text(); IntWritable rval=new IntWritable(); @Override protected void reduce(TQ key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException { /* 相同的key为一组 * key value * 1970 01 01 88 88 //第一条最高 * 1970 01 11 78 78 * 1970 01 21 68 68 * 1970 01 01 58 58 */ int flg=0; int day=0; for (IntWritable v:values) { if(flg==0) {//第一高 rkey.set(key.getYear()+"-"+key.getMonth()+"-"+key.getDay()); //1970-01-01:88 rval.set(key.getWd()); flg++; day=key.getDay(); context.write(rkey, rval); } if(flg!=0 && day!=key.getDay()) { //第二高 rkey.set(key.getYear()+"-"+key.getMonth()+"-"+key.getDay()+":"+key.getWd()); //1970-01-01:88 rval.set(key.getWd()); context.write(rkey, rval); break; } } } }
四、天气类
package zhazhalong.mr.tq; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class TQ implements WritableComparable<TQ>{ private int year; private int month; private int day; private int wd; public int getYear() { return year; } public void setYear(int year) { this.year = year; } public int getMonth() { return month; } public void setMonth(int month) { this.month = month; } public int getDay() { return day; } public void setDay(int day) { this.day = day; } public int getWd() { return wd; } public void setWd(int wd) { this.wd = wd; } //序列化成字节数组 @Override public void write(DataOutput out) throws IOException { out.writeInt(year); out.writeInt(month); out.writeInt(day); out.writeInt(wd); } //反序列化 @Override public void readFields(DataInput in) throws IOException { this.year=in.readInt(); this.month=in.readInt(); this.day=in.readInt(); this.wd=in.readInt(); } //比较器 @Override public int compareTo(TQ that) { //根据自己的定义:日期正序 int c1=Integer.compare(this.year, that.getYear());//比较 if(c1==0)//year相同 { int c2=Integer.compare(this.month, that.getMonth()); if(c2==0)//month相同 { return Integer.compare(this.day, that.getDay()); } return c2;//按月排序 } return c1;//按年排序 } }
五、分区
package zhazhalong.mr.tq; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Partitioner; public class TPartitioner extends Partitioner<TQ, IntWritable> { @Override public int getPartition(TQ key, IntWritable value, int numPartitions) { //对key分组 //map输出的每一个kv都会调用这个方法 return key.getYear() % numPartitions; } }
六、排序比较器
package zhazhalong.mr.tq; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class TSortComparator extends WritableComparator{ public TSortComparator() { super(TQ.class,true); } @Override public int compare(WritableComparable a, WritableComparable b) { TQ t1=(TQ)a; TQ t2=(TQ)b; int c1=Integer.compare(t1.getYear(), t2.getYear()); if(c1==0) { int c2=Integer.compare(t1.getMonth(), t2.getMonth()); if(c2==0) { return -Integer.compare(t1.getWd(), t2.getWd());//倒序 } return c2; } return c1; } }
七、分组比较器
package zhazhalong.mr.tq; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class TGroupingCompartor extends WritableComparator{ public TGroupingCompartor() { super(TQ.class,true); } @Override public int compare(WritableComparable a, WritableComparable b) { TQ t1=(TQ)a; TQ t2=(TQ)b; int c1=Integer.compare(t1.getYear(), t2.getYear()); if(c1==0) { int c2=Integer.compare(t1.getMonth(), t2.getMonth()); return c2; } return c1; } }