并发编程原子类

2015/03/18 JUC

并发编程原子类

  • AtomicInteger的使用方法,并且为大家揭露了AtomicInteger的内部实现原理

    什么是原子性呢

    什么是原子性呢?原子性是指某个操作或者一系列操作要么都成功,要么都失败,不允许出现因中断而导致的部分成功或部分失败的情况。 比如,对int类型的加法操作就是原子性的,如x+1。但是我们在使用的过程中往往会将x+1的结果赋予另一个变量甚至是x变量本身,即进行x=x+1或者x++这样的操作,而这样的语句事实上是由若干个原子性的操作组合而来的,因此它们就不具备原子性。这样的语句的具体实现步骤如下。

  • 将主内存中x的值读取到CPU Cache中。
  • 对x进行加一运算。
  • 将结果写回到CPU Cache中。
  • 将x的值刷新到主内存中。

再比如,long类型的加法x+1的操作就不是原子性的。在Brian Goetz、Tim Peierls、Joshua Bloch、Joseph Bowbeer、David Holmes、Doug Lea合著的《Java Concurrency in Practice》一书的Nonatomic 64-bit operations章节中提到过:“a 64-bit write operation is basically performed as two separate 32-bit operations. This behavior can result in indeterminate values being read in code and that lacks atomicity.”一个64位写操作实际上将会被拆分为2个32位的操作,这一行为的直接后果将会导致最终的结果是不确定的并且缺少原子性的保证。 在Java虚拟机规范中同样也有类似的描述:“For the purposes of the Java programming language memory model, a single write to a non-volatile long or double value is treated as two separate writes: one to each 32-bit half. This can result in a situation where a thread sees the first 32 bits of a 64-bit value from one write, and the second 32 bits from another write.”详见虚拟机官方网址,地址如下: https://docs.oracle.com/javase/specs/jls/se8/html/jls-17.html#jls-17.7

在JDK 1.5版本之前,为了确保在多线程下对某基本数据类型或者引用数据类型运算的原子性,必须依赖于关键字synchronized,但是自JDK 1.5版本以后这一情况发生了改变,JDK官方为开发者提供了原子类型的工具集,比如AtomicInteger、AtomicBoolean等,这些原子类型都是Lock-Free及线程安全的,开发者将不再为一个数据类型的自增运算而增加synchronized的同步操作。 实际上在Java推出原子工具集之前,很多第三方库也提供了类似的解决方案,比如Google的Guava,甚至于JDK自身的原子类工具集也是来自Doug Lea的个人项目。

AtomicInteger详解

AtomicInteger也是Number类的一个子类,除此之外,AtomicInteger还提供了很多原子性的操作方法。在AtomicInteger的内部有一个被volatile关键字修饰的成员变量value,实际上,AtomicInteger所提供的所有方法主要都是针对该变量value进行的操作。

public class AtomicInteger extends Number implements java.io.Serializable {
     private static final long serialVersionUID = 6214790243416807050L;

     // setup to use Unsafe.compareAndSwapInt for updates
     private static final Unsafe unsafe = Unsafe.getUnsafe();
     private static final long valueOffset;

     static {
          try {
               valueOffset = unsafe.objectFieldOffset
                       (AtomicInteger.class.getDeclaredField("value"));
          } catch (Exception ex) {
               throw new Error(ex);
          }
     }

     private volatile int value;
     // ...
}

AtomicInteger的创建

  • public AtomicInteger(): 创建AtomicInteger的初始值为0。
  • public AtomicInteger(int initialValue): 创建AtomicInteger并且指定初始值,无参的AtomicInteger对象创建等价于AtomicInteger(0)。

AtomicInteger的Incremental操作

x++或者x=x+1这样的操作是非原子性的,要想使其具备原子性的特性,我们可以借助AtomicInteger中提供的原子性Incremental的操作方法。

  • int getAndIncrement(): 返回当前int类型的value值,然后对value进行自增运算,该操作方法能够确保对value的原子性增量操作。
    public static void main(String[] args)
    {
      final AtomicInteger ai = new AtomicInteger(5);
      // 返回AtomicInteger的int值,然后自增(在多线程的情况下,下面的断言未必正确)
      assert ai.getAndIncrement() == 5;
      // 获取自增后的结果(在多线程的情况下,下面的断言未必正确)
      assert ai.get() == 6;
    }
    
  • int incrementAndGet(): 直接返回自增后的结果,该操作方法能够确保对value的原子性增量操作。
    public static void main(String[] args)
    {
      // 定义AtomicInteger,初值为5
      final AtomicInteger ai = new AtomicInteger(5);
      // 返回value自增后的结果
      assert ai.incrementAndGet() == 6;
      assert ai.get() == 6;
    }
    

AtomicInteger的Decremental操作

x–或者x=x-1这样的自减操作同样也是非原子性的,要想使其具备原子性的特性,我们可以借助AtomicInteger中提供的原子性Decremental的操作方法。

  • int getAndDecrement(): 返回当前int类型的value值,然后对value进行自减运算(在2.1.3节中我们将学习到该方法的内部原理),该操作方法能够确保对value的原子性减量操作。
    AtomicInteger ai = new AtomicInteger(5);
    assert ai.getAndDecrement() == 5;
    assert ai.get() == 4;
    
  • int decrementAndGet(): 直接返回自减后的结果,该操作方法能够确保对value的原子性减量操作。
    AtomicInteger ai = new AtomicInteger(5);
    assert ai.decrementAndGet() == 4;
    assert ai.get() == 4;
    

原子性地更新value值

boolean compareAndSet(int expect, int update): 原子性地更新AtomicInteger的值,其中expect代表当前的AtomicInteger数值,update则是需要设置的新值,该方法会返回一个boolean的结果:当expect和AtomicInteger的当前值不相等时,修改会失败,返回值为false;若修改成功则会返回true。

// 定义一个AtomicInteger类型的对象ai并且指定初值为10
AtomicInteger ai = new AtomicInteger(10);
// 调用compareAndSet方法,expect的值为100,修改肯定会失败
assert !ai.compareAndSet(100, 12);
// 修改并未成功,因此新值不等于12
assert ai.get() != 12;
// 执行了compareAndSet更新方法之后,ai的返回值依然为10,因为修改失败
assert ai.get() == 10;

// 调用compareAndSet方法,expect的值为10,修改成功(多线程情况下并不能担保百分之百成功)
assert ai.compareAndSet(10, 12);
// 断言成功
assert ai.get() == 12;

boolean weakCompareAndSet(int expect, int update): 目前版本JDK中的该方法与compareAndSet完全一样,源码如下所示。

