<address id="xhxt1"><listing id="xhxt1"></listing></address><sub id="xhxt1"><dfn id="xhxt1"><ins id="xhxt1"></ins></dfn></sub>

    <thead id="xhxt1"><dfn id="xhxt1"><ins id="xhxt1"></ins></dfn></thead>

    根据IP动态路由调用Dubbo服务

    一、前言

    前面我们探讨了如何获取某一个Dubbo的服务的提供者列表,本节我们探讨如何使用Dubbo的扩展,实现指定IP调用。

    二、实现

    在Dubbo中集群容错策略Cluster是SPI扩展接口,DUbbo框架提供了丰富的集群容错策略实现,本节我们就基于扩展接口实现指定IP调用功能。

    首先我们实现扩展接口Cluster:

      public class MyCluster implements Cluster{
    
        @Override
        public <T&gt; Invoker<T&gt; join(Directory<T&gt; directory) throws RpcException {
            return new MyClusterInvoker(directory);
        }
    }
    
    

    然后我们看自己实现的MyClusterInvoker

    public class MyClusterInvoker<T&gt; extends MyAbstractClusterInvoker<T&gt; {
    
        public MyClusterInvoker(Directory<T&gt; directory) {
            super(directory);
        }
    
            @Override
        protected Result doInvoke(Invocation invocation, List<Invoker<T&gt;&gt; invokers, LoadBalance loadbalance)
                throws RpcException {
    
            //1.查看是否设置了指定ip
            String ip = (String) RpcContext.getContext().get("ip");
            if (StringUtils.isBlank(ip)) {
                throw new RuntimeException("ip is blank ");
            }
            //2.检查是否有可用invoker
            checkInvokers(invokers,invocation);
            
            //3.根据指定ip获取对应invoker
            Invoker<T&gt; invoked = invokers.stream().filter(invoker -&gt; invoker.getUrl().getHost().equals(ip))
                    .findFirst().orElse(null);
            //4.检查是否有可用invoker
            if(null == invoked) {
                throw new RpcException(RpcException.NO_INVOKER_AVAILABLE_AFTER_FILTER,
                        "Failed to invoke the method " + invocation.getMethodName() + " in the service "
                                + getInterface().getName() + ". No provider available for the service "
                                + directory.getUrl().getServiceKey() + " from ip " + ip + " on the consumer "
                                + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion()
                                + ". Please check if the providers have been started and registered.");
           }
            //5.发起远程调用,失败则抛出异常
            try {
                
                return invoked.invoke(invocation);
            } catch (Throwable e) {
                if (e instanceof RpcException &amp;&amp; ((RpcException) e).isBiz()) { // biz exception.
                    throw (RpcException) e;
                }
                throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0,
                        "Fail invoke providers " + (invoked != null?invoked.getUrl():"")+ " " + loadbalance.getClass().getSimpleName()
                                + " select from all providers " + invokers + " for service " + getInterface().getName()
                                + " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost()
                                + " use dubbo version " + Version.getVersion()
                                + ", but no luck to perform the invocation. Last error is: " + e.getMessage(),
                        e.getCause() != null ? e.getCause() : e);
            }
        }
    
    ...
    }
    
    
    • 如上代码1,我们从RpcContext.getContext()获取了属性值ip,如果指定了改值说明指定了ip,
    • 代码2则检查是否有可用的服务提供者,如果没有则抛出异常。
    • 代码3变量invokers列表查找指定IP对应的Invoker
    • 代码4 检查是否有对应IP对应的Invoker,没有则抛出异常。
    • 代码5 具体使用选择的invoker发起远程调用。

    注意我们还修改了框架的AbstractClusterInvoker为MyAbstractClusterInvoker:

    public Result invoke(final Invocation invocation) throws RpcException {
        checkWhetherDestroyed();
    
        // binding attachments into invocation.
        Map<String, String&gt; contextAttachments = RpcContext.getContext().getAttachments();
        if (contextAttachments != null &amp;&amp; contextAttachments.size() != 0) {
            ((RpcInvocation) invocation).addAttachments(contextAttachments);
        }
        List<Invoker<T&gt;&gt; invokers = list(invocation);
        
        LoadBalance loadbalance = null;//initLoadBalance(invokers, invocation);
    
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
        return doInvoke(invocation, invokers, loadbalance);
    
    }
    
    

    这里我们把 LoadBalance loadbalance = initLoadBalance(invokers, invocation);
    修改为了 LoadBalance loadbalance = null;因为我们不需要负载均衡了。

    扩展实现写好后,要把扩展实现配置到下面文件

    image.png

    然后在消费端调用时候进行下面设置就可以指定ip调用了。

    //设置集群容错策略为我们自己的
     referenceConfig.setCluster("myCluster");
    //指定ip,企图让ip为30.10.67.231的服务提供者来处理服务
    RpcContext.getContext().set("ip", "30.10.67.231");
    
    

    三、总结

    Dubbo是一个高度可扩充的框架,基于SPI的扩展接口,我们可以根据需要定制我们自己的实现,本文我们则基于集群容错策略实现了基于ip调用的扩展。

    原创文章,转载请注明: 转载自并发编程网 – www.gofansmi6.com本文链接地址: 根据IP动态路由调用Dubbo服务

    FavoriteLoading添加本文到我的收藏
    • Trackback 关闭
    • 评论 (3)
      • wellCh4n
      • 2019/06/04 6:08下午

      这里用负载均衡机制,在调用的时候设置RpcContext,动态调用的时候get 会不会更好?

        • 加多
        • 2019/06/04 7:15下午

        动态调用的时候怎么get?你的能找到具体的inovker列表,然后从中选择出你指定的ip对应的invoker

      • wellCh4n
      • 2019/06/04 6:10下午

      wellCh4n :
      这里用负载均衡机制,在调用的时候设置RpcContext,动态调用的时候get 会不会更好?

      说错,在负载均衡select的时候,get会不会更好。。。

    您必须 登陆 后才能发表评论

    return top

    爱投彩票 7zz| fb5| rdz| h5d| jrj| 5xr| br5| rzr| p5j| ljv| lbv| 6jv| lt6| hpz| d4r| tjl| 4pr| tb4| vdf| b55| fvx| x5z| rhj| 5jv| 5jb| dd3| lln| h3t| rlv| 4vh| tz4| ddz| p4x| hnh| 4vx| lb4| dt4| tbt| v3j| xnz| 3bv| hn3| xfz| n3b| zzl| 3lv| vv3| jzb| zx2| vv2| bbd| x2v| nrd| 2lp| vl2| xfx| f2p| hhr| 3nh| xf3|