最小处理单元:Tuple
以单一流stream的方式把Tuple推送过去。
如果是多个流推送,需要对每一个流进行设置streamid。
spout中最核心的方法是nextTuple,该方法会被storm线程不断调用、主动从数据源拉去数据,再通过emit方法将数据生成元组(Tuple)发送给之后的bolt计算。并且需要在declare方法声明定义的不同数据流

一、配置spout类
package storm; import java.util.List; import java.util.Map; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.utils.Utils; public class wcspout extends BaseRichSpout{ Map map; TopologyContext context; SpoutOutputCollector collector; int i=0; /** * 配置初始化spout类 */ @Override public void open(Map map, TopologyContext context, SpoutOutputCollector collector) { // TODO Auto-generated method stub this.map=map; this.context=context; this.collector=collector; } /** * 采集并向后推送数据 */ @Override public void nextTuple() { // TODO Auto-generated method stub i++; List num=new Values(i); this.collector.emit(num); System.err.println("发:"+i); Utils.sleep(1000); } /* * 向接受数据的逻辑处理单元声明发送数据的字段名称 */ @Override public void declareOutputFields(OutputFieldsDeclarer arg0) { // TODO Auto-generated method stub arg0.declare(new Fields("num")); } }
二、配置bolt类
package storm; import java.util.Map; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Tuple; public class wsbolt extends BaseRichBolt{ Map stormConf; TopologyContext context; OutputCollector collector; int sum=0; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { // TODO Auto-generated method stub this.stormConf=stormConf; this.context=context; this.collector=collector; } /* * 获取数据(也可能会继续发送数据 */ @Override public void execute(Tuple input) { // TODO Auto-generated method stub int num=input.getIntegerByField("num");//声明的num sum+=num; System.err.println("接受:"+sum); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub } }
三、主方法
package storm; import java.util.Map; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.topology.TopologyBuilder; public class test { public static void main(String[] args) { //建立拓扑结构 TopologyBuilder tb=new TopologyBuilder(); tb.setSpout("wcspout", new wcspout()); tb.setBolt("wsbolt", new wsbolt()).shuffleGrouping("wcspout"); //创建本地storm集群 LocalCluster lc=new LocalCluster(); //将任务布置到集群中运行 lc.submitTopology("wordsum", new Config(), tb.createTopology()); } }
评论 抢沙发