// compareAndSet方法源码
public final boolean compareAndSet(int expect, int update) {
    return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
// weakCompareAndSet方法源码
public final boolean weakCompareAndSet(int expect, int update) {
    return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

通过源码我们不难发现两个方法的实现完全一样,那么为什么要有这两个方法呢?其实在JDK 1.6版本以前双方的实现是存在差异的,compareAndSet方法的底层主要是针对Intel x86架构下的CPU指令CAS:cmpxchg(sparc-TSO,ia64的CPU架构也支持),但是ARM CPU架构下的类似指令为LL/SC:ldrex/strex(ARM架构下的CPU主要应用于当下的移动互联网设备,比如在智能手机终端设备中,高通骁龙、华为麒麟等系列都是基于ARM架构和指令集下的CPU产品),或许在运行Android的JVM设备上这两个方法底层存在着差异。

  • int getAndAdd(int delta): 原子性地更新AtomicInteger 的value值,更新后的value为value和delta之和,方法的返回值为value的前一个值,该方法实际上是基于自旋+CAS算法实现的(Compare And Swap)原子性操作。
    // 定义一个AtomicInteger类型的对象ai并且指定初始值为10
    AtomicInteger ai = new AtomicInteger(10);
    // 调用getAndAdd方法,返回value的前一个值为10
    assert ai.getAndAdd(2) == 10;
    // 调用get方法返回AtomicInteger的value值,当前返回值为12
    assert ai.get() == 12;
    
  • int addAndGet(int delta): 该方法与getAndAdd(int delta) 一样,也是原子性地更新AtomicInteger的value值,更新后的结果value为value和delta之和,但是该方法会立即返回更新后的value值。
    // 定义一个AtomicInteger类型的对象ai并且指定初始值为10
    AtomicInteger ai = new AtomicInteger(10);
    // 调用addAndGet方法,返回当前value的值
    assert ai.addAndGet(2)==12;
    // 调用get方法返回AtomicInteger的value值,当前返回值为12
    assert ai.get() == 12;
    

AtomicInteger与函数式接口

自JDK1.8增加了函数式接口之后,AtomicInteger也提供了对函数式接口的支持。

  • int getAndUpdate(IntUnaryOperator updateFunction): 原子性地更新AtomicInteger的值,方法入参为IntUnaryOperator接口,返回值为value更新之前的值。
    @FunctionalInterface
    public interface IntUnaryOperator {
      // 入参为被操作数,对应于AtomicInteger的当前value值
      int applyAsInt(int operand);
    }
    

    IntUnaryOperator为函数式接口,有且仅有一个接口方法(非静态,非default),接口方法的返回值即AtomicInteger被更新后的value的最新值。

    // 定义一个AtomicInteger类型的对象ai并且指定初始值为10
    AtomicInteger ai = new AtomicInteger(10);
    // 调用getAndUpdate方法并且传入lambda表达式,返回结果为value的前一个值
    assert ai.getAndUpdate(x -> x + 2) == 10;
    // 调用get方法返回AtomicInteger的value值,当前返回值为12
    assert ai.get() == 12;
    
  • int updateAndGet(IntUnaryOperator updateFunction): 原子性地更新AtomicInteger的值,方法入参为IntUnaryOperator接口,该方法会立即返回更新后的value值。
    // 定义一个AtomicInteger类型的对象ai并且指定初始值为10
    AtomicInteger ai = new AtomicInteger(10);
    // 调用updateAndGet方法并且传入lambda表达式,返回结果为value更新后的值
    assert ai.updateAndGet(x -> x + 2) == 12;
    // 调用get方法返回AtomicInteger的value值,当前返回值为12
    assert ai.get() == 12;
    
  • int getAndAccumulate(int x, IntBinaryOperator accumulatorFunction): 原子性地更新AtomicInteger的值,方法入参为IntBinaryOperator接口和delta值x,返回值为value更新之前的值。
    @FunctionalInterface
    public interface IntBinaryOperator {
      // 该接口在getAndAccumulate方法中,left为AtomicInteger value的当前值,    // right为delta值,返回值将被用于更新AtomicInteger的value值
      int applyAsInt(int left, int right);
    }
    

    IntBinaryOperator为函数式接口,有且仅有一个接口方法(非静态,非default),接口方法的返回值即AtomicInteger被更新后的value的最新值。

    // 定义一个AtomicInteger类型的对象ai并且指定初值为10
    AtomicInteger ai = new AtomicInteger(10);
    int result = ai.getAndAccumulate(5, new IntBinaryOperator()
    {
      @Override
      public int applyAsInt(int left, int right)
      {
          assert left == 10;
          assert right == 5;
          return left + right;
      }
    });
    assert result == 10;
    assert ai.get() == 15;
    

    上面的代码片段可以用lambda表达式简化,简写后的代码如下。

    // 定义一个AtomicInteger类型的对象ai并且指定初值为10
    AtomicInteger ai = new AtomicInteger(10);
    int result = ai.getAndAccumulate(5, Integer::sum);
    assert result == 10;
    assert ai.get() == 15;
    
  • int accumulateAndGet(int x, IntBinaryOperator accumulatorFunction): 该方法与getAndAccumulate类似,只不过会立即返回AtomicInteger的更新值。
    // 定义一个AtomicInteger类型的对象ai并且指定初值为10
    AtomicInteger ai = new AtomicInteger(10);
    int result = ai.accumulateAndGet(5, Integer::sum);
    assert result == 15;
    assert ai.get() == 15;
    

    其他方法

  • void set(int newValue): 为AtomicInteger的value设置一个新值,通过对前面内容的学习,我们知道在AtomicInteger中有一个被volatile关键字修饰的value成员属性,因此调用set方法为value设置新值后其他线程就会立即看见。
  • void lazySet(int newValue): set方法修改被volatile关键字修饰的value值会被强制刷新到主内存中,从而立即被其他线程看到,这一切都应该归功于volatile关键字底层的内存屏障。内存屏障虽然足够轻量,但是毕竟还是会带来性能上的开销,比如,在单线程中对AtomicInteger的value进行修改时没有必要保留内存屏障,而value又是被volatile关键字修饰的,这似乎是无法调和的矛盾。幸好追求性能极致的JVM开发者们早就考虑到了这一点,lazySet方法的作用正在于此。

AtomicInteger内幕

看看AtomicInteger类的内部原理,以更加深入地了解AtomicInteger的内幕。

// Unsafe是由C++实现的,其内部存在着大量的汇编 CPU指令等代码,JDK实现的
// Lock Free几乎完全依赖于该类
private static final Unsafe unsafe = Unsafe.getUnsafe();
// valueOffset将用于存放value的内存地址偏移量
private static final long valueOffset;
static {
    try {
        // 获取value的内存地址偏移量
        valueOffset = unsafe.objectFieldOffset
            (AtomicInteger.class.getDeclaredField("value"));
    } catch (Exception ex) { throw new Error(ex); }
}
// 我们不止一次地说过,在AtomicInteger的内部有一个volatile修饰的int类型成员属性value private volatile int value;

compareAndSwapInt源码分析——CAS算法

CAS包含3个操作数:内存值V、旧的预期值A、要修改的新值B。当且仅当预期值A与内存值V相等时,将内存值V修改为B,否则什么都不需要做。

compareAndSwapInt方法是一个native方法,提供了CAS(Compare And Swap)算法的实现,AtomicInteger类中的原子性方法几乎都借助于该方法实现。

...
public final boolean weakCompareAndSet(int expect, int update) {
    return unsafe.compareAndSwapInt(this, valueOffset, expect, update);


}
...
public final boolean compareAndSet(int expect, int update) {
    return unsafe.compareAndSwapInt(this, valueOffset, expect, update);


}
...
public final int getAndIncrement() {
    return unsafe.getAndAddInt(this, valueOffset, 1);
}
...
// Unsafe 内部方法getAndAddInt源码
public final int getAndAddInt(Object var1, long var2, int var4) {
    int var5;
    do {
        var5 = this.getIntVolatile(var1, var2);
    } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4)

);
    return var5;
}

进入Unsafe源码中我们会看到compareAndSwapInt 源码。

/**
*由于该方法无法正常反编译,因此笔者在此将方法的入参名进行了一下修改,也许与大家看到的
*的源码存在一些出入
* object:该入参是地址偏移量所在的宿主对象
* valueOffSet:该入参是object对象某属性的地址偏移量,是由Unsafe对象获得的
* expectValue:该值是我们期望value当前的值,如果expectValue与实际的当前
*               值不相等,那么对value的修改将会失败,方法的返回值也会变为false
* newValue:新值
*/
public final native boolean compareAndSwapInt(Object object, long valueOffSet,     int expectValue, int newValue);

通过对compareAndSwapInt 方法的简单分析,我们不禁会产生一个疑问,既然可以通过AtomicInteger获得当前值,那么为什么还会出现expectValue和AtomicInteger当前值不相等的情况呢?比如下面的代码片段。

AtomicInteger ai = new AtomicInteger(2);
ai.compareAndSet(ai.get(),10);

原因是相对于synchronized关键字、显式锁Lock,AtomicInteger所提供的方法不具备排他性,当A线程通过get()方法获取了AtomicInteger value的当前值后,B线程对value的修改已经顺利完成;A线程试图再次修改的时候就会出现expectValue与value的当前值不相等的情况,因此会出现修改失败,这种方式也被称为乐观锁。对数据进行修改的时候,首先需要进行比较。

由于compareAndSwapInt 是本地方法,因此我们必须打开JDK的源码才能看到相关的C++源码,打开openjdk-jdk8u/hotspot/src/share/vm/prims/unsafe.cpp文件我们会找到相关的C++代码。

UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe,              jobject obj, jlong offset, jint e, jint x))
    UnsafeWrapper("Unsafe_CompareAndSwapInt");
    oop p = JNIHandles::resolve(obj);
    // 根据地址偏移量获取内存地址
    jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);
    // 调用Atomic的成员方法
    return (jint)(Atomic::cmpxchg(x, addr, e)

) == e;
UNSAFE_END

在C++代码中,我们不难发现Unsafe_CompareAndSwapInt方法依赖于Atomic::cmpxchg方法,该方法实际上会调用不同CPU架构下的汇编代码(汇编代码主要用于执行相关的CPU指令)。下面打开基于x86架构的Atomic::cmpxchg源码文件openjdk-jdk8u/hotspot/src/os_cpu/bsd_x86/vm/atomic_bsd_x86.inline.hpp。

inline jint     Atomic::cmpxchg    (jint     exchange_value, volatile jint*     dest, jint     compare_value) {
  int mp = os::is_MP();
  __asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl

 %1,(%3)"
                    : "=a" (exchange_value)
                    : "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)
                    : "cc", "memory");
  return exchange_value;
}

cmpxchg是C++的一个内联函数,在其内部主要执行相关的汇编指令cmpxchgl,对汇编指令感兴趣的读者可以参阅Intel的CPU指令手册,其中就有对该指令的详细说明,地址如下:

http://heather.cs.ucdavis.edu/~matloff/50/PLN/lock.pdf

自旋方法addAndGet源码分析

由于compareAndSwapInt 方法的乐观锁特性,会存在对value修改失败的情况,但是有些时候对value的更新必须要成功,比如调用incrementAndGet、addAndGet 等方法,本节就来分析一下addAndGet 方法的实现。

public final int addAndGet(int delta) {
    // 调用Unsafe的getAndAddInt方法
    return unsafe.getAndAddInt(this, valueOffset, delta) + delta;
}

// Unsafe类中的getAndAddInt方法
public final int getAndAddInt(Object object, long valueOffset, int delta) {
    int currentValue;
    do {
        // ①
        currentValue= this.getIntVolatile(object, valueOffset);
        // ②
    } while(!this.compareAndSwapInt(object, valueOffset, currentValue,                                     currentValue+ delta));
    return currentValue;
}
  • 在getAndAddInt 方法中有一个直到型do..while循环控制语句,首先在注释①处获取当前被volatile关键字修饰的value值(通过内存偏移量的方式读取内存)。
  • 在注释②处执行compareAndSwapInt 方法,如果执行成功则直接返回,如果执行失败则再次执行下一轮的compareAndSwapInt 方法。 通过上面源码的分析,incrementAndGet 的执行结果有可能是11也有可能是比11更大的值。
    AtomicInteger ai = new AtomicInteger(10);
    //这句断言在多线程的情况下未必会成功
    assert ai.incrementAndGet() == 11;
    

AtomicBoolean详解


本文主要内容

  1. JUC中的原子类介绍
  2. 介绍基本类型原子类
  3. 介绍数组类型原子类
  4. 介绍引用类型原子类
  5. 介绍对象属性修改相关原子类

预备知识

JUC中的原子类都是都是依靠volatileCASUnsafe类配合来实现的,需要了解的请移步:

volatile与Java内存模型

java中的CAS

JUC底层工具类Unsafe

JUC中原子类介绍

什么是原子操作?

atomic 翻译成中文是原子的意思。在化学上,我们知道原子是构成一般物质的最小单位,在化学反应中是不可分割的。在我们这里 atomic 是指一个操作是不可中断的。即使是在多个线程一起执行的时候,一个操作一旦开始,就不会被其他线程干扰,所以,所谓原子类说简单点就是具有原子操作特征的类,原子操作类提供了一些修改数据的方法,这些方法都是原子操作的,在多线程情况下可以确保被修改数据的正确性

JUC中对原子操作提供了强大的支持,这些类位于java.util.concurrent.atomic包中,如下图:

