[Java Concurrent Programming] ABA problem of atomic class atomic and CAS for concurrent operation

Atomic atomic class

Atomic class is a class with the characteristics of atomic operation.

Atomic classes exist in Java util. concurrent. Atmic package.

According to the data type of operation, atomic classes can be divided into the following categories.

Basic type

Common methods of atomicinteger

public final int get() //获取当前的值
public final int getAndSet(int newValue)//获取当前的值,并设置新的值
public final int getAndIncrement()//获取当前的值,并自增
public final int getAndDecrement() //获取当前的值,并自减
public final int getAndAdd(int delta) //加上给定的值,并返回之前的值
public final int addAndGet(int delta) //加上给定的值,并返回最终结果
boolean compareAndSet(int expect,int update) //如果输入的数值等于预期值,则以原子方式将该值设置为输入值(update)
public final void lazySet(int newValue)//最终设置为newValue,使用 lazySet 设置之后可能导致其他线程在之后的一小段时间内还是可以读到旧的值。

Usage of atomicinteger common methods

@Test
public void AtomicIntegerT() {

    AtomicInteger c = new AtomicInteger();

    c.set(10);
    System.out.println("初始设置的值 ==>" + c.get());

    int andAdd = c.getAndAdd(10);
    System.out.println("为原先的值加上10,并返回原先的值,原先的值是 ==> " + andAdd + "加上之后的值是 ==> " + c.get());

    int finalVal = c.addAndGet(5);
    System.out.println("加上5,之后的值是 ==> " + finalVal);

    int i = c.incrementAndGet();
    System.out.println("++1,之后的值为 ==> " + i);
    
    int result = c.updateAndGet(e -> e + 3);
    System.out.println("可以使用函数式更新 + 3 计算后的结果为 ==> "+ result);

    int res = c.accumulateAndGet(10,(x,y) -> x + y);
    System.out.println("使用指定函数计算后的结果为 ==>" + res);
}

初始设置的值 ==>10
为原先的值加上10,原先的值是 ==> 10 
加上之后的值是 ==> 20
加上5,之后的值是 ==> 25
++1,之后的值为 ==> 26
可以使用函数式更新 + 3 计算后的结果为 ==> 29
使用指定函数计算后的结果为 ==>39

Atomicinteger guarantees atomicity

We know that volatile can guarantee visibility and order, but it cannot guarantee atomicity. Therefore, the result of the following code in the concurrent environment will be incorrect: the final result may be less than 10000.

public class AtomicTest {

    static CountDownLatch c = new CountDownLatch(10);
    public volatile int inc = 0;

    public static void main(String[] args) throws InterruptedException {

        final AtomicTest test = new Atomictest();
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                for (int j = 0; j < 1000; j++) {
                    test.increase();
                }
                c.countDown();
            }).start();
        }
        c.await();
        System.out.println(test.inc);

    }
    //不是原子操作,先读取inc的值,inc + 1,写回内存
    public void increase() {
        inc++;
    }
}

There are two ways to solve the problem that the final result is not 10000:

    public synchronized void increase() {
        inc++;
    }
public class AtomicTest {

    static CountDownLatch c = new CountDownLatch(10);

    // 使用整型原子类 保证原子性
    public AtomicInteger inc = new AtomicInteger();

    public static void main(String[] args) throws InterruptedException {

        final AtomicTest test = new Atomictest();
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                for (int j = 0; j < 1000; j++) {
                    test.increase();
                }
                c.countDown();
            }).start();
        }
        c.await();
        System.out.println(test.getCount());
    }

    // 获取当前的值,并自增
    public void increase() {
        inc.getAndIncrement();
    }

    // 获取当前的值
    public int getCount() {
        return inc.get();
    }
}

Implementation of getandincrement () method

How does the getandincrement method ensure atomic operations?

    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long valueOffset;

    static {
        try {
            //objectFieldOffset本地方法,用来拿到“原来的值”的内存地址。
            valueOffset = unsafe.objectFieldOffset
                (AtomicInteger.class.getDeclaredField("value"));
        } catch (Exception ex) { throw new Error(ex); }
    }
	//value在内存中可见,JVM可以保证任何时刻任何线程总能拿到该变量的最新值
    private volatile int value;   


	public final int incrementAndGet() {
        return unsafe.getAndAddInt(this,valueOffset,1) + 1;
    }

