<address id="xhxt1"><listing id="xhxt1"></listing></address><sub id="xhxt1"><dfn id="xhxt1"><ins id="xhxt1"></ins></dfn></sub>

    <thead id="xhxt1"><dfn id="xhxt1"><ins id="xhxt1"></ins></dfn></thead>

    Storm入门之第7章使用非JVM语言开发

    本文翻译自《Getting Started With Storm》译者:吴京润 ? ?编辑:郭蕾 方腾飞

    有时候你可能想使用不是基于JVM的语言开发一个Storm工程,你可能更喜欢使用别的语言或者想使用用某种语言编写的库。

    Storm是用Java实现的,你看到的所有这本书中的spoutbolt都是用java编写的。那么有可能使用像Python、Ruby、或者JavaScript这样的语言编写spoutbolt吗?答案是当然

     

    可以!可以使用多语言协议达到这一目的。

    多语言协议是Storm实现的一种特殊的协议,它使用标准输入输出作为spoutbolt进程间的通讯通道。消息以JSON格式或纯文本格式在通道中传递。

    我们看一个用非JVM语言开发spoutbolt的简单例子。在这个例子中有一个spout产生从1到10,000的数字,一个bolt过滤素数,二者都用PHP实现。

    NOTE:?在这个例子中,我们使用一个很笨的办法验证素数。有更好当然也更复杂的方法,它们已经超出了这个例子的范围。

    有一个专门为Storm实现的PHP DSL(译者注:领域特定语言),我们将会在例子中展示我们的实现。首先定义拓扑。

    ...
    TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout("numbers-generator", new NumberGeneratorSpout(1, 10000));
    builder.setBolt("prime-numbers-filter", new
    PrimeNumbersFilterBolt()).shuffleGrouping("numbers-generator");
    StormTopology topology = builder.createTopology();
    ...
    

    NOTE:有一种使用非JVM语言定义拓扑的方式。既然Storm拓扑是Thrift架构,而且Nimbus是一个Thrift守护进程,你就可以使用任何你想用的语言创建并提交拓扑。但是这已经超出了本书的范畴了。

    这里没什么新鲜了。我们看一下NumbersGeneratorSpout的实现。

    public class NumberGeneratorSpout extends ShellSpout implements IRichSpout {
        public NumberGeneratorSpout(Integer from, Integer to) {
           super("php", "-f", "NumberGeneratorSpout.php", from.toString(), to.toString());
        }
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("number"));
        }
        public Map<String, Object> getComponentConfiguration() {
            return null;
        }
    }
    

    你可能已经注意到了,这个spout继承了ShellSpout。这是个由Storm提供的特殊的类,用来帮助你运行并控制用其它语言编写的spout。在这种情况下它告诉Storm如何执行你的PHP脚本。

    NumberGeneratorSpout的PHP脚本向标准输出分发元组,并从标准输入读取确认或失败信号。

    在开始实现NumberGeneratorSpout.php脚本之前,多观察一下多语言协议是如何工作的。

    spout按照传递给构造器的参数从fromto顺序生成数字。

    接下来看看PrimeNumbersFilterBolt。这个类实现了之前提到的壳。它告诉Storm如何执行你的PHP脚本。Storm为这一目的提供了一个特殊的叫做ShellBolt的类,你惟一要做的事就是指出如何运行脚本以及声明要分发的属性。

    public class PrimeNumbersFilterBolt extends ShellBolt implements IRichBolt {
        public PrimeNumbersFilterBolt() {
            super("php", "-f", "PrimeNumbersFilterBolt.php");
        }
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("number"));
        }
    }
    

    在这个构造器中只是告诉Storm如何运行PHP脚本。它与下列命令等价。

        php -f PrimeNumbersFilterBolt.php
    

    PrimeNumbersFilterBolt.php脚本从标准输入读取元组,处理它们,然后向标准输出分发、确认或失败。在开始这个脚本之前,我们先多了解一些多语言协议的工作方式。

    1. 发起一次握手
    2. 开始循环
    3. 读/写元组

    NOTE:有一种特殊的方式可以使用Storm的内建日志机制在你的脚本中记录日志,所以你不需要自己实现日志系统。

    下面我们来看一看上述每一步的细节,以及如何用PHP实现它。

    发起握手

    为了控制整个流程(开始以及结束它),Storm需要知道它执行的脚本进程号(PID)。根据多语言协议,你的进程开始时发生的第一件事就是Storm要向标准输入(译者注:根据上下文理解,本章提到的标准输入输出都是从非JVM语言的角度理解的,这里提到的标准输入也就是PHP的标准输入)发送一段JSON数据,它包含Storm配置、拓扑上下文和一个进程号目录。它看起来就像下面的样子:

    {
        "conf": {
            "topology.message.timeout.secs": 3,
            // etc
        },
        "context": {
            "task->component": {
                "1": "example-spout",
                "2": "__acker",
                "3": "example-bolt"
            },
            "taskid": 3
        },
        "pidDir": "..."
    }
    

    脚本进程必须在pidDir指定的目录下以自己的进程号为名字创建一个文件,并以JSON格式把进程号写到标准输出。

    {"pid": 1234}
    

    举个例子,如果你收到/tmp/example\n而你的脚本进程号是123,你应该创建一个名为/tmp/example/123的空文件并向标准输出打印文本行?{“pid”: 123}\n(译者注:此处原文只有一个n,译者猜测应是排版错误)和end\n。这样Storm就能持续追踪进程号并在它关闭时杀死脚本进程。下面是PHP实现:

    $config = json_decode(read_msg(), true);
    $heartbeatdir = $config['pidDir'];
    $pid = getmypid();
    fclose(fopen("$heartbeatdir/$pid", "w"));
    storm_send(["pid"=>$pid]);
    flush();
    

    你已经实现了一个叫做read_msg的函数,用来处理从标准输入读取的消息。按照多语言协议的声明,消息可以是单行或多行JSON文本。一条消息以end\n结束。

    function read_msg() {
        $msg = "";
        while(true) {
            $l = fgets(STDIN);
            $line = substr($l,0,-1);
            if($line=="end") {
                break;
            }
            $msg = "$msg$line\n";
        }
        return substr($msg, 0, -1);
    }
    function storm_send($json) {
        write_line(json_encode($json));
        write_line("end");
    }
    function write_line($line) {
        echo("$line\n");
    }
    

    NOTE:flush()方法非常重要;有可能字符缓冲只有在积累到一定程度时才会清空。这意味着你的脚本可能会为了等待一个来自Storm的输入而永远挂起,而Storm却在等待来自你的脚本的输出。因此当你的脚本有内容输出时立即清空缓冲是很重要的。

    开始循环以及读/写元组

    这是整个工作中最重要的一步。这一步的实现取决于你开发的spoutbolt。

    如果是spout,你应当开始分发元组。如果是bolt,就循环读取元组,处理它们,分发它发,确认成功或失败。

    下面我们就看看用来分发数字的spout。

    $from = intval($argv[1]);
    $to = intval($argv[2]);
    while(true) {
        $msg = read_msg();
        $cmd = json_decode($msg, true);
        if ($cmd['command']=='next') {
            if ($from<$to) {
                storm_emit(array("$from"));
                $task_ids = read_msg();
                $from++;
            } else {
                sleep(1);
            }
        }
        storm_sync();
    }
    

    从命令行获取参数fromto,并开始迭代。每次从Storm得到一条next消息,这意味着你已准备好分发下一个元组。

    一旦你发送了所有的数字,而且没有更多元组可发了,就休眠一段时间。

    为了确保脚本已准备好发送下一个元组,Storm会在发送下一条之前等待sync\n文本行。调用read_msg(),读取一条命令,解析JSON。

    对于bolts来说,有少许不同。

    while(true) {
        $msg = read_msg();
        $tuple = json_decode($msg, true, 512, JSON_BIGINT_AS_STRING);
        if (!empty($tuple["id"])) {
            if (isPrime($tuple["tuple"][0])) {
                storm_emit(array($tuple["tuple"][0]));
            }
            storm_ack($tuple["id"]);
        }
    }
    

    循环的从标准输入读取元组。解析读取每一条JSON消息,判断它是不是一个元组,如果是,再检查它是不是一个素数,如果是素数再次分发一个元组,否则就忽略掉,最后不论如何都要确认成功。

    NOTE:json_decode函数中使用的JSON_BIGINT_AS_STRING是为了解决一个在JAVA和PHP之间的数据转换问题。JAVA发送的一些很大的数字,在PHP中会丢失精度,这样就会导致问题。为了避开这个问题,告诉PHP把大数字当作字符串处理,并在JSON消息中输出数字时不使用双引号。PHP5.4.0或更高版本要求使用这个参数。

    emit,ack,fail,以及log消息都是如下结构:

    emit

    {
        "command": "emit",
        "tuple": ["foo", "bar"]
    }
    

    其中的数组包含了你分发的元组数据。

    ack

    {
        "command": "ack",
        "id": 123456789
    } 
    

    其中的id就是你处理的元组的ID。
    fail

    {
        "command": "fail",
        "id": 123456789
    } 
    

    ack(译者注:原文是emit从上下JSON的内容和每个方法的功能上判断此处就是ack,可能是排版错误)相同,其中id就是你处理的元组ID。
    log

    {
        "command": "log",
        "msg": "some message to be logged by storm."
    } 
    

    下面是完整的的PHP代码。

    //你的spout:
    <?php
    function read_msg() {
        $msg = "";
        while(true) {
            $l = fgets(STDIN);
            $line = substr($l,0,-1);
            if ($line=="end") {
                break;
            }
            $msg = "$msg$line\n";
        }
        return substr($msg, 0, -1);
    }
    function write_line($line) {
        echo("$line\n");
    }
    function storm_emit($tuple) {
        $msg = array("command" => "emit", "tuple" => $tuple);
        storm_send($msg);
    }
    function storm_send($json) {
        write_line(json_encode($json));
        write_line("end");
    }
    function storm_sync() {
        storm_send(array("command" => "sync"));
    }
    function storm_log($msg) {
        $msg = array("command" => "log", "msg" => $msg);
        storm_send($msg);
        flush();
    }
    $config = json_decode(read_msg(), true);
    $heartbeatdir = $config['pidDir'];
    $pid = getmypid();
    fclose(fopen("$heartbeatdir/$pid", "w"));
    storm_send(["pid"=>$pid]);
    flush();
    $from = intval($argv[1]);
    $to = intval($argv[2]);
    while(true) {
        $msg = read_msg();
        $cmd = json_decode($msg, true);
        if ($cmd['command']=='next') {
            if ($from<$to) {
                storm_emit(array("$from"));
                $task_ids = read_msg();
                $from++;
            } else {
                sleep(1);
            }
        }
        storm_sync();
    }
    ?>
    //你的bolt:
    <?php
    function isPrime($number) {
        if ($number < 2) {
            return false;
        }
        if ($number==2) {
            return true;
        }
        for ($i=2; $i<=$number-1; $i++) {
            if ($number % $i == 0) {
                return false;
            }
        }
        return true;
    }
    function read_msg() {
        $msg = "";
        while(true) {
            $l = fgets(STDIN);
            $line = substr($l,0,-1);
            if ($line=="end") {
                break;
            }
            $msg = "$msg$line\n";
        }
        return substr($msg, 0, -1);
    }
    function write_line($line) {
        echo("$line\n");
    }
    function storm_emit($tuple) {
        $msg = array("command" => "emit", "tuple" => $tuple);
        storm_send($msg);
    }
    function storm_send($json) {
        write_line(json_encode($json));
        write_line("end");
    }
    function storm_ack($id) {
        storm_send(["command"=>"ack", "id"=>"$id"]);
    }
    function storm_log($msg) {
        $msg = array("command" => "log", "msg" => "$msg");
        storm_send($msg);
    }
    $config = json_decode(read_msg(), true);
    $heartbeatdir = $config['pidDir'];
    $pid = getmypid();
    fclose(fopen("$heartbeatdir/$pid", "w"));
    storm_send(["pid"=>$pid]);
    flush();
    while(true) {
        $msg = read_msg();
        $tuple = json_decode($msg, true, 512, JSON_BIGINT_AS_STRING);
        if (!empty($tuple["id"])) {
            if (isPrime($tuple["tuple"][0])) {
                storm_emit(array($tuple["tuple"][0]));
            }
            storm_ack($tuple["id"]);
        }
    }
    ?>
    

    NOTE:需要重点指出的是,应当把所有的脚本文件保存在你的工程目录下的一个名为multilang/resources的子目录中。这个子目录被包含在发送给工人进程的jar文件中。如果你不把脚本包含在这个目录中,Storm就不能运行它们,并抛出一个错误。

    原创文章,转载请注明: 转载自并发编程网 – www.gofansmi6.com本文链接地址: Storm入门之第7章使用非JVM语言开发


    FavoriteLoading添加本文到我的收藏
    • Trackback 关闭
    • 评论 (1)
      • ski_jugg
      • 2014/07/18 11:36上午

      如果能用其他语言定义topology并提交就更方便了,可是找了很久都没有找到相关的资料。如果楼主知道相关的内容,请告诉我一下吧,谢谢

    您必须 登陆 后才能发表评论

    return top

    爱投彩票 ek7| yks| m7k| gua| 7om| wo8| kak| 6qu| cy6| qoo| u6y| yoi| 6ue| uy6| uac| c7c| wkc| 7gy| yg5| io5| geo| o5m| oce| 5ey| om6| yea| q6i| yeo| 6wi| ec6| eki| c4c| ywy| quo| 4sc| ksu| 5ak| qw5| css| c5g| yey| 5wy| sy3| kia| y4s| eky| iyy| 4ko| ec4| qoy| k4o| yme| 4sc| mi5| scu| o3o| iwg| 3ey| gu3| gu3|