Java concurrent reading notes: how to realize correct communication between threads
1、 Synchronized and volatile
The synchronized keyword is a mutually exclusive built-in lock provided by Java. The lock mechanism does not need to explicitly add or release locks. The feature of mutually exclusive execution can ensure that the execution of the whole critical area code is atomic, and the synchronization mechanism ensures that the shared data is only used by one thread at the same time.
Review the following underlying implementations of synchronized:
We can decompile the following code: javap - V testdata class。
public class TestData {
public static synchronized void m1(){}
public synchronized void m2(){}
public static void main(String[] args) {
synchronized (TestData.class){
}
}
}
The compilation results are as follows:
Although the implementation details of synchronization methods and code blocks are different, in the final analysis: the JVM's implementation of methods or code blocks is based on the entry and exit operations of monitor objects.
Take synchronization code block as an example:
The following figure is from the art of Java Concurrent Programming 4-2
The new JAVA memory model provides a more lightweight communication mechanism than lock. It enhances the memory semantics of volatile and makes volatile have the same semantics as lock: it tells the program that any access to volatile modified variables must be obtained from shared memory, and its changes must be synchronously refreshed back to shared memory to ensure the visibility of thread access to variables.
Focus on volatile, and then make a summary.
2、 Waiting / notification mechanism
Wait / notify related methods are defined in Java Lang. object, these methods must be called by the lock object. The synchronization instance method is this, the static method is class object, and the lock of code block is something in parentheses.
These methods must be called after obtaining the lock object, that is, they must be invoked in the synchronization block or synchronization method, otherwise the exception of IllegalMonitorStateException will be thrown.
Wait (): the thread calling this method enters the waiting state and releases the lock of the object. At this time, the current thread will return only after being notified or interrupted by other threads.
Wait (long), wait (long, int): enter timed_ In the waiting state, the lock is released. If the current thread has a notification or interrupt, it will return. When the time is up, it will also return.
notice
Notify(): the current thread notifies another thread waiting on the object that the awakened thread is moved from the waiting queue to the synchronized queue (blocked), which means that the awakened thread will not execute immediately. It needs to wait until the current thread releases the lock and the thread in the synchronized queue gets the lock. Notifyall(): the current thread notifies all threads waiting on the object and moves all threads in the waiting queue to the synchronization queue.
Suppose a and B need to obtain the same lock. After a enters, B enters the synchronization queue and falls into blocked.
If A calls the wait () method of the lock, A releases the lock and falls into the wait (WAITING). At this time, another thread B obtains the current lock, and B runs.
If the notify () method of calling the lock is invoked in B, A is waken up from the waiting queue to the synchronous queue. Only when B is finished, the lock is released, and the A gets the lock, and A comes out to run.
The wait / notification mechanism relies on the synchronization mechanism to ensure that the waiting thread can perceive the changes made by the notification thread when it returns from the wait () method.
Several questions often asked in an interview
The difference between sleep method and wait method
Both sleep () and wait () methods can make the thread abandon the CPU for a period of time and enter the waiting state.
The sleep () static method is defined in the thread class, and the wait () method is defined in the object class.
If the thread holds the monitor of an object, the current thread will release the lock after the wait () call, while sleep () will not release the lock.
About discarding object monitor
For the discard object monitor, the wait() method is different from notify() / notifyall():
After the lock object calls the wait () method, the object monitor is released immediately. Notify() / notifyall() will not release the monitor immediately, but will not release the monitor until the remaining code of the thread is executed.
3、 Waiting for notification typical
Wait() and notify() / notifyall() can effectively coordinate the work of multiple threads and improve the efficiency of thread communication.
Producer consumer model
The following code retains the main idea, which depends on the situation.
Define a simple product class product, which defines an identification bit to judge whether the product exists or not.
//产品
public class Product {
public boolean exist = false;
}
Then define the run method in the consumer. First, obtain the lock of the object. If the product does not exist, wait. Otherwise, consume it once, set the identification position to false, and wake up the production thread.
//消费方
synchronized (product) {
while (true) {
TimeUnit.SECONDS.sleep(1);
while (!product.exist) {
product.wait();
}
System.out.println("消费一次");
product.exist = false;
product.notifyAll();
}
}
Corresponding to the consumer, the producer still obtains the lock of the object first, and then judges the identification bit. If there is already a product, wait, otherwise produce it once, attach the identification bit to true, and finally wake up the waiting consumer.
//生产方
synchronized (product) {
while (true) {
TimeUnit.SECONDS.sleep(1);
while (product.exist) {
product.wait();
}
System.out.println("生产一次");
product.exist = true;
product.notifyAll();
}
}
This process is the most basic. We need to understand the convenience brought by wait (), notify () and other methods. When the real scene is more complex, for example, when the production and consumption speeds are not equal, a buffer needs to be created, and so on.
Possible error codes
//T1
synchronized (product) {
product.exist = false;
product.notifyAll();
}
}
//T2
synchronized(product){
while(product.exist)
product.wait();
}
//T3
while (product.exist) {
//A
synchronized(product){
product.wait();
}
}
Assuming that T1 and T3 are communication parties, the notification may be lost:
Therefore, we can learn that in order to eliminate the competition of multiple threads in the identification bit, we can lock the threads in the form of T2 to ensure that the conditions are met after being notified.
4、 Use explicit lock and condition
We have also learned before that if the explicit lock object is used to ensure thread synchronization, the implicit monitor does not exist, and the wait() and notify() / notifyall() cannot be used.
Java provides a condition interface to maintain coordinated communication between threads. Through the cooperation of condition object and lock object, the tasks completed by synchronized synchronization method and code block can be completed.
I was curious about how condition and lock established a relationship, so I checked their inheritance relationship:
Detailed explanation of methods in condition interface
The current thread enters the waiting state until it is notified or interrupted. The current thread enters the running state and returns from the await() method:
It should be noted that in all the above cases, in order to return from await (), the current thread must re acquire the lock associated with this condition.
The current thread enters the waiting state until it is notified. It is not sensitive to interrupts. Therefore, the scenario it returns from the waiting state is different from await (), with only the third interrupt scenario missing.
This method causes the current thread to wait until it is notified, interrupted, or timed out. This method returns the estimated number of nanoseconds remaining to wait according to the nanostimeout value provided when returning. If it times out, it returns a value less than or equal to zero. This value can be used to determine whether to wait again and how long to wait again when waiting for return but the waiting condition is still invalid.
There is another similar method, which will not be repeated.
This method causes the current thread to wait until it is notified or interrupted, or until the specified deadline is reached. If the deadline has passed when returning, it is false; otherwise, it is true.
Wake up a waiting thread. If the awakened thread wants to return from the await method, it needs to regain the lock associated with the condition.
Wake up all waiting threads. Similarly, if you want to return from the await method, you must regain the condition related interlock.
Condition works with lock
The condition interface depends on lock. We can create a condition instance of a specific lock instance as follows:
Lock lock = new reentrantlock(); //创建Lock对象
Condition condition = lock.newCondition(); //利用lock对象的newCondition()创建Condition对象
Since there is a dependency relationship, the method provided in condition can be called only when lock is obtained, that is, only in lock Lock() and lock Called between unlock().
A lock object can be associated with multiple condition objects to detect different conditions. Here is a demo of a simple producer consumer model:
public class Product {
//共享产品编号
private int count = 0;
//标识位,标识是否还有产品
private boolean flag = false;
//创建Lock锁对象
private Lock lock = new reentrantlock();
//创建两个Condition对象,作为两种条件检测
private Condition condProducer = lock.newCondition();
private Condition condConsumer = lock.newCondition();
//生产方法
public void produce() {
lock.lock(); //上锁
try {
//驱使线程等待的条件
while (flag) {
condProducer.await(); //如果flag为true,则不用生产
}
count++;
System.out.println(Thread.currentThread().getName() + "生产产品一件,产品编号" + count);
//生产完成,将标识为改为false
flag = true;
//唤醒conConsumer条件下的所有线程(当然,这里只有一个)
condConsumer.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();//在finally中,保证解锁
}
}
//消费方法
public void consume() {
lock.lock();
try {
//驱使线程等待的条件
while (!flag) {
condConsumer.await(); //如果flag为false,则不用消费
}
//消费的逻辑
System.out.println(Thread.currentThread().getName() + "消费产品一件,产品编号" + count);
flag = false;
condProducer.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
//生产者线程
class Producer implements Runnable {
private Product product;
Producer(Product product) {
this.product = product;
}
@Override
public void run() {
//每个生产者线程生产会生产五件产品
for (int i = 0; i < 5; i++) {
product.produce();
}
}
}
//消费者线程
class Consumer implements Runnable {
private Product product;
Consumer(Product product) {
this.product = product;
}
@Override
public void run() {
//每个消费者线程会消费五件产品
for (int i = 0; i < 5; i++) {
product.consume();
}
}
}
The same problem: we need to use the wait () method, which needs to be included in the while loop statement to prevent premature or unexpected notification and ensure that only those who do not meet the waiting conditions can exit the loop. In other words, using the while loop without if judgment can effectively prevent the phenomenon of "false wake-up".
Condition interface and object monitor
The following table takes off with reference to the art of Java Concurrent Programming
About condition and lock, there will be related articles to learn them in more detail. This article mainly understands their basic methods of thread communication.
5、 Pipeline input and output flow
Pipeline I / O stream is mainly used for data transmission between threads, and the transmission medium is memory.
Byte oriented: pipedoutputstream, pipedinputstream, character oriented: pipedwriter, pipedreader
The following is an example of inter thread communication through pipeline I / O stream:
public class Piped {
public static void main(String[] args) throws IOException {
//创建管道输入输出流
PipedWriter out = new PipedWriter();
PipedReader in = new PipedReader();
//将输入输出流连结起来,否则在使用的时候会抛出异常
out.connect(in);
Thread printThread = new Thread(new Print(in),"PrintThread");
printThread.start();
//标准输入流转化到管道输出流
int receive;
try{
while((receive = system.in.read())!=-1){
out.write(receive);
}
}finally {
out.close();
}
}
//定义线程类,接收管道输入流,写入标准输出流
static class Print implements Runnable{
private PipedReader in;
public Print(PipedReader in){
this.in = in;
}
@Override
public void run() {
int receive;
try{
while((receive = in.read())!=-1){
System.out.print((char)receive);
}
}catch (IOException e){
e.printStackTrace();
}
}
}
}
6、 Thread join()
The join method has been summarized before, so I won't explain it in detail here.
The official explanation is concise and clear: waits for this thread to die, Obviously, for threads, who calls, who dies. For example: when calling the join () method of the B thread in the A thread, the A thread will fall into wait or timeout wait until the execution of the B thread is completed, and then it will become a blockage.
The join () method has three specific steps:
//等待该线程消亡。
public final void join()
//等待该线程消亡,只不过最多等millis毫秒。
public final synchronized void join(long millis)
//等待该线程消亡,只不过最多等millis毫秒+nanos纳秒(毫微秒)。
public final synchronized void join(long millis,int nanos)
7、 Utilize ThreadLocal
Similarly, more detailed study on ThreadLocal will be released later. This article focuses on understanding communication methods.
ThreadLocal is a thread local variable. It is a storage structure with ThreadLocal object as the key and any object as the value. The structure is attached to the thread. The thread can query the value bound to the thread according to a thread object.
It provides a copy of the variable value for each thread using the variable, so that each thread can change its copy independently without the problem of data competition caused by multiple threads when operating shared data through main memory.
We can use set and get to set and retrieve the value of local variables. It should be clear that no matter how many threads there are, defining local variables with ThreadLocal will generate a copy in each thread. Since then, the read and write operations between threads are not related to each other. We can use this property to meet our special needs.
public class Profiler {
// 定义一个ThreadLocal类型的变量,该变量是一个线程局部变量
private static final ThreadLocal<Long> TIME_THREADLOCAL = new ThreadLocal<Long>(){
//重写方法,为该局部变量赋初始值
protected Long initialValue(){
return System.currentTimeMillis();
}
};
//public void set(T value),设置该局部变量值
public static final void begin(){
TIME_THREADLOCAL.set(System.currentTimeMillis());
}
//public T get() ,取出该局部变量的值
public static final long cost(){
return System.currentTimeMillis() - TIME_THREADLOCAL.get();
}
//测试
public static void main(String[] args) throws Exception{
Profiler.begin();
TimeUnit.SECONDS.sleep(1);
System.out.println("Cost: "+ Profiler.cost()+" mills");
}
}
The above use case is extracted from the art of Java Concurrent Programming. A more detailed analysis of ThreadLocal will be summarized later.
Reference: the art of Java Concurrent Programming, JDK official document