openjdk1. 8source code of unsafe class: unsafe java

    /**
     * Atomically adds the given value to the current value of a field
     * or array element within the given object <code>o</code>
     * at the given <code>offset</code>.
     *
     * @param o object/array to update the field/element in
     * @param offset field/element offset
     * @param delta the value to add
     * @return the prevIoUs value
     * @since 1.8
     */
    public final int getAndAddInt(Object o,long offset,int delta) {
        int v;
        do {
            v = getIntVolatile(o,offset);
        } while (!compareAndSwapInt(o,offset,v,v + delta));
        return v;
    }

There are some changes to the Java source code, and the content of the art of Java Concurrent Programming is also excerpted here, which is relatively better understood:

    public final int getAddIncrement() {
        for ( ; ; ) {
            //先取得存储的值
            int current = get();
            //加1操作
            int next = current + 1;
            // CAS保证原子更新操作,如果输入的数值等于预期值,将值设置为输入的值
            if (compareAndSet(current,next)) {
                return current;
            }
        }
    }

    public final boolean compareAndSet(int expect,int update) {
        return unsafe.compareAndSwapInt(this,expect,update);

Array type

Common methods of atomicintegerarray

@Test
public void AtomicIntegerArrayT() {

    int[] nums = {1,2,3,4,5};
    AtomicIntegerArray c = new AtomicIntegerArray(nums);

    for (int i = 0; i < nums.length; i++) {
        System.out.print(c.get(i) + " ");
    }
    System.out.println();

    int finalVal = c.addAndGet(0,10);
    System.out.println("索引为 0 的值 加上 10  ==> " + finalVal);

    int i = c.incrementAndGet(0);
    System.out.println("索引为 0 的值 ++1,之后的值为 ==> " + i);

    int result = c.updateAndGet(0,e -> e + 3);
    System.out.println("可以使用函数式更新索引为0 的位置 + 3 计算后的结果为 ==> " + result);

    int res = c.accumulateAndGet(0,10,y) -> x * y);
    System.out.println("使用指定函数计算后的结果为 ==> " + res);
}

reference type

A basic type atomic class can only update one variable. If you need to update multiple variables, you need to use a reference type atomic class.

Usage of atomicreference common methods

@Test
public void AtomicReferenceT(){

    AtomicReference<Person> ar = new AtomicReference<>();
    Person p = new Person(18,"summer");

    ar.set(p);

    Person pp = new Person(50,"dan");
    ar.compareAndSet(p,pp);// except = p  update = pp

    System.out.println(ar.get().getName());
    System.out.println(ar.get().getAge());

}
@Data
@AllArgsConstructor
@NoArgsConstructor
class Person{

    int age;
    String name;
}
//dan
//50

Object's property modification type

If you need to update a field in a class, you need to use the attribute of the object to modify the type atomic class.

To update the properties of an object atomically requires two steps.

Use of common methods of atomicintegerfieldupdater

@Test
public void AtomicIntegerFieldUpdatetest(){
    AtomicIntegerFieldUpdater<Person> a =
        AtomicIntegerFieldUpdater.newUpdater(Person.class,"age");
    Person p = new Person(18,"summer");
    System.out.println(a.getAndIncrement(p)); //18
    System.out.println(a.get(p)); //19
}
@Data
@AllArgsConstructor
@NoArgsConstructor
class Person{

    public volatile int age;
    private String name;
}

New atomic operation class in Java 8

Atomiclong provides non blocking atomic operations through CAS, which has good performance. At high speed, a large number of threads compete to update the same atomic weight, but only one thread can update successfully, resulting in a large waste of CPU resources.

Longadder solves this problem by allowing multiple threads to compete for multiple cell resources. In the case of high concurrency, the threads operate on the cell array, not the base. When the cell elements are insufficient, the capacity is expanded twice, and the performance under high concurrency is higher than atomiclong

Generation of ABA problem in CAS

Suppose two threads access the same variable x.

Let's take a look at the ABA problem caused by a value variable and understand the process of ABA problem:

