Multithreaded programming learning eight (atomic operation class)

brief introduction

Atomic operation means "the smallest particle that cannot be further divided", while atomic operation means "one or a series of operations that cannot be interrupted".

Java provides Java in JDK 1.5 util. concurrent. Atomic package, the atomic operation class in this package provides a simple, efficient and thread safe way to update a variable. It mainly provides four types of atomic update methods: atomic update basic type, atomic update array, atomic update reference and atomic update attribute.

Atomic classes basically use unsafe to ensure thread safety.

public final class Unsafe {
    ...

    public final native boolean compareAndSwapObject(Object var1,long var2,Object var4,Object var5);

    public final native boolean compareAndSwapInt(Object var1,int var4,int var5);

    public final native boolean compareAndSwapLong(Object var1,long var4,long var6);

    ...
}

In JDK 1.8, Doug lea added long accumulator and other parallel accumulators to the atomic package, providing a more efficient lock free solution.

Atomic update basic data type

public class AtomicIntegerTest {

    private static CountDownLatch countDownLatch = new CountDownLatch(1);

    private static AtomicInteger atomicInteger = new AtomicInteger(1);

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 10; i++) {
            Thread thread = new Thread(() -> {
                try {
                    countDownLatch.await();
                    // 以原子方式将当前值加 1,并返回之前的值
                    System.out.print(atomicInteger.getAndIncrement() + " ");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
            thread.start();
        }
        // 线程同时进行争抢操作
        countDownLatch.countDown();
        Thread.sleep(2000);
        System.out.println();
        // 以原子方式将输入的数值与实例中的值相加,并返回结果。
        System.out.println(atomicInteger.addAndGet(10));
        // CAS 操作
        atomicInteger.compareAndSet(21,30);
        System.out.println(atomicInteger.get());
    }
}

Atomic update array

public class AtomicReferenceArrayTest {

    // AtomicReferenceArray 会将当前数组(VALUE)复制一份,所以当 AtomicReferenceArray 对内部的数组元素进行修改时,不会影响传入的数组。
    private static Stu[] VALUE = new Stu[]{new Stu(System.currentTimeMillis(),"张三"),new Stu(System.currentTimeMillis(),"李四")};

    private static AtomicReferenceArray<Stu> REFERENCE_ARRAY = new AtomicReferenceArray<>(VALUE);

    public static void main(String[] args) {
        // 修改指定位置元素的值
        REFERENCE_ARRAY.getAndSet(0,"王五"));
        System.out.println(REFERENCE_ARRAY.get(0));
        System.out.println(VALUE[0]);
    }
}

Atomic update reference

public class AtomicStampedReferenceTest {

    private static Stu stu = new Stu(System.currentTimeMillis(),"张三");
    /**
     * 更新对象的时候带一个版本号,可以防止 CAS 中 ABA 问题。原理在于 compare 的时候不仅比较原来的值,还比较版本号。同理更新的时候也需要更新版本号
     */
    private static AtomicStampedReference<Stu> stampedReference = new AtomicStampedReference(stu,1);

    public static void main(String[] args) {
        System.out.println(stampedReference.getReference());
        Stu newStu = new Stu(System.currentTimeMillis(),"李四");
        int stamp = stampedReference.getStamp();
        stampedReference.compareAndSet(stu,newStu,stamp,stamp++);
        System.out.println(stampedReference.getReference());
    }
}

Atomic update properties

public class AtomicReferenceFieldUpdaterTest {

    // 创建原子更新器,并设置需要更新的对象类和对象的属性
    private static AtomicReferenceFieldUpdater<Stu,String> atomicUserFieldRef = AtomicReferenceFieldUpdater.newUpdater(Stu.class,String.class,"name");

    public static void main(String[] args) {
        Stu stu = new Stu(System.currentTimeMillis(),"张三");
        atomicUserFieldRef.set(stu,"李四");
        System.out.println(stu.getName());
    }
}

It should be noted that the public volatile modifier must be used to update the properties of the class. The following is the source code of atomicreferencefieldupdater:

            if (vclass.isPrimitive())
                throw new IllegalArgumentException("Must be reference type");

            if (!Modifier.isVolatile(modifiers))
                throw new IllegalArgumentException("Must be volatile type");

1.8 parallel accumulator

Atomiclong maintains a variable value to provide non blocking atomic operations through CAS. The disadvantage is that after CAS fails, it needs to continuously try through infinite loop spin lock, which will greatly waste CPU resources under high concurrency n multithreading. (this is also a common problem of other atomic classes)

Then, if you decompose a variable into multiple variables and let the same number of threads compete for multiple resources, won't the performance problem be solved? Yes, this is the idea of the longadder provided by jdk8.

The core idea of longadder is segmentation. It inherits from striped64. Striped64 has two parameters long base and cell [] cells. Next, let's take a look at the core code of longaddr:

public void add(long x) {
        Cell[] as; long b,v; int m; Cell a;
        //想要add一个元素的时候,先看一下 cells 数组是否为空,如果是空的就尝试去看能不能直接加到 base上面,如果线程竞争很小就加到 base上面了,函数结束
        //如果 cells 是空的,并且竞争很大,cas 失败,就进入if块内,创建 cells
        //如果不是空的就进入到 cell 数组中看能加到哪个上面去
        if ((as = cells) != null || !casBase(b = base,b + x)) {
            boolean uncontended = true;
            //如果 cells 是空的,就执行增加操作
            if (as == null || (m = as.length - 1) < 0 || (a = as[getProbe() & m]) == null || !(uncontended = a.cas(v = a.value,v + x)))
                longAccumulate(x,null,uncontended);
        }
    }

Therefore, if you want to get the cumulative result, you can only call the sum () method of longadder, that is, the sum of base + cell [] array elements. It should be noted that concurrent updates that occur when calculating the sum may not be merged.

The content of this article comes from the network collection of netizens. It is used as a learning reference. The copyright belongs to the original author.
THE END
分享
二维码
< <上一篇
下一篇>>