缓存协同工作指的是数个缓存间共享天然资源统计数据或天然资源,并协同各别的继续执行次序,以顺利完成某一各项任务的操作过程。
Java 下列三种形式同时实现缓存协同工作
wait() 和 notify()/notifyAll() 形式
透过共享天然资源第一类上的锁来同时实现缓存间的等候和通告。当两个缓存继续执行 wait() 形式时,它会释放出来掉所持的锁并步入等候状况,直至其它缓存初始化 notify()/notifyAll() 形式唤起它。此种形式须要特别注意避免互斥,和在等候和通告时对共享天然资源第一类的状况展开精确性检查和,防止不实唤起。
Condition USB
与 wait() 和 notify()/notifyAll() 形式相近,但更为灵巧,能同时实现更为繁杂的等候和通告监督机制。透过 Lock 第一类的 newCondition() 形式建立 Condition 第一类,并透过 await() 和 signal()/signalAll() 形式同时实现缓存的等候和通知。
alAll()形式唤起。具体内容采用形式如下表所示:
建立锁第一类
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
在须要等候的地方初始化await()形式
lock.lock();
try {
while (condition不成立) {
condition.await();
}
// 继续执行须要等候的操作
} finally {
lock.unlock();
}
在条件成立的地方初始化signal()或signalAll()形式
lock.lock();
try {
// 继续执行满足条件的操作
condition.signal(); // 或者condition.signalAll();
} finally {
lock.unlock();
}
须要特别注意的是,await()形式会释放出来当前缓存所持的锁,因此在初始化await()形式前放。
管道(PipedInputStream 和 PipedOutputStream)
在Java中,能采用PipedInputStream和PipedOutputStream来同时实现管道通信。PipedInputStream和PipedOutputStream是Java IO库中提供的两个类,分别用于从管道中读取统计数据和向管道中写入统计数据。
具体内容同时实现时,首先须要建立两个PipedInputStream和一个PipedOutputStream,并将它们连接起来,然后能在两个缓存中采用PipedOutputStream向管道中写入统计数据,在另两个缓存中采用PipedInputStream从管道中读取统计数据。
下面是两个简单的例子:
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
public class PipeExample {
public static void main(String[] args) throws IOException {
PipedInputStream in = new PipedInputStream();
PipedOutputStream out = new PipedOutputStream(in);
Thread t1 = new Thread(() -> {
try {
out.write(“hello world”.getBytes());
out.close();
} catch (IOException e) {
e.printStackTrace();
}
});
Thread t2 = new Thread(() -> {
try {
int data;
while ((data = in.read()) != -1) {
System.out.print((char) data);
}
in.close();
} catch (IOException e) {
e.printStackTrace();
}
});
t1.start();
t2.start();
}
}
在这个例子中,我们建立了两个PipedInputStream和两个PipedOutputStream,并将它们连接起来。然后我们启动了两个缓存,两个缓存采用PipedOutputStream向管道中写入统计数据,另两个缓存采用PipedInputStream从管道中读取统计数据。在写入统计数据的缓存中,我们采用write形式将字符串“hello world”写入管道,然后关闭输出流。在读取统计数据的缓存中,我们采用read形式从管道中读取统计数据,并将其打印到控制台上,直至读取到流的结尾,然后关闭输入流。
须要特别注意的是,采用PipedInputStream和PipedOutputStream展开管道通信时,如果没有正确地关闭流,可能会导致缓存阻塞或内存泄漏等问题。因此,建议在采用完PipedInputStream和PipedOutputStream后,初始化close形式来关闭流。
CountDownLatch 类
CountDownLatch是Java并发包提供的两个同步工具类,透过它能同时实现类似于计数器的功能,使得两个或数个缓存等候其它缓存继续执行完毕后再继续执行。采用CountDownLatch类,须要采用下列步骤:
1、建立两个CountDownLatch第一类,指定计数器的数量,即有多少个缓存须要等候。
2、在须要等候的缓存中,初始化CountDownLatch第一类的await()形式,使得该缓存等待。
3、在其它缓存继续执行完毕后,初始化CountDownLatch第一类的countDown()形式,将计数器的数量减1。
4、当计数器的数量减为0时,所有等候的缓存都会被唤起,继续继续执行。
下面是两个简单的示例代码,演示如何采用CountDownLatch类:
import java.util.concurrent.CountDownLatch;
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
int threadCount = 5;
CountDownLatch latch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
Thread t = new Thread(new Worker(latch));
t.start();
}
// 等候所有缓存继续执行完毕
latch.await();
System.out.println(“All workers have finished their work.”);
}
static class Worker implements Runnable {
private CountDownLatch latch;
public Worker(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void run() {
// 模拟耗时操作
try {
Thread.sleep((long) (Math.random() * 1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(“Worker ” + Thread.currentThread().getName() + ” has finished its work.”);
latch.countDown(); // 计数器减1
}
}
}
在这个示例代码中,我们首先建立了两个CountDownLatch第一类,指定计数器的数量为5。然后建立5个缓存,每个缓存会模拟两个耗时操作,并在操作顺利完成后初始化CountDownLatch第一类的countDown()形式,将计数器减1。最后,主缓存初始化CountDownLatch第一类的await()形式,等候所有缓存继续执行完毕。当所有缓存都继续执行完毕后,主缓存会被唤起,输出一条消息。
CyclicBarrier 类
CyclicBarrier类是Java并发包提供的两个缓存同步工具类,它能协同一组缓存在某一点上同步,当所有缓存都到达这个点时,才能继续继续执行后面的操作。下面是采用CyclicBarrier类的基本步骤:
1、建立CyclicBarrier第一类,并指定参与同步的缓存数和到达同步点后要继续执行的操作(可选)。
CyclicBarrier barrier = new CyclicBarrier(parties, action);
2、在各个缓存中初始化await()形式,等候其它缓存到达同步点。
barrier.await();
3、当所有缓存都到达同步点后,继续执行指定的操作(如果有),然后继续继续执行后面的代码。
在采用CyclicBarrier时须要特别注意,所有缓存都必须初始化await()形式,否则会导致互斥。另外,CyclicBarrier只能重置一次,如果须要重复利用,须要重新建立新的第一类。
下面是两个简单的示例代码,演示了如何采用CyclicBarrier同时实现缓存同步:
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierDemo {
public static void main(String[] args) {
int parties = 3;
CyclicBarrier barrier = new CyclicBarrier(parties, () -> {
System.out.println(“All parties have arrived, lets continue!”);
});
new Thread(() -> {
try {
System.out.println(“Thread 1 is on the way…”);
Thread.sleep(3000);
System.out.println(“Thread 1 has arrived.”);
barrier.await();
System.out.println(“Thread 1 continues to run.”);
} catch (Exception e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
System.out.println(“Thread 2 is on the way…”);
Thread.sleep(2000);
System.out.println(“Thread 2 has arrived.”);
barrier.await();
System.out.println(“Thread 2 continues to run.”);
} catch (Exception e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
System.out.println(“Thread 3 is on the way…”);
Thread.sleep(1000);
System.out.println(“Thread 3 has arrived.”);
barrier.await();
System.out.println(“Thread 3 continues to run.”);
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
Semaphore 类
Semaphore是Java并发包中的两个类,
在Java中采用Semaphore类,能透过下列步骤同时实现:
1、建立Semaphore第一类,指定许可证数量。
3、继续执行共享天然资源天然资源的代码。
4、在代码块结束时,初始化Semaphore的release()形式释放出来许可证。
下面是两个简单的例子,演示了如何采用Semaphore类控制对共享天然资源天然资源的访问:
import java.util.concurrent.Semaphore;
public class SemaphoreExample {
private static final int THREAD_COUNT = 10;
private static Semaphore semaphore = new Semaphore(5);
public static void main(String[] args) {
for (int i = 0; i < THREAD_COUNT; i++) {
new Thread(new Worker()).start();
}
}
private static class Worker implements Runnable {
@Override
public void run() {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + ” acquired a permit.”);
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + ” released a permit.”);
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
在上面的例子中,我们建立了两个Semaphore第一类,