MR天气案例

首页 » MapReduce » MR天气案例

计算每月温度最高的两天。

一、客户端

package zhazhalong.mr.tq;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class tqmr {
public static void main(String[] args) throws Exception {
 
 Configuration conf = new Configuration(true);
  Job job = Job.getInstance(conf); 
 job.setJarByClass(tqmr.class);
 
 //-----map start
 job.setMapperClass(TMapper.class);
 job.setMapOutputKeyClass(TQ.class);
 job.setMapOutputValueClass(IntWritable.class);
 //分区
 job.setPartitionerClass(TPartitioner.class);
 //排序比较器
 job.setSortComparatorClass(TSortComparator.class);
 //job.setCombinerClass(TCombiner.class);
 //-----map end
 
 //-----reduce start
 //分组比较器:相同的key为一组
 job.setGroupingComparatorClass(TGroupingCompartor.class);
 job.setReducerClass(TReduce.class);
 //reduce end
 Path input =new Path("/root/tq.txt");
 FileInputFormat.addInputPath(job, input );
 Path output=new Path("/output/05");
 FileOutputFormat.setOutputPath(job, output);
 
 job.waitForCompletion(true);
}
}

二、Mapper

package zhazhalong.mr.tq;

import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.util.StringUtils;

public class TMapper extends Mapper<LongWritable, Text, TQ, IntWritable>{
 //map工作就是做映射,对数据进行整理kv给reduce.
 TQ mkey=new TQ();
 IntWritable mval=new IntWritable();
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, TQ, IntWritable>.Context context)
  throws IOException, InterruptedException {
 /*
  * 1949-10-01 14:21:02	34c
  */
 try {
  String[] str=StringUtils.split(value.toString(), '\t');
  SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd");
  Date date=sdf.parse(str[0]);
  Calendar cal=Calendar.getInstance();
  cal.setTime(date);
  mkey.setYear(cal.get(Calendar.YEAR));
  mkey.setMonth(cal.get(Calendar.MONTH)+1);
  mkey.setDay(cal.get(Calendar.DAY_OF_MONTH));
  int wd=Integer.parseInt(str[1].substring(0, str[1].length()-1));//取出温度
  mkey.setWd(wd);
  mval.set(wd);
  context.write(mkey, mval);
 } catch (ParseException e) {
  e.printStackTrace();
 }
}
}

三、reduce

package zhazhalong.mr.tq;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class TReduce extends Reducer<TQ, IntWritable, Text, IntWritable>{
 Text rkey=new Text();
 IntWritable rval=new IntWritable();
 @Override
 protected void reduce(TQ key, Iterable<IntWritable> values,Context context)
   throws IOException, InterruptedException {
  /* 相同的key为一组
   * key				value
   * 1970 01 01 88 	88			//第一条最高
   * 1970 01 11 78	78
   * 1970 01 21 68	68
   * 1970 01 01 58	58
   */
  int flg=0;
  int day=0;
  for (IntWritable v:values) {
   if(flg==0) {//第一高
    rkey.set(key.getYear()+"-"+key.getMonth()+"-"+key.getDay());
    //1970-01-01:88
    rval.set(key.getWd());	
    flg++;
    day=key.getDay();
    context.write(rkey, rval);
   }
   if(flg!=0 && day!=key.getDay()) {
    //第二高
    rkey.set(key.getYear()+"-"+key.getMonth()+"-"+key.getDay()+":"+key.getWd());
    //1970-01-01:88
    rval.set(key.getWd());	
    context.write(rkey, rval);
    break;
   }
  }
 }
}




 

 四、天气类

package zhazhalong.mr.tq;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class TQ implements WritableComparable<TQ>{

 private int year;
 private int month;
 private int day;
 private int wd;
 
 
 
 public int getYear() {
  return year;
 }

 public void setYear(int year) {
  this.year = year;
 }

 public int getMonth() {
  return month;
 }

 public void setMonth(int month) {
  this.month = month;
 }

 public int getDay() {
  return day;
 }

 public void setDay(int day) {
  this.day = day;
 }

 public int getWd() {
  return wd;
 }

 public void setWd(int wd) {
  this.wd = wd;
 }
 //序列化成字节数组
 @Override
 public void write(DataOutput out) throws IOException {
  out.writeInt(year);
  out.writeInt(month);
  out.writeInt(day);
  out.writeInt(wd);
 }
 //反序列化
 @Override
 public void readFields(DataInput in) throws IOException {
  this.year=in.readInt();
  this.month=in.readInt();
  this.day=in.readInt();
  this.wd=in.readInt();
 }
 //比较器
 @Override
 public int compareTo(TQ that) {
  //根据自己的定义:日期正序
  int c1=Integer.compare(this.year, that.getYear());//比较
  if(c1==0)//year相同
  {
   int c2=Integer.compare(this.month, that.getMonth());
   if(c2==0)//month相同
   {
    return Integer.compare(this.day, that.getDay());
   }
   return c2;//按月排序
  }
  return c1;//按年排序
 }

 
 
 
 
 
 
 
}

五、分区

package zhazhalong.mr.tq;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;

public class TPartitioner extends Partitioner<TQ, IntWritable> {

 @Override
 public int getPartition(TQ key, IntWritable value, int numPartitions) {
  //对key分组
  //map输出的每一个kv都会调用这个方法
  
  return key.getYear() % numPartitions;
 }

}

六、排序比较器

package zhazhalong.mr.tq;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class TSortComparator extends WritableComparator{

 public TSortComparator() {
  super(TQ.class,true);
  
 }
 @Override
 public int compare(WritableComparable a, WritableComparable b) {
  
  TQ t1=(TQ)a;
  TQ t2=(TQ)b;
  int c1=Integer.compare(t1.getYear(), t2.getYear());
  if(c1==0) {
   int c2=Integer.compare(t1.getMonth(), t2.getMonth());
   if(c2==0) {
    return -Integer.compare(t1.getWd(), t2.getWd());//倒序
   }
   return c2;
  }
  return c1;
 }
}

七、分组比较器

package zhazhalong.mr.tq;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class TGroupingCompartor extends WritableComparator{

 public TGroupingCompartor() {
  super(TQ.class,true);
 }
 @Override
 public int compare(WritableComparable a, WritableComparable b) {
  
  TQ t1=(TQ)a;
  TQ t2=(TQ)b;
  int c1=Integer.compare(t1.getYear(), t2.getYear());
  if(c1==0) {
   int c2=Integer.compare(t1.getMonth(), t2.getMonth());
   return c2;
  }
  return c1;
 }
}

 

 

分享到:
赞(0) 打赏

评论 4

评论前必须登录!

 

  1. #1

    good厉害了

    hello7个月前 (03-27)
  2. #2

    看了那么多博客,就你的能看懂

    你好7个月前 (03-28)
  3. #3

    挺明白的

    奋斗7个月前 (03-28)
  4. #4

    靓仔5个月前 (05-26)

觉得文章有用就打赏一下弟弟吧

支付宝扫一扫打赏

微信扫一扫打赏

Vieu4.5主题
专业打造轻量级个人企业风格博客主题!专注于前端开发,全站响应式布局自适应模板。
正在播放:

作者想对您说:

累了就停下来听首歌吧

听完后会给您一个好心情

最后

等到您不容易

还希望您能多待一会儿

      00:00/00:00