img

JUC中原子类思维导图

img

基本类型原子类

使用原子的方式更新基本类型

  • AtomicInteger:int类型原子类
  • AtomicLong:long类型原子类
  • AtomicBoolean :boolean类型原子类

上面三个类提供的方法几乎相同,这里以 AtomicInteger 为例子来介绍。

AtomicInteger 类常用方法

public final int get() //获取当前的值
public final int getAndSet(int newValue)//获取当前的值,并设置新的值
public final int getAndIncrement()//获取当前的值,并自增
public final int getAndDecrement() //获取当前的值,并自减
public final int getAndAdd(int delta) //获取当前的值,并加上预期的值
boolean compareAndSet(int expect, int update) //如果输入的数值等于预期值,则以原子方式将该值设置为输入值(update)
public final void lazySet(int newValue)//最终设置为newValue,使用 lazySet 设置之后可能导致其他线程在之后的一小段时间内还是可以读到旧的值。

部分源码

private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;

static {
    try {
        valueOffset = unsafe.objectFieldOffset
            (AtomicInteger.class.getDeclaredField("value"));
    } catch (Exception ex) { throw new Error(ex); }
}

private volatile int value;

2个关键字段说明: value:使用volatile修饰,可以确保value在多线程中的可见性。 valueOffset:value属性在AtomicInteger中的偏移量,通过这个偏移量可以快速定位到value字段,这个是实现AtomicInteger的关键。

getAndIncrement源码:

public final int getAndIncrement() {
    return unsafe.getAndAddInt(this, valueOffset, 1);
}

内部调用的是Unsafe类中的getAndAddInt方法,我们看一下getAndAddInt源码:

public final int getAndAddInt(Object var1, long var2, int var4) {
    int var5;
    do {
        var5 = this.getIntVolatile(var1, var2);
    } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

    return var5;
}

说明: this.getIntVolatile:可以确保从主内存中获取变量最新的值。

compareAndSwapInt:CAS操作,CAS的原理是拿期望的值和原本的值作比较,如果相同则更新成新的值,可以确保在多线程情况下只有一个线程会操作成功,不成功的返回false。

上面有个do-while循环,compareAndSwapInt返回false之后,会再次从主内存中获取变量的值,继续做CAS操作,直到成功为止。

getAndAddInt操作相当于线程安全的count++操作,如同: synchronize(lock){ count++; } count++操作实际上是被拆分为3步骤执行:

  1. 获取count的值,记做A:A=count
  2. 将A的值+1,得到B:B = A+1
  3. 让B赋值给count:count = B

    多线程情况下会出现线程安全的问题,导致数据不准确。

synchronize的方式会导致占时无法获取锁的线程处于阻塞状态,性能比较低。CAS的性能比synchronize要快很多。

示例

使用AtomicInteger实现网站访问量计数器功能,模拟100人同时访问网站,每个人访问10次,代码如下:

package com.itsoku.chat23;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 跟着阿里p7学并发,微信公众号:javacode2018
 */
public class Demo1 {
    //访问次数
    static AtomicInteger count = new AtomicInteger();

    //模拟访问一次
    public static void request() throws InterruptedException {
        //模拟耗时5毫秒
        TimeUnit.MILLISECONDS.sleep(5);
        //对count原子+1
        count.incrementAndGet();
    }

    public static void main(String[] args) throws InterruptedException {
        long starTime = System.currentTimeMillis();
        int threadSize = 100;
        CountDownLatch countDownLatch = new CountDownLatch(threadSize);
        for (int i = 0; i < threadSize; i++) {
            Thread thread = new Thread(() -> {
                try {
                    for (int j = 0; j < 10; j++) {
                        request();
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    countDownLatch.countDown();
                }
            });
            thread.start();
        }

        countDownLatch.await();
        long endTime = System.currentTimeMillis();
        System.out.println(Thread.currentThread().getName() + ",耗时:" + (endTime - starTime) + ",count=" + count);
    }
}

输出:

main耗时158,count=1000

通过输出中可以看出incrementAndGet在多线程情况下能确保数据的正确性。

数组类型原子类介绍

使用原子的方式更新数组里的某个元素,可以确保修改数组中数据的线程安全性。

  • AtomicIntegerArray:整形数组原子操作类
  • AtomicLongArray:长整形数组原子操作类
  • AtomicReferenceArray :引用类型数组原子操作类

上面三个类提供的方法几乎相同,所以我们这里以 AtomicIntegerArray 为例子来介绍。

AtomicIntegerArray 类常用方法

public final int get(int i) //获取 index=i 位置元素的值
public final int getAndSet(int i, int newValue)//返回 index=i 位置的当前的值,并将其设置为新值:newValue
public final int getAndIncrement(int i)//获取 index=i 位置元素的值,并让该位置的元素自增
public final int getAndDecrement(int i) //获取 index=i 位置元素的值,并让该位置的元素自减
public final int getAndAdd(int delta) //获取 index=i 位置元素的值,并加上预期的值
boolean compareAndSet(int expect, int update) //如果输入的数值等于预期值,则以原子方式将 index=i 位置的元素值设置为输入值(update)
public final void lazySet(int i, int newValue)//最终 将index=i 位置的元素设置为newValue,使用 lazySet 设置之后可能导致其他线程在之后的一小段时间内还是可以读到旧的值。

示例

统计网站页面访问量,假设网站有10个页面,现在模拟100个人并行访问每个页面10次,然后将每个页面访问量输出,应该每个页面都是1000次,代码如下:

package com.itsoku.chat23;

import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerArray;

/**
 * 跟着阿里p7学并发,微信公众号:javacode2018
 */
public class Demo2 {

    static AtomicIntegerArray pageRequest = new AtomicIntegerArray(new int[10]);

    /**
     * 模拟访问一次
     *
     * @param page 访问第几个页面
     * @throws InterruptedException
     */
    public static void request(int page) throws InterruptedException {
        //模拟耗时5毫秒
        TimeUnit.MILLISECONDS.sleep(5);
        //pageCountIndex为pageCount数组的下标,表示页面对应数组中的位置
        int pageCountIndex = page - 1;
        pageRequest.incrementAndGet(pageCountIndex);
    }

    public static void main(String[] args) throws InterruptedException {
        long starTime = System.currentTimeMillis();
        int threadSize = 100;
        CountDownLatch countDownLatch = new CountDownLatch(threadSize);
        for (int i = 0; i < threadSize; i++) {
            Thread thread = new Thread(() -> {
                try {

                    for (int page = 1; page <= 10; page++) {
                        for (int j = 0; j < 10; j++) {
                            request(page);
                        }
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    countDownLatch.countDown();
                }
            });
            thread.start();
        }

        countDownLatch.await();
        long endTime = System.currentTimeMillis();
        System.out.println(Thread.currentThread().getName() + ",耗时:" + (endTime - starTime));

        for (int pageIndex = 0; pageIndex < 10; pageIndex++) {
            System.out.println("第" + (pageIndex + 1) + "个页面访问次数为" + pageRequest.get(pageIndex));
        }
    }
}

输出:

main耗时635
第1个页面访问次数为1000
第2个页面访问次数为1000
第3个页面访问次数为1000
第4个页面访问次数为1000
第5个页面访问次数为1000
第6个页面访问次数为1000
第7个页面访问次数为1000
第8个页面访问次数为1000
第9个页面访问次数为1000
第10个页面访问次数为1000

说明:

代码中将10个面的访问量放在了一个int类型的数组中,数组大小为10,然后通过AtomicIntegerArray来操作数组中的每个元素,可以确保操作数据的原子性,每次访问会调用incrementAndGet,此方法需要传入数组的下标,然后对指定的元素做原子+1操作。输出结果都是1000,可以看出对于数组中元素的并发修改是线程安全的。如果线程不安全,则部分数据可能会小于1000。

其他的一些方法可以自行操作一下,都非常简单。

引用类型原子类介绍

基本类型原子类只能更新一个变量,如果需要原子更新多个变量,需要使用 引用类型原子类。

  • AtomicReference:引用类型原子类
  • AtomicStampedRerence:原子更新引用类型里的字段原子类
  • AtomicMarkableReference :原子更新带有标记位的引用类型

AtomicReferenceAtomicInteger 非常类似,不同之处在于 AtomicInteger是对整数的封装,而AtomicReference则是对应普通的对象引用,它可以确保你在修改对象引用时的线程安全性。在介绍AtomicReference的同时,我们先来了解一个有关原子操作逻辑上的不足。

ABA问题

之前我们说过,线程判断被修改对象是否可以正确写入的条件是对象的当前值和期望值是否一致。这个逻辑从一般意义上来说是正确的,但是可能出现一个小小的例外,就是当你获得当前数据后,在准备修改为新值钱,对象的值被其他线程连续修改了两次,而经过这2次修改后,对象的值又恢复为旧值,这样,当前线程就无法正确判断这个对象究竟是否被修改过,这就是所谓的ABA问题,可能会引发一些问题。

举个例子

有一家蛋糕店,为了挽留客户,决定为贵宾卡客户一次性赠送20元,刺激客户充值和消费,但条件是,每一位客户只能被赠送一次,现在我们用AtomicReference来实现这个功能,代码如下:

package com.itsoku.chat22;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

/**
 * 跟着阿里p7学并发,微信公众号:javacode2018
 */
public class Demo3 {
    //账户原始余额
    static int accountMoney = 19;
    //用于对账户余额做原子操作
    static AtomicReference<Integer> money = new AtomicReference<>(accountMoney);

