配置并行度
works jvm:在一个节点可以运行多个jvm进程,一个topology可以包含一个或者多个worker并行的泡在不同的machine,所以一个work progress就是执行一个topology的子集并且一个worker只能对应一个toplogyexectors在一个worker可以包含一个或者多个tasks,但默认每个excutor只执行一个task,一个worker包含多个exectors,每个component(spout和bolt)至少对应一个executortasks(bolt/spout instance) task就是具体的处理对象,每一个spout和bolt会被当做很多task在集群里面执行,每一个task对应一个线程,而stream grouping则是定义怎么从一堆task发射tuple到另一堆task,可以调用ToplogyBuilder.setSpout和TopBuilder.setBolt来设置并行度,也就是多个task配置并行度对于并行度的配置,在storm可以在多个地方进行配置, 优先级为defaults.yaml<storm.yaml<topology-specific configuration<internal component-specific configuration<external componnet -specific configurationwork process 的数目,可以通过配置文件和代码中的配置,work就是执行进程,所以考虑并发的效果,数目至少应该大于machines数目executor数目 component的并发线程数, 只能在代码中配置通过setbolt和setspout的参数,列如 setbolt("green-bolt",new GreenBolt(),2)tasks数目,可以不配置,默认和executor1:1,也可以通过设置setNumTask()配置配置并行度Topology 的worker数通过config设置,也就是执行该toplogy 的work进程数,他可以通过strom rebalance 命令任意调整Config conf=new Config();stream Grouping ,告诉topology如何在两个组件之间发送tuple定义一个topology的其中一个定义每个bolt接收什么样的流作为输入。stream grouping 就是用来stream应该stream应该如果分配数据给bolts上面的多个tasks列如:当:boltA 的一个task要发送一个tuple给bolt B , 他应该发送storm里面有7种 stream grouping1.shuffle grouping 2.fields grouping 3.all grouping 4.global grouping 5.none grouping 6.direct grouping 7local or shuffle grouping 通过cuomstreamgrouping 接口来实现分流1.shuffle grouping 随机分组, 随机派发stream里面得tuple,保证每个bolt接收到的tuple数目大致相同splitsentence对于句子里面得每个单词发射一个新的tuple,wordconut里面维护一个单词-》次数的mapping,wordcount每接收一个单词就跟新他的统计状态RandomSentenceSpout和splitSentence之间就是用的shuffle grouping shuffle grouping则是定义怎么从一堆对每个task的tuple分配比较均匀 TopologyBuilder buider=new TopologyBuilder(); buider.setSpout(1, new RandomSentenceSpout(),5); buider.setBolt(2, new SplitSentence(),8).shuffleGrouping(); buider.setBolt(3, new WordCount(),12).FieldsGrouping(2,new Fields("words") );2.fields grouping 按字段分组,比如按user-id这个字段来分组,那么具有同样userid的tuple会被分到相同的bolts里面得一个task而不同的userid会被分到不同的tasksplitsentence 和wordcount之间使用fields grouping ,这种机制保证相同field值得tuple会同一个task,这对于wordcount很关键如果同一个单词不去同一个task,那么统计出来的次数就不对了 TopologyBuilder buider=new TopologyBuilder(); buider.setSpout(1, new TweetSpout()); buider.setSpout("signal",new SignalSpout()); buider.setBolt(2, new TweetCounter()).FieldsGrouping(1,new Fields("username")).allGrouping("sigals"); 3.all grouping 广播发送,对于每一个tuple,所有的bolts都会受到all grouping的一个常见用途就是发送信号给bolts,比如你对stream进行了一些过滤, 你必须把filter的参数传给所有的bolts,发送这些参数就可以通过all grouping 来实现这里我们为tweetcounter的所有task订阅signal,所以我们可以使用sigalspout发送不同的信号到tweetcounter这个bolt TopologyBuilder buider=new TopologyBuilder(); buider.setSpout("a", new SpoutA()); buider.setSpout("b", new SpoutB()); buider.setSpout("c", new SpoutC()); buider.setSpout("d", new SpoutD()); buider.setSpout("e", new SpoutE()).globalGrouping ("c").shuffleGrouping(); 4.global grouping 全局分组,整个stream被分配到storm中的一个bolt的其中一个task,再具体一点就是分配给id值最低的那个task比较适合并发汇总5.none grouping 不分组,这个分组的意思就是说stream不关心怎么样分组,目前这种分组和shuffle grouping是一种效果,有点不一样的是storm会使用none grouping的这一个bolt放到这个bolt的订阅者同一个线程里面去执行6直接分组消息发送者指定消息接收者的哪个task处理这个消息,只有被表名direct stream的消息流可以处理这个方法消息处理者可以通过topologyContext来获取处理它消息的task idpublic void declareOutputFieleds(OutputFieldsDeclarer declare){ declare.declareStream("directStream",true,new Fields("fields"));}public void prepare(Map stormConf,TopologyContext context,OutputCollector collector){ this.numOfTasks=context.getComponentTasks("my-stream"); this.collector=collector;}public void execute(Tuple input ){ collector.emitDirect(new Random().nextInt(3),process(input));}7local or shuffle groupingcustom grouping实现customStreamGrouping接口来自定义分组public class CategoryGrouping implements CustomStreamGrouping,Serializable { // Mapping of category to integer values for groupingprivate static final Map<String, Integer> categories =ImmutableMap.of("Financial", 0,"Medical", 1,"FMCG", 2,"Electronics", 3);// number of tasks, this is initialized in prepare methodprivate int tasks = 0;public void prepare(WorkerTopologyContext context,GlobalStreamId stream, List<Integer> targetTasks){ // initialize the number of taskstasks = targetTasks.size();}public List<Integer> chooseTasks(int taskId, List<Object>values) { // return the taskId for a given categoryString category = (String) values.get(0);return ImmutableList.of(categories.get(category) % tasks);}}builder.setSpout("a", new SpoutA());builder.setBolt("b", (IRichBolt)new BoltB()).customGrouping("a", new CategoryGrouping()); 消息可靠处理机制 在什么情况下人为一个spout发送出来的消息被完全处理答案是下面的条件同时被满足 1.tuple tree不再增长 2.树中的任何消息被标识为已处理 如果在指定时间内,一个消息被衍生出来的tuple tree 未被完全处理成功,则人为该消息未被完成这个超时值可以通过任务级参数Config.Toplogy_message_timeout_sec进行配置,默认30秒消息的生命周期spout应该实现的接口Ispout extends serialble假设我们从kestrel读取消息,spout会将这个消息设置ID作为消息的message id向spoutOutputCollector接下来,这些消息会被发送到后续处理的bolts,并且storm会跟踪由此消息衍生出来的新消息,当检测到一个消息产生出来的tuple tree被完整处理后,并将此消息的messageid作为参数传入,同理消息如果超时就调用fail可靠性相关api当在tuple tree创建新节点, 要通知storm当处理完一个单独消息,需要告诉storm这个消息树变化情况为tuple tree增加一个节点,我们称之为峁定package spouts;import java.util.Map;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichBolt;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;public class SplitSentence extends BaseRichBolt { private OutputCollector collector; public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector=collector; } public void execute(Tuple tuple) { String sentence=tuple.getString(0); for(String word:sentence.split(" ")){ collector.emit(tuple, new Values(word)); } collector.ack(tuple); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); }}多重铆钉 List<Tuple> archor=new LinkedList<Tuple>(); archor.add(Tuple1); archor.add(Tuple2); collector.emit(archor, new Values(1,2,3));或者以下功能一样,但是BasicOutputCollector会被自动锚定到输入消息,当exexute执行完消息会自动的应答输入消息package spouts;import backtype.storm.topology.BasicOutputCollector;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseBasicBolt;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;public class SplitSentence2 extends BaseBasicBolt { public void execute(Tuple input, BasicOutputCollector collector) { String sentence=input.getString(0); for(String word:sentence.split(" ")){ collector.emit( new Values(word)); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); }}调整可靠性将参数config.Topology_ackers设置为0,通过此方法,当spout发送此消息的时候,他的ack会被立即调用2.发送消息时,可以不指定消息messageid,当需要关闭可靠性的时候,可以使用此 方法3.如果你不介意衍生出来的子孙消息可靠性,则派生出来出来的消息在发送时候不要铆钉,及在emit中不指定消息,因为这些消息没有锚定到任何tuple tree中,他们的失败不会引起任何spout重新发送消息