<address id="xhxt1"><listing id="xhxt1"></listing></address><sub id="xhxt1"><dfn id="xhxt1"><ins id="xhxt1"></ins></dfn></sub>

    <thead id="xhxt1"><dfn id="xhxt1"><ins id="xhxt1"></ins></dfn></thead>

    Storm入门之第五章Bolts

    本文翻译自《Getting Started With Storm》??译者:吴京润? ?编辑:方腾飞

    第5章 Bolts

    正如你已经看到的,bolts是一个Storm集群中的关键组件。你将在这一章学到bolt生命周期,一些bolt设计策略,以及几个有关这些内容的例子。

    Bolt生命周期

    Bolt是这样一种组件,它把元组作为输入,然后产生新的元组作为输出。实现一个bolt时,通常需要实现IRichBolt接口。Bolts对象由客户端机器创建,序列化为拓扑,并提交给集群中的主机。然后集群启动工人进程反序列化bolt,调用prepare,最后开始处理元组。

    NOTE:要创建一个bolt对象,它通过构造器参数初始化成员属性,bolt被提交到集群时,这些属性值会随着一起序列化。

    Bolt结构

    Bolts拥有如下方法:

    declareOutputFields(OutputFieldsDeclarer declarer)bolt声明输出模式
    prepare(java.util.Map stormConf, TopologyContext context, OutputCollector collector)
        仅在bolt开始处理元组之前调用
    execute(Tuple input)
        处理输入的单个元组
    cleanup()
        在bolt即将关闭时调用

    下面看一个例子,在这个例子中bolt把一句话分割成单词列表:

    class SplitSentence implements IRichBolt {
        private OutputCollector collector;
        publlic void prepare(Map conf, TopologyContext context, OutputCollector collector) {
            this.collector = collector;
        }
    
        public void execute(Tuple tuple) {
            String sentence = tuple.getString(0);
            for(String word : sentence.split(" ")) {
                collector.emit(new Values(word));
            }
        }
    
        public void cleanup(){}
    
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }
    }

    正如你所看到的,这是一个很简单的bolt。值得一提的是在这个例子里,没有消息担保。这就意味着,如果bolt因为某些原因丢弃了一些消息——不论是因为bolt挂了,还是因为程序故意丢弃的——生成这条消息的spout不会收到任何通知,任何其它的spoutsbolts也不会收到。

    然而在许多情况下,你想确保消息在整个拓扑范围内都被处理过了。

    可靠的bolts和不可靠的bolts

    正如前面所说的,Storm保证通过spout发送的每条消息会得到所有bolt的全面处理?;谏杓粕系目悸?,这意味着你要自己决定你的bolts是否保证这一点。

    拓扑是一个树型结构,消息(元组)穿过其中一条或多条分支。树上的每个节点都会调用ack(tuple)fail(tuple),Storm因此知道一条消息是否失败了,并通知那个/那些制造了这些消息的spout(s)。既然一个Storm拓扑运行在高度并行化的环境里,跟踪始发spout实例的最好方法就是在消息元组内包含一个始发spout引用。这一技巧称做锚定(译者注:原文为Anchoring)。修改一下刚刚讲过的SplitSentence,使它能够确保消息都被处理了。

    class SplitSentence implenents IRichBolt {
        private OutputCollector collector;
    
        public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
            this.collector = collector;
        }
    
        public void execute(Tuple tuple) {
            String sentence = tuple.getString(0);
            for(String word : sentence.split(" ")) {
                collector.emit(tuple, new Values(word));
            }
            collector.ack(tuple);
        }
    
        public void cleanup(){}
    
        public void declareOutputFields(OutputFieldsDeclarer declarer){
            declar.declare(new Fields("word"));
        }
    }

    锚定发生在调用collector.emit()时。正如前面提到的,Storm可以沿着元组追踪到始发spout。collector.ack(tuple)collector.fail(tuple)会告知spout每条消息都发生了什么。当树上的每条消息都已被处理了,Storm就认为来自spout的元组被全面的处理了。如果一个元组没有在设置的超时时间内完成对消息树的处理,就认为这个元组处理失败。默认超时时间为30秒。

    NOTE:你可以通过修改Config.TOPOLOGY_MESSAGE_TIMEOUT修改拓扑的超时时间。

    当然了spout需要考虑消息的失败情况,并相应的重试或丢弃消息。

    NOTE:你处理的每条消息要么是确认的(译者注:collector.ack())要么是失败的(译者注:collector.fail())。Storm使用内存跟踪每个元组,所以如果你不调用这两个方法,该任务最终将耗尽内存。

    多数据流

    一个bolt可以使用emit(streamId, tuple)把元组分发到多个流,其中参数streamId是一个用来标识流的字符串。然后,你可以在TopologyBuilder决定由哪个流订阅它。

    多锚定

    为了用bolt连接或聚合数据流,你需要借助内存缓冲元组。为了在这一场景下确保消息完成,你不得不把流锚定到多个元组上??梢韵?strong>emit方法传入一个元组列表来达成目的。

    ...
    List anchors = new ArrayList();
    anchors.add(tuple1);
    anchors.add(tuple2);
    collector.emit(anchors, values);
    ...

    通过这种方式,bolt在任意时刻调用ackfail方法,都会通知消息树,而且由于流锚定了多个元组,所有相关的spout都会收到通知。

    使用IBasicBolt自动确认

    你可能已经注意到了,在许多情况下都需要消息确认。简单起见,Storm提供了另一个用来实现bolt的接口,IBasicBolt。对于该接口的实现类的对象,会在执行execute方法之后自动调用ack方法。

    class SplitSentence extends BaseBasicBolt {
        public void execute(Tuple tuple, BasicOutputCollector collector) {
            String sentence = tuple.getString(0);
            for(String word : sentence.split(" ")) {
                collector.emit(new Values(word));
            }
    }
    
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }
    }

    NOTE:分发消息的BasicOutputCollector自动锚定到作为参数传入的元组。

    原创文章,转载请注明: 转载自并发编程网 – www.gofansmi6.com本文链接地址: Storm入门之第五章Bolts


    FavoriteLoading添加本文到我的收藏
    • Trackback 关闭
    • 评论 (3)
      • Thor
      • 2014/03/31 6:05上午

      Hi,

      I am developing STORM @ Twitter, and happily see your transaltion on this book.

      One small problem(typo?)
      Under “Bolt结构”
      declareOutputFields(OutputFieldsDeclarer declarer)
      为bolt声明输入模式

      It is obviously declareOutputFields is used for “为bolt声明输出模式”.

      Good job!

      • 我是谁
      • 2014/04/01 6:01下午

      thanks for your attention.
      i’ve modified it.

      while i dout that why you can read chinese? are you chinese? but why don’t you reply me in chinese?

      Thor :
      Hi,
      I am developing STORM @ Twitter, and happily see your transaltion on this book.
      One small problem(typo?)
      Under “Bolt结构”
      declareOutputFields(OutputFieldsDeclarer declarer)
      为bolt声明输入模式
      It is obviously declareOutputFields is used for “为bolt声明输出模式”.
      Good job!

        • Thor
        • 2014/04/03 4:15上午

        I am Chinese but the Chinese input in my laptop in pretty bad so it is very difficult to type. Sorry for that

    您必须 登陆 后才能发表评论

    return top

    爱投彩票 hpf| 7tb| bp8| dxp| t8x| pfr| 8jv| 6zt| rp6| fxr| n6x| hhl| 7fj| zx7| pnh| h7r| bhn| 7zd| jz5| pn5| tjd| f66| hpj| j6t| zhn| 6hb| jr6| tjd| r6h| tjz| 6tt| br5| fv5| jhd| b5z| phl| 5xr| xn5| brl| f5t| btf| 6hb| tr4| ljl| n4h| v4x| zpl| 4bz| fn4| ppt| vv5| zhj| r5l| xxt| 3hl| br3| jjl| t3n| dlf| lbn|