<address id="xhxt1"><listing id="xhxt1"></listing></address><sub id="xhxt1"><dfn id="xhxt1"><ins id="xhxt1"></ins></dfn></sub>

    <thead id="xhxt1"><dfn id="xhxt1"><ins id="xhxt1"></ins></dfn></thead>

    并发实战题(一)

    作者:一粟

    实现一个流控程序??刂瓶突Ф嗣棵氲饔媚掣鲈冻谭癫怀齆次,客户端是会多线程并发调用,需要一个轻量简洁的实现,大家看看下面的一个实现,然后可以自己写一个实现。

    import java.util.Date;
    
    import java.util.concurrent.ExecutorService;
    
    import java.util.concurrent.Executors;
    
    import java.util.concurrent.Semaphore;
    
    import java.util.concurrent.TimeUnit;
    
    public class Test {
    
        final static int MAX_QPS = 10;
    
        final static Semaphore semaphore = new Semaphore(MAX_QPS);
    
        public static void main (String ... args) throws Exception {
    
            Executors.newScheduledThreadPool(1).scheduleAtFixedRate(new Runnable() {
    
                @Override
    
                public void run() {
    
                    semaphore.release(MAX_QPS/2);
    
                }
    
            }, 1000, 500, TimeUnit.MILLISECONDS);
    
            //lots of concurrent calls:100 * 1000
            ExecutorService pool = Executors.newFixedThreadPool(100);
    
            for (int i=100;i>0;i--) {
    
                final int x = i;
    
                pool.submit(new Runnable() {
    
                    @Override
    
                    public void run() {
    
                        for (int j=1000;j>0;j--) {
    
                            semaphore.acquireUninterruptibly(1);
                            remoteCall(x, j);
    
                        }
    
                    }
    
                });
    
            }
    
            pool.shutdown();
    
            pool.awaitTermination(1, TimeUnit.HOURS);
    
            System.out.println("DONE");
        }
    
        private static void remoteCall(int i, int j) {
            System.out.println(String.format("%s - %s: %d %d",new Date(),
                Thread.currentThread(), i, j));
        }
    
    }
    

    原创文章,转载请注明: 转载自并发编程网 – www.gofansmi6.com本文链接地址: 并发实战题(一)


    FavoriteLoading添加本文到我的收藏
    • Trackback 关闭
    • 评论 (28)
    1. 这个更实际的例子甚好,值得思考是否有更好的实现。

        • Yole
        • 2013/03/07 10:46上午

        同意,有实例代码效果更好,无论例子简单复杂

      • 这个代码中,如果有一秒的请求很少或请求数根本就是0,那么下一秒的许可就可能因为定时release导致许可超过MAX_QPS个,下一秒的请求数就可以超过MAX_QPS次。

          • 杰iter
          • 2013/03/09 10:47上午

          不会把,在Semaphore初始化的时候许可的数量就已经固定了

          • 这只是初始的一个许可,并没有说是最大限,不断的release,就可以不断的累积许可??碅PI描述可知,这个值甚至可以是负数。

            • 对的,结果测试会出现大于max的调用次数。

              • Phoenix.
              • 2019/03/04 12:10下午

              是的,有不断堆积许可的问题,导致后续周期有可能超过设定的最大请求数,可以通过semaphore.drainPermits(); 先进行重置再release颁发许可, 重整了下例子,可以通过注释掉重置代码来重现上述问题。

              /**
              * 每秒最大请求数10,发起100个请求.
              */
              public class Test {

              final static int MAX_QPS = 10;

              final static Semaphore semaphore = new Semaphore(0);

              final static AtomicInteger UNIT_CALL_COUNT = new AtomicInteger(0);

              final static AtomicInteger TOTAL_TASK_COUNT = new AtomicInteger(0);

              public static void main(String… args) throws Exception {
              ScheduledExecutorService poolSchedule = Executors.newScheduledThreadPool(1);
              poolSchedule.scheduleAtFixedRate(new Runnable() {
              public void run() {
              semaphore.drainPermits();//重置许可为0
              semaphore.release(MAX_QPS);
              System.out.println(“*****UNIT_CALL_COUNT=” + UNIT_CALL_COUNT.getAndSet(0));
              }
              }, 0, 1000, TimeUnit.MILLISECONDS);

              long startTime = System.currentTimeMillis();
              ExecutorService pool = Executors.newFixedThreadPool(10);
              for (int i = 0; i < 100; i++) {
              if (i == 5) {
              Thread.sleep(1000); //模拟请求量不饱和的周期
              }
              final int n = i;
              pool.submit(new Runnable() {
              public void run() {
              semaphore.acquireUninterruptibly(1);
              remoteCall(n);
              }
              });
              }
              pool.shutdown();
              pool.awaitTermination(1, TimeUnit.HOURS);
              System.out.println(String.format("Done: TOTAL_TASK_COUNT=%d TOTAL_TIME=%d", TOTAL_TASK_COUNT.get(), (System.currentTimeMillis() – startTime)));

              poolSchedule.shutdown();
              }

              private static void remoteCall(int i) {
              UNIT_CALL_COUNT.incrementAndGet();
              TOTAL_TASK_COUNT.incrementAndGet();
              }

              }

      • Snway
      • 2013/03/07 10:26上午

      不错的例子,学习了!

      • 匿名
      • 2013/03/07 1:49下午

      why use MAX_QPS/2 instead of MAX_QPS?

      • nicky
      • 2013/03/07 9:49下午

      semaphore.acquireUninterruptibly(1);这个是代表的将信号量-1么?

      • Snway
      • 2013/03/08 10:18上午

      前辈,有其他的示例吗?

      • 杰iter
      • 2013/03/09 11:01上午

      各位看这个是否可行?

       private static int MAX_EXE_COUNT = 10;
      
          private static AtomicInteger count = new AtomicInteger(MAX_EXE_COUNT);
      
          public static void main(String[] args) throws InterruptedException,
                  IOException
          {
              new Thread()
              {
                  @Override
                  public void run()
                  {
                      while (true)
                      {
                          try
                          {
                              Thread.sleep(1000);
                          }
                          catch (InterruptedException e)
                          {
                          }
                          System.out.println("1秒过去了");
                          count.set(MAX_EXE_COUNT);
                      }
                  }
              }.start();
              Executor e = Executors.newFixedThreadPool(100);
              for (int i = 0; i < 100; i++)
              {
                  e.execute(new Runnable()
                  {
                      public void run()
                      {
                          while (count.getAndDecrement() <= 0)
                          {
                          }
                          System.out.println(Thread.currentThread().getName()
                                  + "我调用了一次");
                      }
                  });
              }
          }
      
      • Snway
      • 2013/03/10 10:38下午

      丁 一 :
      这只是初始的一个许可,并没有说是最大限,不断的release,就可以不断的累积许可??碅PI描述可知,这个值甚至可以是负数。

      前辈,那该示例程序该如何调整一下,确保线程安全。

      • yl.w
      • 2013/03/24 11:41上午

      蛮好。
      尝试加了个闭锁,如下

      import java.util.Date;
      import java.util.concurrent.CountDownLatch;
      import java.util.concurrent.ExecutorService;
      import java.util.concurrent.Executors;
      import java.util.concurrent.Semaphore;
      import java.util.concurrent.TimeUnit;
      
      public class TestConcrrent2 {
      
      	final static int MAX_QPS = 10;
      	final static int MAX_TASK_NUM = 100;
      	final static int MAX_THREAD_NUM = 100;
      
      	final static Semaphore semaphore = new Semaphore(MAX_QPS);	
      	final static CountDownLatch overLatch = new CountDownLatch(MAX_TASK_NUM);
      
      	public static void main(String... args) throws Exception {
      		Executors.newScheduledThreadPool(1).scheduleAtFixedRate(new Runnable() {
      			@Override
      			public void run() {
      				semaphore.release(MAX_QPS / 2);
      			}
      		}, 1000, 500, TimeUnit.MILLISECONDS);
      
      		// lots of concurrent calls:100 * 1000
      		ExecutorService pool = Executors.newFixedThreadPool(MAX_THREAD_NUM);
      		for (int i = MAX_TASK_NUM; i &gt; 0; i--) {
      			final int x = i;
      			pool.submit(new Runnable() {
      				@Override
      				public void run() {
      					for (int j = 5; j &gt; 0; j--) {
      						semaphore.acquireUninterruptibly(1);
      						remoteCall(x, j);
      					}
      					overLatch.countDown();
      				}
      			});
      		}
      		pool.shutdown();		
      //		pool.awaitTermination(1, TimeUnit.HOURS);
      		overLatch.await();
      		System.out.println("DONE");
      
      	}
      
      	private static void remoteCall(int i, int j) {
      		System.out.println(String.format("%s - %s: %d %d", new Date(),Thread.currentThread(), i, j));
      	}
      
      }
      
    2. 稍微优化了下,但无法做到非常精确。
      public class FlowConcurrentController {

      final static int MAX_QPS = 10;

      final static Semaphore semaphore = new Semaphore(MAX_QPS);

      final static AtomicInteger accessCount = new AtomicInteger(0);

      public static void main(String[] args) throws Exception {

      // release semaphore thread
      Executors.newScheduledThreadPool(1).scheduleAtFixedRate(new Runnable() {
      @Override
      public void run() {
      semaphore.release(accessCount.get());
      accessCount.set(0);
      }
      }, 1000, 1000, TimeUnit.MILLISECONDS);

      // lots of concurrent calls: 100 * 1000
      ExecutorService pool = Executors.newFixedThreadPool(100);
      for (int i=100;i>0;i–) {
      final int x = i;
      pool.submit(new Runnable() {
      @Override
      public void run() {
      for (int j=1000;j>0;j–) {
      try {
      Thread.sleep(5);
      } catch (InterruptedException e) {
      e.printStackTrace();
      }
      semaphore.acquireUninterruptibly(1);
      accessCount.incrementAndGet();

      remoteCall(x, j);
      }
      }
      });
      }

      pool.shutdown();
      pool.awaitTermination(1, TimeUnit.HOURS);
      System.out.println(“DONE”);
      }

      private static void remoteCall(int i, int j) {
      System.out.println(String.format(“%s – %s: %d %d”,new Date(),
      Thread.currentThread(), i, j));
      }
      }

    3. 如大家所说,给出的例子有个小问题;
      ‘杰iter’给出的方案比较漂亮了,不过对于当前没获得许可的请求线程,他采取的是在一个while循环里忙等,这可能不太符合需求;
      ‘淘宝天水’的方案,如果仔细分析一下,其实还是会出现例子中类似的问题,只不过是许可数越来越小,semaphore.release(accessCount.get());accessCount.set(0);这两条语句中间,如果accessCount增大了,那么semaphore的最大值将会减小,因为下一次释放的许可个数将小于请求的许可个数。如果要解决此处的问题,应该改成semaphore.release(accessCount.getAndSet(0));
      如果想让未获取到许可的线程暂阻塞起来,实现一个AbstractQueuedSynchronizer会是一个很漂亮的解决方案,用state来表示当前剩余许可数,重写tryAcquireShared方法,如果state已经小于等于0,则返回-1,如果用compareAndSetState方法对state减1执行成功,则返回1(此处应循环尝试),调用远程方法前,先调用该方法,另外,定时调用setState(MAX_QPS)来重置许可数
      PS:因为目前没有开发环境,没能一一验证,若分析得不对,欢迎指正。

    4. 上面的回复有个小错误,调用远程方法前,不是调用tryAcquireShared,而是调用acquireShared

    5. 有个细节没交待,setState并不会唤醒阻塞者,所以还需要借用releaseShared来完成唤醒,重写tryReleaseShared方法,返回true,调用setState之后再调用releaseShared,也可直接将setState写在tryReleaseShared方法里,直接调用releaseShared就可以了。

      AbstractQueuedSynchronizer是个很强大的工具,同步包里的大多数同步工具都是用它来处理同步问题的。

      PS:话说,怎么没人交流?

      • bobo
      • 2014/05/04 3:27下午

      一粟的例子里面,将那个重置信号量的线程池,改成1000ms允许一次,里面释放的代码
      int available = semaphore.availablePermits();
      //只释放用掉的许可证数量
      semaphore.release(MAX_QPS-available);
      是不是就可以了。

      另外有个疑问:
      多线程调度下,能精确保证那个重置信号量的线程,每秒钟都得到运行吗?

        • zhonglin
        • 2014/10/16 12:18下午

        Executors.newScheduledThreadPool(1).scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
        // /**
        // * 这里存在问题,即如果每秒访问量没有超过设定数的话,那么信号量的许可数会累积
        // */
        // semaphore.release(MAX_QPS / 2);
        int available = semaphore.availablePermits();
        //只释放用掉的许可证数量
        semaphore.release(MAX_QPS-available);
        }

        }, 1000, 1000, TimeUnit.MILLISECONDS);
        把计时的这个也改下,要不然就成没半秒允许10个了

      • 冬日阳光
      • 2015/08/18 10:37上午

      我也写了一个例子,不过没有采用锁来实现,只使用了一个原子类AtomicInteger来计数,不过在业务上是每秒调用超过10次会采用拒绝的方式来处理 。代码如下:

      import java.util.Date;
      import java.util.concurrent.ExecutorService;
      import java.util.concurrent.Executors;
      import java.util.concurrent.ScheduledExecutorService;
      import java.util.concurrent.TimeUnit;
      import java.util.concurrent.atomic.AtomicInteger;

      /**
      * 实现一个流控程序??刂瓶突Ф嗣棵氲饔媚掣鲈冻谭癫怀齆次,客户端是会多线程并发调用
      *
      * @version : Ver 1.0
      * @author : wenxing
      * @date : 2015年8月18日 上午10:31:47
      */
      public class RestrictedCallTest {
      /**
      * 每秒QPS限制
      */
      final static int MAX_QPS = 10;
      /**
      * 调用计数
      */
      final static AtomicInteger CALL_SIZE = new AtomicInteger(0);

      public static void main(String[] args) throws InterruptedException {

      // 如果是限制集群中的方法每秒调用次数,这种每秒定时重置的方法就需要独立到单独的应用中去,或者采用redis的过期机制
      ScheduledExecutorService scheduledService = Executors.newScheduledThreadPool(1);
      scheduledService.scheduleAtFixedRate(new Runnable() {
      @Override
      public void run() {
      CALL_SIZE.getAndSet(0);
      }

      }, 1000, 1000, TimeUnit.MILLISECONDS);

      // 模拟每1秒调用100次,连续模拟5次,但是只有10次可以正确调用,其它的会被拒绝
      ExecutorService pool = Executors.newFixedThreadPool(100);
      for (int size = 0; size 0; i–) {
      final int x = i;
      pool.submit(new Runnable() {
      @Override
      public void run() {
      // 判断每秒的调用次数,在实际业务当中,不外就是一个拦截来实现
      if (getCallSize() >= MAX_QPS) {
      System.out.println(String.format(“每秒最多调用%s次该方法,第%d轮拒绝”, MAX_QPS, j));
      } else {
      incCall();
      // 调用业务方法
      remoteCall(x, j);
      }
      }

      });

      }
      // 休息一秒
      TimeUnit.MILLISECONDS.sleep(1000);
      }
      pool.shutdown();
      scheduledService.shutdown();
      }

      /**
      * 业务方法
      *
      * @author : wenxing 2015年8月18日 上午10:28:39
      * @param i
      * @param j
      */
      private static void remoteCall(int i, int j) {
      System.out.println(String.format(“%s – %s: 第%d轮调用,第%d次执行 “, new Date(), Thread.currentThread(), j, i));
      }

      /**
      * 增加方法调用次数
      * 如果是限制集群中的方法每秒调用次数,可以采用redis进行集中存储,不过采用redis集中存储,限制的调用次数,可能会超出调用次数最大限制
      *
      * @author : wenxing 2015年8月18日 上午10:28:39
      * @return 返回最新的调用次数
      */
      private static int incCall() {
      return CALL_SIZE.incrementAndGet();
      }

      /**
      * 获取方法的调用次数 如果是限制集群中的方法每秒调用次数,可以采用redis进行集中存储
      *
      * @author :wenxing 2015年8月18日 上午10:30:01
      * @return 返回当前的调用次数
      */
      private static int getCallSize() {
      return CALL_SIZE.get();
      }
      }

      • 冬日阳光
      • 2015/08/18 11:03上午

      继续写了一个例子,控制业务方法每秒最多MAX_QPS = 10,超过的访问请求等待,下面的代码可以看到,每秒最多只有10次的业务方法调用。
      import java.util.Date;
      import java.util.concurrent.ExecutorService;
      import java.util.concurrent.Executors;
      import java.util.concurrent.ScheduledExecutorService;
      import java.util.concurrent.Semaphore;
      import java.util.concurrent.TimeUnit;
      /**
      * TODO wenxing 类描述.
      *
      * @version : Ver 1.0
      * @author : wenxing
      * @date : 2015年8月18日 上午10:54:09
      */
      public class SemaphoreTest {

      /**
      * 每秒QPS限制
      */
      final static int MAX_QPS = 10;
      /**
      * 调用计数
      */
      final static Semaphore CALL_SIZE = new Semaphore(MAX_QPS);

      public static void main(String[] args) throws InterruptedException {

      // 如果是限制集群中的方法每秒调用次数,这种每秒定时重置的方法就需要独立到单独的应用中去,或者采用redis的过期机制
      ScheduledExecutorService scheduledService = Executors.newScheduledThreadPool(1);

      scheduledService.scheduleAtFixedRate(new Runnable() {
      @Override
      public void run() {
      //释放最多MAX_QPS个许可
      CALL_SIZE.release(MAX_QPS-CALL_SIZE.availablePermits());
      }

      }, 1000, 1000, TimeUnit.MILLISECONDS);//执行间隔为1秒,初次延时1秒执行

      // 模拟每1秒调用100次,连续模拟5次,但是只有10次可以正确调用,其它的等待下一秒的许可发放
      ExecutorService pool = Executors.newFixedThreadPool(100);
      for (int size = 0; size 0; i–) {
      final int x = i;
      pool.submit(new Runnable() {
      @Override
      public void run() {
      // 调用业务方法
      try {
      //获取许可,最久等待2秒,可以避免等待线程过多
      //CALL_SIZE.tryAcquire(2000, TimeUnit.MILLISECONDS);
      CALL_SIZE.acquire();
      remoteCall(x, j);
      } catch (InterruptedException e) {
      System.out.println(e.getMessage());
      }

      }

      });

      }
      // 休息一秒
      TimeUnit.MILLISECONDS.sleep(1000);
      }
      pool.shutdown();
      //scheduledService.shutdown();
      }

      /**
      * 业务方法
      *
      * @author : wenxing 2015年8月18日 上午10:28:39
      * @param i
      * @param j
      */
      private static void remoteCall(int i, int j) {
      System.out.println(String.format(“%s – %s: 第%d轮调用,第%d次执行 “, new Date(), Thread.currentThread(), j, i));
      }

      }

      • 当猪真快活
      • 2016/07/12 12:21下午

      如果将semaphore.release(MAX_QPS / 2);替换成semaphore.release(MAXQPS – semaphore.availablePermits());会有什么问题吗?

    您必须 登陆 后才能发表评论

    return top

    爱投彩票 oay| 3yy| ycc| uc4| kui| a4w| uck| 4gk| ew2| aqa| o2c| oeo| 3oy| ow3| iyq| a3u| o3s| ssk| 3sw| eu1| ucm| u2e| csm| k2g| usm| 2sw| aa2| qas| w2u| g2k| usi| 3ua| uc1| wei| q1a| mci| 1ec| sa1| eee| q1u| aaq| 2ky| 2es| ki0| ouc| w0i| aqo| 0aw| si0| iom| ks1| yme| i1u| ccy| 1yc| 9kg| ss9| qoq| u9m| kse|