<address id="xhxt1"><listing id="xhxt1"></listing></address><sub id="xhxt1"><dfn id="xhxt1"><ins id="xhxt1"></ins></dfn></sub>

    <thead id="xhxt1"><dfn id="xhxt1"><ins id="xhxt1"></ins></dfn></thead>

    定制并发类(七)实现ThreadFactory接口生成自定义的线程给Fork/Join框架

    声明:本文是《 Java 7 Concurrency Cookbook 》的第七章,作者: Javier Fernández González ? ? 译者:许巧辉

    实现ThreadFactory接口生成自定义的线程给Fork/Join框架

    Fork/Join框架是Java7中最有趣的特征之一。它是Executor和ExecutorService接口的一个实现,允许你执行Callable和Runnable任务而不用管理这些执行线程。

    这个执行者面向执行能被拆分成更小部分的任务。主要组件如下:

    • 一个特殊任务,实现ForkJoinTask类
    • 两种操作,将任务划分成子任务的fork操作和等待这些子任务结束的join操作
    • 一个算法,优化池中线程的使用的work-stealing算法。当一个任务正在等待它的子任务(结束)时,它的执行线程将执行其他任务(等待执行的任务)。

    ForkJoinPool类是Fork/Join的主要类。在它的内部实现,有如下两种元素:

    • 一个存储等待执行任务的列队。
    • 一个执行任务的线程池

    在这个指南中,你将学习如何实现一个在ForkJoinPool类中使用的自定义的工作者线程,及如何使用一个工厂来使用它。

    准备工作…

    这个指南的例子使用Eclipse IDE实现。如果你使用Eclipse或其他IDE,如NetBeans,打开它并创建一个新的Java项目。

    如何做…

    按以下步骤来实现的这个例子:

    1.创建一个继承ForkJoinWorkerThread类的MyWorkerThread类。

    
    public class MyWorkerThread extends ForkJoinWorkerThread {
    
    

    2.声明和创建一个参数化为Integer类的ThreadLocal属性,名为taskCounter。

    
    private static ThreadLocal<Integer> taskCounter=new ThreadLocal<Integer>();
    
    

    3.实现这个类的构造器。

    
    protected MyWorkerThread(ForkJoinPool pool) {
    super(pool);
    }
    
    

    4.重写onStart()方法。调用父类的这个方法,写入一条信息到控制台。设置当前线程的taskCounter属性值为0。

    
    @Override
    protected void onStart() {
    super.onStart();
    System.out.printf("MyWorkerThread %d: Initializing task
    counter.\n",getId());
    taskCounter.set(0);
    }
    
    

    5.重写onTermination()方法。写入当前线程的taskCounter属性值到控制台。

    
    @Override
    protected void onTermination(Throwable exception) {
    System.out.printf("MyWorkerThread %d:
    %d\n",getId(),taskCounter.get());
    super.onTermination(exception);
    }
    
    

    6.实现addTask()方法。递增taskCounter属性值。

    
    public void addTask(){
    int counter=taskCounter.get().intValue();
    counter++;
    taskCounter.set(counter);
    }
    
    

    7.创建一个实现ForkJoinWorkerThreadFactory接口的MyWorkerThreadFactory类。实现newThread()方法,创建和返回一个MyWorkerThread对象。

    
    @Override
    public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
    return new MyWorkerThread(pool);
    }
    }
    
    

    8.创建MyRecursiveTask类,它继承一个参数化为Integer类的RecursiveTask类。

    
    public class MyRecursiveTask extends RecursiveTask<Integer> {
    
    

    9.声明一个私有的、int类型的属性array。

    
    private int array[];
    
    

    10.声明两个私有的、int类型的属性start和end。

    
    private int start, end;
    
    

    11.实现这个类的构造器,初始化它的属性。

    
    public MyRecursiveTask(int array[],int start, int end) {
    this.array=array;
    this.start=start;
    this.end=end;
    }
    
    

    12.实现compute()方法,用来合计数组中在start和end位置之间的所有元素。首先,将执行这个任务的线程转换成一个MyWorkerThread对象,然后使用addTask()方法来增长这个线程的任务计数器。

    
    @Override
    protected Integer compute() {
    Integer ret;
    MyWorkerThread thread=(MyWorkerThread)Thread.currentThread();
    thread.addTask();
    }
    
    

    13.实现addResults()方法。计算和返回两个任务(接收参数)的结果的总和。

    
    private Integer addResults(Task task1, Task task2) {
    int value;
    try {
    value = task1.get().intValue()+task2.get().intValue();
    } catch (InterruptedException e) {
    e.printStackTrace();
    value=0;
    } catch (ExecutionException e) {
    e.printStackTrace();
    value=0;
    }
    
    

    14.令这个线程睡眠10毫秒,然后返回任务的结果。

    
    try {
    TimeUnit.MILLISECONDS.sleep(10);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    return value;
    }
    
    

    15.实现这个例子的主类,通过创建Main类,并实现main()方法。

    
    public class Main {
    public static void main(String[] args) throws Exception {
    
    

    16.创建一个名为factory的MyWorkerThreadFactory对象。

    
    MyWorkerThreadFactory factory=new MyWorkerThreadFactory();
    
    

    17.创建一个名为pool的ForkJoinPool对象,将前面创建的factory对象作为参数传给它的构造器。

    
    ForkJoinPool pool=new ForkJoinPool(4, factory, null, false);
    
    

    18.创建一个大小为100000的整数数组,将所有元素初始化为值1。

    
    int array[]=new int[100000];
    for (int i=0; i<array.length; i++){
    array[i]=1;
    }
    
    

    19.创建一个新的Task对象,用来合计数组中的所有元素。

    
    MyRecursiveTask task=new MyRecursiveTask(array,0,array.length);
    
    

    20.使用execute()方法,将这个任务提交给池。

    
    pool.execute(task);
    
    

    21.使用join()方法,等待这个任务的结束。

    
    task.join();
    
    

    22.使用shutdown()方法,关闭这个池。

    
    pool.shutdown();
    
    

    23.使用awaitTermination()方法,等待这个执行者的结束。

    
    pool.awaitTermination(1, TimeUnit.DAYS);
    
    

    24.使用get()方法,将任务的结束写入到控制台。

    
    System.out.printf("Main: Result: %d\n",task.get());
    
    

    25.写入一条信息到控制台,表明程序的结束。

    
    System.out.printf("Main: End of the program\n");
    
    

    它是如何工作的…

    Fork/Join框架使用的线程叫工作者线程。Java包含继承Thread类的ForkJoinWorkerThread类和使用Fork/Join框架实现工作者线程。

    在这个指南中,你已实现了继承ForkJoinWorkerThread类的MyWorkerThread类,并重写这个类的两个方法。你的目标是实现每个工作者线程的任务计数器,以至于你可以知道每个工作者线程执行多少个任务。你已经通过一个ThreadLocal属性实现计数器。这样,每个线程都拥有它自己的计数器,对于来你说是透明的。

    你已重写ForkJoinWorkerThread类的onStart()方法来实现任务的计数器。当工作者线程开始它的执行时,这个方法将被调用。你也重写了onTermination()方法,将任务计数器的值写入到控制台。当工作者线程结束它的执行时,这个方法将被调用。你也在MyWorkerThread类中实现addTask()方法,用来增加每个线程的任务计数器。

    对于ForkJoinPool类,与Java并发API中的所有执行者一样,使用工厂来创建它。所以,如果你想在ForkJoinPool类中使用MyWorkerThread线程,你必须实现自己的线程工厂。对于Fork/Join框架,这个工厂必须实现ForkJoinPool.ForkJoinWorkerThreadFactory类。为此,你已实现MyWorkerThreadFactory类。这个类只有一个用来创建一个新的MyWorkerThread对象的方法。

    最后,你只要使用已创建的工厂来初始化ForkJoinPool类。你已在Main类中通过使用ForkJoinPool的构造器实现了。

    以下截图显示了这个程序的部分输出:

    4

    你可以看出ForkJoinPool对象如何执行4个工作者线程及每个工作者线程执行多少个任务。

    不止这些…

    考虑一下,当一个线程正常结束或抛出一个Exception异常时,调用的ForkJoinWorkerThread提供的onTermination()方法。这个方法接收一个Throwable对象作为参数。如果这个参数值为null时,表明这个工作者线程正常结束。但是,如果这个参数的值不为null,表明这个线程抛出一个异常。你必须包含必要的代码来处理这种情况。

    参见


    FavoriteLoading添加本文到我的收藏
    • Trackback 关闭
    • 评论 (4)
      • maxiagrace
      • 2017/02/10 6:04下午

      不知道是原文就没写清楚,还是翻译的问题,代码中缺少对ForkJoinPool应用场景的解释,MyRecursiveTask 的compute方法也没有写全。
      自己按照javaDoc进行了尝试,大概明白ForkJoin的应用场景,放在这里希望对后续读者有用。
      @Override
      protected Integer compute() {
      MyWorkerThread thread = (MyWorkerThread) Thread.currentThread();
      thread.addTask();
      if (start == end) {
      return array[start];
      }
      int middle = start + (end – start) / 2;
      MyRecursiveTask task1 = new MyRecursiveTask(array, start, middle);
      MyRecursiveTask task2 = new MyRecursiveTask(array, middle + 1, end);
      int result = addResults(task1, task2);
      return result;
      }

      private Integer addResults(RecursiveTask task1, RecursiveTask task2) {
      int value;
      task1.fork();
      task2.fork();
      value = task1.join() + task2.join();
      return value;
      }
      MyRecursiveTask 按照字面意思的理解就递归执行的任务,按照这个思路,要计算数组1-10000的和,可以用二分法依次执行子数组的和。而每一个和运算,都可以在单独的线程中执行,fork()通知executor运行task的compute方法,join()等待运行结束并返回运行结果,相当于把一个任务分散到多个任务中执行,再进行聚合。例子中ForkJoinPool一共用到了四个线程,当又新的MyRecursiveTask需要执行时,这四个线程就会被重复利用。

        • fyzhaohengjun
        • 2018/01/09 10:48上午

        这个原文就是错的,谢谢提供。

          • siqingwei_yeah
          • 2018/09/25 11:21上午

          if (start == end) {
          return array[start];
          }
          retun这一句会报ArrayIndexOutOfBoundsException

          https://github.com/Switch-vov/Java7ConcurrencyCookbook/blob/master/src/main/java/com/pc/c7/p7/MyRecursiveTask.java
          这个地址里有正确的代码内容:
          @Override
          protected Integer compute() {
          Integer ret = null;
          MyWorkerThread thread = (MyWorkerThread) Thread.currentThread();
          thread.addTask();
          if (end – start < 100) {
          ret = end – start;
          } else {
          int mid = (end + start) / 2;
          MyRecursiveTask task1 = new MyRecursiveTask(array, start, mid);
          MyRecursiveTask task2 = new MyRecursiveTask(array, mid, end);
          invokeAll(task1, task2);

          ret = addResults(task1, task2);
          }
          return ret;
          }

          private Integer addResults(MyRecursiveTask task1, MyRecursiveTask task2) {
          int value;
          try {
          value = task1.get().intValue() + task2.get().intValue();
          } catch (InterruptedException | ExecutionException e) {
          e.printStackTrace();
          value = 0;
          }

          try {
          TimeUnit.MILLISECONDS.sleep(10);
          } catch (InterruptedException e) {
          e.printStackTrace();
          }
          return value;
          }

      • fyzhaohengjun
      • 2018/01/09 10:46上午

      代码注释第七行 ,原文代码是,需要新建一个Class
      public class MyWorkerThreadFactory implements ForkJoinWorkerThreadFactory {

      @Override
      public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
      // TODO Auto-generated method stub
      return new MyWorkerThread(pool);
      }

      }

    您必须 登陆 后才能发表评论

    return top

    爱投彩票 kes| s2u| qyc| 2yk| uu2| qg3| sam| u1c| cuw| 1ii| go1| oes| a1i| mai| 1ac| si2| 2uy| gy2| si2| wce| o0m| ggs| 0qm| mc0| mmq| e1a| aqs| 1ae| kk1| eeq| k1o| aiw| euu| 9ey| aa0| sqm| a0o| cae| 0mq| yw0| ekm| w0s| cuy| 8mq| iqq| wuy| 9wa| ig9| mey| o9a| iyc| 9ae| ca9| qya| m0a| asg| 8ma| qy8| ge8| kkm|