spout
import java.io.BufferedReader; import java.io.FileInputStream; import java.io.InputStreamReader; import java.util.Map; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichSpout; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; public class MySpout implements IRichSpout{ private static final long serialVersionUID = 1L; int index = 0; FileInputStream fis; InputStreamReader isr; BufferedReader br; SpoutOutputCollector collector = null; String str = null; @Override public void nextTuple() { try { if ((str = this.br.readLine()) != null) { //标示 index++; //当前tuple的唯一标识,必须有 collector.emit(new Values(str), index); // collector.emit(new Values(str)); } } catch (Exception e) { } } @Override public void close() { try { br.close(); isr.close(); fis.close(); } catch (Exception e) { e.printStackTrace(); } } @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { try { this.collector = collector; this.fis = new FileInputStream("track.log");//输入流 this.isr = new InputStreamReader(fis, "UTF-8");//转换流 this.br = new BufferedReader(isr);//缓冲区流 } catch (Exception e) { e.printStackTrace(); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("log")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } /** * 信息完整传递所执行的方法 */ @Override public void ack(Object msgId) { System.err.println(" [" + Thread.currentThread().getName() + "] "+ " spout ack:"+msgId.toString()); } @Override public void activate() { } @Override public void deactivate() { } /** * 信息没有完整传递 */ @Override public void fail(Object msgId) { System.err.println(" [" + Thread.currentThread().getName() + "] "+ " spout fail:"+msgId.toString()); } }
bolt
import java.util.Map; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; public class MyBolt implements IRichBolt { private static final long serialVersionUID = 1L; OutputCollector collector = null; @Override public void cleanup() { } int num = 0; String valueString = null; @Override public void execute(Tuple input) { try { valueString = input.getStringByField("log") ; if(valueString != null) { num ++ ; System.err.println(Thread.currentThread().getName()+" lines :"+num +" session_id:"+valueString.split("\t")[1]); } collector.emit(input, new Values(valueString)); // collector.emit(new Values(valueString)); collector.ack(input);//成功,回调MySpout的ack方法 Thread.sleep(2000); } catch (Exception e) { collector.fail(input);//失败, e.printStackTrace(); } } @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector ; } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("session_id")) ; } @Override public Map<String, Object> getComponentConfiguration() { return null; } }
Main
import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.generated.AlreadyAliveException; import backtype.storm.generated.InvalidTopologyException; import backtype.storm.topology.TopologyBuilder; public class Main { public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new MySpout(), 1); builder.setBolt("bolt", new MyBolt(), 2).shuffleGrouping("spout"); // Map conf = new HashMap(); // conf.put(Config.TOPOLOGY_WORKERS, 4); Config conf = new Config() ; conf.setDebug(true); conf.setMessageTimeoutSecs(conf, 100); conf.setNumAckers(4); if (args.length > 0) { try { StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } catch (AlreadyAliveException e) { e.printStackTrace(); } catch (InvalidTopologyException e) { e.printStackTrace(); } }else { LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("mytopology", conf, builder.createTopology()); } } }
我也是小白以后多多交流
给你点赞