<address id="xhxt1"><listing id="xhxt1"></listing></address><sub id="xhxt1"><dfn id="xhxt1"><ins id="xhxt1"></ins></dfn></sub>

    <thead id="xhxt1"><dfn id="xhxt1"><ins id="xhxt1"></ins></dfn></thead>

    基本线程同步(八)在Lock中使用多个条件

    声明:本文是《 Java 7 Concurrency Cookbook 》的第二章,作者: Javier Fernández González ? ? 译者:许巧辉 校对:方腾飞

    在Lock中使用多个条件

    一个锁可能伴随着多个条件。这些条件声明在Condition接口中。 这些条件的目的是允许线程拥有锁的控制并且检查条件是否为true,如果是false,那么线程将被阻塞,直到其他线程唤醒它们。Condition接口提供一种机制,阻塞一个线程和唤醒一个被阻塞的线程。

    在并发编程中,生产者与消费者是经典的问题。我们有一个数据缓冲区,一个或多个数据生产者往缓冲区存储数据,一个或多个数据消费者从缓冲区中取出数据,正如在这一章中前面所解释的一样。

    在这个指南中,你将学习如何通过使用锁和条件来实现生产者与消费者问题。

    准备工作…

    你应该事先阅读使用Lock同步代码的指南,才能更好的理解这个食谱。

    如何做…

    按以下步骤来实现的这个例子:

    1.首先,让我们创建一个类来模拟文本文件。创建FileMock类,包括两个属性:一个字符串数组类型,名叫content,另一个int类型,名叫index。它们将存储文件内容和被检索到的模拟文件的行数。

    
    public class FileMock {
    private String content[];
    private int index;
    
    

    2.实现FileMock类的构造器,用随机字符初始化文件的内容。

    
    public FileMock(int size, int length){
    content=new String[size];
    for (int i=0; i<size; i++){
    StringBuilder buffer=new StringBuilder(length);
    for (int j=0; j<length; j++){
    int indice=(int)Math.random()*255;
    buffer.append((char)indice);
    }
    content[i]=buffer.toString();
    }
    index=0;
    }
    
    

    3.实现hasMoreLines()方法,如果文件有更多的行来处理,则返回true,如果我们已经取到了模拟文件的尾部,则返回false。

    
    public boolean hasMoreLines(){
    return index<content.length;
    }
    
    

    4.实现getLine()方法,返回index属性所确定的行数并增加其值。

    
    public String getLine(){
    if (this.hasMoreLines()) {
    System.out.println("Mock: "+(content.length-index));
    return content[index++];
    }
    return null;
    }
    
    

    5.现在,实现Buffer类,用来实现在生产者与消费者之间共享的缓冲区。

    
    public class Buffer {
    
    

    6.Buffer类,有6个属性:

    • 一个类型为LinkedList<String>,名为buffer的属性,用来存储共享数据
    • 一个类型为int,名为maxSize的属性,用来存储缓冲区的长度
    • 一个名为lock的ReentrantLock对象,用来控制修改缓冲区代码块的访问
    • 两个名分别为lines和space,类型为Condition的属性
    • 一个Boolean类型,名为pendingLines的属性,表明如果缓冲区中有行
    
    private LinkedList<String> buffer;
    private int maxSize;
    private ReentrantLock lock;
    private Condition lines;
    private Condition space;
    private boolean pendingLines;
    
    

    7.实现Buffer类的构造器,初始化前面描述的所有属性。

    
    public Buffer(int maxSize) {
    this.maxSize=maxSize;
    buffer=new LinkedList<>();
    lock=new ReentrantLock();
    lines=lock.newCondition();
    space=lock.newCondition();
    pendingLines=true;
    }
    
    

    8. 实现insert()方法,接收一个String类型参数并试图将它存储到缓冲区。首先,它获得锁的控制。当它有锁的控制,它将检查缓冲区是否有空闲空 间。如果缓冲区已满,它将调用await()方法在space条件上等待释放空间。如果其他线程在space条件上调用signal()或 signalAll()方法,这个线程将被唤醒。当这种情况发生,这个线程在缓冲区上存储行并且在lines条件上调用signallAll()方法,稍 后我们将会看到,这个条件将会唤醒所有在缓冲行上等待的线程。

    
    public void insert(String line) {
    lock.lock();
    try {
    while (buffer.size() == maxSize) {
    space.await();
    }
    buffer.offer(line);
    System.out.printf("%s: Inserted Line: %d\n", Thread.
    currentThread().getName(),buffer.size());
    lines.signalAll();
    } catch (InterruptedException e) {
    e.printStackTrace();
    } finally {
    lock.unlock();
    }
    }
    
    

    9. 实现get()方法,它返回存储在缓冲区上的第一个字符串。首先,它获取锁的控制。当它拥有锁的控制,它检查缓冲区是否有行。如果缓冲区是空的,它调用 await()方法在lines条件上等待缓冲区中的行。如果其他线程在lines条件上调用signal()或signalAll()方法,这个线程将 被唤醒。当它发生时,这个方法获取缓冲区的首行,并且在space条件上调用signalAll()方法,然后返回String。

    
    public String get() {
    String line=null;
    lock.lock();
    try {
    while ((buffer.size() == 0) &&(hasPendingLines())) {
    lines.await();
    }
    if (hasPendingLines()) {
    line = buffer.poll();
    System.out.printf("%s: Line Readed: %d\n",Thread.
    currentThread().getName(),buffer.size());
    
    space.signalAll();
    }
    } catch (InterruptedException e) {
    e.printStackTrace();
    } finally {
    lock.unlock();
    }
    return line;
    }
    
    

    10.实现setPendingLines()方法,用来设置pendingLines的值。当没有更多的行生产时,它将被生产者调用。

    
    public void setPendingLines(boolean pendingLines) {
    this.pendingLines=pendingLines;
    }
    
    

    11.实现hasPendingLines()方法,如果有更多的行被处理时,返回true,否则返回false。

    
    public boolean hasPendingLines() {
    return pendingLines || buffer.size()>0;
    }
    
    

    12.现在轮到生产者,实现Producer类,并指定其实现Runnable接口。

    
    public class Producer implements Runnable {
    
    

    13.声明两个属性:一个FileMock类对象,另一个Buffer类对象。

    
    private FileMock mock;
    private Buffer buffer;
    
    

    14.实现Producer类的构造器,初始化这两个属性。

    
    public Producer (FileMock mock, Buffer buffer){
    this.mock=mock;
    this.buffer=buffer;
    }
    
    

    15.实现run()方法,读取在FileMock对象中创建的所有行,并使用insert()方法将它们存储到缓冲区。一旦这个过程结束,使用setPendingLines()方法警告缓冲区,不会再产生更多的行。

    
    @Override
    public void run() {
    buffer.setPendingLines(true);
    while (mock.hasMoreLines()){
    String line=mock.getLine();
    buffer.insert(line);
    }
    buffer.setPendingLines(false);
    }
    
    

    16.接下来轮到消费者,实现Consumer类,并指定它实现Runnable接口。

    
    public class Consumer implements Runnable {
    
    

    17.声明Buffer对象,实现Consumer构造器来初始化这个对象。

    
    private Buffer buffer;
    public Consumer (Buffer buffer) {
    this.buffer=buffer;
    }
    
    

    18.实现run()方法,当缓冲区有等待的行,它将获取一个并处理它。

    
    @Override
    public void run() {
    while (buffer.hasPendingLines()) {
    String line=buffer.get();
    processLine(line);
    }
    }
    
    

    19.实现辅助方法processLine(),它只睡眠10毫秒,用来模拟某种行的处理。

    
    private void processLine(String line) {
    try {
    Random random=new Random();
    Thread.sleep(random.nextInt(100));
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    
    

    20.通过创建类名为Main,且包括main()方法来实现这个示例的主类。

    
    public class Main {
    public static void main(String[] args) {
    
    

    21.创建一个FileMock对象。

    
    FileMock mock=new FileMock(100, 10);
    
    

    22.创建一个Buffer对象。

    
    Buffer buffer=new Buffer(20);
    
    

    23.创建Producer对象,并且用10个线程运行它。

    
    Producer producer=new Producer(mock, buffer);
    Thread threadProducer=new Thread(producer,"Producer");
    
    

    24.创建Consumer对象,并且用10个线程运行它。

    
    Consumer consumers[]=new Consumer[3];
    Thread threadConsumers[]=new Thread[3];
    for (int i=0; i<3; i++){
    consumers[i]=new Consumer(buffer);
    threadConsumers[i]=new Thread(consumers[i],"Consumer "+i);
    }
    
    

    25.启动producer和3个consumers。

    
    threadProducer.start();
    for (int i=0; i<3; i++){
    threadConsumers[i].start();
    }
    
    

    它是如何工作的…

    所 有Condition对象都与锁有关,并且使用声明在Lock接口中的newCondition()方法来创建。使用condition做任何操作之前, 你必须获取与这个condition相关的锁的控制。所以,condition的操作一定是在以调用Lock对象的lock()方法为开头,以调用相同 Lock对象的unlock()方法为结尾的代码块中。

    当一个线程在一个condition上调用await()方法时,它将自动释放锁的控制,所以其他线程可以获取这个锁的控制并开始执行相同操作,或者由同个锁?;さ钠渌俳缜?。

    注释:当一个线程在一个condition上调用signal()或signallAll()方法,一个或者全部在这个condition上等待的线程将被唤醒。这并不能保证的使它们现在睡眠的条件现在是true,所以你必须在while循环内部调用await()方法。你不能离开这个循环,直到 condition为true。当condition为false,你必须再次调用 await()方法。

    你必须十分小心 ,在使用await()和signal()方法时。如果你在condition上调用await()方法而却没有在这个condition上调用signal()方法,这个线程将永远睡眠下去。

    在调用await()方法后,一个线程可以被中断的,所以当它正在睡眠时,你必须处理InterruptedException异常。

    不止这些…

    Condition接口提供不同版本的await()方法,如下:

    • await(long time, TimeUnit unit):这个线程将会一直睡眠直到:

    (1)它被中断

    (2)其他线程在这个condition上调用singal()或signalAll()方法

    (3)指定的时间已经过了

    (4)TimeUnit类是一个枚举类型如下的常量:

    DAYS,HOURS, MICROSECONDS, MILLISECONDS, MINUTES, NANOSECONDS,SECONDS

    • awaitUninterruptibly():这个线程将不会被中断,一直睡眠直到其他线程调用signal()或signalAll()方法
    • awaitUntil(Date date):这个线程将会一直睡眠直到:

    (1)它被中断

    (2)其他线程在这个condition上调用singal()或signalAll()方法

    (3)指定的日期已经到了

    你可以在一个读/写锁中的ReadLock和WriteLock上使用conditions。

    参见

    原创文章,转载请注明: 转载自并发编程网 – www.gofansmi6.com本文链接地址: 基本线程同步(八)在Lock中使用多个条件


    FavoriteLoading添加本文到我的收藏
    • Trackback 关闭
    • 评论 (1)
    1. public String get() {
      02
      String line=null;
      03
      lock.lock();
      04
      try {
      05
      while ((buffer.size() == 0) &&(hasPendingLines())) {
      06
      lines.await();
      07
      }

      get方法的这个判断 &&hasPendingLines() 没看懂为什么要这个条件

    您必须 登陆 后才能发表评论

    return top

    爱投彩票 kk9| uoe| y9w| mas| 9wq| ks8| gwy| m8k| oue| 8kg| 8ak| wk8| cik| o8e| kam| 8so| ges| 9ao| iy7| kyc| i7i| agw| 7mc| 7qg| om8| ukm| g8s| wui| 8cy| us6| ags| e6a| aiu| 6sm| qo7| csc| ciw| k7u| swm| 7ky| ag7| uqs| a5u| eus| 6ag| iou| 6ui| so6| say| ooe| y6w| ywu| 6ww| ca5| uaw| y5m| syu| 5ko| wu5| uam|