Storm ack机制代码实现

首页 » Storm » Storm ack机制代码实现

spout

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.util.Map;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

public class MySpout implements IRichSpout{

 private static final long serialVersionUID = 1L;

 int index = 0;
 
 FileInputStream fis;
 InputStreamReader isr;
 BufferedReader br;			
 SpoutOutputCollector collector = null;
 String str = null;

 @Override
 public void nextTuple() {
  try {
   if ((str = this.br.readLine()) != null) {
    //标示
    index++;
    //当前tuple的唯一标识,必须有
    collector.emit(new Values(str), index);
//				collector.emit(new Values(str));
   }
  } catch (Exception e) {
  }
  
  
 }
 @Override
 public void close() {
  try {
   br.close();
   isr.close();
   fis.close();
  } catch (Exception e) {
   e.printStackTrace();
  }
 }
 @Override
 public void open(Map conf, TopologyContext context,
   SpoutOutputCollector collector) {
  try {
   this.collector = collector;
   this.fis = new FileInputStream("track.log");//输入流
   this.isr = new InputStreamReader(fis, "UTF-8");//转换流
   this.br = new BufferedReader(isr);//缓冲区流
  } catch (Exception e) {
   e.printStackTrace();
  }
 }

 @Override
 public void declareOutputFields(OutputFieldsDeclarer declarer) {
  declarer.declare(new Fields("log"));
 }

 @Override
 public Map<String, Object> getComponentConfiguration() {
  return null;
 }
 /**
  * 信息完整传递所执行的方法
  */
 @Override
 public void ack(Object msgId) {
  System.err.println(" [" + Thread.currentThread().getName() + "] "+ " spout ack:"+msgId.toString());
 }

 @Override
 public void activate() {
  
 }

 @Override
 public void deactivate() {
  
 }

 /**
  * 信息没有完整传递
  */
 @Override
 public void fail(Object msgId) {
  System.err.println(" [" + Thread.currentThread().getName() + "] "+ " spout fail:"+msgId.toString());
 }

}

bolt

import java.util.Map;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class MyBolt implements IRichBolt {

 private static final long serialVersionUID = 1L;

 OutputCollector collector = null;
 @Override
 public void cleanup() {

 }
 int num = 0;
 String valueString = null;
 @Override
 public void execute(Tuple input) {
  try {
   valueString = input.getStringByField("log") ;
   
   if(valueString != null) {
    num ++ ;
    System.err.println(Thread.currentThread().getName()+"   lines  :"+num +"   session_id:"+valueString.split("\t")[1]);
   }
   collector.emit(input, new Values(valueString));
//			collector.emit(new Values(valueString));
   collector.ack(input);//成功,回调MySpout的ack方法
   Thread.sleep(2000);
  } catch (Exception e) {
   collector.fail(input);//失败,
   e.printStackTrace();
  }
  
 }

 @Override
 public void prepare(Map stormConf, TopologyContext context,
   OutputCollector collector) {
  this.collector = collector ;
 }

 @Override
 public void declareOutputFields(OutputFieldsDeclarer declarer) {
  declarer.declare(new Fields("session_id")) ;
 }

 @Override
 public Map<String, Object> getComponentConfiguration() {
  return null;
 }

}

Main

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;

public class Main {

 public static void main(String[] args) {

  TopologyBuilder builder = new TopologyBuilder();

  builder.setSpout("spout", new MySpout(), 1);
  builder.setBolt("bolt", new MyBolt(), 2).shuffleGrouping("spout");
  
//		Map conf = new HashMap();
//		conf.put(Config.TOPOLOGY_WORKERS, 4);
  
  Config conf = new Config() ;
  conf.setDebug(true);
  conf.setMessageTimeoutSecs(conf, 100);
  conf.setNumAckers(4);
  
  if (args.length > 0) {
   try {
    StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
   } catch (AlreadyAliveException e) {
    e.printStackTrace();
   } catch (InvalidTopologyException e) {
    e.printStackTrace();
   }
  }else {
   LocalCluster localCluster = new LocalCluster();
   localCluster.submitTopology("mytopology", conf, builder.createTopology());
  }
  
 }

}
分享到:
赞(0) 打赏

评论 2

评论前必须登录!

 

  1. #1

    我也是小白以后多多交流

    笔记本3个月前 (05-26)
  2. #2

    给你点赞

    靓仔3个月前 (05-26)

觉得文章有用就打赏一下弟弟吧

支付宝扫一扫打赏

微信扫一扫打赏

Vieu4.5主题
专业打造轻量级个人企业风格博客主题!专注于前端开发,全站响应式布局自适应模板。
正在播放:

作者想对您说:

累了就停下来听首歌吧

听完后会给您一个好心情

最后

等到您不容易

还希望您能多待一会儿

      00:00/00:00

      登陆仅提取账号昵称,并无其他功能。

      为了服务器更好的运行,请使用第三方账号登陆。

      授权名称使用的公安备案名称——菜鸟奋斗历程,请知悉。