storm wordcount

首页 » Storm » storm wordcount

storm分发策略

随机分发(分组),随机派发stream里面的tuple,保证每个bolt接收到的tuple数目大致相同。

按字段分发(分组),比如按hello这个字段来分组,那么具有同样”hello”的tuple就会被分到相同的bolt里面一个task任务中,而不同的”hello”则可能会被分配到不同的task任务中。

spout

package wordcount;

import java.util.List;
import java.util.Map;
import java.util.Random;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;

public class wcspout extends BaseRichSpout{

 SpoutOutputCollector collector;
 //测试数据
 String[]test= {
   "hello zha long",
   "zha nihao hello",
   "wang nihao long"
 };
 Random r = new Random();
 @Override
 public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
  // TODO Auto-generated method stub
  this.collector=collector;
 }

 /*
  * 随机向后发送每一行字符串
  */
 @Override
 public void nextTuple() {
  // TODO Auto-generated method stub
  List line=new Values(test[r.nextInt(test.length)]);
  this.collector.emit(line);
  System.err.println("spout emit---------------"+line);
  Utils.sleep(1000);
 }

 @Override
 public void declareOutputFields(OutputFieldsDeclarer declarer) {
  // TODO Auto-generated method stub
  declarer.declare(new Fields("line"));
 }

}

切分splitbolt

package wordcount;

import java.util.List;
import java.util.Map;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class wssplitbolt extends BaseRichBolt{

 OutputCollector collector;
 @Override
 public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
  // TODO Auto-generated method stub
  this.collector=collector;
 }

 /**
  * 获取每一行数据,并切分
  */
 @Override
 public void execute(Tuple input) {
  // TODO Auto-generated method stub
  String line=input.getString(0);
  //切割
  String[] words=line.split(" ");
  for(String word:words) {
   List w=new Values(word);
   this.collector.emit(w);
  }
 }

 @Override
 public void declareOutputFields(OutputFieldsDeclarer declarer) {
  // TODO Auto-generated method stub
  declarer.declare(new Fields("w"));
 }
}

bolt

package wordcount;

import java.util.HashMap;
import java.util.Map;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;

public class wcbolt extends BaseRichBolt{

 Map map=new HashMap<String,Integer>();//key 出现的单词,value 出现的次数
 OutputCollector collector;
 @Override
 public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
  // TODO Auto-generated method stub
 }

 /**
  * 获取tuple中的每一个单词,并且按照单词统计输出出现的次数
  */
 @Override
 public void execute(Tuple input) {
  // TODO Auto-generated method stub
  //获取单词
  String word=input.getStringByField("w");
  int count=1;
  //如果map中已经存在该单词,则该单词的数量+1,否则将该单词放入map中
  if(map.containsKey(word)) {
   count=(int)map.get(word)+1;
  }
  map.put(word, count);
  
  System.err.println(word+"--------------------"+count);
 }

 @Override
 public void declareOutputFields(OutputFieldsDeclarer declarer) {
  // TODO Auto-generated method stub
  
 }

}

主方法

package wordcount;

import java.util.Map;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;

public class test {
public static void main(String[] args) {
 //建立拓扑结构
 TopologyBuilder tb=new TopologyBuilder();
 tb.setSpout("wcspout", new wcspout());//一个线程
 /*
  * 随机分发,随机派发stream里面的tuple,保证每个bolt接收到的tuple数目大致相同,
  * 轮询,平均分配
  */
 tb.setBolt("wssplitbolt", new wssplitbolt()).shuffleGrouping("wcspout");//第二个线程
 /*
  * 按字段分组,比如按hello这个字段来分组,那么具有同样"hello"的tuple就会被分到相同的bolt里面一个task任务中,
  * 而不同的"hello"则可能会被分配到不同的task任务中.
  */
 //再加上3个线程,3个bolt,一种单词只进入一个bolt中。
 tb.setBolt("wcbolt", new wcbolt(),3 ).fieldsGrouping("wssplitbolt", new Fields("w"));
 //当并行度不是3,而是1,可以直接这样
 //	tb.setBolt("wcbolt", new wcbolt()).shuffleGrouping("wssplitbolt");
 //创建本地storm集群
 LocalCluster lc=new LocalCluster();
 //将任务布置到集群中运行
 lc.submitTopology("wordcount", new Config(), tb.createTopology());
}
}

 

分享到:
赞(0) 打赏

评论 2

评论前必须登录!

 

  1. #1

    给你点赞

    沥青3个月前 (05-26)
  2. #2

    挺明白的

    hello3个月前 (05-26)

觉得文章有用就打赏一下弟弟吧

支付宝扫一扫打赏

微信扫一扫打赏

Vieu4.5主题
专业打造轻量级个人企业风格博客主题!专注于前端开发,全站响应式布局自适应模板。
正在播放:

作者想对您说:

累了就停下来听首歌吧

听完后会给您一个好心情

最后

等到您不容易

还希望您能多待一会儿

      00:00/00:00

      登陆仅提取账号昵称,并无其他功能。

      为了服务器更好的运行,请使用第三方账号登陆。

      授权名称使用的公安备案名称——菜鸟奋斗历程,请知悉。