storm数据累加

首页 » Storm » storm数据累加

最小处理单元:Tuple

以单一流stream的方式把Tuple推送过去。

如果是多个流推送,需要对每一个流进行设置streamid。

spout中最核心的方法是nextTuple,该方法会被storm线程不断调用、主动从数据源拉去数据,再通过emit方法将数据生成元组(Tuple)发送给之后的bolt计算。并且需要在declare方法声明定义的不同数据流

 

storm数据累加
storm计算模型

一、配置spout类

package storm;

import java.util.List;
import java.util.Map;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;

public class wcspout extends BaseRichSpout{

 Map map;
 TopologyContext context;
 SpoutOutputCollector collector;
 int i=0;
 /**
  * 配置初始化spout类
  */
 @Override
 public void open(Map map, TopologyContext context, SpoutOutputCollector collector) {
  // TODO Auto-generated method stub
  this.map=map;
  this.context=context;
  this.collector=collector;
 }
 /**
  * 采集并向后推送数据
  */
 @Override
 public void nextTuple() {
  // TODO Auto-generated method stub
  i++;
  List num=new Values(i);
  this.collector.emit(num);
  System.err.println("发:"+i);
  Utils.sleep(1000);
 }

 /*
  * 向接受数据的逻辑处理单元声明发送数据的字段名称
  */
 @Override
 public void declareOutputFields(OutputFieldsDeclarer arg0) {
  // TODO Auto-generated method stub
  arg0.declare(new Fields("num"));
 }

}

二、配置bolt类

package storm;

import java.util.Map;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;

public class wsbolt extends BaseRichBolt{

 Map stormConf;
 TopologyContext context;
 OutputCollector collector;
 int sum=0;
 @Override
 public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
  // TODO Auto-generated method stub
  this.stormConf=stormConf;
  this.context=context;
  this.collector=collector;
 }

 /*
  * 获取数据(也可能会继续发送数据
  */
 @Override
 public void execute(Tuple input) {
  // TODO Auto-generated method stub
  int num=input.getIntegerByField("num");//声明的num
  sum+=num;
  System.err.println("接受:"+sum);
 }

 @Override
 public void declareOutputFields(OutputFieldsDeclarer declarer) {
  // TODO Auto-generated method stub
  
 }

}

三、主方法

package storm;

import java.util.Map;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;

public class test {
public static void main(String[] args) {
 //建立拓扑结构
 TopologyBuilder tb=new TopologyBuilder();
 tb.setSpout("wcspout", new wcspout());
 tb.setBolt("wsbolt", new wsbolt()).shuffleGrouping("wcspout");
 //创建本地storm集群
 LocalCluster lc=new LocalCluster();
 //将任务布置到集群中运行
 lc.submitTopology("wordsum", new Config(), tb.createTopology());
}
}

 

分享到:
赞(0) 打赏

评论 抢沙发

评论前必须登录!

 



觉得文章有用就打赏一下弟弟吧

支付宝扫一扫打赏

微信扫一扫打赏

Vieu4.5主题
专业打造轻量级个人企业风格博客主题!专注于前端开发,全站响应式布局自适应模板。
正在播放:

作者想对您说:

累了就停下来听首歌吧

听完后会给您一个好心情

最后

等到您不容易

还希望您能多待一会儿

      00:00/00:00