Java Concurrent Programming (04): inter thread communication, waiting / notification mechanism

Source code of this article: GitHub · click here | gitee · click here

1、 Concept introduction

1. Thread communication

In the operating system, a thread is an independent individual, but in the process of thread execution, if the same business logic is processed, resource contention may occur, resulting in concurrency problems. Mutex lock is usually used to control the logic. However, in other scenarios, task execution is controlled sequentially, such as common report data generation:

This scenario is very common in relatively complex systems. If the process is described based on multithreading, communication and cooperation between threads are required to process the business of this scenario in an orderly manner.

2. Waiting for notification mechanism

In the above business scenario, if thread B is accessing the data container all the time during the data generation process of thread a to judge whether the data of the process has been generated, it will cause a waste of resources. The normal process should be as shown in the figure. Thread a and thread B start at the same time. Thread a starts to process the data generation task. Thread B attempts to obtain the container data. Before the data comes, thread B enters the waiting state. When the task processing of thread a is completed, thread B is notified to obtain the data from the container, so as to cooperate to complete the task based on the mechanism of thread waiting and notification.

3. Basic method

The relevant methods of the wait / notification mechanism are the basic methods of the object level in Java. Any object has this method:

The thread waiting notification mechanism is based on these basic methods.

2、 Waiting for notification principle

1. Basic principles

Wait / notify mechanism. In this mode, thread a calls the object wait() method to enter the wait state when it does not meet the task execution. Thread B modifies the execution conditions of thread a and calls the object notify() or notifyall() method. Thread a returns from the wait state after receiving the notification, and then performs subsequent operations. The two threads complete the interaction between waiting and notification through wait () / notify () / notifyAll () and other methods provided by the object, so as to improve the scalability of the program.

2. Implementation case

The decoupling process of the above data generation and storage tasks is solved through thread communication.

public class NotifyThread01 {

    static Object lock = new Object() ;
    static volatile List<String> dataList = new ArrayList<>();

    public static void main(String[] args) throws Exception {
        Thread saveThread = new Thread(new SaveData(),"SaveData");
        saveThread.start();
        TimeUnit.SECONDS.sleep(3);
        Thread dataThread = new Thread(new AnalyData(),"AnalyData");
        dataThread.start();
    }
    // 等待数据生成,保存
    static class SaveData implements Runnable {
        @Override
        public void run() {
            synchronized (lock){
                while (dataList.size()==0){
                    try {
                        System.out.println(Thread.currentThread().getName()+"等待...");
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println("SaveData .."+ dataList.get(0)+dataList.get(1));
            }
        }
    }
    // 生成数据,通知保存
    static class AnalyData implements Runnable {
        @Override
        public void run() {
            synchronized (lock){
                dataList.add("hello,");
                dataList.add("java");
                lock.notify();
                System.out.println("AnalyData End...");
            }
        }
    }
}

Note: in addition to the datalist meeting the write conditions, the notification operation should also be performed in the analysis data thread.

3、 Pipeline flow communication

1. Introduction to pipeline flow

Basic concepts

Pipeline flow is mainly used to directly transfer data between different threads. One thread sends data to the output pipeline, and the other thread reads data from the input pipeline, so as to realize the communication between different threads.

Implementation classification

Pipeline byte stream: pipedinputstream and pipedoutputstream;

Pipeline character stream: pipedwriter and pipedreader;

New IO pipeline flow: pipe Sinkchannel and pipe sourceChannel;

2. Use case

public class NotifyThread02 {
    public static void main(String[] args) throws Exception {
        PipedInputStream pis = new PipedInputStream();
        PipedOutputStream pos = new PipedOutputStream();
        // 链接输入流和输出流
        pos.connect(pis);
        // 写数据线程
        new Thread(new Runnable() {
            public void run() {
                BufferedReader br = new BufferedReader(new InputStreamReader(system.in));
                // 将从键盘读取的数据写入管道流
                PrintStream ps = new PrintStream(pos);
                while (true) {
                    try {
                        System.out.print(Thread.currentThread().getName());
                        ps.println(br.readLine());
                        Thread.sleep(1000);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        },"输入数据线程:").start();
        // 读数据线程
        new Thread(new Runnable() {
            public void run() {
                BufferedReader br = new BufferedReader(new InputStreamReader(pis));
                while (true) {
                    try {
                        System.out.println(Thread.currentThread().getName() + br.readLine());
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        },"输出数据线程:").start();
    }
}

The write thread writes data to the pipeline flow, and the read thread reads data to complete the basic communication process.

4、 Production and consumption pattern

1. Business scenario

Thread based waiting notification mechanism: realize the business process of factory producing a commodity and notifying the store to sell a commodity.

2. Code implementation

public class NotifyThread03 {
    public static void main(String[] args) {
        Product product = new Product();
        ProductFactory productFactory = new ProductFactory(product);
        ProductShop productShop = new ProductShop(product);
        productFactory.start();
        productShop.start();
    }
}
// 产品
class Product {
    public String name ;
    public double price ;
    // 产品是否生产完毕,默认没有
    boolean flag ;
}
// 产品工厂:生产
class ProductFactory extends Thread {
    Product product ;
    public ProductFactory (Product product){
        this.product = product;
    }
    @Override
    public void run() {
        int i = 0 ;
        while (i < 20) {
            synchronized (product) {
                if (!product.flag){
                    if (i%2 == 0){
                        product.name = "鼠标";
                        product.price = 79.99;
                    } else {
                        product.name = "键盘";
                        product.price = 89.99;
                    }
                    System.out.println("产品:"+product.name+"【价格:"+product.price+"】出厂...");
                    product.flag = true ;
                    i++;
                    // 通知消费者
                    product.notifyAll();
                } else {
                    try {
                        // 进入等待状态
                        product.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
}
// 产品商店:销售
class ProductShop extends Thread {
    Product product ;
    public ProductShop (Product product){
        this.product = product ;
    }
    @Override
    public void run() {
        while (true) {
            synchronized (product) {
                if (product.flag == true ){
                    System.out.println("产品:"+product.name+"【价格"+(product.price*2)+"】卖出...");
                    product.flag = false ;
                    product.notifyAll(); //唤醒生产者
                } else {
                    try {
                        product.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
}

Process Description: the productfactory generates a commodity, notifies the store to sell it, judges whether the control enters the waiting state through the flag identification, and notifies the factory to produce the commodity again after the store sells the commodity.

5、 Source code address

GitHub·地址
https://github.com/cicadasmile/java-base-parent
GitEE·地址
https://gitee.com/cicadasmile/java-base-parent
The content of this article comes from the network collection of netizens. It is used as a learning reference. The copyright belongs to the original author.
THE END
分享
二维码
< <上一篇
下一篇>>