    /**
     * 模拟2个线程同时更新后台数据库,为用户充值
     */
    static void recharge() {
        for (int i = 0; i < 2; i++) {
            new Thread(() -> {
                for (int j = 0; j < 5; j++) {
                    Integer m = money.get();
                    if (m == accountMoney) {
                        if (money.compareAndSet(m, m + 20)) {
                            System.out.println("当前余额:" + m + ",小于20,充值20元成功,余额:" + money.get() + "元");
                        }
                    }
                    //休眠100ms
                    try {
                        TimeUnit.MILLISECONDS.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }
    }

    /**
     * 模拟用户消费
     */
    static void consume() throws InterruptedException {
        for (int i = 0; i < 5; i++) {
            Integer m = money.get();
            if (m > 20) {
                if (money.compareAndSet(m, m - 20)) {
                    System.out.println("当前余额:" + m + ",大于10,成功消费10元,余额:" + money.get() + "元");
                }
            }
            //休眠50ms
            TimeUnit.MILLISECONDS.sleep(50);
        }
    }

    public static void main(String[] args) throws InterruptedException {
        recharge();
        consume();
    }

}

输出:

当前余额:19,小于20,充值20元成功,余额:39元
当前余额:39,大于10,成功消费10元,余额:19元
当前余额:19,小于20,充值20元成功,余额:39元
当前余额:39,大于10,成功消费10元,余额:19元
当前余额:19,小于20,充值20元成功,余额:39元
当前余额:39,大于10,成功消费10元,余额:19元
当前余额:19,小于20,充值20元成功,余额:39元

从输出中可以看到,这个账户被先后反复多次充值。其原因是账户余额被反复修改,修改后的值和原有的数值19一样,使得CAS操作无法正确判断当前数据是否被修改过(是否被加过20)。虽然这种情况出现的概率不大,但是依然是有可能出现的,因此,当业务上确实可能出现这种情况时,我们必须多加防范。JDK也为我们考虑到了这种情况,使用AtomicStampedReference可以很好地解决这个问题。

使用AtomicStampedRerence解决ABA的问题

AtomicReference无法解决上述问题的根本原因是,对象在被修改过程中丢失了状态信息,比如充值20元的时候,需要同时标记一个状态,用来标注用户被充值过。因此我们只要能够记录对象在修改过程中的状态值,就可以很好地解决对象被反复修改导致线程无法正确判断对象状态的问题。

AtomicStampedRerence正是这么做的,他内部不仅维护了对象的值,还维护了一个时间戳(我们这里把他称为时间戳,实际上它可以使用任何一个整形来表示状态值),当AtomicStampedRerence对应的数值被修改时,除了更新数据本身外,还必须要更新时间戳。当AtomicStampedRerence设置对象值时,对象值及时间戳都必须满足期望值,写入才会成功。因此,即使对象值被反复读写,写回原值,只要时间戳发生变量,就能防止不恰当的写入。

AtomicStampedRerence的几个Api在AtomicReference的基础上新增了有关时间戳的信息。

//比较设置,参数依次为:期望值、写入新值、期望时间戳、新时间戳
public boolean compareAndSet(V expectedReference, V newReference, int expectedStamp, int newStamp);
//获得当前对象引用
public V getReference();
//获得当前时间戳
public int getStamp();
//设置当前对象引用和时间戳
public void set(V newReference, int newStamp);

现在我们使用AtomicStampedRerence来修改一下上面充值的问题,代码如下:

package com.itsoku.chat22;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicStampedReference;

/**
 * 跟着阿里p7学并发,微信公众号:javacode2018
 */
public class Demo4 {
    //账户原始余额
    static int accountMoney = 19;
    //用于对账户余额做原子操作
    static AtomicStampedReference<Integer> money = new AtomicStampedReference<>(accountMoney, 0);

    /**
     * 模拟2个线程同时更新后台数据库,为用户充值
     */
    static void recharge() {
        for (int i = 0; i < 2; i++) {
            int stamp = money.getStamp();
            new Thread(() -> {
                for (int j = 0; j < 50; j++) {
                    Integer m = money.getReference();
                    if (m == accountMoney) {
                        if (money.compareAndSet(m, m + 20, stamp, stamp + 1)) {
                            System.out.println("当前时间戳:" + money.getStamp() + ",当前余额:" + m + ",小于20,充值20元成功,余额:" + money.getReference() + "元");
                        }
                    }
                    //休眠100ms
                    try {
                        TimeUnit.MILLISECONDS.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }
    }

    /**
     * 模拟用户消费
     */
    static void consume() throws InterruptedException {
        for (int i = 0; i < 50; i++) {
            Integer m = money.getReference();
            int stamp = money.getStamp();
            if (m > 20) {
                if (money.compareAndSet(m, m - 20, stamp, stamp + 1)) {
                    System.out.println("当前时间戳:" + money.getStamp() + ",当前余额:" + m + ",大于10,成功消费10元,余额:" + money.getReference() + "元");
                }
            }
            //休眠50ms
            TimeUnit.MILLISECONDS.sleep(50);
        }
    }

    public static void main(String[] args) throws InterruptedException {
        recharge();
        consume();
    }

}

输出:

当前时间戳1,当前余额19小于20充值20元成功余额39
当前时间戳2,当前余额39大于10成功消费10元余额19

结果正常了。

关于这个时间戳的,在数据库修改数据中也有类似的用法,比如2个编辑同时编辑一篇文章,同时提交,只允许一个用户提交成功,提示另外一个用户:博客已被其他人修改,如何实现呢?

博客表:t_blog(id,content,stamp),stamp默认值为0,每次更新+1

A、B 二个编辑同时对一篇文章进行编辑,stamp都为0,当点击提交的时候,将stamp和id作为条件更新博客内容,执行的sql如下:

update t_blog set content = 更新的内容,stamp = stamp+1 where id = 博客id and stamp = 0;

这条update会返回影响的行数,只有一个会返回1,表示更新成功,另外一个提交者返回0,表示需要修改的数据已经不满足条件了,被其他用户给修改了。这种修改数据的方式也叫乐观锁。

对象的属性修改原子类介绍

如果需要原子更新某个类里的某个字段时,需要用到对象的属性修改原子类。

  • AtomicIntegerFieldUpdater:原子更新整形字段的值
  • AtomicLongFieldUpdater:原子更新长整形字段的值
  • AtomicReferenceFieldUpdater :原子更新应用类型字段的值

要想原子地更新对象的属性需要两步:

  1. 第一步,因为对象的属性修改类型原子类都是抽象类,所以每次使用都必须使用静态方法 newUpdater()创建一个更新器,并且需要设置想要更新的类和属性。
  2. 第二步,更新的对象属性必须使用 public volatile 修饰符。

上面三个类提供的方法几乎相同,所以我们这里以AtomicReferenceFieldUpdater为例子来介绍。

调用AtomicReferenceFieldUpdater静态方法newUpdater创建AtomicReferenceFieldUpdater对象

public static <U,W> AtomicReferenceFieldUpdater<U,W> newUpdater(Class<U> tclass,
                                                                    Class<W> vclass,
                                                                    String fieldName) 

说明:

三个参数

tclass:需要操作的字段所在的类 vclass:操作字段的类型 fieldName:字段名称

示例

多线程并发调用一个类的初始化方法,如果未被初始化过,将执行初始化工作,要求只能初始化一次

代码如下:

package com.itsoku.chat22;

import com.sun.org.apache.xpath.internal.operations.Bool;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

/**
 * 跟着阿里p7学并发,微信公众号:javacode2018
 */
public class Demo5 {

    static Demo5 demo5 = new Demo5();
    //isInit用来标注是否被初始化过
    volatile Boolean isInit = Boolean.FALSE;
    AtomicReferenceFieldUpdater<Demo5, Boolean> updater = AtomicReferenceFieldUpdater.newUpdater(Demo5.class, Boolean.class, "isInit");

    /**
     * 模拟初始化工作
     *
     * @throws InterruptedException
     */
    public void init() throws InterruptedException {
        //isInit为false的时候,才进行初始化,并将isInit采用原子操作置为true
        if (updater.compareAndSet(demo5, Boolean.FALSE, Boolean.TRUE)) {
            System.out.println(System.currentTimeMillis() + "," + Thread.currentThread().getName() + ",开始初始化!");
            //模拟休眠3秒
            TimeUnit.SECONDS.sleep(3);
            System.out.println(System.currentTimeMillis() + "," + Thread.currentThread().getName() + ",初始化完毕!");
        } else {
            System.out.println(System.currentTimeMillis() + "," + Thread.currentThread().getName() + ",有其他线程已经执行了初始化!");
        }
    }

    public static void main(String[] args) {
        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                try {
                    demo5.init();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}

输出:

1565159962098,Thread-0开始初始化!
1565159962098,Thread-3有其他线程已经执行了初始化!
1565159962098,Thread-4有其他线程已经执行了初始化!
1565159962098,Thread-2有其他线程已经执行了初始化!
1565159962098,Thread-1有其他线程已经执行了初始化!
1565159965100,Thread-0初始化完毕!

说明:

  1. isInit属性必须要volatille修饰,可以确保变量的可见性
  2. 可以看出多线程同时执行init()方法,只有一个线程执行了初始化的操作,其他线程跳过了。多个线程同时到达updater.compareAndSet,只有一个会成功。

ThreadLocal、InheritableThreadLocal(通俗易懂)

本文内容

  1. 需要解决的问题
  2. 介绍ThreadLocal
  3. 介绍InheritableThreadLocal

需要解决的问题

我们还是以解决问题的方式来引出ThreadLocalInheritableThreadLocal,这样印象会深刻一些。

目前java开发web系统一般有3层,controller、service、dao,请求到达controller,controller调用service,service调用dao,然后进行处理。

我们写一个简单的例子,有3个方法分别模拟controller、service、dao。代码如下:

package com.itsoku.chat24;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 跟着阿里p7学并发,微信公众号:javacode2018
 */
public class Demo1 {

    static AtomicInteger threadIndex = new AtomicInteger(1);
    //创建处理请求的线程池子
    static ThreadPoolExecutor disposeRequestExecutor = new ThreadPoolExecutor(3,
            3,
            60,
            TimeUnit.SECONDS,
            new LinkedBlockingDeque<>(),
            r -> {
                Thread thread = new Thread(r);
                thread.setName("disposeRequestThread-" + threadIndex.getAndIncrement());
                return thread;
            });

    //记录日志
    public static void log(String msg) {
        StackTraceElement stack[] = (new Throwable()).getStackTrace();
        System.out.println("****" + System.currentTimeMillis() + ",[线程:" + Thread.currentThread().getName() + "]," + stack[1] + ":" + msg);
    }

    //模拟controller
    public static void controller(List<String> dataList) {
        log("接受请求");
        service(dataList);
    }

    //模拟service
    public static void service(List<String> dataList) {
        log("执行业务");
        dao(dataList);
    }

    //模拟dao
    public static void dao(List<String> dataList) {
        log("执行数据库操作");
        //模拟插入数据
        for (String s : dataList) {
            log("插入数据" + s + "成功");
        }
    }

    public static void main(String[] args) {
        //需要插入的数据
        List<String> dataList = new ArrayList<>();
        for (int i = 0; i < 3; i++) {
            dataList.add("数据" + i);
        }

        //模拟5个请求
        int requestCount = 5;
        for (int i = 0; i < requestCount; i++) {
            disposeRequestExecutor.execute(() -> {
                controller(dataList);
            });
        }

        disposeRequestExecutor.shutdown();
    }
}

运行结果:

****1565338891286,[线程:disposeRequestThread-2],com.itsoku.chat24.Demo1.controller(Demo1.java:36):接受请求
****1565338891286,[线程:disposeRequestThread-1],com.itsoku.chat24.Demo1.controller(Demo1.java:36):接受请求
****1565338891287,[线程:disposeRequestThread-2],com.itsoku.chat24.Demo1.service(Demo1.java:42):执行业务
****1565338891287,[线程:disposeRequestThread-1],com.itsoku.chat24.Demo1.service(Demo1.java:42):执行业务
****1565338891287,[线程:disposeRequestThread-3],com.itsoku.chat24.Demo1.controller(Demo1.java:36):接受请求
****1565338891287,[线程:disposeRequestThread-1],com.itsoku.chat24.Demo1.dao(Demo1.java:48):执行数据库操作
****1565338891287,[线程:disposeRequestThread-1],com.itsoku.chat24.Demo1.dao(Demo1.java:51):插入数据数据0成功
****1565338891287,[线程:disposeRequestThread-1],com.itsoku.chat24.Demo1.dao(Demo1.java:51):插入数据数据1成功
****1565338891287,[线程:disposeRequestThread-2],com.itsoku.chat24.Demo1.dao(Demo1.java:48):执行数据库操作
****1565338891287,[线程:disposeRequestThread-1],com.itsoku.chat24.Demo1.dao(Demo1.java:51):插入数据数据2成功
****1565338891287,[线程:disposeRequestThread-3],com.itsoku.chat24.Demo1.service(Demo1.java:42):执行业务
****1565338891288,[线程:disposeRequestThread-1],com.itsoku.chat24.Demo1.controller(Demo1.java:36):接受请求
****1565338891287,[线程:disposeRequestThread-2],com.itsoku.chat24.Demo1.dao(Demo1.java:51):插入数据数据0成功
****1565338891288,[线程:disposeRequestThread-1],com.itsoku.chat24.Demo1.service(Demo1.java:42):执行业务
****1565338891288,[线程:disposeRequestThread-3],com.itsoku.chat24.Demo1.dao(Demo1.java:48):执行数据库操作
****1565338891288,[线程:disposeRequestThread-1],com.itsoku.chat24.Demo1.dao(Demo1.java:48):执行数据库操作
****1565338891288,[线程:disposeRequestThread-2],com.itsoku.chat24.Demo1.dao(Demo1.java:51):插入数据数据1成功
****1565338891288,[线程:disposeRequestThread-1],com.itsoku.chat24.Demo1.dao(Demo1.java:51):插入数据数据0成功
****1565338891288,[线程:disposeRequestThread-3],com.itsoku.chat24.Demo1.dao(Demo1.java:51):插入数据数据0成功
****1565338891288,[线程:disposeRequestThread-1],com.itsoku.chat24.Demo1.dao(Demo1.java:51):插入数据数据1成功
****1565338891288,[线程:disposeRequestThread-2],com.itsoku.chat24.Demo1.dao(Demo1.java:51):插入数据数据2成功
****1565338891288,[线程:disposeRequestThread-1],com.itsoku.chat24.Demo1.dao(Demo1.java:51):插入数据数据2成功
****1565338891288,[线程:disposeRequestThread-3],com.itsoku.chat24.Demo1.dao(Demo1.java:51):插入数据数据1成功
****1565338891288,[线程:disposeRequestThread-2],com.itsoku.chat24.Demo1.controller(Demo1.java:36):接受请求
****1565338891288,[线程:disposeRequestThread-3],com.itsoku.chat24.Demo1.dao(Demo1.java:51):插入数据数据2成功
****1565338891288,[线程:disposeRequestThread-2],com.itsoku.chat24.Demo1.service(Demo1.java:42):执行业务
****1565338891289,[线程:disposeRequestThread-2],com.itsoku.chat24.Demo1.dao(Demo1.java:48):执行数据库操作
****1565338891289,[线程:disposeRequestThread-2],com.itsoku.chat24.Demo1.dao(Demo1.java:51):插入数据数据0成功
****1565338891289,[线程:disposeRequestThread-2],com.itsoku.chat24.Demo1.dao(Demo1.java:51):插入数据数据1成功
****1565338891289,[线程:disposeRequestThread-2],com.itsoku.chat24.Demo1.dao(Demo1.java:51):插入数据数据2成功

代码中调用controller、service、dao 3个方法时,来模拟处理一个请求。main方法中循环了5次模拟发起5次请求,然后交给线程池去处理请求,dao中模拟循环插入传入的dataList数据。

问题来了:开发者想看一下哪些地方耗时比较多,想通过日志来分析耗时情况,想追踪某个请求的完整日志,怎么搞?

上面的请求采用线程池的方式处理的,多个请求可能会被一个线程处理,通过日志很难看出那些日志是同一个请求,我们能不能给请求加一个唯一标志,日志中输出这个唯一标志,当然可以。

如果我们的代码就只有上面示例这么简单,我想还是很容易的,上面就3个方法,给每个方法加个traceId参数,log方法也加个traceId参数,就解决了,代码如下:

package com.itsoku.chat24;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 跟着阿里p7学并发,微信公众号:javacode2018
 */
public class Demo2 {

    static AtomicInteger threadIndex = new AtomicInteger(1);
    //创建处理请求的线程池子
    static ThreadPoolExecutor disposeRequestExecutor = new ThreadPoolExecutor(3,
            3,
            60,
            TimeUnit.SECONDS,
            new LinkedBlockingDeque<>(),
            r -> {
                Thread thread = new Thread(r);
                thread.setName("disposeRequestThread-" + threadIndex.getAndIncrement());
                return thread;
            });

    //记录日志
    public static void log(String msg, String traceId) {
        StackTraceElement stack[] = (new Throwable()).getStackTrace();
        System.out.println("****" + System.currentTimeMillis() + "[traceId:" + traceId + "],[线程:" + Thread.currentThread().getName() + "]," + stack[1] + ":" + msg);
    }

    //模拟controller
    public static void controller(List<String> dataList, String traceId) {
        log("接受请求", traceId);
        service(dataList, traceId);
    }

    //模拟service
    public static void service(List<String> dataList, String traceId) {
        log("执行业务", traceId);
        dao(dataList, traceId);
    }

    //模拟dao
    public static void dao(List<String> dataList, String traceId) {
        log("执行数据库操作", traceId);
        //模拟插入数据
        for (String s : dataList) {
            log("插入数据" + s + "成功", traceId);
        }
    }

    public static void main(String[] args) {
        //需要插入的数据
        List<String> dataList = new ArrayList<>();
        for (int i = 0; i < 3; i++) {
            dataList.add("数据" + i);
        }

        //模拟5个请求
        int requestCount = 5;
        for (int i = 0; i < requestCount; i++) {
            String traceId = String.valueOf(i);
            disposeRequestExecutor.execute(() -> {
                controller(dataList, traceId);
            });
        }

        disposeRequestExecutor.shutdown();
    }
}

输出:

****1565339559773[traceId:0],[线程:disposeRequestThread-1],com.itsoku.chat24.Demo2.controller(Demo2.java:36):接受请求
****1565339559773[traceId:1],[线程:disposeRequestThread-2],com.itsoku.chat24.Demo2.controller(Demo2.java:36):接受请求
****1565339559773[traceId:2],[线程:disposeRequestThread-3],com.itsoku.chat24.Demo2.controller(Demo2.java:36):接受请求
****1565339559774[traceId:1],[线程:disposeRequestThread-2],com.itsoku.chat24.Demo2.service(Demo2.java:42):执行业务
****1565339559774[traceId:0],[线程:disposeRequestThread-1],com.itsoku.chat24.Demo2.service(Demo2.java:42):执行业务
****1565339559774[traceId:1],[线程:disposeRequestThread-2],com.itsoku.chat24.Demo2.dao(Demo2.java:48):执行数据库操作
****1565339559774[traceId:2],[线程:disposeRequestThread-3],com.itsoku.chat24.Demo2.service(Demo2.java:42):执行业务
****1565339559774[traceId:1],[线程:disposeRequestThread-2],com.itsoku.chat24.Demo2.dao(Demo2.java:51):插入数据数据0成功
****1565339559774[traceId:0],[线程:disposeRequestThread-1],com.itsoku.chat24.Demo2.dao(Demo2.java:48):执行数据库操作
****1565339559774[traceId:1],[线程:disposeRequestThread-2],com.itsoku.chat24.Demo2.dao(Demo2.java:51):插入数据数据1成功
****1565339559774[traceId:2],[线程:disposeRequestThread-3],com.itsoku.chat24.Demo2.dao(Demo2.java:48):执行数据库操作
****1565339559774[traceId:1],[线程:disposeRequestThread-2],com.itsoku.chat24.Demo2.dao(Demo2.java:51):插入数据数据2成功
****1565339559774[traceId:0],[线程:disposeRequestThread-1],com.itsoku.chat24.Demo2.dao(Demo2.java:51):插入数据数据0成功
****1565339559775[traceId:3],[线程:disposeRequestThread-2],com.itsoku.chat24.Demo2.controller(Demo2.java:36):接受请求
****1565339559775[traceId:2],[线程:disposeRequestThread-3],com.itsoku.chat24.Demo2.dao(Demo2.java:51):插入数据数据0成功
****1565339559775[traceId:3],[线程:disposeRequestThread-2],com.itsoku.chat24.Demo2.service(Demo2.java:42):执行业务
****1565339559775[traceId:0],[线程:disposeRequestThread-1],com.itsoku.chat24.Demo2.dao(Demo2.java:51):插入数据数据1成功
****1565339559775[traceId:3],[线程:disposeRequestThread-2],com.itsoku.chat24.Demo2.dao(Demo2.java:48):执行数据库操作
****1565339559775[traceId:2],[线程:disposeRequestThread-3],com.itsoku.chat24.Demo2.dao(Demo2.java:51):插入数据数据1成功
****1565339559775[traceId:3],[线程:disposeRequestThread-2],com.itsoku.chat24.Demo2.dao(Demo2.java:51):插入数据数据0成功
****1565339559775[traceId:0],[线程:disposeRequestThread-1],com.itsoku.chat24.Demo2.dao(Demo2.java:51):插入数据数据2成功
****1565339559775[traceId:3],[线程:disposeRequestThread-2],com.itsoku.chat24.Demo2.dao(Demo2.java:51):插入数据数据1成功
****1565339559775[traceId:2],[线程:disposeRequestThread-3],com.itsoku.chat24.Demo2.dao(Demo2.java:51):插入数据数据2成功
****1565339559775[traceId:3],[线程:disposeRequestThread-2],com.itsoku.chat24.Demo2.dao(Demo2.java:51):插入数据数据2成功
****1565339559775[traceId:4],[线程:disposeRequestThread-1],com.itsoku.chat24.Demo2.controller(Demo2.java:36):接受请求
****1565339559776[traceId:4],[线程:disposeRequestThread-1],com.itsoku.chat24.Demo2.service(Demo2.java:42):执行业务
****1565339559776[traceId:4],[线程:disposeRequestThread-1],com.itsoku.chat24.Demo2.dao(Demo2.java:48):执行数据库操作
****1565339559776[traceId:4],[线程:disposeRequestThread-1],com.itsoku.chat24.Demo2.dao(Demo2.java:51):插入数据数据0成功
****1565339559776[traceId:4],[线程:disposeRequestThread-1],com.itsoku.chat24.Demo2.dao(Demo2.java:51):插入数据数据1成功
****1565339559776[traceId:4],[线程:disposeRequestThread-1],com.itsoku.chat24.Demo2.dao(Demo2.java:51):插入数据数据2成功

上面我们通过修改代码的方式,把问题解决了,但前提是你们的系统都像上面这么简单,功能很少,需要改的代码很少,可以这么去改。但事与愿违,我们的系统一般功能都是比较多的,如果我们都一个个去改,岂不是要疯掉,改代码还涉及到重新测试,风险也不可控。那有什么好办法么?

ThreadLocal

还是拿上面的问题,我们来分析一下,每个请求都是由一个线程处理的,线程就相当于一个人一样,每个请求相当于一个任务,任务来了,人来处理,处理完毕之后,再处理下一个请求任务。人身上是不是有很多口袋,人刚开始准备处理任务的时候,我们把任务的编号放在处理者的口袋中,然后处理中一路携带者,处理过程中如果需要用到这个编号,直接从口袋中获取就可以了。那么刚好java中线程设计的时候也考虑到了这些问题,Thread对象中就有很多口袋,用来放东西。Thread类中有这么一个变量:

ThreadLocal.ThreadLocalMap threadLocals = null;

这个就是用来操作Thread中所有口袋的东西,ThreadLocalMap源码中有一个数组(有兴趣的可以去看一下源码),对应处理者身上很多口袋一样,数组中的每个元素对应一个口袋。

如何来操作Thread中的这些口袋呢,java为我们提供了一个类ThreadLocal,ThreadLocal对象用来操作Thread中的某一个口袋,可以向这个口袋中放东西、获取里面的东西、清除里面的东西,这个口袋一次性只能放一个东西,重复放东西会将里面已经存在的东西覆盖掉。

常用的3个方法:

//向Thread中某个口袋中放东西
public void set(T value);
//获取这个口袋中目前放的东西
public T get();
//清空这个口袋中放的东西
public void remove()

我们使用ThreadLocal来改造一下上面的代码,如下:

package com.itsoku.chat24;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 跟着阿里p7学并发,微信公众号:javacode2018
 */
public class Demo3 {

    //创建一个操作Thread中存放请求任务追踪id口袋的对象
    static ThreadLocal<String> traceIdKD = new ThreadLocal<>();

    static AtomicInteger threadIndex = new AtomicInteger(1);
    //创建处理请求的线程池子
    static ThreadPoolExecutor disposeRequestExecutor = new ThreadPoolExecutor(3,
            3,
            60,
            TimeUnit.SECONDS,
            new LinkedBlockingDeque<>(),
            r -> {
                Thread thread = new Thread(r);
                thread.setName("disposeRequestThread-" + threadIndex.getAndIncrement());
                return thread;
            });

    //记录日志
    public static void log(String msg) {
        StackTraceElement stack[] = (new Throwable()).getStackTrace();
        //获取当前线程存放tranceId口袋中的内容
        String traceId = traceIdKD.get();
        System.out.println("****" + System.currentTimeMillis() + "[traceId:" + traceId + "],[线程:" + Thread.currentThread().getName() + "]," + stack[1] + ":" + msg);
    }

    //模拟controller
    public static void controller(List<String> dataList) {
        log("接受请求");
        service(dataList);
    }

    //模拟service
    public static void service(List<String> dataList) {
        log("执行业务");
        dao(dataList);
    }

    //模拟dao
    public static void dao(List<String> dataList) {
        log("执行数据库操作");
        //模拟插入数据
        for (String s : dataList) {
            log("插入数据" + s + "成功");
        }
    }

    public static void main(String[] args) {
        //需要插入的数据
        List<String> dataList = new ArrayList<>();
        for (int i = 0; i < 3; i++) {
            dataList.add("数据" + i);
        }

        //模拟5个请求
        int requestCount = 5;
        for (int i = 0; i < requestCount; i++) {
            String traceId = String.valueOf(i);
            disposeRequestExecutor.execute(() -> {
                //把traceId放入口袋中
                traceIdKD.set(traceId);
                try {
                    controller(dataList);
                } finally {
                    //将tranceId从口袋中移除
                    traceIdKD.remove();
                }
            });
        }

        disposeRequestExecutor.shutdown();
    }
}

输出:

****1565339644214[traceId:1],[线程:disposeRequestThread-2],com.itsoku.chat24.Demo3.controller(Demo3.java:41):接受请求
****1565339644214[traceId:2],[线程:disposeRequestThread-3],com.itsoku.chat24.Demo3.controller(Demo3.java:41):接受请求
****1565339644214[traceId:0],[线程:disposeRequestThread-1],com.itsoku.chat24.Demo3.controller(Demo3.java:41):接受请求
****1565339644214[traceId:2],[线程:disposeRequestThread-3],com.itsoku.chat24.Demo3.service(Demo3.java:47):执行业务
****1565339644214[traceId:1],[线程:disposeRequestThread-2],com.itsoku.chat24.Demo3.service(Demo3.java:47):执行业务
****1565339644214[traceId:2],[线程:disposeRequestThread-3],com.itsoku.chat24.Demo3.dao(Demo3.java:53):执行数据库操作
****1565339644214[traceId:0],[线程:disposeRequestThread-1],com.itsoku.chat24.Demo3.service(Demo3.java:47):执行业务
****1565339644214[traceId:2],[线程:disposeRequestThread-3],com.itsoku.chat24.Demo3.dao(Demo3.java:56):插入数据数据0成功
****1565339644214[traceId:0],[线程:disposeRequestThread-1],com.itsoku.chat24.Demo3.dao(Demo3.java:53):执行数据库操作
****1565339644214[traceId:1],[线程:disposeRequestThread-2],com.itsoku.chat24.Demo3.dao(Demo3.java:53):执行数据库操作
****1565339644215[traceId:0],[线程:disposeRequestThread-1],com.itsoku.chat24.Demo3.dao(Demo3.java:56):插入数据数据0成功
****1565339644215[traceId:2],[线程:disposeRequestThread-3],com.itsoku.chat24.Demo3.dao(Demo3.java:56):插入数据数据1成功
****1565339644215[traceId:0],[线程:disposeRequestThread-1],com.itsoku.chat24.Demo3.dao(Demo3.java:56):插入数据数据1成功
****1565339644215[traceId:1],[线程:disposeRequestThread-2],com.itsoku.chat24.Demo3.dao(Demo3.java:56):插入数据数据0成功
****1565339644215[traceId:0],[线程:disposeRequestThread-1],com.itsoku.chat24.Demo3.dao(Demo3.java:56):插入数据数据2成功
****1565339644215[traceId:2],[线程:disposeRequestThread-3],com.itsoku.chat24.Demo3.dao(Demo3.java:56):插入数据数据2成功
****1565339644215[traceId:1],[线程:disposeRequestThread-2],com.itsoku.chat24.Demo3.dao(Demo3.java:56):插入数据数据1成功
****1565339644215[traceId:4],[线程:disposeRequestThread-3],com.itsoku.chat24.Demo3.controller(Demo3.java:41):接受请求
****1565339644215[traceId:3],[线程:disposeRequestThread-1],com.itsoku.chat24.Demo3.controller(Demo3.java:41):接受请求
****1565339644215[traceId:4],[线程:disposeRequestThread-3],com.itsoku.chat24.Demo3.service(Demo3.java:47):执行业务
****1565339644215[traceId:1],[线程:disposeRequestThread-2],com.itsoku.chat24.Demo3.dao(Demo3.java:56):插入数据数据2成功
****1565339644215[traceId:4],[线程:disposeRequestThread-3],com.itsoku.chat24.Demo3.dao(Demo3.java:53):执行数据库操作
****1565339644215[traceId:3],[线程:disposeRequestThread-1],com.itsoku.chat24.Demo3.service(Demo3.java:47):执行业务
****1565339644215[traceId:4],[线程:disposeRequestThread-3],com.itsoku.chat24.Demo3.dao(Demo3.java:56):插入数据数据0成功
****1565339644215[traceId:3],[线程:disposeRequestThread-1],com.itsoku.chat24.Demo3.dao(Demo3.java:53):执行数据库操作
****1565339644215[traceId:4],[线程:disposeRequestThread-3],com.itsoku.chat24.Demo3.dao(Demo3.java:56):插入数据数据1成功
****1565339644215[traceId:3],[线程:disposeRequestThread-1],com.itsoku.chat24.Demo3.dao(Demo3.java:56):插入数据数据0成功
****1565339644215[traceId:4],[线程:disposeRequestThread-3],com.itsoku.chat24.Demo3.dao(Demo3.java:56):插入数据数据2成功
****1565339644215[traceId:3],[线程:disposeRequestThread-1],com.itsoku.chat24.Demo3.dao(Demo3.java:56):插入数据数据1成功
****1565339644215[traceId:3],[线程:disposeRequestThread-1],com.itsoku.chat24.Demo3.dao(Demo3.java:56):插入数据数据2成功

可以看出输出和刚才使用traceId参数的方式结果一致,但是却简单了很多。不用去修改controller、service、dao代码了,风险也减少了很多。

代码中创建了一个ThreadLocal traceIdKD,这个对象用来操作Thread中一个口袋,用这个口袋来存放tranceId。在main方法中通过traceIdKD.set(traceId)方法将traceId放入口袋,log方法中通traceIdKD.get()获取口袋中的traceId,最后任务处理完之后,main中的finally中调用traceIdKD.remove();将口袋中的traceId清除。

ThreadLocal的官方API解释为:

“该类提供了线程局部 (thread-local) 变量。这些变量不同于它们的普通对应物,因为访问某个变量(通过其 get 或 set 方法)的每个线程都有自己的局部变量,它独立于变量的初始化副本。ThreadLocal 实例通常是类中的 private static 字段,它们希望将状态与某一个线程(例如,用户 ID 或事务 ID)相关联。”

InheritableThreadLocal

继续上面的实例,dao中循环处理dataList的内容,假如dataList处理比较耗时,我们想加快处理速度有什么办法么?大家已经想到了,用多线程并行处理dataList,那么我们把代码改一下,如下:

package com.itsoku.chat24;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 跟着阿里p7学并发,微信公众号:javacode2018
 */
public class Demo4 {

    //创建一个操作Thread中存放请求任务追踪id口袋的对象
    static ThreadLocal<String> traceIdKD = new ThreadLocal<>();

    static AtomicInteger threadIndex = new AtomicInteger(1);
    //创建处理请求的线程池子
    static ThreadPoolExecutor disposeRequestExecutor = new ThreadPoolExecutor(3,
            3,
            60,
            TimeUnit.SECONDS,
            new LinkedBlockingDeque<>(),
            r -> {
                Thread thread = new Thread(r);
                thread.setName("disposeRequestThread-" + threadIndex.getAndIncrement());
                return thread;
            });

    //记录日志
    public static void log(String msg) {
        StackTraceElement stack[] = (new Throwable()).getStackTrace();
        //获取当前线程存放tranceId口袋中的内容
        String traceId = traceIdKD.get();
        System.out.println("****" + System.currentTimeMillis() + "[traceId:" + traceId + "],[线程:" + Thread.currentThread().getName() + "]," + stack[1] + ":" + msg);
    }

    //模拟controller
    public static void controller(List<String> dataList) {
        log("接受请求");
        service(dataList);
    }

    //模拟service
    public static void service(List<String> dataList) {
        log("执行业务");
        dao(dataList);
    }

    //模拟dao
    public static void dao(List<String> dataList) {
        CountDownLatch countDownLatch = new CountDownLatch(dataList.size());

        log("执行数据库操作");
        String threadName = Thread.currentThread().getName();
        //模拟插入数据
        for (String s : dataList) {
            new Thread(() -> {
                try {
                    //模拟数据库操作耗时100毫秒
                    TimeUnit.MILLISECONDS.sleep(100);
                    log("插入数据" + s + "成功,主线程:" + threadName);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    countDownLatch.countDown();
                }
            }).start();
        }
        //等待上面的dataList处理完毕
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        //需要插入的数据
        List<String> dataList = new ArrayList<>();
        for (int i = 0; i < 3; i++) {
            dataList.add("数据" + i);
        }

        //模拟5个请求
        int requestCount = 5;
        for (int i = 0; i < requestCount; i++) {
            String traceId = String.valueOf(i);
            disposeRequestExecutor.execute(() -> {
                //把traceId放入口袋中
                traceIdKD.set(traceId);
                try {
                    controller(dataList);
                } finally {
                    //将tranceId从口袋中移除
                    traceIdKD.remove();
                }
            });
        }

        disposeRequestExecutor.shutdown();
    }
}

输出:

****1565339904279[traceId:0],[线程:disposeRequestThread-1],com.itsoku.chat24.Demo4.controller(Demo4.java:42):接受请求
****1565339904279[traceId:0],[线程:disposeRequestThread-1],com.itsoku.chat24.Demo4.service(Demo4.java:48):执行业务
****1565339904279[traceId:1],[线程:disposeRequestThread-2],com.itsoku.chat24.Demo4.controller(Demo4.java:42):接受请求
****1565339904279[traceId:1],[线程:disposeRequestThread-2],com.itsoku.chat24.Demo4.service(Demo4.java:48):执行业务
****1565339904279[traceId:2],[线程:disposeRequestThread-3],com.itsoku.chat24.Demo4.controller(Demo4.java:42):接受请求
****1565339904279[traceId:2],[线程:disposeRequestThread-3],com.itsoku.chat24.Demo4.service(Demo4.java:48):执行业务
****1565339904279[traceId:0],[线程:disposeRequestThread-1],com.itsoku.chat24.Demo4.dao(Demo4.java:56):执行数据库操作
****1565339904279[traceId:1],[线程:disposeRequestThread-2],com.itsoku.chat24.Demo4.dao(Demo4.java:56):执行数据库操作
****1565339904279[traceId:2],[线程:disposeRequestThread-3],com.itsoku.chat24.Demo4.dao(Demo4.java:56):执行数据库操作
****1565339904281[traceId:null],[线程:Thread-3],com.itsoku.chat24.Demo4.lambda$dao$1(Demo4.java:62):插入数据数据0成功,主线程disposeRequestThread-1
****1565339904281[traceId:null],[线程:Thread-5],com.itsoku.chat24.Demo4.lambda$dao$1(Demo4.java:62):插入数据数据0成功,主线程disposeRequestThread-2
****1565339904281[traceId:null],[线程:Thread-4],com.itsoku.chat24.Demo4.lambda$dao$1(Demo4.java:62):插入数据数据0成功,主线程disposeRequestThread-3
****1565339904281[traceId:null],[线程:Thread-6],com.itsoku.chat24.Demo4.lambda$dao$1(Demo4.java:62):插入数据数据1成功,主线程disposeRequestThread-3
****1565339904281[traceId:null],[线程:Thread-9],com.itsoku.chat24.Demo4.lambda$dao$1(Demo4.java:62):插入数据数据2成功,主线程disposeRequestThread-3
****1565339904282[traceId:null],[线程:Thread-8],com.itsoku.chat24.Demo4.lambda$dao$1(Demo4.java:62):插入数据数据1成功,主线程disposeRequestThread-1
****1565339904282[traceId:null],[线程:Thread-10],com.itsoku.chat24.Demo4.lambda$dao$1(Demo4.java:62):插入数据数据2成功,主线程disposeRequestThread-1
****1565339904282[traceId:3],[线程:disposeRequestThread-3],com.itsoku.chat24.Demo4.controller(Demo4.java:42):接受请求
****1565339904282[traceId:null],[线程:Thread-7],com.itsoku.chat24.Demo4.lambda$dao$1(Demo4.java:62):插入数据数据1成功,主线程disposeRequestThread-2
****1565339904282[traceId:null],[线程:Thread-11],com.itsoku.chat24.Demo4.lambda$dao$1(Demo4.java:62):插入数据数据2成功,主线程disposeRequestThread-2
****1565339904282[traceId:3],[线程:disposeRequestThread-3],com.itsoku.chat24.Demo4.service(Demo4.java:48):执行业务
****1565339904282[traceId:4],[线程:disposeRequestThread-1],com.itsoku.chat24.Demo4.controller(Demo4.java:42):接受请求
****1565339904283[traceId:3],[线程:disposeRequestThread-3],com.itsoku.chat24.Demo4.dao(Demo4.java:56):执行数据库操作
****1565339904283[traceId:4],[线程:disposeRequestThread-1],com.itsoku.chat24.Demo4.service(Demo4.java:48):执行业务
****1565339904283[traceId:4],[线程:disposeRequestThread-1],com.itsoku.chat24.Demo4.dao(Demo4.java:56):执行数据库操作
****1565339904283[traceId:null],[线程:Thread-12],com.itsoku.chat24.Demo4.lambda$dao$1(Demo4.java:62):插入数据数据0成功,主线程disposeRequestThread-3
****1565339904283[traceId:null],[线程:Thread-13],com.itsoku.chat24.Demo4.lambda$dao$1(Demo4.java:62):插入数据数据1成功,主线程disposeRequestThread-3
****1565339904283[traceId:null],[线程:Thread-14],com.itsoku.chat24.Demo4.lambda$dao$1(Demo4.java:62):插入数据数据0成功,主线程disposeRequestThread-1
****1565339904284[traceId:null],[线程:Thread-15],com.itsoku.chat24.Demo4.lambda$dao$1(Demo4.java:62):插入数据数据2成功,主线程disposeRequestThread-3
****1565339904284[traceId:null],[线程:Thread-17],com.itsoku.chat24.Demo4.lambda$dao$1(Demo4.java:62):插入数据数据2成功,主线程disposeRequestThread-1
****1565339904284[traceId:null],[线程:Thread-16],com.itsoku.chat24.Demo4.lambda$dao$1(Demo4.java:62):插入数据数据1成功,主线程disposeRequestThread-1

看一下上面的输出,有些traceId为null,这是为什么呢?这是因为dao中为了提升处理速度,创建了子线程来并行处理,子线程调用log的时候,去自己的存放traceId的口袋中拿去东西,肯定是空的了。

那有什么办法么?可不可以这样?

父线程相当于主管,子线程相当于干活的小弟,主管让小弟们干活的时候,将自己兜里面的东西复制一份给小弟们使用,主管兜里面可能有很多牛逼的工具,为了提升小弟们的工作效率,给小弟们都复制一个,丢到小弟们的兜里,然后小弟就可以从自己的兜里拿去这些东西使用了,也可以清空自己兜里面的东西。

Thread对象中有个inheritableThreadLocals变量,代码如下:

ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;

inheritableThreadLocals相当于线程中另外一种兜,这种兜有什么特征呢,当创建子线程的时候,子线程会将父线程这种类型兜的东西全部复制一份放到自己的inheritableThreadLocals兜中,使用InheritableThreadLocal对象可以操作线程中的inheritableThreadLocals兜。

InheritableThreadLocal常用的方法也有3个:

//向Thread中某个口袋中放东西
public void set(T value);
//获取这个口袋中目前放的东西
public T get();
//清空这个口袋中放的东西
public void remove()

使用InheritableThreadLocal解决上面子线程中无法输出traceId的问题,只需要将上一个示例代码中的ThreadLocal替换成InheritableThreadLocal即可,代码如下:

package com.itsoku.chat24;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 跟着阿里p7学并发,微信公众号:javacode2018
 */
public class Demo4 {

    //创建一个操作Thread中存放请求任务追踪id口袋的对象,子线程可以继承父线程中内容
    static InheritableThreadLocal<String> traceIdKD = new InheritableThreadLocal<>();

    static AtomicInteger threadIndex = new AtomicInteger(1);
    //创建处理请求的线程池子
    static ThreadPoolExecutor disposeRequestExecutor = new ThreadPoolExecutor(3,
            3,
            60,
            TimeUnit.SECONDS,
            new LinkedBlockingDeque<>(),
            r -> {
                Thread thread = new Thread(r);
                thread.setName("disposeRequestThread-" + threadIndex.getAndIncrement());
                return thread;
            });

    //记录日志
    public static void log(String msg) {
        StackTraceElement stack[] = (new Throwable()).getStackTrace();
        //获取当前线程存放tranceId口袋中的内容
        String traceId = traceIdKD.get();
        System.out.println("****" + System.currentTimeMillis() + "[traceId:" + traceId + "],[线程:" + Thread.currentThread().getName() + "]," + stack[1] + ":" + msg);
    }

    //模拟controller
    public static void controller(List<String> dataList) {
        log("接受请求");
        service(dataList);
    }

    //模拟service
    public static void service(List<String> dataList) {
        log("执行业务");
        dao(dataList);
    }

    //模拟dao
    public static void dao(List<String> dataList) {
        CountDownLatch countDownLatch = new CountDownLatch(dataList.size());

        log("执行数据库操作");
        String threadName = Thread.currentThread().getName();
        //模拟插入数据
        for (String s : dataList) {
            new Thread(() -> {
                try {
                    //模拟数据库操作耗时100毫秒
                    TimeUnit.MILLISECONDS.sleep(100);
                    log("插入数据" + s + "成功,主线程:" + threadName);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    countDownLatch.countDown();
                }
            }).start();
        }
        //等待上面的dataList处理完毕
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        //需要插入的数据
        List<String> dataList = new ArrayList<>();
        for (int i = 0; i < 3; i++) {
            dataList.add("数据" + i);
        }

        //模拟5个请求
        int requestCount = 5;
        for (int i = 0; i < requestCount; i++) {
            String traceId = String.valueOf(i);
            disposeRequestExecutor.execute(() -> {
                //把traceId放入口袋中
                traceIdKD.set(traceId);
                try {
                    controller(dataList);
                } finally {
                    //将tranceId从口袋中移除
                    traceIdKD.remove();
                }
            });
        }

        disposeRequestExecutor.shutdown();
    }
}

输出:

****1565341611454[traceId:1],[线程:disposeRequestThread-2],com.itsoku.chat24.Demo4.controller(Demo4.java:42):接受请求
****1565341611454[traceId:2],[线程:disposeRequestThread-3],com.itsoku.chat24.Demo4.controller(Demo4.java:42):接受请求
****1565341611454[traceId:0],[线程:disposeRequestThread-1],com.itsoku.chat24.Demo4.controller(Demo4.java:42):接受请求
****1565341611454[traceId:2],[线程:disposeRequestThread-3],com.itsoku.chat24.Demo4.service(Demo4.java:48):执行业务
****1565341611454[traceId:1],[线程:disposeRequestThread-2],com.itsoku.chat24.Demo4.service(Demo4.java:48):执行业务
****1565341611454[traceId:0],[线程:disposeRequestThread-1],com.itsoku.chat24.Demo4.service(Demo4.java:48):执行业务
****1565341611454[traceId:2],[线程:disposeRequestThread-3],com.itsoku.chat24.Demo4.dao(Demo4.java:56):执行数据库操作
****1565341611454[traceId:1],[线程:disposeRequestThread-2],com.itsoku.chat24.Demo4.dao(Demo4.java:56):执行数据库操作
****1565341611454[traceId:0],[线程:disposeRequestThread-1],com.itsoku.chat24.Demo4.dao(Demo4.java:56):执行数据库操作
****1565341611557[traceId:2],[线程:Thread-5],com.itsoku.chat24.Demo4.lambda$dao$1(Demo4.java:64):插入数据数据0成功,主线程disposeRequestThread-3
****1565341611557[traceId:0],[线程:Thread-4],com.itsoku.chat24.Demo4.lambda$dao$1(Demo4.java:64):插入数据数据0成功,主线程disposeRequestThread-1
****1565341611557[traceId:1],[线程:Thread-11],com.itsoku.chat24.Demo4.lambda$dao$1(Demo4.java:64):插入数据数据2成功,主线程disposeRequestThread-2
****1565341611557[traceId:1],[线程:Thread-3],com.itsoku.chat24.Demo4.lambda$dao$1(Demo4.java:64):插入数据数据0成功,主线程disposeRequestThread-2
****1565341611557[traceId:1],[线程:Thread-8],com.itsoku.chat24.Demo4.lambda$dao$1(Demo4.java:64):插入数据数据1成功,主线程disposeRequestThread-2
****1565341611557[traceId:0],[线程:Thread-6],com.itsoku.chat24.Demo4.lambda$dao$1(Demo4.java:64):插入数据数据1成功,主线程disposeRequestThread-1
****1565341611557[traceId:0],[线程:Thread-10],com.itsoku.chat24.Demo4.lambda$dao$1(Demo4.java:64):插入数据数据2成功,主线程disposeRequestThread-1
****1565341611557[traceId:3],[线程:disposeRequestThread-2],com.itsoku.chat24.Demo4.controller(Demo4.java:42):接受请求
****1565341611557[traceId:2],[线程:Thread-9],com.itsoku.chat24.Demo4.lambda$dao$1(Demo4.java:64):插入数据数据2成功,主线程disposeRequestThread-3
****1565341611558[traceId:2],[线程:Thread-7],com.itsoku.chat24.Demo4.lambda$dao$1(Demo4.java:64):插入数据数据1成功,主线程disposeRequestThread-3
****1565341611557[traceId:3],[线程:disposeRequestThread-2],com.itsoku.chat24.Demo4.service(Demo4.java:48):执行业务
****1565341611557[traceId:4],[线程:disposeRequestThread-1],com.itsoku.chat24.Demo4.controller(Demo4.java:42):接受请求
****1565341611558[traceId:3],[线程:disposeRequestThread-2],com.itsoku.chat24.Demo4.dao(Demo4.java:56):执行数据库操作
****1565341611558[traceId:4],[线程:disposeRequestThread-1],com.itsoku.chat24.Demo4.service(Demo4.java:48):执行业务
****1565341611558[traceId:4],[线程:disposeRequestThread-1],com.itsoku.chat24.Demo4.dao(Demo4.java:56):执行数据库操作
****1565341611659[traceId:3],[线程:Thread-15],com.itsoku.chat24.Demo4.lambda$dao$1(Demo4.java:64):插入数据数据2成功,主线程disposeRequestThread-2
****1565341611659[traceId:4],[线程:Thread-14],com.itsoku.chat24.Demo4.lambda$dao$1(Demo4.java:64):插入数据数据0成功,主线程disposeRequestThread-1
****1565341611659[traceId:3],[线程:Thread-13],com.itsoku.chat24.Demo4.lambda$dao$1(Demo4.java:64):插入数据数据1成功,主线程disposeRequestThread-2
****1565341611659[traceId:3],[线程:Thread-12],com.itsoku.chat24.Demo4.lambda$dao$1(Demo4.java:64):插入数据数据0成功,主线程disposeRequestThread-2
****1565341611660[traceId:4],[线程:Thread-16],com.itsoku.chat24.Demo4.lambda$dao$1(Demo4.java:64):插入数据数据1成功,主线程disposeRequestThread-1
****1565341611660[traceId:4],[线程:Thread-17],com.itsoku.chat24.Demo4.lambda$dao$1(Demo4.java:64):插入数据数据2成功,主线程disposeRequestThread-1

输出中都有traceId了,和期望的结果一致。

希望通过这篇文章可以学会使用InheritableThreadLocalInheritableThreadLocal。有问题可以加我微信itsoku交流,也可以留言,谢谢。

Search

    微信好友

    博士的沙漏

    Table of Contents