@SneakyThrows
@Test
public void test1() {
    AtomicInteger atomicInteger = new AtomicInteger(10);

    CountDownLatch countDownLatch = new CountDownLatch(2);

    new Thread(() -> {
        atomicInteger.compareAndSet(10,11);
        atomicInteger.compareAndSet(11,10);
        System.out.println(Thread.currentThread().getName() + ":10->11->10");
        countDownLatch.countDown();
    }).start();

    new Thread(() -> {
        try {
            TimeUnit.SECONDS.sleep(1);
            boolean isSuccess = atomicInteger.compareAndSet(10,12);
            System.out.println("设置是否成功:" + isSuccess + ",设置的新值:" + atomicInteger.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        countDownLatch.countDown();
    }).start();

    countDownLatch.await();
}
//输出:线程2并没有发现初始值已经被修改
//Thread-0:10->11->10
//设置是否成功:true,设置的新值:12

ABA problem exists, but it may not affect the results of value variables, but consider a special case:

https://zhuanlan.zhihu.com/p/237611535

How to solve the problem of BAB

Atomicstampedreference updates reference types with version numbers. This class associates integer values with references, which can be used to solve the update data and version number of atomic data, and solve the ABA problem that may occur when atomic updates are performed using CAS.

@SneakyThrows
@Test
public void test2() {
    AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference(10,1);

    CountDownLatch countDownLatch = new CountDownLatch(2);

    new Thread(() -> {
        System.out.println(Thread.currentThread().getName() + " 第一次版本:" + atomicStampedReference.getStamp());
        atomicStampedReference.compareAndSet(10,11,atomicStampedReference.getStamp(),atomicStampedReference.getStamp() + 1);
        System.out.println(Thread.currentThread().getName() + " 第二次版本:" + atomicStampedReference.getStamp());
        atomicStampedReference.compareAndSet(11,atomicStampedReference.getStamp() + 1);
        System.out.println(Thread.currentThread().getName() + " 第三次版本:" + atomicStampedReference.getStamp());
        countDownLatch.countDown();
    }).start();

    new Thread(() -> {
        System.out.println(Thread.currentThread().getName() + " 第一次版本:" + atomicStampedReference.getStamp());
        try {
            TimeUnit.SECONDS.sleep(2);
            boolean isSuccess = atomicStampedReference.compareAndSet(10,12,atomicStampedReference.getStamp() + 1);
            System.out.println(Thread.currentThread().getName() + " 修改是否成功:" + isSuccess + " 当前版本:" + atomicStampedReference.getStamp() + " 当前值:" + atomicStampedReference.getReference());
            countDownLatch.countDown();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }).start();

    countDownLatch.await();
}
//输出
//输出
Thread-0 第一次版本:1
Thread-0 第二次版本:2
Thread-0 第三次版本:3
Thread-1 第一次版本:3
Thread-1 修改是否成功:true 当前版本:4 当前值:12

The atomicmarkablereference passes through the flag bit, and the flag bit only has true and false. Each time the flag bit is updated, it will become the same as the first time in the third time, which can not solve the ABA problem.

@SneakyThrows
@Test
public void test3() {
    AtomicMarkableReference<Integer> markableReference = new AtomicMarkableReference<>(10,false);

    CountDownLatch countDownLatch = new CountDownLatch(2);

    new Thread(() -> {
        System.out.println(Thread.currentThread().getName() + " 第一次标记:" + markableReference.isMarked());
        markableReference.compareAndSet(10,markableReference.isMarked(),true);
        System.out.println(Thread.currentThread().getName() + " 第二次标记:" + markableReference.isMarked());
        markableReference.compareAndSet(11,false);
        System.out.println(Thread.currentThread().getName() + " 第三次标记:" + markableReference.isMarked());
        countDownLatch.countDown();
    }).start();

    new Thread(() -> {
        System.out.println(Thread.currentThread().getName() + " 第一次标记:" + markableReference.isMarked());
        try {
            TimeUnit.SECONDS.sleep(2);
            boolean isSuccess = markableReference.compareAndSet(10,false,true);
            System.out.println(Thread.currentThread().getName() + " 修改是否成功:" + isSuccess + " 当前标记:" + markableReference.isMarked() + " 当前值:" + markableReference.getReference());
            countDownLatch.countDown();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }).start();

    countDownLatch.await();
}
//输出
Thread-0 第一次标记:false
Thread-0 第二次标记:true
Thread-0 第三次标记:false
Thread-1 第一次标记:false
Thread-1 修改是否成功:true 当前标记:true 当前值:12

reference resources

The content of this article comes from the network collection of netizens. It is used as a learning reference. The copyright belongs to the original author.
THE END
分享
二维码
< <上一篇
下一篇>>