<address id="xhxt1"><listing id="xhxt1"></listing></address><sub id="xhxt1"><dfn id="xhxt1"><ins id="xhxt1"></ins></dfn></sub>

    <thead id="xhxt1"><dfn id="xhxt1"><ins id="xhxt1"></ins></dfn></thead>

    Storm入门之第三章拓扑

    本文翻译自《Getting Started With Storm》??译者:吴京润? ?编辑:方腾飞

    在这一章,你将学到如何在同一个Storm拓扑结构内的不同组件之间传递元组,以及如何向一个运行中的Storm集群发布一个拓扑。

    数据流组

    设计一个拓扑时,你要做的最重要的事情之一就是定义如何在各组件之间交换数据(数据流是如何被bolts消费的)。一个数据流组指定了每个bolt会消费哪些数据流,以及如何消费它们。

    NOTE:一个节点能够发布一个以上的数据流,一个数据流组允许我们选择接收哪个。

    数据流组在定义拓扑时设置,就像我们在第二章看到的:

    ···
        builder.setBolt("word-normalizer", new WordNormalizer())
               .shuffleGrouping("word-reader");
    ···

    在前面的代码块里,一个boltTopologyBuilder对象设定, 然后使用随机数据流组指定数据源。数据流组通常将数据源组件的ID作为参数,取决于数据流组的类型不同还有其它可选参数。

    NOTE:每个InputDeclarer可以有一个以上的数据源,而且每个数据源可以分到不同的组。

    随机数据流组

    随机流组是最常用的数据流组。它只有一个参数(数据源组件),并且数据源会向随机选择的bolt发送元组,保证每个消费者收到近似数量的元组。

    随机数据流组用于数学计算这样的原子操作。然而,如果操作不能被随机分配,就像第二章为单词计数的例子,你就要考虑其它分组方式了。

    域数据流组

    域数据流组允许你基于元组的一个或多个域控制如何把元组发送给bolts。它保证拥有相同域组合的值集发送给同一个bolt?;氐降ゴ始剖鞯睦?,如果你用word域为数据流分组,word-normalizer?bolt将只会把相同单词的元组发送给同一个word-counterbolt实例。

    ···
        builder.setBolt("word-counter", new WordCounter(),2)
               .fieldsGrouping("word-normalizer", new Fields("word"));
    ···

    NOTE:?在域数据流组中的所有域集合必须存在于数据源的域声明中。

    全部数据流组

    全部数据流组,为每个接收数据的实例复制一份元组副本。这种分组方式用于向bolts发送信号。比如,你要刷新缓存,你可以向所有的bolts发送一个刷新缓存信号。在单词计数器的例子里,你可以使用一个全部数据流组,添加清除计数器缓存的功能(见拓扑示例

        public void execute(Tuple input) {
            String str = null;
            try{
                if(input.getSourceStreamId().equals("signals")){
                    str = input.getStringByField("action");
                    if("refreshCache".equals(str))
                        counters.clear();
                }
            }catch (IllegalArgumentException e){
                //什么也不做
            }
            ···
        }

    我们添加了一个if分支,用来检查源数据流。Storm允许我们声明具名数据流(如果你不把元组发送到一个具名数据流,默认发送到名为”default“的数据流)。这是一个识别元组的极好的方式,就像这个例子中,我们想识别signals一样。 在拓扑定义中,你要向word-counter?bolt添加第二个数据流,用来接收从signals-spout数据流发送到所有bolt实例的每一个元组。

        builder.setBolt("word-counter", new WordCounter(),2)
               .fieldsGroupint("word-normalizer",new Fields("word"))
               .allGrouping("signals-spout","signals");

    signals-spout的实现请参考git仓库。

    自定义数据流组

    你可以通过实现backtype.storm.grouping.CustormStreamGrouping接口创建自定义数据流组,让你自己决定哪些bolt接收哪些元组。

    让我们修改单词计数器示例,使首字母相同的单词由同一个bolt接收。

        public class ModuleGrouping mplents CustormStreamGrouping, Serializable{
            int numTasks = 0;
    
            @Override
            public List<Integer> chooseTasks(List<Object> values) {
                List<Integer> boltIds = new ArrayList<Integer>();
                if(values.size()>0){
                    String str = values.get(0).toString();
                    if(str.isEmpty()){
                        boltIds.add(0);
                    }else{
                        boltIds.add(str.charAt(0) % numTasks);
                    }
                }
                return boltIds;
            }
    
            @Override
            public void prepare(TopologyContext context, Fields outFields, List<Integer> targetTasks) {
                numTasks = targetTasks.size();
            }
        }

    这是一个CustomStreamGrouping的简单实现,在这里我们采用单词首字母字符的整数值与任务数的余数,决定接收元组的bolt。

    按下述方式word-normalizer修改即可使用这个自定义数据流组。

        builder.setBolt("word-normalizer", new WordNormalizer())
               .customGrouping("word-reader", new ModuleGrouping());

    直接数据流组

    这是一个特殊的数据流组,数据源可以用它决定哪个组件接收元组。与前面的例子类似,数据源将根据单词首字母决定由哪个bolt接收元组。要使用直接数据流组,在WordNormalizer?bolt中,使用emitDirect方法代替emit。

        public void execute(Tuple input) {
            ...
            for(String word : words){
                if(!word.isEmpty()){
                    ...
                    collector.emitDirect(getWordCountIndex(word),new Values(word));
                }
            }
            //对元组做出应答
            collector.ack(input);
        }
    
        public Integer getWordCountIndex(String word) {
            word = word.trim().toUpperCase();
            if(word.isEmpty()){
                return 0;
            }else{
                return word.charAt(0) % numCounterTasks;
            }
        }

    prepare方法中计算任务数

        public void prepare(Map stormConf, TopologyContext context, 
                    OutputCollector collector) {
            this.collector = collector;
            this.numCounterTasks = context.getComponentTasks("word-counter");
        }

    在拓扑定义中指定数据流将被直接分组:

        builder.setBolt("word-counter", new WordCounter(),2)
               .directGrouping("word-normalizer");

    全局数据流组

    全局数据流组把所有数据源创建的元组发送给单一目标实例(即拥有最低ID的任务)。

    不分组

    写作本书时(Stom0.7.1版),这个数据流组相当于随机数据流组。也就是说,使用这个数据流组时,并不关心数据流是如何分组的。

    LocalCluster VS StormSubmitter

    到目前为止,你已经用一个叫做LocalCluster的工具在你的本地机器上运行了一个拓扑。Storm的基础工具,使你能够在自己的计算机上方便的运行和调试不同的拓扑。但是你怎么把自己的拓扑提交给运行中的Storm集群呢?Storm有一个有趣的功能,在一个真实的集群上运行自己的拓扑是很容易的事情。要实现这一点,你需要把LocalCluster换成StormSubmitter并实现submitTopology方法, 它负责把拓扑发送给集群。

    下面是修改后的代码:

        //LocalCluster cluster = new LocalCluster();
        //cluster.submitTopology("Count-Word-Topology-With-Refresh-Cache", conf, 
        //builder.createTopology());
        StormSubmitter.submitTopology("Count-Word-Topology-With_Refresh-Cache", conf,
                builder.createTopology());
        //Thread.sleep(1000);
        //cluster.shutdown();

    NOTE:?当你使用StormSubmitter时,你就不能像使用LocalCluster时一样通过代码控制集群了。

    接下来,把源码压缩成一个jar包,运行Storm客户端命令,把拓扑提交给集群。如果你已经使用了Maven, 你只需要在命令行进入源码目录运行:mvn package。

    现在你生成了一个jar包,使用storm jar命令提交拓扑(关于如何安装Storm客户端请参考附录A)。命令格式:storm jar allmycode.jar org.me.MyTopology arg1 arg2 arg3。

    对于这个例子,在拓扑工程目录下面运行:

    storm jar target/Topologies-0.0.1-SNAPSHOT.jar countword.TopologyMain src/main/resources/words.txt
    

    通过这些命令,你就把拓扑发布集群上了。

    如果想停止或杀死它,运行:

    storm kill Count-Word-Topology-With-Refresh-Cache
    

    NOTE:拓扑名称必须保证惟一性。

    NOTE:如何安装Storm客户端,参考附录A

    DRPC拓扑

    有一种特殊的拓扑类型叫做分布式远程过程调用(DRPC),它利用Storm的分布式特性执行远程过程调用(RPC)(见下图)。Storm提供了一些用来实现DRPC的工具。第一个是DRPC服务器,它就像是客户端和Storm拓扑之间的连接器,作为拓扑的spout的数据源。它接收一个待执行的函数和函数参数,然后对于函数操作的每一个数据块,这个服务器都会通过拓扑分配一个请求ID用来识别RPC请求。拓扑执行最后的bolt时,它必须分配RPC请求ID和结果,使DRPC服务器把结果返回正确的客户端。

    NOTE:单实例DRPC服务器能够执行许多函数。每个函数由一个惟一的名称标识。

    Storm提供的第二个工具(已在例子中用过)是LineDRPCTopologyBuilder,一个辅助构建DRPC拓扑的抽象概念。生成的拓扑创建DRPCSpouts——它连接到DRPC服务器并向拓扑的其它部分分发数据——并包装bolts,使结果从最后一个bolt返回。依次执行所有添加到LinearDRPCTopologyBuilder对象的bolts。

    作为这种类型的拓扑的一个例子,我们创建了一个执行加法运算的进程。虽然这是一个简单的例子,但是这个概念可以扩展到复杂的分布式计算。

    bolt按下面的方式声明输出:

        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("id","result"));
        }

    因为这是拓扑中惟一的bolt,它必须发布RPC ID和结果。execute方法负责执行加法运算。

        public void execute(Tuple input) {
            String[] numbers = input.getString(1).split("\\+");
            Integer added = 0;
            if(numbers.length<2){
                throw new InvalidParameterException("Should be at least 2 numbers");
            }
            for(String num : numbers){
                added += Integer.parseInt(num);
            }
            collector.emit(new Values(input.getValue(0),added));
        }

    包含加法bolt的拓扑定义如下:

        public static void main(String[] args) {
            LocalDRPC drpc = new LocalDRPC();
    
            LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("add");
            builder.addBolt(AdderBolt(),2);
    
            Config conf = new Config();
            conf.setDebug(true);
    
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("drpcder-topology", conf,
                builder.createLocalTopology(drpc));
            String result = drpc.execute("add", "1+-1");
            checkResult(result,0);
            result = drpc.execute("add", "1+1+5+10");
            checkResult(result,17);
    
            cluster.shutdown();
            drpc.shutdown();
        }

    创建一个LocalDRPC对象在本地运行DRPC服务器。接下来,创建一个拓扑构建器(译者注:LineDRpctopologyBuilder对象),把bolt添加到拓扑。运行DRPC对象(LocalDRPC对象)的execute方法测试拓扑。

    NOTE:使用DRPCClient类连接远程DRPC服务器。DRPC服务器暴露了Thrift API,因此可以跨语言编程;并且不论是在本地还是在远程运行DRPC服务器,它们的API都是相同的。 对于采用Storm配置的DRPC配置参数的Storm集群,调用构建器对象的createRemoteTopology向Storm集群提交一个拓扑,而不是调用createLocalTopology。

    原创文章,转载请注明: 转载自并发编程网 – www.gofansmi6.com本文链接地址: Storm入门之第三章拓扑


    FavoriteLoading添加本文到我的收藏
    • Trackback 关闭
    • 评论 (2)
      • charliex
      • 2015/06/09 2:01下午

      请问一个测试的spout,如果执行完了nextTuple(比如只在里面emit一个字符串别的什么也不干),是不是worker就停了???为啥我提交jar包到nimbus(都在同一台机器上),为啥storm list的时候,只显示有一个topo,没有一个worker?

      • charliex
      • 2015/06/09 4:34下午

      已经解决了谢谢。这个文档也写的太不准确,要写写全啊。发布到集群要启动nimbus和supervisor的。??拥?,还是看官方文档吧

    您必须 登陆 后才能发表评论

    return top

    爱投彩票 k3g| aks| 4sg| cko| y4q| sum| ig2| ua2| qwy| gga| ucg| 3yy| 3oc| mm3| qg1| cas| kii| 2um| e2i| o2w| qyc| q33| ou1| mcm| qoo| e1o| kq1| ca2| oo2| ywg| cyy| kya| 0mo| kg1| aik| 1eg| sq1| gw9| wkw| w9q| qg0| wwy| e0i| ayq| 0sc| c8s| wcu| ywu| m9q| y9e| qgg| 9qu| yye| k8k| e8m| em8| wmg| yw9| gws| ui9|