java源码

思维导图

Java 基础

Java 语言有哪些特点

  • 面向对象
    • 封装
      • 对象隐藏内部的属性和实现细节,对外部提供统一的访问方法
    • 继承
      • 子类继承父类的成员和方法,使子类也具有父类相同的行为
    • 多态
      • 对象类型和引用类型之间具有继承(类)/实现(接口)的关系
      • 重写
      • 重载
  • 跨平台
    • JVM虚拟机
    • 字节码
  • 可靠性
    • 强类型检查
    • 异常处理
    • 内存管理
  • 安全性
    • 字节码校验
    • 访问控制(public,private)
    • 安全沙箱

JDK、 JRE、 JVM

JDK

java开发环境

  • JRE Java运行环境
  • 开发工具
    • 编译器
    • 调试器

JRE

java 运行环境

  • 包含java虚拟机
  • java类库和Java命令(javac)

JVM

java 虚拟机,用于运行java字节码

字节码

字节码:通过 javac 命令编译后的 .class 文件就是字节码,字节码是介于源码机器码之间的一种编码,面向JVM

好处

  • java 是解释型语言,先把源码编译为字节码能够提高jvm的执行效率
  • 字节码是跨平台的组成部分,面向jvm执行,jvm是跨平台实现的根本

java为什么不直接编译为机器码

和java的生态有关,java作为一种灵活的代码,得益于 CGLIB 和 ASM 动态字节码技术,如果全部编译为机器码就会失去这种灵活性

编译与解释并存

java源码 --javac–> 字节码 --加载–> JVM --解释–> 机器码 --执行–> cpu

编译:将java源码编译为JVM可执行的字节码(.class文件)

解释:JVM加载字节码后一行行进行通过Java解释器解释成系统能够运行的机器码

continue、break 和 return 的区别

  • continue:用于跳出本次循环进入下次循环
  • break :用于跳出整个循环体
  • return:用于跳出所在方法

如何实现跨平台

  1. 将Java源码编译成字节码文件
  2. 使用Java虚拟机运行字节码文件
  3. 不同平台有不同的Java虚拟机实现

重载

发生在同一个类中(或者父类和子类之间)

  • 方法名称相同
  • 参数类型不同,个数不同,顺序不同,返回值和访问修饰符可以不同
1
2
3
4
5
6
7
8
9
10
public  class Test {
public void hello() {
System.out.println("hello");
}

// 重载 hello() 方法
public void hello(String name) {
System.out.println("hello: " + name);
}
}

重写

子类覆盖父类的方法,有重新的实现

两同两小一大

  • 方法名称相同,参数列表相同
  • 子类方法返回值类型要小于等于父类方法返回值类型;子类方法声明抛出的异常要小于等于父类方法抛出的异常
  • 子类方法的访问修饰符要大于等于父类方法的访问修饰符
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class Parent {

protected Integer add(int a, int b) throws Exception {
return a + b;
}

}

public class Son extends Parent {

@Override
public Integer add(int a, int b) throws RuntimeException {
return 0;
}
}

数据类型

基本类型位数字节默认值取值范围
byte810-128 ~ 127
short1620-32768 ~ 32767
int3240-2147483648 ~ 2147483647
long6480L-9223372036854775808 ~ 9223372036854775807
char162‘u0000’0 ~ 65535
float3240f1.4E-45 ~ 3.4028235E38
double6480d4.9E-324 ~ 1.7976931348623157E308
boolean1falsetrue、false

封装类型

  • 成员变量包装类型不赋值就是 null ,而基本类型有默认值且不是 null
  • 包装类型可用于泛型,而基本类型不可以
  • Byte,Short,Integer,Long 这 4 种包装类默认创建了数值 [-128,127] 的相应类型的缓存数据

装箱、拆箱

  • 装箱将基本类型用它们对应的引用类型包装起来
  • 拆箱将包装类型转换为基本数据类型
1
2
3
4
5
// 自动装箱,相当于 Integer a = Integer.valueOf(10)
Integer a = 10;

// 自动拆箱, 相当于 int b = a.intValue()
int b = a;

注意:如果频繁拆装箱的话,也会严重影响系统的性能。我们应该尽量避免不必要的拆装箱操作

解决浮点精度丢失

BigDecimal 可以实现对浮点数的运算,不会造成精度丢失,必须使用其字符串类型的构造函数

1
BigDecimal a = new BigDecimal("1.0");

超过 long 整型的数据应该如何表示

BigInteger 内部使用 int[] 数组来存储任意大小的整形数据

1
2
3
BigInteger a=new BigInteger(“23”);
BigInteger b=new BigInteger(“34”);
a. add(b);

相对于常规整数类型的运算来说,BigInteger 运算的效率会相对较低

隐式转换(自动转换)

  • 数值型数据的转换:byte→short→int→long→float→double
  • 字符型转换为整型:char→int
  • 两种数据类型彼此兼容
  • 低级类型数据转换成高级类型数据
1
2
3
4
5
6
7
8
9
10
11
short a = 1;
// 类型自动转换为int,因为1是int
short b = (short) (a + 1);
// += -= 存在类型自动转换
short d = a += 1;
int c = a + 1;

// 数据溢出处理
int i = 900000 * 100000;
// 隐式转换转换为long
long k = 900000 * 100000L;

显式转换(强制转换)

两种数据类型不兼容,高级类型向低级类型转换,自动转换将无法进行,这时就需要进行强制类型转换

1
2
3
int a = 3;
double b = 5.0;
a = (int)b;

对象的相等和引用相等的区别

  • 对象的相等一般比较的是内存中存放的内容是否相等
    • 使用 equals 判断两个对象的内容是否相等
  • 引用相等一般比较的是他们指向的内存地址是否相等
    • 使用 == 判断两个变量是否指向同一个内存地址

构造方法

  • 如果不声明构造方法,默认会生成一个无参的构造方法
  • 如果我们自己添加了类的构造方法(无论是否有参),Java 就不会再添加默认的无参数的构造方法了
  • 主要作用是完成对象的初始化工作,在对象被创建后执行

特点

  • 名字与类名相同
  • 没有返回值,但不能用 void 声明构造函数
  • 生成类的对象时自动执行,无需调用
  • 构造方法不能被 override(重写),但是可以 overload(重载)

接口和抽象类有什么共同点和区别

相同

  • 都不能被实例化
  • 都可以包含抽象方法
  • 都可以有默认实现的方法(Java 8 可以用 default 关键字在接口中定义默认方法)

区别

  • 一个类只能继承一个类,但是可以实现多个接口
  • 抽象类可以有构造方法,接口不可以
  • 接口只能定义常量,抽象类可以定义变量

浅拷贝和深拷贝

浅拷贝

浅拷贝:浅拷贝会在堆上创建一个新的对象(区别于引用拷贝的一点),不过,如果原对象内部的属性是引用类型的话,浅拷贝会直接复制内部对象的引用地址,也就是说拷贝对象和原对象共用同一个内部对象

1
2
3
4
5
6
7
8
9
10
public class Test implements Cloneable {

private Integer age;

@Override
protected Object clone() throws CloneNotSupportedException {
Test clone = (Test) super.clone();
return clone;
}
}

深拷贝

深拷贝 :深拷贝会完全复制整个对象,包括这个对象所包含的内部对象

1
2
3
4
5
6
7
8
9
10
11
12
13
@Data
public class Test implements Cloneable {

private Integer age;

@Override
protected Object clone() throws CloneNotSupportedException {
Integer age = new Integer(this.getAge());
Test clone = new Test();
clone.setAge(age);
return clone;
}
}

深拷贝可以使用序列化和反序列化实现

Object 方法

== 和 equals() 的区别

  • 对于基本数据类型来说,== 比较的是值
  • 对于引用数据类型来说,== 比较的是对象的内存地址
  • 类没有重写 equals()方法 :通过equals()比较该类的两个对象时,等价于通过“==”比较这两个对象
  • 类重写了 equals()方法 :一般我们都重写 equals()方法来比较两个对象中的属性是否相等

为什么重写 equals() 时必须重写 hashCode() 方法

java约定

  • 两个对象相等其hashCode必定相等
  • 两个对象的hashCode相等对象不一定相等(hash冲突)

hashCode被广泛应用于Java核心类库的集合类中(HashSet,HashMap)。为了维护哈希表的正常运作并维持对象的一致性,确保通过hashCode能够找到正确的对象(HashMap源码有体现,先判断HashCode,存在冲突再遍历对象使用equals方法找出正确对象),因此重写 equals() 时必须重写 hashCode() 方法

字符串

String

String不可变

String 是不可变的 (每次变量拼接会生成新的对象,底层由StringBuilder#append实现),线程不安全的

String 类中使用 final 关键字修饰字符数组来保存字符串

1
2
3
4
public final class String implements java.io.Serializable, Comparable<String>, CharSequence {
private final char value[];
//...
}
  1. 保存字符串的数组被 final 修饰且为私有的,并且String 类没有提供/暴露修改这个字符串的方法。
  2. String 类被 final 修饰导致其不能被继承,进而避免了子类破坏 String 不可变。

intern()

String.intern()方法设计的初衷就是:重用字符串对象,以便节省内存

JDK1.8, 先判断常量池中当前字符串是否存在

  • 如果不存在:不会将当前字符串复制到常量池,而是将当前字符串的引用复制到常量池
  • 如果存在:不会改变常量池已经存在的引用,并直接返回常量池中字符串引用
1
2
3
String str1 = "java";
String str2 = new String("Ja") + new String("va");
str1 == str2.intern(); //false

思路分析:两者分别是堆中的两个对象

1
2
3
String str1 = "java";
String str2 = new String("ja") + new String("va");
str1 == str2.intern(); // true

思路分析:String str1 = “java” 会将字符串分配到常量池,str2.intern()直接返回常量池的引用。因此它们属于常量池的同一个引用

String 运算

1
2
String str = "R"; 
一共创建了几个对象

思路分析:"R"是一个字面量,会放在字符串常量池子中

  • 如果字符串常量池中已经存在“R“,那么创建0个对象,直接返回字符串常量池引用
  • 如果字符串常量池不存在"R",那么在常量池中创建1个对象
1
2
new String("R"); 
创建了几个对象

思路分析: new 关键字一定会在内存中创建一个对象,还要把字面量“R“提取出来分析

  • 如果字符串常量池中已经存在"R",那么创建1个对象(堆中1个)
  • 如果字符串常量池不存在“R“,那么在常量池中创建2个对象(字符串常量池1个,堆中1个)
1
2
3
String str = "Java";	// 常量池没有则在常量池创建一个 Java 对象,有则返回字符串的引用
String str2 = "Java"; // 直接返回常量池引用
为什么 str == str2 为true

思路分析:java对于常量(字面量)会存储在字符串常量池中,str1第一次赋值时在常量池中添加了“Java“并返回其引用, str2第二次赋值直接引用字符串常量池。两个都是常量池 “Java” 的引用

1
2
3
String str = "Java";
String str2 = "Ja" + "va"; // 编译器对字面量优化 str2 = "Java"
为什么 str == str2 为true

思路分析:参考上面,常量拼接还是常量(编译期优化)

1
2
3
String str = "Java";
String str2 = new String("Java");
为什么 str == str2 为false

思路分析:String str = “Java”; 内存存储区域在字符串常量池,new String(“Java”);内存存储区域在堆

1
2
3
4
String str1 = "java";
String str2 = "ja";
String str3 = str2 + "va"; // 编译器无法优化变量拼接,底层用的是 StringBuilder
为什么 str1 == str3 为false

思路分析:因为两者属于不同对象,第一个对象在常量池中,第二个对象在堆中

str2 + “va”; 编译器无法对变量拼接优化。字符串拼接底层使用的是 StringBuilder#append,最终使用StringBuilder#toString方法返回String对象,对象在堆里

1
2
3
4
public String toString() {
// 创建一个新的String对象
return new String(value, 0, count);
}

拼接运算

1
2
3
4
5
6
7
8
9
10
11
12
13
String str1 = "str";
String str2 = "ing";
String str3 = "str" + "ing";
String str4 = str1 + str2;
String str5 = "string";
String str6 = str1 + "ing";

System.out.println(str3 == str4);//false
System.out.println(str3 == str5);//true
System.out.println(str4 == str5);//false
System.out.println(str4.intern() == str6.intern()); // true
System.out.println(str5 == str6); // false
System.out.println(str4.intern() == str5); //true

StringBuilder

当对字符串进行修改的时候,需要使用 StringBuffer 和 StringBuilder 类

因为 StringBuffer 和 StringBuilder 都是可变的,调用append 不会产生新的String对象

  • 线程不安全,性能最好

  • 默认16个字节

  • 扩容:2倍 + 2个字节

toString

1
2
3
4
5
6
7
char[] value;

@Override
public String toString() {
// Create a copy, don't share the array
return new String(value, 0, count);
}

toString 直接使用缓冲区 char数组

StringBuffe

和StringBuilder一致,不同的是使用了 Synchronized 保证了线程安全,性能差于StringBuilder

  • 线程安全

  • 默认16个字节

  • 扩容:2倍 + 2个字节

toString

1
2
3
4
5
6
7
8
@Override
public synchronized String toString() {
if (toStringCache == null) {
// 并发场景,会先拷贝一次缓冲区
toStringCache = Arrays.copyOfRange(value, 0, count);
}
return new String(toStringCache, true);
}

异常

Throwable

所有的异常都有一个共同的祖先

Exception 和 Error 有什么区别

  • Exception :程序本身可以处理的异常,可以通过 catch 来进行捕获。Exception 又可以分为 Checked Exception (受检查异常,必须处理) 和 Unchecked Exception (不受检查异常,可以不处理)

    • Checked Exception 即 受检查异常 ,Java 代码在编译过程中,如果受检查异常没有被 catch或者throws 关键字处理的话,就没办法通过编译

      比如 SQLException IOException

    • Unchecked Exception 即 不受检查异常 ,Java 代码在编译过程中 ,我们即使不处理不受检查异常也可以正常通过编译

      RuntimeException 及其子类都统称为非受检查异常

      • NullPointerException(空指针错误)
      • IllegalArgumentException(参数错误比如方法入参类型错误)
      • NumberFormatException(字符串转换为数字格式错误,IllegalArgumentException的子类)
      • ArrayIndexOutOfBoundsException(数组越界错误)
      • ClassCastException(类型转换错误)
  • Error :属于程序无法处理的错误,例如 Java 虚拟机运行错误(Virtual MachineError)、虚拟机内存不够错误(OutOfMemoryError) 这些异常发生时,Java 虚拟机(JVM)一般会选择线程终止

try-catch-finally 如何使用

  • try块 : 用于捕获异常。其后可接零个或多个 catch 块,如果没有 catch 块,则必须跟一个 finally 块。
  • catch块 : 用于处理 try 捕获到的异常。
  • finally 块 : 无论是否捕获或处理异常,finally 块里的语句都会被执行。当在 try 块或 catch 块中遇到 return 语句时,finally 语句块将在方法返回之前被执行。
1
2
3
4
5
6
7
8
try {
System.out.println("Try to do something");
throw new RuntimeException("RuntimeException");
} catch (Exception e) {
System.out.println("Catch Exception -> " + e.getMessage());
} finally {
System.out.println("Finally");
}

输出:

1
2
3
Try to do something
Catch Exception -> RuntimeException
Finally

注意:不要在 finally 语句块中使用 return! 当 try 语句和 finally 语句中都有 return 语句时,try 语句块中的 return 语句会被忽略。这是因为 try 语句中的 return 返回值会先被暂存在一个本地变量中,当执行到 finally 语句中的 return 之后,这个本地变量的值就变为了 finally 语句中的 return 返回值

try具有缓存功能

在try语块中返回基本类型,会将return的值进行缓存,如果在finally语块修改返回值则不生效(基本类型)

1
2
3
4
5
6
7
8
9
10
    public static Integer test() {
int a = 0;
try {
a = 1;
return a;
} finally {
a = 10;
}
}
// 返回值还是1

catch语块的多种用法

catch块可以捕获多种类型的异常,但是需要注意以下几点:

  1. 异常的捕获顺序:在多个catch块中,应该将具体的异常类型放在前面,将父类异常放在后面。如果将父类异常放在前面,那么子类异常永远不会被捕获

    1
    2
    3
    4
    5
    6
    7
    8
    9
    try {
    // 代码块
    } catch (ArrayIndexOutOfBoundsException e) {
    // 处理数组越界异常
    } catch (IndexOutOfBoundsException e) {
    // 处理下标越界异常
    } catch (Exception e) {
    // 处理其他异常
    }
  2. 同时处理多个异常类型:在一个catch块中,可以同时捕获多个异常类型,多个异常类型之间使用管道符号 | 分隔

    1
    2
    3
    4
    5
    try {
    // 代码块
    } catch (IOException | SQLException e) {
    // 处理IO异常和SQL异常
    }

finally 中的代码一定会执行吗

不一定的!在某些情况下,finally 中的代码不会被执行。

就比如说 finally 之前虚拟机被终止运行的话,finally 中的代码就不会被执行。

1
2
3
4
5
6
7
8
9
10
try {
System.out.println("Try to do something");
throw new RuntimeException("RuntimeException");
} catch (Exception e) {
System.out.println("Catch Exception -> " + e.getMessage());
// 终止当前正在运行的Java虚拟机
System.exit(1);
} finally {
System.out.println("Finally");
}

Java SPI

SPI详解

SPI 全称为 Service Provider Interface(接口服务提供者),它是一种服务发现机制。SPI的本质是将接口的实现类配置在文件中,并由服务加载器读取配置文件,进行加载和实例化。

SPI机制的设计初衷是为了解决接口的扩展性问题。在传统的编程模型中,接口的实现类通常是由开发者手动指定或硬编码在代码中的。这种方式存在一个问题,即当需要替换或新增接口的实现类时,必须修改源代码并重新编译、部署应用程序。这种修改源代码的方式增加了耦合性,同时也不够灵活。

而SPI机制通过将接口的实现类配置在文件中,并由服务加载器动态加载,可以实现在不修改源代码的情况下,替换或新增接口的实现类。这样就提高了系统的扩展性和灵活性。

具体来说,SPI机制的使用步骤如下:

  1. 定义接口:首先需要定义一个接口,接口定义了一组方法或规范,表示一种功能或服务的抽象。

    1
    2
    3
    public interface MyService {
    void doSomething();
    }
  2. 实现接口:根据接口的定义,编写实现类来实现接口中的方法,提供具体的功能实现。

    1
    2
    3
    4
    5
    6
    public class MyServiceImpl implements MyService {
    @Override
    public void doSomething() {
    System.out.println("Doing something...");
    }
    }
  3. 创建配置文件:在resources/META-INF/services目录下创建一个以接口的全限定名命名的文件,文件内容为接口实现类的全限定名,每行一个。

    1
    com.example.MyServiceImpl
  4. 加载实现类:通过服务加载器(ServiceLoader)加载配置文件中的实现类,并进行实例化。

    1
    ServiceLoader<MyService> serviceLoader = ServiceLoader.load(MyService.class); // ServiceLoader 是一个可迭代对象,可以加在1-N个服务
  5. 使用服务:通过服务加载器获取已加载的实现类的实例,并调用其方法。

    1
    2
    3
    for (MyService service : serviceLoader) {
    service.doSomething();
    }

序列化

序列化 :将对象转为字节流的过程,让对象能够保存在本地文件或者在网络中传输

反序列化 :将字节流转回Java对象的过程,根据字节流的信息将流数据转成一个Java对象的过程

序列化详解

Serializable

1
2
3
4
5
6
7
@Data
public class Article implements Serializable {
private static final long serialVersionUID = 1L;
private Integer id;
private String title; //文章标题
private String content; // 文章内容
}

Externlizable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Data
public class ExternalizableDemo implements Externalizable {
private static final long serialVersionUID = 1L;
private String name;
private int number;

@Override
public void writeExternal(ObjectOutput out) throws IOException {
out.writeUTF(name);
out.writeInt(number);
}

@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
this.name = in.readUTF();
this.number = in.readInt();
}

异同

  1. 序列化内容,Externalizable自定义序列化可以控制序列化的过程和决定哪些属性不被序列化
  2. Serializable反序列化时不会调用默认的构造器,而Externalizable反序列化时会调用默认构造器的
  3. 使用Externalizable时,必须按照写入时的确切顺序读取所有字段状态。否则会产生异常。例如,如果更改ExternalizableDemo类中的number和name属性的读取顺序,则将抛出java.io.EOFException。而Serializable接口没有这个要求

transient修饰符

transient修饰符用于类属性、变量。表示该类的序列化过程用transient修饰该变量,可避免该变量被序列化

1
private transient String password;

为什么 Java 中只有值传递

基本类型:传递的是实际值的拷贝

引用类型: 传递的是内存地址值的拷贝

比较器

Comparable 内部比较器

类继承 Comparable 接口并且实现 compareTo 方法就可以使用 Collections.sort 进行排序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
 @Data
@ToString
@AllArgsConstructor
public static class User implements Comparable<User> {

private String name;
private Integer age;

@Override
public int compareTo(User o) {
// 倒叙
return o.getAge() - this.getAge();
// 正序
//return this.getAge() - o.getAge();
}
}

public static void main(String[] args) {
List<User> userList = new ArrayList<>();
userList.add(new User("test-1", 12));
userList.add(new User("test-2", 15));
userList.add(new User("test-3", 18));
userList.add(new User("test-4", 14));
userList.add(new User("test-5", 13));

// 使用内部比较器排序
Collections.sort(userList);
// userList.forEach(System.out::println);
// OrderService.User(name=test-3, age=18)
// OrderService.User(name=test-2, age=15)
// OrderService.User(name=test-4, age=14)
// OrderService.User(name=test-5, age=13)
// OrderService.User(name=test-1, age=12)
}

外部比较器 Comparator

如果类没有实现 Comparable 接口又要进行排序可使用外部比较器 Comparator 外部比较器重写 compare 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
 @Data
@ToString
@AllArgsConstructor
public static class User {

private String name;
private Integer age;
}

public static void main(String[] args) {
List<User> userList = new ArrayList<>();
userList.add(new User("test-1", 12));
userList.add(new User("test-2", 15));
userList.add(new User("test-3", 18));
userList.add(new User("test-4", 14));
userList.add(new User("test-5", 13));

// 类内有实现Comparable 但又需要进行排序, 使用外部比较器
Comparator<User> comparator = new Comparator<User>() {
@Override
public int compare(User o1, User o2) {
return o2.getAge() - o1.getAge();
// 正序
//return o1.getAge() - o2.getAge();
}
};

// 排序
Collections.sort(userList, comparator);

userList.forEach(System.out::println);
// OrderService.User(name=test-3, age=18)
// OrderService.User(name=test-2, age=15)
// OrderService.User(name=test-4, age=14)
// OrderService.User(name=test-5, age=13)
// OrderService.User(name=test-1, age=12)
}

static

  • 静态内部类:不需要依赖外部类的实例而实例化
  • 静态代码块:类初始化时执行
  • 静态方法:使用类名调用
  • 静态变量:类变量分配到方法区

泛型

类型擦除

泛型信息只存在于代码编译阶段,但是在java的运行期(已经生成字节码文件后)与泛型相关的信息会被擦除掉,专业术语叫做类型擦除

1
2
3
4
ArrayList<Integer> l1 = new ArrayList(); 
ArrayList<String> l2 = new ArrayList();
System.out.println(l1.getClass()==l2.getClass());
//运行代码,结果为True

这是因为 ArrayList<String>ArrayList<Integer> 在 jvm 中的 Class 都是 List.class,二者在 jvm 中等同于 List<Object>,所有的泛型在编译后都会变成Object

Java的泛型其实是伪泛型,因为编译后都会进行类型擦除,它的作用就是在编码阶段对使用的类型作限制

泛型上下限

无边界擦除

1
2
3
4
5
6
7
8
9
10
List<Integer> list = new ArrayList<>();
list.add(1);

List<?> list2 = list;

// 获得类型为Object
Object o = list2.get(0);

// 编译不通过
list2.add(2);
  • 相当于 List<? extends Object>,知道父类是Object
  • 可以get获取值,返回的是Object
  • 不清楚使用的是哪种具体类型,因此不能写入数据

上界通配

  • <? extends 父类类型>

  • 可以确定父类型,所以返回的数据是父类型(向上转型)

  • 不能写入数据

1
2
3
4
5
6
7
8
9
List<Animal> list = new ArrayList<>();
list.add(new Cat());

List<? extends Animal> list2 = list;
// 返回的是父类型(向上转型)
Animal animal = list2.get(0);

// 不能写数据,编译不通过
list2.add(new Dog());

下界通配

  • <? super 子类类型>
  • 可以确定子类类型,但不确定其有多少父类,所以返回Object
  • 由于知道子类,所以可以写入父类对象(向上转型是安全的)
1
2
3
4
5
6
7
8
9
10
List<Animal> list = new ArrayList<>();
list.add(new Cat());
list.add(new Dog());

List<? super Cat> list2 = list;
// 只知道子类,所以返回的是Object
Object o = list2.get(0);

// 能写数据
list2.add(new Cat());

集合

fail-fast机制

多线程环境保证集合安全性和一致性的错误机制

fail-fast(快速失败)机制是java集合(Collection)中的一种错误机制

当多个线程对同一个集合的内容进行操作时,就可能产生fail-fast事件

例如:线程A通过Iterator迭代器去遍历某个集合的过程中,如果集合对象的内容被其他线程进行了修改(增加,删除,修改),则会抛出java.util.ConcurrentModificationException异常,产生 fail-fast事件

java.util包下的集合都是快速失败的,不能在多线程并发修改

fail-safe机制

fail-safe(安全失败)机制在遍历时不是在集合内容上访问的,而是先复制原有集合内容,在拷贝的集合上进行遍历,因此它不会抛出java.util.ConcurrentModificationException异常

juc包下的集合都是fail-safe安全失败的,可以在多线程并发下使用,并发修改

迭代器

Iterator

  • 支持 fail-fast 机制,更加安全
  • Iterator是JDK1.2添加的接口,Collection 集合类使用
  • 速度比Enumeration慢
  • 允许从集合中移除元素(从循环中删除元素)

Enumeration

  • 不支持 fail-fast 机制
  • Enumeration 是JDK 1.0添加的接口。使用到它的函数包括Vector、Hashtable等类,已过时
  • 速度比Iterator快
  • 基础的实现,不支持移除元素

List

  • 提供排序 sort 方法,传入 Comparator
  • List转Array:调用toArray方法
  • Array转List:new ArrayList(Arrays.asList(array))
  • List遍历:普通 for增强 for迭代器Stream.foreach

Arrays.asList

  • 它的add/remove/clear方法会抛出UnsupportedOperationException()
  • 返回的是 Arrays.ArrayList 内部类
  • 要正常使用必须使用 new ArrayList() 包裹

ArrayList

  • 数据结构:Object[]
  • 初始容量10
  • 最大容量:Integer.MAX_VALUE - 8
  • 扩容倍数oldCapacity + (oldCapacity >> 1) 原容量 + (原容量/2) 1.5倍
  • 扩容实现System.arraycopy() 数组复制
  • 非线程安全
  • 支持随机访问
  • 存空间占用:扩容后数组元素可能没有完全利用
  • 特点:数据检索快,数据插入、删除慢(需要移动数组元素)

Vector

  • 数据结构:Object[]

  • 初始容量:10

  • 最大容量:Integer.MAX_VALUE - 8

  • 扩容倍数2

  • 支持随机访问

  • 历史遗留的集合类,已经不推荐使用

  • 线程安全:可以看作线程安全的ArrayList

  • 线程安全保证:方法使用synchronized修饰

  • 性能比ArrayList差

  • 线程安全推荐使用 JUC的 CopyOnWriteArrayList

LinkedList

  • 数据结构双向链表,节点保存两个指针,前驱和后继
  • 最大容量:理论上没有,取决于内存大小
  • 头节点不存放数据,允许元素为null
  • 不支持随机访问(链表不允许)
  • 内存空间占用:维护额外的前驱后继指针
  • 特点:数据插入删除快,数据检索效率不高

Map

HashMap

  • 数据结构:数组 + 单向链表 + 红黑树(hash冲突严重用于优化查询性能)
  • 负载因子:0.75
  • 初始容量:16
  • 扩容条件:当前数组大小 > 当前数组大小 * 0.75 (默认12)
  • 扩容倍数:2
  • 扩容过程:
  • 不必将所有Key都重新计算一次hash
  • 将key的hash与数组长度做高位与运算(2n-1),结果为0位置不变,为1位置变为当前下标 + 扩容步长 比如(7 + 16)
  • 链表转红黑树:数组大小>= 64, 链表长度>=8
  • 红黑树退化链表:链表长度<=6
  • 允许一个key为null
  • 非线程安全
  • 数据添加无序
  • put:(n-1) & hash,计算出数组下标位置获取链表进行添加, (n为数组长度,hash为key的hash)
  • get: (n-1) & hash,计算素组下标位置获取链表进行遍历,先判断hash是否相等,相等则判断key
  • JDK1.8的变化:
    • 链表和红黑树转换
    • hash碰撞,1.7会在链表头部插入;1.8会在链表尾部插入
    • Entry被Node替代

HashTable

  • 方法使用Synchronized修改,线程安全
  • 初始容量:11
  • 扩容倍数:2n + 1
  • key,value都不能为null
  • 不支持链表和红黑树互转
  • 考虑线程安全使用ConcurrentHashMap Node锁效率更高

LinkedHashMap

  • 特点保证插入顺序,使用双向链表保证
  • 数据结构:数组加双向链表(前驱后继保证插入顺序)
  • 使用before,after指针保证插入顺序

HashMap的Entry是一个单向链表,用于解决Hash冲突

1
2
3
4
5
6
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
V value;
Node<K,V> next;
...

LinkedHashMap的Entry继承HashMap的Node, Entry是一个双向链表,用于保证插入顺序

1
2
3
4
5
6
static class Entry<K,V> extends HashMap.Node<K,V> {
Entry<K,V> before, after;
Entry(int hash, K key, V value, Node<K,V> next) {
super(hash, key, value, next);
}
}

TreeMap

  • 数据结构:红黑树
  • 实现key的自然排序
  • 可以在构造函数指定Comparator比较器对key排序

WeakHashMap

Java存在的引用

  • WeakHashMap:一个weak key的Map
  • 它能让Map释放其所持有的对象
  • 如果某个对象除了在Map当中充当键之外 ,在其他地方都没有引用的话,那它将被当作垃圾回收
  • 使用场景:二三级缓存
  • 也存在内存溢出可能
  • GC回收时进行回收
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* WeakHashMap:一个weak key的Map,
* 是为某些特殊问题而设计的。它能让Map释放其所持有的对象。
* 如果某个对象除了在Map当中充当键之外,
* 在其他地方都没有引用的话,那它将被当作垃圾回收。
* 使用场景:二三级缓存
*/
public class WeakHashMapTest {

public static WeakHashMap<Integer, Integer> map = new WeakHashMap<>();

public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 1000; i++) {
map.put(i, i);
}
System.out.println(map.size());

System.gc();

TimeUnit.SECONDS.sleep(5);
System.out.println(map.size());
}
}

/**
* 输出
* 1000
* 128
*/

IdentityHashMap

  • HashMap判断key是否相等是根据k1.hashcode==k2.hashcode && k1.equals(k2)
  • IdentityHashMap是只判断内存地址是否相等 k1=k2,换而言之就是同个Class的不同对象都可以存放在map里
1
2
3
4
5
6
7
8
9
10
public static void main(String[] args) {
// 不会判断key的hashcode是否相等,只判断了key的内存地址是否相等
IdentityHashMap<Integer, Object> map = new IdentityHashMap<>();
map.put(new Integer(1), 1);
map.put(new Integer(1), 2);
System.out.println(map);

// 输出
// {1=2, 1=1}
}

Set

HashSet

底层采用HashMap实现,value为空

LinkedHashSet

底层采用LinkedHashMap实现,value为空

TreeSet

底层采用TreeMap实现,value为空

PriorityQueue

优先级队列使用

  • 优先级队列
  • 数据结构:利用了二叉堆的数据结构来实现的,底层使用可变长的数组来存储数据
  • 初始容量:11
  • 扩容倍数:1.5
  • 其与 Queue 的区别在于元素出队顺序是与优先级相关的,即总是优先级最高的元素先出队
  • 构造器允许传入 Comparator 实现自定义优先级
  • 非线程安全的,且不支持存储 NULL 和 non-comparable 的对象
  • 默认是小顶堆,但可以接收一个 Comparator 作为构造参数

ArrayDeque

双端队列使用

  • 双端队列,支持两边进两边出

  • 数据结构:可变长数组加双指针(头,尾)

  • 扩容倍数:2

  • 初始容量:16

  • 不允许数据为null(因为需要比较)

  • 先进先出

    • addFirst() 方法 配合pollLast() 方法
    • addLast() 方法 配合 pollFirst()方法
  • 先进后出(栈)

    • addFirst() 方法配合 pollFirst()方法
    • addLast()方法配合pollLast()方法

CAS

  • Compare and Swap(比较并交换),是一种无锁 原子算法,也是一条CPU的原子指令 MESI协议

  • Atomic 内部使用了 volatile 保证了自身的可见性

  • CAS涉及到三个属性

    • 内存读写位置
    • 旧值
    • 需要写入的新值

    CAS具体执行时,当内存中的值等于传入的旧值,就将内存值修改为新值,否则不替换

  • Unsafe是CAS的核心类,Java无法直接访问底层操作系统,而是通过本地(native)方法来访问。不过尽管如此,JVM还是开了一个后门:Unsafe,它提供了硬件级别的原子操作,一条汇编指令完成数据比较和交换保证了其原子性

1
2
3
4
5
6
7
8
9
10
11
public static void main(String[] args) {
AtomicInteger ac = new AtomicInteger(0);

/**
* ac: 需要写入的内存位置
* 第一个参数:预期值
* 第二个参数:修改值
*/
ac.compareAndSet(0, 1);
System.out.println(ac.get()); // 1
}

ABA

  • 如果一个值原来是A,变成了B,然后又变成了A,那么在CAS检查的时候会发现没有改变,但是实质上它已经发生了改变,这就是所谓的ABA问题
  • 对于ABA问题其解决方案是加上版本号,即在每个变量都加上一个版本号,每次改变时加1,即A —> B —> A,变成1A —> 2B —> 3A

ABA解决

AtomicMarkableReference

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class AtomicMarkableReferenceTest {

/**
* 用boolean 值做版本戳
*/
private static AtomicMarkableReference<Integer> mark = new AtomicMarkableReference(1, false);

public static void main(String[] args) throws InterruptedException {

AtomicBoolean first = new AtomicBoolean(true);
for (int i = 0; i < 10; i++) {
new Thread(() -> {
for (int j = 0; j < 5; j++) {

if (first.getAndSet(false)) {
// CAS需要版本号
mark.compareAndSet(1, 10, false, false);
} else {
// 版本戳不一致,CAS不会成功
mark.compareAndSet(10, 100, true, true);
}

}
}).start();
}

TimeUnit.SECONDS.sleep(2);
System.out.println(mark.getReference());
}
}

/**
* 输出10
*/

AtomicStampedReference

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* 根据一个版本戳解决ABA问题
*/
public class AtomicStampedReferenceTest {

/**
* 设置版本戳为1
*/
private static AtomicStampedReference<Integer> ref = new AtomicStampedReference(10, 1);

public static void main(String[] args) throws InterruptedException {
AtomicBoolean first = new AtomicBoolean(true);
for (int i = 0; i < 10; i++) {
new Thread(() -> {
if (first.getAndSet(false)) {
// 增加 2 个参数 一个是期望版本戳 一个是新版本戳
ref.compareAndSet(10, 100, ref.getStamp(), ref.getStamp() + 1);
} else {
// 版本戳不一直, CAS失败
ref.compareAndSet(10, 100, 0, 0);
}
}).start();
}

TimeUnit.SECONDS.sleep(2);
System.out.println(ref.getReference());
}
}

/**
* 输出 100
*/

AtomicBoolean

API文档

  • 需要注意的方法 public final void set(boolean newValue); 无条件设置值,不保证原子性
  • public final void lazySet(boolean newValue);懒设置值,不会像set一样马上将值刷到主内存中,但最终还是会刷新到主内存,非原子
  • public boolean weakCompareAndSet(boolean expect,boolean update);在正常情况下weak版本比compareAndSet 更高效,但是不同的是任何给定的weakCompareAndSet方法的调用都可能会返回一个虚假的失败( 无任何明显的原因 )。一个失败的返回意味着,操作将会重新执行如果需要的话

AtomicInteger

API文档

  • public final int getAndUpdate(IntUnaryOperator updateFunction);实现IntUnaryOperator 接口的applyAsInt 可以实现复杂的原子性算法

  • getAndAccumulate(int x, IntBinaryOperator accumulatorFunction);给指定值x,和实现IntBinaryOperator函数式接口实现当前值和给定值x的运算结果,最终通过cas重新设置新值

AtomicIntegerArray

API文档

  • public final int getAndUpdate(int i,IntUnaryOperator updateFunction); 和AtomicInteger的类似,只是多了个下标
  • public final int getAndAccumulate(int i,int x,IntBinaryOperator accumulatorFunction);和AtomicInteger的类似,只是多了个下标
  • 初始化 new AtomicIntegerArray(new int[]{1, 2});
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class AtomicIntegerArrayTest {

public static AtomicIntegerArray array = new AtomicIntegerArray(new int[]{1, 2});

public static void main(String[] args) throws InterruptedException {

for (int i = 0; i < 10; i++) {
new Thread(() -> {
// 第一个参数是数组下标
// 第二个参数是指定值
// 第三个参数是一个函数式接口,根据数组下标当前值和执行值运算返回的新值
// 最后将计算出来的新值设置到对应的数组下标中
while (array.accumulateAndGet(0, 1, (x, y) -> x + y) < 10) {
}
}).start();

new Thread(() -> {
while (array.accumulateAndGet(1, 1, (x, y) -> x + y) < 10) {
}
}).start();
}

TimeUnit.SECONDS.sleep(2);
System.out.println(array.get(0));
System.out.println(array.get(1));
}
}

/**
* 输出
* 19
* 19
* <p>
* 分析,第一个线程累加1-10,在这累加的过程中其他9条线程也在进行cas操作,最后执行完成
* 第一个线程的执行结果为10
* 第二个线程cas执行结果后为 11
* ...
* 第十个线程cas执行结果后为 19
* 原因是 array.accumulateAndGet(0, 1, (x, y) -> x + y) < 10 这个判断不是一个原子操作,在
* 多线程并发不能保证数据的准确性
*/

AtomicIntegerFieldUpdater

API文档

  • 主要对对象里的 public volatile int 变量进行原子性操作,其API和AtomicInteger一致
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class AtomicIntegerFieldUpdaterTest {

// 通过反射创建
private static AtomicIntegerFieldUpdater<Test> fieldUpdater = AtomicIntegerFieldUpdater.newUpdater(Test.class, "num");

public static void main(String[] args) throws InterruptedException {
// 创建对象
Test test = new Test();

for (int i = 0; i < 10; i++) {
new Thread(() -> {
for (int j = 0; j < 10; j++) {
fieldUpdater.incrementAndGet(test);
}
}).start();
}

TimeUnit.SECONDS.sleep(2);
System.out.println(test.num);
System.out.println(test.getStr());
}
}

/**
* 输出
* 100
* null
*/

class Test {
// 一定要使用 volatile 保证其他线程的可见性
// 一定要使用public保证外界只直接访问
// 必须为int类型,Integer封装类型也不允许
public volatile int num;

private String str;


public String getStr() {
return str;
}

public void setStr(String str) {
this.str = str;
}
}

AtomicLong

API文档

和AtomicInteger使用一致,累加建议使用 LongAdder

AtomicLongArray

API文档

和AtomicIntegerArray 使用一致

AtomicReference

API文档

  • 在低并发情况下可以得到正确的结果,但是高并发情况下就会出现差异.因为自定义的对象在访问时用的是set,get没有CAS,所以导致线程不安全.
  • atomic包中提供AtomicReferenceFieldUpdaterAtomicIntegerFieldUpdaterAtomicLongFieldUpdater,原子性的更新某一个类实例的指定的某一个字段
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
/**
* 保证对象的原子性
*/
public class AtomicReferenceTest {
private static AtomicReference<Simple> ref = new AtomicReference<>(new Simple(20, ""));

public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 100; i++) {
new Thread(() -> {
for (int j = 0; j < 100; j++) {
ref.compareAndSet(ref.get(), new Simple(ref.get().getAge() + 1, ""));
}
}).start();
}

TimeUnit.SECONDS.sleep(5);
System.out.println(ref.get().getAge());
}
}

/**
* 不一定输出 10020
*/

class Simple {
public Simple(int age, String name) {
this.age = age;
this.name = name;
}

private int age;
private String name;

public int getAge() {
return age;
}

public void setAge(int age) {
this.age = age;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}
}

AtomicReferenceFieldUpdater

API文档

AtomicReferenceFieldUpdater 是基于反射的工具类,用来将指定类型的指定的volatile引用字段进行原子更新,对应的原子引用字段不能是private的。通常一个类volatile成员属性获取值、设定为某个值两个操作时非原子的,若想将其变为原子的,则可通过AtomicReferenceFieldUpdater来实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
 private static AtomicReferenceFieldUpdater<Simple, Integer> updater = AtomicReferenceFieldUpdater.newUpdater(Simple.class, Integer.class, "age");

public static void main(String[] args) throws InterruptedException {
Simple simple = new Simple(20, "");
for (int i = 0; i < 100; i++) {
new Thread(() -> {
for (int j = 0; j < 100; j++) {
// 自旋修改
while (!updater.compareAndSet(simple, simple.getAge(),simple.getAge() + 1));
// 存在ABA问题, 最终不等于 10020, 原因是simple.getAge()只保证可见性,不保证原子性,多个线程会读到同一个值
// 如果要修改的值不依赖于 simple.getAge() 则可以直接使用 updater.getAndSet
//Integer temp1 = simple.getAge();
//Integer temp2 = updater.getAndSet(simple, simple.getAge() + 1);
//if (temp1 >= temp2) {
// System.out.println("ABA");
// System.out.println("temp1=" + temp1);
// System.out.println("temp2=" + temp2);
//}
}
}).start();
}

TimeUnit.SECONDS.sleep(5);
System.out.println(simple.getAge());
}
}


class Simple {
public Simple(Integer age, String name) {
this.age = age;
this.name = name;
}

public volatile Integer age;
private String name;

public Integer getAge() {
return age;
}

public void setAge(Integer age) {
this.age = age;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}
}

LongAdder

JDK1.8新增一个原子性操作类LongAdder,用于代替AtomicLong的功能,因为在非常高并发的请求下,AtomicLong的性能是一个很大的瓶颈,因为AtomicLong采用的CAS算法失败后还是通过无限循环的自旋锁不断的尝试

为了解决这个问题,JDK的开发组就创建了LongAdder,性能要高于AtomicLong很多

缺点:在统计的时候,如果有并发更新,可能会导致统计数据有些误差

实现

LongAdder内部维护一个Cell[] as数组,每个Cell里面有一个初始值为0的long型变量,在同等并发量的情况下,争夺单个变量的线程会减少,这是变相的减少了争夺共享资源的并发量,另外多个线程在争夺同一个原子变量时候,如果失败并不是自旋CAS重试,而是尝试获取其他原子变量的锁,最后当获取当前值时候是把所有变量的值累加后再加上base的值返回的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
   // 底层使用cell数组,每个数组元素都是一个原子资源,高并发只要获得其中一个元素后进行自增即可
// 调用get时,返回的是所有cell数组的累加值
// 缺点:数值存在偏差
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}

/**
* Equivalent to {@code add(1)}.
*/
public void increment() {
add(1L);
}

JUC

非阻塞

Queue

单端队列

ConcurrentLinkedQueue

  • API文档 基于链接节点的无界线程安全queue
  • FIFO,先进先出
  • 数据结构:单向链表
  • 采用cas算法保证线程安全,底层时Unsafe
  • 元素不能为空(cas)
  • add过程
    • 如果链表为空,将当前节点通过CAS设置为链表的 head 和 tail
    • 如果链表不为空,则通过CAS将当前节点追加到链表尾部
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
/**
* 基于链接节点的无界线程安全queue 。 这个队列排列元素FIFO(先进先出),
* 像大多数其他并发集合实现一样,此类不允许使用null元素,否则会空指针异常
*/
public class ConcurrentLinkedQueueTest {

private static ConcurrentLinkedQueue<Integer> query = new ConcurrentLinkedQueue<>();

public static void main(String[] args) {
// 生产者线程
new Thread(() -> {
Random random = new Random();
for (int i = 0; i < 10; i++) {
int num = random.nextInt(50);
try {
TimeUnit.MICROSECONDS.sleep(num);
System.out.println(String.format("%s 生产产品编号:%s", Thread.currentThread().getName(), num));
query.add(num);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();

// 消费者线程
new Thread(() -> {
int count = 0;
while (count < 100) {
// 队列为空则返回null
Integer num = query.poll();
if (Objects.nonNull(num)) {
System.out.println(String.format("%s 消费产品编号:%s", Thread.currentThread().getName(), num));
count++;
}
}
}).start();
}
}

/**
* 输出
* Thread-0 生产产品编号:1
* Thread-0 生产产品编号:36
* Thread-1 消费产品编号:1
* Thread-1 消费产品编号:36
* Thread-0 生产产品编号:39
* Thread-1 消费产品编号:39
* Thread-0 生产产品编号:34
* Thread-1 消费产品编号:34
* Thread-0 生产产品编号:36
* Thread-1 消费产品编号:36
* Thread-0 生产产品编号:49
* Thread-1 消费产品编号:49
* Thread-0 生产产品编号:32
* Thread-1 消费产品编号:32
* Thread-0 生产产品编号:28
* Thread-1 消费产品编号:28
* Thread-0 生产产品编号:27
* Thread-1 消费产品编号:27
* Thread-0 生产产品编号:13
* Thread-1 消费产品编号:13
*/

List

线程安全的List

CopyOnWriteArrayList

API文档 一个线程安全的变体ArrayList ,其中所有可变操作( add , get ,等等)通过对底层数组的最新副本实现

  • 底层数据结构采用Object[]
  • 实现 List 接口
  • 使用和ArrayList类似
  • 线程安全:使用 ReentrantLock 保证
  • CopyOnWrite 简称 COW先复制再写入,就是在添加元素的时候,先把原List列表复制一份,再添加新的元素,添加元素时,先枷锁,再进行复制替换操作,最后释放锁
  • 适合读多写少场景,写操作代价昂贵
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public boolean add(E e) {
// 加锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
// 复制副本
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
// 数组替换
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
1
2
3
4
  // get 不加锁
public E get(int index) {
return get(getArray(), index);
}

Set

单端

线程安全的Set

CopyOnWriteArraySet

API文档

  • 线程安全的 Set
  • 底层数据:CopyOnWriteArrayList,Object[]
  • ReentrantLock 保证线程安全
  • 去重:每次插入先遍历元素是否存在

双端

ConcurrentSkipListSet

API文档

  • key有序:可以当作线程安全的 TreeSet
  • 提供获取第一个或最后一个元素的能力
  • 底层数据结构:ConcurrentSkipListMap
1
2
3
4
5
6
7
8
9
10
   public boolean add(E e) {
return al.addIfAbsent(e);
}

public boolean addIfAbsent(E e) {
Object[] snapshot = getArray();
return indexOf(e, snapshot, 0, snapshot.length) >= 0 ? false :
// 加锁新增
addIfAbsent(e, snapshot);
}

Map

ConcurrentHashMap

API文档

在线源码

  • 线程安全,使用Node锁替换分段锁

    • 如果链表为空采用cas设置链表头
    • 否则使用synchronized锁住整个桶
  • 放弃分段锁: 分段锁会锁几条链表(数组的几个元素),锁粒度太大,不能很好支持并发

  • 支持链表和红黑树互转

  • 懒加载,Map创建不会初始化,当put第一个值才会初始化 initTable()

  • sizeCtl

    • -1:正在初始化
    • -n:正在扩容
    • n:下次扩容的阈值
  • 扩容时机

    • 桶数量 >= 数组长度 * 0.75
  • 扩容

    • ForwardingNode

      • 转义节点
      • 存放 nextTable 新扩容的Tab
      • 转义节点Key的 hash = -1
      • 完成扩容的旧桶放置转义节点,这样做时为了在扩容期间其他线程调用get转义节点能跳转到新Tab获取数据
    • 多线程协助扩容

      • 当调用Put方法定位桶,桶是一个转义节点时协助扩容
      • 每个协助扩容线程最少负责16个桶
      • 流程
        • 在老的Tab里分配最少16个要扩容的桶
        • 如果老桶为空,在老Tab插入一个转义节点,用于告诉其他线程当前在扩容
        • 如果当前是转义节点说明这个桶已经完成扩容,跳过
        • 如果老桶不为空也不是转义节点,开始扩容
          • 对当前桶加锁
          • 便利链表,进行高位与运算
          • 在先Tab里分裂为2个链表
          • 扩容完成,当前桶设置为转义节点
    • 扩容时get

      • get不需要加锁
      • 如果桶不为空,直接加锁链表后遍历i获取数据
      • 如果为转义节点,则通过转义节点去新的Tab获取数据
      • 如果桶正在扩容,那么它被其他线程加锁,则等待
  • 进化红黑树

    • hash值为-2,说明当前hash位置下挂的是红黑树
    • 数组长度>=64
  • 链表长度大于8

  • 退化红黑树

    • 红黑树节点数量 <= 6
  • K,V 不能为空

  • 计数器优化,思路和 LongAdder 类似,底层采用 CountCell[]

  • HASH_BITS

    • 第一位是0, 01111111…
    • 保证运算出来的hash一定是个正数,负数在Map中有特殊含义

分段锁

ReentrantLook + N个桶

Node锁

JDK1.8放弃分段锁的原因是因为粒度太大,影响并发性能

node锁

  • 如果是链表第一个元素,则采用cas算法
  • 否则就锁链表的第一个node节点 synchroinzed

扩容场景

  • hash冲突严重,链表节点数超过8,此时先判断数组是否超过64,没有超过就先扩容,超过就转红黑树
  • putAll方法,放入一个长度大于本身的Map
  • put方法,数组长度超过扩容的阈值

ConcurrentSkipListMap

  • 1、ConcurrentSkipListMap 的key是有序的。
  • 2、构造函数支持 Comparator比较器自定义实现key排序
  • 3、ConcurrentSkipListMap 支持更高的并发。ConcurrentSkipListMap 的存取时间是log(N),和线程数几乎无关。也就是说在数据量一定的情况下,并发的线程越多,ConcurrentSkipListMap越能体现出他的优势。

Deque

双端队列

ConcurrentLinkedDeque

API文档 基于链表实现的无界双端队列

  • 双端对列可以做 FIFO对列,也可以做
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
/**
* 基于链接节点的无界并发deque 。 并发插入,删除和访问操作可以跨多个线程安全执行
* 像大多数其他并发集合实现一样,此类不允许使用null元素
* 迭代器和分配器是weakly consistent
*/
public class ConcurrentLinkedDequeTest {

private static ConcurrentLinkedDeque<Integer> deque = new ConcurrentLinkedDeque();
private static final Random random = new Random();

public static void main(String[] args) {
// 生产者线程
new Thread(() -> {
for (int i = 0; i < 10; i++) {
deque.add(i);
}
}).start();

// 队列头部消费线程
new Thread(() -> {
int count = 0;

while (count < 5) {
int stop = random.nextInt(100);

try {
TimeUnit.MILLISECONDS.sleep(stop);
// 获取队列头部元素
Integer num = deque.pollFirst();
if (Objects.nonNull(num)) {
System.out.println(String.format("头部线程消费元素:%d", num));
count++;
}

} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();


// 队列尾部消费线程
new Thread(() -> {
int count = 0;

while (count < 5) {
int stop = random.nextInt(100);

try {
TimeUnit.MILLISECONDS.sleep(stop);
// 获取队列尾部元素
Integer num = deque.pollLast();
if (Objects.nonNull(num)) {
System.out.println(String.format("尾部线程消费元素:%d", num));
count++;
}

} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}

/**
* 输出
* 尾部线程消费元素:9
* 头部线程消费元素:0
* 头部线程消费元素:1
* 头部线程消费元素:2
* 头部线程消费元素:3
* 尾部线程消费元素:8
* 尾部线程消费元素:7
* 头部线程消费元素:4
* 尾部线程消费元素:6
* 尾部线程消费元素:5
*/

阻塞

put: 队列满了阻塞

take: 队列空了阻塞

Queue

ArrayBlockingQueue

API文档 由数组创建的有界的单端对列

  • 有界队列

  • 底层数据类型:Object[]

  • 采用ReentrantLock保证多线程安全

  • FIFO

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
/**
* 一个有限的blocking queue由数组支持。 这个队列排列元素FIFO(先进先出)
* 底层采用的数据结构是数组,所以在初始化时需要指定长度,创建后,容量无法更改
* 尝试put成满的队列的元件将导致在操作阻挡;
* 尝试take从空队列的元件将类似地阻塞。
*/
public class ArrayBlockingQueueTest {
private static ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(2);

public static void main(String[] args) {
// 生产者线程
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
// 用add 队列满了会抛出异常
queue.put(i);
System.out.println(String.format("生产线程生产元素:%d", i));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();

// 消费者线程
new Thread(() -> {
for (int i = 0; i < 10; i++) {
// 底层采用ReentrantLock实现线程安全
try {
// 队列为空会阻塞
TimeUnit.MILLISECONDS.sleep(100);
int num = queue.take();
System.out.println(String.format("消费线程消费元素:%d", num));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}

/**
* 输出
* 生产线程生产元素:0
* 生产线程生产元素:1
* 消费线程消费元素:0
* 生产线程生产元素:2
* 生产线程生产元素:3
* 消费线程消费元素:1
* 消费线程消费元素:2
* 生产线程生产元素:4
* 生产线程生产元素:5
* 消费线程消费元素:3
* 生产线程生产元素:6
* 消费线程消费元素:4
* 消费线程消费元素:5
* 生产线程生产元素:7
* 生产线程生产元素:8
* 消费线程消费元素:6
* 消费线程消费元素:7
* 生产线程生产元素:9
* 消费线程消费元素:8
* 消费线程消费元素:9
*/
1
2
3
4
5
6
7
8
9
10
11
12
13
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 对列满了则阻塞
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 对列空了阻塞
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}

LinkedBlockingQueue

API文档 基于链表实现的有界阻塞的单端对列

采用ReentrantLock保证多线程安全

底层数据结构:单向链表

FIFO

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
* 基于链接节点的可选限定的blocking queue 。 这个队列排列元素FIFO(先进先出)
* <p>
* LinkedBlockingQueue()
* 创建一个 LinkedBlockingQueue ,容量为 Integer.MAX_VALUE 。
* <p>
* LinkedBlockingQueue(Collection<? extends E> c)
* 创建一个 LinkedBlockingQueue ,容量为 Integer.MAX_VALUE ,最初包含给定集合的元素,以集合的迭代器的遍历顺序添加。
* <p>
* LinkedBlockingQueue(int capacity)
* 创建一个具有给定(固定)容量的 LinkedBlockingQueue 。
* <p>
* 队列满了继续往队列添加元素则阻塞,队列空继续往队列获取元素则阻塞
*/
public class LinkedBlockingQueueTest {
private static LinkedBlockingQueue<Integer> query = new LinkedBlockingQueue<>(2);

public static void main(String[] args) {
// 生产线程
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
// 队列满了则阻塞
query.put(i);
System.out.println(String.format("生产线程生成元素:%d", i));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();

// 消费线程
new Thread(() -> {
for (int i = 0; i < 10; i++) {
// 队列为空则阻塞
try {
TimeUnit.MILLISECONDS.sleep(100);
int num = query.take();
System.out.println(String.format("消费线程消费元素:%d", num));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();

int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {

while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}

PriorityBlockingQueue

API文档 无边界具有优先级的阻塞队列

  • 具有优先级的队列,可根据Comparator自定义优先级排序
  • 采用ReentrantLock保证多线程安全
  • 底层数据结构:Object[]
  • 优先级队列可用于实现大顶堆小顶堆
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
/**
* 无边界具有优先级的阻塞队列
* 元素加入队列会根据自实现的Comparator比较器自动排序实现优先级
*/
public class PriorityBlockingQueueTest {
public static void main(String[] args) {
// 实现正序比较器,按照年龄排序
Comparator<User> comparator = (User u1, User u2) -> {
if (u1 == u2) {
return 0;
}
return u1.getAge() - u2.getAge();
};

// 设置初始长度和比较器
PriorityBlockingQueue<User> queue = new PriorityBlockingQueue(2, comparator);

// 生产者线程
new Thread(() -> {
Random random = new Random();
for (int i = 0; i < 10; i++) {
int num = random.nextInt(50);
User user = new User(String.valueOf(num), num);
queue.add(user);
}
}).start();

// 消费者线程
new Thread(() -> {
for (int i = 0; i < 10; i++) {
User user = null;
try {
user = queue.take();
System.out.println(String.format("消费元素:%s", user));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}

class User {
public User(String name, int age) {
this.name = name;
this.age = age;
}

private String name;
private int age;

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public int getAge() {
return age;
}

public void setAge(int age) {
this.age = age;
}

@Override
public String toString() {
return "User{" +
"name='" + name + '\'' +
", age=" + age +
'}';
}
}

/**
* 输出
* 消费元素:User{name='2', age=2}
* 消费元素:User{name='6', age=6}
* 消费元素:User{name='15', age=15}
* 消费元素:User{name='17', age=17}
* 消费元素:User{name='21', age=21}
* 消费元素:User{name='22', age=22}
* 消费元素:User{name='23', age=23}
* 消费元素:User{name='32', age=32}
* 消费元素:User{name='34', age=34}
* 消费元素:User{name='45', age=45}
*/

DelayQueue

API文档 具有延迟效果的单端队列,只有元素过期才能被获取

  • 延迟队列,延迟时间到达才能释放元素
  • 底层数据结构:PriorityQueue(优先级队列)
  • ReentrantLock 实现线程安全
  • 元素要实现 Delayed 接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
/**
* delayQueue 延迟队列
* DelayQueue是一个无界的BlockingQueue,用于放置实现了Delayed接口的对象,
* 其中的对象只能在其到期时才能从队列中取走。这种队列是有序的,即队头对象
* 的延迟到期时间最长。注意:不能将null元素放置到这种队列中。
*/
public class DelayQueueTest {
private static DelayQueue<Order> queue = new DelayQueue<>();

public static void main(String[] args) {
// 生产者线程
new Thread(() -> {
for (int i = 0; i < 10; i++) {
Order order = new Order(i, System.currentTimeMillis() + new Random().nextInt(1000 * 5));
queue.put(order);
try {
TimeUnit.MILLISECONDS.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();

// 消费者线程
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
Order order = queue.take();
System.out.println(String.format("释放订单:%s", order));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}

class Order implements Delayed {
// 超时时间为20秒
public static final int TIME_OUT = 10 * 1000;
private int orderId;
private long payTime;

public Order(int orderId, long payTime) {
this.orderId = orderId;
this.payTime = payTime;
}

public int getOrderId() {
return orderId;
}

public void setOrderId(int orderId) {
this.orderId = orderId;
}

public long getPayTime() {
return payTime;
}

public void setPayTime(long payTime) {
this.payTime = payTime;
}

/**
* 用来判断是否到了截止时间
*
* @param unit
* @return
*/
@Override
public long getDelay(TimeUnit unit) {
return payTime + TIME_OUT - System.currentTimeMillis();
}

/**
* 排序,这个方法很重要,根据下单时间正序排序,保证队列头部一定是最需要释放的元素
*
* @param o
* @return
*/
@Override
public int compareTo(Delayed o) {
Order order = (Order) o;
return (int) (this.getPayTime() - order.getPayTime());
}

@Override
public String toString() {
return "Order{" +
"orderId=" + orderId +
", payTime=" + payTime +
'}';
}
}
/**
* 输出
* 释放订单:Order{orderId=0, payTime=1595155223599}
* 释放订单:Order{orderId=1, payTime=1595155223901}
* 释放订单:Order{orderId=2, payTime=1595155224201}
* 释放订单:Order{orderId=3, payTime=1595155224501}
* 释放订单:Order{orderId=4, payTime=1595155224801}
* 释放订单:Order{orderId=5, payTime=1595155225102}
* 释放订单:Order{orderId=6, payTime=1595155225402}
* 释放订单:Order{orderId=7, payTime=1595155225702}
* 释放订单:Order{orderId=8, payTime=1595155226002}
* 释放订单:Order{orderId=9, payTime=1595155226303}
*/

SynchronousQueue

API文档 具有单个元素容量的阻塞队列

  • SynchronousQueue 是一个特殊的队列,它的内部同时只能够容纳单个元素
  • 底层数据结构:单向链表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
/**
* SynchronousQueue 是一个特殊的队列,它的内部同时只能够容纳单个元素。
* 如果该队列已有一元素的话,试图向队列中插入一个新元素的线程将会阻塞
* 直到另一个线程将该元素从队列中抽走。同样,如果该队列为空,试图向队列
* 中抽取一个元素的线程将会阻塞,直到另一个线程向队列中插入了一条新的元素
*/
public class SynchronousQueueTest {
private static SynchronousQueue<Integer> queue = new SynchronousQueue<>();

public static void main(String[] args) {
// 生产队列
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
// 用add会抛队列满异常
queue.put(i);
System.out.println(String.format("生产线程生产元素:%d", i));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();

// 消费线程
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
int num = queue.take();
System.out.println(String.format("消费线程消费元素:%d", num));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}

/**
* 输出
* 生产线程生产元素:0
* 消费线程消费元素:0
* 消费线程消费元素:1
* 生产线程生产元素:1
* 生产线程生产元素:2
* 消费线程消费元素:2
* 消费线程消费元素:3
* 生产线程生产元素:3
* 生产线程生产元素:4
* 消费线程消费元素:4
* 消费线程消费元素:5
* 生产线程生产元素:5
* 消费线程消费元素:6
* 生产线程生产元素:6
* 消费线程消费元素:7
* 生产线程生产元素:7
* 消费线程消费元素:8
* 生产线程生产元素:8
* 消费线程消费元素:9
* 生产线程生产元素:9
*/

LinkedTransferQueue

LinkedTransferQueue采用一种预占模式。意思就是消费者线程取元素时,如果队列不为空,则直接取走数据,若队列为空,那就生成一个节点(节点元素为null)入队,然后消费者线程被等待在这个节点上,后面生产者线程入队时发现有一个元素为null的节点,生产者线程就不入队了,直接就将元素填充到该节点,并唤醒该节点等待的线程,被唤醒的消费者线程取走元素,从调用的方法返回。我们称这种节点操作为“匹配”方式

Deque

LinkedBlockingDeque

API文档

  • 可以做 先进先出的队列,也可以做先进后出的栈
  • 数据结构:双向链表
  • 默认容量:Integer.MAX_VALUE
  • ReentrantLock 保证线程安全

AQS

API文档

在线源码

什么是AQS?

AQS指的是Java中的AbstractQueuedSynchronizer类,翻译过来的意思就是抽象队列同步器它是Java并发用来构建锁和其他同步组件的基础框架,它的核心是一个双端队列cas算法一个使用volatile修改的int类型的共享变量state, 当state为0时,表示可以申请锁,为1时表示其他线程获得锁

锁的模式:共享锁,排他锁

加锁过程:使用cas算法将state修改为1,并设置同步器的线程归属

未获得锁线程:被封装为一个node节点,并加入等待队列然后调用LockSupport.park挂起线程

解锁过程:将state修改为0,将同步器的线程归属置空,唤醒队列后继节点获取锁

为什么AQS是从后往前唤醒节点的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 唤醒节点
private void unparkSuccessor(Node node) {

int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);


Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
// 丛队列尾部向前开始查找第一个需要唤醒的节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 添加节点,node是当前节点
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {//将当前节点追加到队列尾部
//将新节点的前驱指向上一个节点;AQS节点从后往前遍历就是因为先赋值了prev指针,如果从前往后在高并发情况会发生next指针来不及指向而漏掉后面节点的扫描
node.prev = t;
if (compareAndSetTail(t, node)) {//cas设置队尾节点
t.next = node;//将上一节点的后继指向当前节点
return t;
}
}
}
}

原因是AQS在添加节点的时候,如果队列存在节点

  • 先将插入节点的 prev 指针指向队列尾部节点
  • 然后CAS将插入节点替换为队列的 tail
  • 将原 tail 节点的 next 指针指向当前节点

由此可知 prev 指针比 next 指针有更高的赋值优先级,并发情况下,如果节点从前往后扫描,如果此时新增节点来没来得及对上一个节点的 next 指针赋值,AQS可能出现后继节点漏扫描情况

ReentrantLock

API文档

在线源码

可重入锁:每次重入 AQS 的 state + 1

底层实现:在AQS基础上构建

  • 锁模式
    • 公平锁
    • 非公平锁(默认)

JUC包下很多同步机制都使用ReentrantLock实现,比如阻塞队列,CopyOnWriteArrayList

可重入

可重入是如果当前线程获得了锁,那么在获得锁的情况下继续调用lock方法获得锁不会进行阻塞

调用了多少次lock方法就必须调用多少次unlock方法,因为每次调用lock方法时,state值都会+1,只有state = 0 才是无锁状态

公平锁

  • tryAcquire 尝试获取锁
  • 公平锁调用lock方法时会调用tryAcquire方法尝试获取锁
  • 如果state = 0,尝试获取锁
  • 如果队列存在节点,则排队
1
2
3
4
5
6
7
8
9
// 默认为非公平锁
public ReentrantLock() {
sync = new NonfairSync();
}

// 传入 fair 获取公平或非公平锁
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

非公平锁

  • nonfairTryAcquire 尝试获取锁
  • 非公平锁调用lock时会通过cas先尝试是否能获取锁,不能才调用nonfairTryAcquire方法
    • 排队前插队,再次尝试通过cas将 state由0修改为1,尝试获取锁
    • 获取不到则排队

用法

1
2
3
4
5
6
7
8
9
10
11
12
private static ReentrantLock lock = new ReentrantLock();
private static volatile Integer a = 0;

public static void main(String[] args) {
for (int i = 0; i < 15; i++) {
new Thread(() -> {
lock.lock();
log.info("Thread = {}, {}", Thread.currentThread().getName(), a++);
lock.unlock();
}).start();
}
}

ReentrantLock.Condition

API文档

Condition是一个多线程间协调通信的工具类,使得某个,或者某些线程一起等待某个条件(Condition),只有当该条件具备( signal 或者 signalAll方法被带调用)时 ,这些等待线程才会被唤醒,从而重新争夺锁

作用:有条件地 唤醒线程 休眠线程

  • Condition.await() 让线程休眠,并自动释放Condition关联的锁
  • Condition.signal() 唤醒休眠线程,如果线程能够恢复,则说明当前线程获得Condition关联的锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
 // 锁
private static ReentrantLock lock = new ReentrantLock();

// 读条件
private static Condition read = lock.newCondition();

// 写条件
private static Condition write = lock.newCondition();

private static volatile Object[] arr = new Object[10];

// 读写索引,数组元素总量
private static int putIdx, takeIdx, count;


// 写操作
public static void put(Object obj) throws InterruptedException {
// 获取锁
lock.lock();
try {
TimeUnit.MILLISECONDS.sleep(100);
while (count == arr.length) {
// 数组满了,等待
write.await();
}

arr[putIdx] = obj;
log.info("生产者生产了:{}", obj);
// 如果数组写到最后一个元素后则索引复位
if (++putIdx == arr.length) {
putIdx = 0;
}
count++;
// 唤醒读取线程
read.signal();
} finally {
lock.unlock();
}
}


// 读操作
public static Object take() throws InterruptedException {
// 获取锁
lock.lock();
try {
TimeUnit.MILLISECONDS.sleep(200);
// 数组为空
while (count == 0) {
read.await();
}

Object result = arr[takeIdx];

// 读取到最后一个元素则复位
if (++takeIdx == arr.length) {
takeIdx = 0;
}

count--;
// 唤醒写线程
write.signal();
log.info("消费者消费了:{}", result);
return result;
} finally {
lock.unlock();
}
}

public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 2; i++) {
new Thread(() -> {
try {
while (true) {
put(System.currentTimeMillis());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}

new Thread(() -> {
try {
while (true) {
take();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();

Thread.currentThread().join();
}

// 01:04:49.824 [Thread-0] INFO com.macro.mall.controller.MinioController - 生产者生产了:1661706289722
// 01:04:49.929 [Thread-0] INFO com.macro.mall.controller.MinioController - 生产者生产了:1661706289829
// 01:04:50.029 [Thread-0] INFO com.macro.mall.controller.MinioController - 生产者生产了:1661706289929
// 01:04:50.129 [Thread-0] INFO com.macro.mall.controller.MinioController - 生产者生产了:1661706290029
// 01:04:50.229 [Thread-0] INFO com.macro.mall.controller.MinioController - 生产者生产了:1661706290129
// 01:04:50.329 [Thread-1] INFO com.macro.mall.controller.MinioController - 生产者生产了:1661706289722
// 01:04:50.429 [Thread-1] INFO com.macro.mall.controller.MinioController - 生产者生产了:1661706290329
// 01:04:50.529 [Thread-1] INFO com.macro.mall.controller.MinioController - 生产者生产了:1661706290429
// 01:04:50.629 [Thread-1] INFO com.macro.mall.controller.MinioController - 生产者生产了:1661706290529
// 01:04:50.729 [Thread-1] INFO com.macro.mall.controller.MinioController - 生产者生产了:1661706290629
// 01:04:51.029 [Thread-2] INFO com.macro.mall.controller.MinioController - 消费者消费了:1661706289722
// 01:04:51.229 [Thread-2] INFO com.macro.mall.controller.MinioController - 消费者消费了:1661706289829
// 01:04:51.429 [Thread-2] INFO com.macro.mall.controller.MinioController - 消费者消费了:1661706289929

ReentrantReadWriteLock

ReentrantReadWriteLock API文档

ReentrantReadWriteLock.ReadLock API文档

ReentrantReadWriteLock.WriteLock API文档

适合读多写少场景

读写锁底层有两把锁,ReadLockWriteLock。读写锁之间是互斥的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class Counter {
private final ReadWriteLock rwlock = new ReentrantReadWriteLock();
private final Lock rlock = rwlock.readLock();
private final Lock wlock = rwlock.writeLock();
private int[] counts = new int[10];

public void inc(int index) {
wlock.lock(); // 加写锁
try {
counts[index] += 1;
} finally {
wlock.unlock(); // 释放写锁
}
}

public int[] get() {
rlock.lock(); // 加读锁
try {
return Arrays.copyOf(counts, counts.length);
} finally {
rlock.unlock(); // 释放读锁
}
}
}

共享锁(读锁)

多个线程可以获得同一把锁,不阻塞,但是数据不能修改 (不能写是代码上规范的,共享锁不是真的不让写数据)

排他锁(写锁)

多只能一个线程获得锁,其他线程阻塞

读写互斥

当获得共享锁时,需要获得排他锁就必须等待共享锁释放,反之亦然

底层实现

ReentrantReadWriteLock 还是基于 AQS 实现的,还是对state进行操作,拿到锁资源就去干活,如果没有拿到,依然去AQS队列中排队

  • AQS 中 int (32位)类型的 state 变量标记为读锁或写锁

  • 读锁操作:基于state的高16位进行操作

  • 写锁操作:基于state的低16为进行操作

  • ReentrantReadWriteLock依然是可重入锁

写锁重入:和 ReentrantLock一致,依然是对 state 进行 +1 操作,只不过范围变小了(低16位)

读锁重入:因为读锁是共享锁。读锁在获取锁资源操作时,是要对state的高16位进行 + 1操作。因为读锁是共享锁,所以同一时间会有多个读线程持有读锁资源。这样一来,多个读操作在持有读锁时,无法确认每个线程读锁重入的次数(多个线程操作 state)。为了去记录读锁重入的次数,每个读操作的线程,都会有一个 ThreadLocal 记录锁重入的次数,当 ThreadLocal 的重入次数为0则释放读锁

写锁的饥饿问题:读锁是共享锁,当有线程持有读锁资源时,再来一个线程想要获取读锁,直接对state修改即可。在读锁资源先被占用后,来了一个写锁资源,然后,大量的需要获取读锁的线程来请求锁资源,如果可以绕过写锁,直接拿资源,会造成写锁长时间无法获取到写锁资源

读锁在拿到锁资源后,如果再有读线程需要获取读锁资源,需要去AQS队列排队。如果队列的前面需要写锁资源的线程,那么后续读线程是无法拿到锁资源的。持有读锁的线程,只会让写锁线程之前的读线程拿到锁资源

StampedLock

参考 廖雪峰文章

StampedLock是对读写锁的一种优化,先通过获取乐观锁后获取数据,然后再进行版本号验证。如果版本号验证通过则说明获取的数据是安全的,否则申请悲观锁重新获取数据,是乐观锁悲观锁结合的解决方案,能够有效提高并发(不支持锁的重入)。非常适合在读多写少的场景下使用。底层使用Unsafe的内存屏障保证数据读写的一致性

StampedLockReadWriteLock相比,改进之处在于:读的过程中也允许获取写锁后写入!这样一来,我们读的数据就可能不一致,所以,需要一点额外的代码来判断读的过程中是否有写入,这种读锁是一种乐观锁

先获取乐观锁,在读写冲突时,读取数据先获取乐观锁,然后再通过乐观锁版本号判断读取的数据是否最新版本

  • 版本比较一致,说明当前数据为最新版本,直接返回数据
  • 版本比较不一致,说明数据被修改,获取悲观锁后重新获取最新版本数据

多线程

并发三大特性

原子性

JMM规定所有变量都会存储在主内存中,在操作的时候,需要从主内存中复制一份到线程内存(CPU内存),在线程内部做计算。然后再写回主内存中

定义:原子性指一个操作是不可分割的,不可中断的,一个线程在执行时,另一个线程不会影响到他

  • synchronized 排它锁,某一时刻只有一个线程能获得锁
  • CAS Compare And Swap也就是比较和交换,他是一条CPU的并发原语
  • Lock 底层是CAS + AQS排队
  • JUC 原子类,比如AtomicInteger

可见性

MESI协议

可见性问题是基于CPU位置出现的,CPU处理速度非常快,相对CPU来说,去主内存获取数据这个事情太慢了,CPU就提供了L1,L2,L3的三级缓存,每次去主内存拿完数据后,就会存储到CPU的三级缓存,每次去三级缓存拿数据,效率肯定会提升

这就带来了问题,现在CPU都是多核,每个线程的工作内存(CPU三级缓存)都是独立的会告知每个线程中做修改时,只改自己的工作内存,没有及时的同步到主内存,导致数据不一致问题

  • volatile
    • 读屏障:将对应的CPU缓存置为无效,强制去主内存读取共享变量
    • 写屏障:将写入缓存中的数据更新写入主内存,让其他线程可见
  • synchronized
    • 获取锁:将内部(同步块、方法)涉及的变量重新去主内存中获取
    • 释放锁:立即将缓存同步到主内存
  • final 常量不允许修改,因此可以保证可见性

有序性

指令重排序

  • volatile
    • 内存屏障,内存屏障是一条CPU指令
    • 通过插入内存屏障防止内存屏障前后发生指令重排序

线程

线程状态

  • 新建(new):当线程对象被创建时,线程处于新建状态。此时,该线程还没有启动,也没有分配系统资源
  • 就绪(Runnable):当线程调用start()方法后,线程进入就绪状态。此时,线程已经分配到了系统资源,但还没有开始执行。在就绪状态下,线程可能会等待其他线程的执行
  • 运行(Running):当线程获取到CPU资源后,开始执行run()方法,线程处于运行状态。线程会不断地执行run()方法中的代码,直到线程被阻塞或者执行完毕
  • 阻塞(Blocked):线程在某些情况下可能会被阻塞,例如等待某个资源的释放(synchronized)或者线程调用了sleep()方法暂停执行。在阻塞状态下,线程会暂时停止执行,直到满足某个条件后进入就绪状态
  • 等待(Waiting):线程在等待某个特定条件发生时,会进入等待状态。例如,线程调用了wait()方法,线程会一直等待直到其他线程调用了notify()或者notifyAll()方法唤醒它们
  • 超时等待(Timed Waiting):在等待状态中,线程可以设置等待的超时时间。一旦超过时间限制,线程会自动唤醒并进入就绪状态。例如,线程调用了sleep()方法,线程会在指定的时间内暂停执行
  • 终止状态(TERMINATED):线程任务执行完毕,线程被终止

API文档

创建线程的几种方式

继承Thread类 , 重写run方法

1
2
3
4
5
6
7
8
9
10
11
12
static class MyThread extends Thread {
@Override
public void run() {
System.out.println("myThread");
}
}

public static void main(String[] args) throws InterruptedException {
MyThread myThread = new MyThread();
myThread.start();
myThread.join();
}

实现Runnable接口,实现run方法

1
2
3
4
5
6
7
8
9
10
11
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("myThread");
}
});

thread.start();
thread.join();
}

Future / Callable 配合线程池方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
static class MyCallable implements Callable<Integer> {

@Override
public Integer call() throws Exception {
int a = 0;
for (int i = 0; i < 10; i++) {
TimeUnit.MILLISECONDS.sleep(100);
a ++;
}

return a;
}
}

public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(2);
MyCallable myCallable = new MyCallable();
MyCallable myCallable2 = new MyCallable();

Future<Integer> submit = executorService.submit(myCallable);
Future<Integer> submit2 = executorService.submit(myCallable2);

// 阻塞等待结果
Integer integer = submit.get();
Integer integer2 = submit2.get();
System.out.println(integer + integer2); // 20

// 关闭线程池
executorService.shutdown();
}

通过线程池创建

1
2
3
4
5
6
7
8
9
10
11
12
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(2);

executorService.execute(() -> {
for (int i = 0; i < 5; i++) {
System.out.println(i);
}
});

// 关闭线程池
executorService.shutdown();
}

构造函数

父子关系

  • 线程的创建是由另一个线程完成的
  • 被创建线程的父线程是创建它的线程

ThreadGroup

构建一个线程如果没有指定ThreadGroup,则它会用父线程的ThreadGroup

Runnable

  • Thread负责线程相关职责和控制
  • Runnable负责执行逻辑单元

stackSize

-Xss

线程的堆栈大小,如果没有设置则为0

设置了更小的堆栈大小,JVM能支持同时存活更多的线程,如果设置大了单个线程拥有更大的递归深度,但是同时存活的线程更少了

守护线程

  • 守护线程有自动退出特性,当JVM没有非守护线程运行时,守护线程会自动退出
  • 设置线程为守护线程 Thread.setDaemon(boolean),必须在 start()之前设置
  • 守护线程不能持有任何需要关闭的资源,例如打开文件等,因为虚拟机退出时,守护线程没有任何机会来关闭文件,这会导致数据丢失
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
    public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(()-> {
try {
for (;;) {
TimeUnit.MILLISECONDS.sleep(500);
System.out.println(1);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
thread.setDaemon(true);
thread.start();

TimeUnit.SECONDS.sleep(2);
}
//输出
1
1
1
1
1
2
3
4
public Thread(ThreadGroup group, Runnable target, String name,
long stackSize) {
init(group, target, name, stackSize);
}

sleep

线程休眠

  • sleep不会释放锁,阻塞线程
  • 睡眠的时间不是一个准确时间,和cpu任务调度有关
  • 可以使用TimeUnit.SECONDS.sleep(3);更优雅睡眠线程
  • 一个线程调用sleep,另一个线程调用 interrupt 能捕获中断信号

yield

让出CPU执行时间片

  • yield 不会释放对象锁
  • 让出cpu资源(不一定会让),让同等优先级的线程拥有可执行机会
  • 让执行中的线程返回就绪状态,和同等优先级线程竞争执行机会
  • 另外线程调用interrupt不能捕获到中断信号

suspend / resume (已废弃)

suspend

  • 线程会被挂起,但不会释放锁
  • 不推荐使用,在同步块或同步方法等调用suspend 会造成死锁

resume

  • 恢复suspend挂起的线程

死锁场景

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private static String message = null;
private static int i=0;

public static void main(String[] args) {

Thread consumer = new Thread(() -> {
while (true) {
synchronized (Object.class) {
while (message == null) {
System.out.println("等待接受消息");
Thread.currentThread().suspend(); // 这里消费者挂起线程没有释放锁
}
System.out.println("接受消息 => " + message);
message = null;
}
}
});
consumer.start();

Thread producer = new Thread(() -> {
while (true) {
synchronized (Object.class) { // 这里死锁,一直获取不到锁
try {
Thread.sleep(100);
message = "Hello , this is " + i++;
consumer.resume();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
producer.start();
}
// 输出 等待接受消息

stop

  • 暴力停止,不推荐使用
  • 一个线程几乎可以在任何地方抛出一个ThreadDeath异常,导致逻辑执行不完整
  • 释放该线程所持有的所有的锁

join

串行执行

  • 当一个线程调用另一个线程的join()方法时,调用线程会被阻塞,直到被调用线程执行完毕或者超时。这意味着join()方法可以用来实现线程的串行执行
  • 被调用join()线程执行完毕,当前线程再继续往下执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class Test {
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(() -> {
System.out.println("等待执行完毕后,主线程退出");
try {
for (int i = 0; i < 5; i++) {
System.out.println(i);
TimeUnit.MILLISECONDS.sleep(100);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});

thread.start();
// 等待调用join 方法的线程执行完后再执行
thread.join();
System.out.println("主线程继续执行");
}
}

等待执行完毕后,主线程退出
0
1
2
3
4
主线程继续执行

interrupt

  • 用于打断阻塞线程,打断调用waitsleepjoin的线程
  • 调用interrupt 方法后,在该线程的run方法会接收到 InterruptedException 异常(信号)
  • 在线程内部可用 interrupted 判断当前线程是否被中断
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(() -> {
try {
// 休眠60秒
TimeUnit.SECONDS.sleep(60);
} catch (InterruptedException e) {
// 外部调用interrupt,线程体内部会触发InterruptedException
e.printStackTrace();
}

int index = 0;
while (true) {
try {
System.out.println(index++);
} catch (InterruptedException e) {
System.out.println("线程被中断");
}
}
});

thread.start();
TimeUnit.SECONDS.sleep(3);
// 线程提内部会触发 InterruptedException
thread.interrupt();
}
  • 实际上线程休眠3秒后就被主线程打断继续执行

interrupted

interrupt具有清除状态的功能,线程中断后,连续两次调用,第二次会返回false

setPriority

  • 取值范围 1~10
  • 设置线程优先级不一定会让线程拥有更多的执行机会
  • 默认线程优先级都是5,因为Main线程的优先级就是5,其他线程优先级都是派生mail线程
  • 线程的优先级设置可以理解为线程抢占CPU时间片的概率,虽然概率比较大,但是它不一定就是按照优先级的顺序去抢占CPU时间片的,具体的执行顺序还是要根据谁先抢到了CPU的时间片,谁就先来执行
1
2
3
4
5
public static void main(String[] args) {
Thread thread = new Thread(() -> {});
thread.setPriority(8);
thread.start();
}

suspend和wait的区别

wait(Object方法)

  • 暂停当前线程执行并释放锁,进入对象等待池(被监视对象的对象等待池),调用wait方法必须获取对象锁,否则抛出IllegalMonitorStateException异常
  • 调用wait 必须是获得对象锁的情况下才能调用,只有在获取该对象的锁才能使用

notify(Object方法)

  • 调用notify会获取对象锁,只有在synchronized方法或块中才能调用,调用notify方法必须获取对象锁,否则抛出IllegalMonitorStateException异常
  • 在对象(被监视对象)等待池中随机移走一个线程并放到锁标志等待池

notify all

  • notify all 被唤醒的线程会再次去竞争对象锁,必须在synchronized方法或数据块中调用
  • 从对象(被监视对象)等待池中移走所有等待那个对象的线程并放到锁标志等待池中

死锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(() -> {
for (;;) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(1);
}
});
thread.start();
thread.wait(); //抛出 java.lang.IllegalMonitorStateException,需要获得对象锁才能调用
TimeUnit.SECONDS.sleep(3);
thread.notify();
TimeUnit.SECONDS.sleep(3);
}

正确用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public static void main(String[] args) throws InterruptedException {
final Object object = new Object();
Thread t1 = new Thread() {
public void run() {
synchronized (object) {
try {
System.out.println("使用wait必须获取对象锁,因为要释放对象锁");
object.wait();
System.out.println("继续执行");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
Thread t2 = new Thread() {
public void run() {
synchronized (object) {
System.out.println("获取对象锁 另外的线程调用 notify 唤醒线程");
object.notify();
}
}
};

t1.start();
t2.start();
}

输出
使用wait必须获取对象锁,因为要释放对象锁
获取对象锁 另外的线程调用 notify 唤醒线程
继续执行

区别

  • supend和resume是Thread的方法,wait和notify是Object的方法
  • supend不释放对象锁,容易造成死锁,wait和notify必须要获取对象锁才能调用,并且会主动释放对象锁,不会造死锁

正确关闭线程

  • 线程结束生命周期正常结束

  • 等待线程体执行完任务,正常退出

捕获中断信号结束线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public static void main(String[] args)
throws InterruptedException {
Thread thread = new Thread(() -> {

try {
// 接收线程中断信号
while (!Thread.interrupted()) {
TimeUnit.SECONDS.sleep(1);

System.out.println(Thread.currentThread().getName());
}
} catch (InterruptedException e) {
// 必须在循环外捕获 InterruptedException
System.out.println("线程中断");
}
});

thread.start();
TimeUnit.SECONDS.sleep(3);
// 中断线程
thread.interrupt();
}

使用 volatile 开关控制

使用 volatile修饰是要保证多线程环境下的可见性

由于interrupted有清除状态的功能,可以使用一个flag来控制线程结束

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class Test {

public static void main(String[] args)
throws InterruptedException {

TestThread thread = new TestThread();
thread.start();
TimeUnit.SECONDS.sleep(10);
thread.closedThread();
}
}

class TestThread extends Thread {
private volatile boolean closed = false;

@Override
public void run() {
while (!closed) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println(this.getName());
}
}

public void closedThread() {
this.closed = true;
}
}

线程通信

单线程间通信

  • join / yield
  • 单线程通信可以使用 synchronizedwaitnotify 完成线程间通信
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
public class Test {

/**
* 模拟异步非阻塞事件处理
* @param args
* @throws InterruptedException
*/
public static void main(String[] args) throws InterruptedException {
EventQuery eventQuery = new EventQuery();

new Thread(() -> {
for (int i=0; i<100; i++) {
eventQuery.offer(new EventQuery.Event(i));
}
}).start();

new Thread(() -> {
for (int i=0; i<100; i++) {
try {
TimeUnit.MILLISECONDS.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
eventQuery.take();
}
}).start();
}
}

/**
* 事件队列
*/
class EventQuery {

private final static int MAX_NUM = 10;
private int maxSize;
private final LinkedList<Event> taskQuery = new LinkedList<>();

static class Event {
private int eventId;

public Event(int eventId) {
this.eventId = eventId;
}

@Override
public String toString() {
return "Event{" +
"eventId=" + eventId +
'}';
}
}


public EventQuery() {
this(MAX_NUM);
}

public EventQuery(int maxSize) {
this.maxSize = maxSize;
}

/**
* 提交事件
* offer 单线程通信
* @param event
*/
public void offer(Event event) {
synchronized (this.taskQuery) {
if (this.taskQuery.size() >= this.maxSize) {
System.out.println("this query full");
try {
this.taskQuery.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

System.out.println("this event commit " + event);
this.taskQuery.add(event);

// 这一步很重要,实现自旋
this.taskQuery.notify();
}
}

/**
* 消费事件
* take 单线程通信
* @return
*/
public Event take() {
Event event = null;
synchronized (this.taskQuery) {
if (CollectionUtils.isEmpty(this.taskQuery)) {
System.out.println("this query is empty");
try {
this.taskQuery.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

// 多线程下,可能会执行空的removeFirst,原因是eventQuery 两个线程同时
// wait,另外一个线程执行offer后顺利执行 take,然后又唤起了wait的take线程
event = this.taskQuery.removeFirst();
System.out.println("this event take " + event);

// 这一步很重要,实现自旋
this.taskQuery.notify();
}

return event;
}
}

多线程间通信

  • synchronizedwaitnotifynotifyAll
  • ReentrantLock + Condition
  • CountDownLatch 计数器
  • CyclicBarrier 同步屏障
  • Semaphore 信号量
  • join / yield
  • BlockingQueue
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
public class Test {

/**
* 模拟异步非阻塞事件处理
*
* @param args
* @throws InterruptedException
*/
public static void main(String[] args) throws InterruptedException {
EventQuery eventQuery = new EventQuery();

new Thread(() -> {
for (int i = 0; i < 100; i++) {
eventQuery.offer(new EventQuery.Event(i));
}
}).start();

new Thread(() -> {
for (int i = 100; i < 200; i++) {
eventQuery.offer(new EventQuery.Event(i));
}
}).start();

new Thread(() -> {
for (int i = 200; i < 300; i++) {
eventQuery.offer(new EventQuery.Event(i));
}
}).start();

new Thread(() -> {
for (int i = 0; i < 100; i++) {
try {
TimeUnit.MILLISECONDS.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
eventQuery.take();
}
}).start();

new Thread(() -> {
for (int i = 100; i < 200; i++) {
try {
TimeUnit.MILLISECONDS.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
eventQuery.take();
}
}).start();

new Thread(() -> {
for (int i = 200; i < 300; i++) {
try {
TimeUnit.MILLISECONDS.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
eventQuery.take();
}
}).start();
}
}

/**
* 事件队列
*/
class EventQuery {

private final static int MAX_NUM = 10;
private int maxSize;
private final LinkedList<Event> taskQuery = new LinkedList<>();

static class Event {
private int eventId;

public Event(int eventId) {
this.eventId = eventId;
}

@Override
public String toString() {
return "Event{" +
"eventId=" + eventId +
'}';
}
}


public EventQuery() {
this(MAX_NUM);
}

public EventQuery(int maxSize) {
this.maxSize = maxSize;
}

/**
* 提交事件
* offer 单线程通信
*
* @param event
*/
public void offer(Event event) {
synchronized (this.taskQuery) {

// 多线程的场景下,线程每次被重新唤醒必须重新校验一次
while (this.taskQuery.size() >= this.maxSize) {
System.out.println("this query full");
try {
this.taskQuery.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

System.out.println("this event commit " + event);
this.taskQuery.add(event);

// 唤醒所有线程进行锁竞争
this.taskQuery.notifyAll();
}
}

/**
* 消费事件
* take 单线程通信
*
* @return
*/
public Event take() {
Event event = null;
synchronized (this.taskQuery) {

// 多线程的场景下,线程每次被重新唤醒必须重新校验一次
while (CollectionUtils.isEmpty(this.taskQuery)) {
System.out.println("this query is empty");
try {
this.taskQuery.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

// 多线程下,可能会执行空的removeFirst,原因是eventQuery 两个线程同时
// wait,另外一个线程执行offer后顺利执行 take,然后又唤起了wait的take线程
event = this.taskQuery.removeFirst();
System.out.println("this event take " + event);

// 这一步很重要,实现自旋
this.taskQuery.notifyAll();
}

return event;
}
}

CountDownLatch

计数器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* 多线程计数器
*/
public class CountDownLatchTest {

private static int num = 0;

public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(10);

for (int i = 0; i < 10; i++) {
new Thread(() -> {
for (int j = 0; j < 1000; j++) {
synchronized (CountDownLatchTest.class) {
num++;
}
}

// 线程完成计数器减1
latch.countDown();
}).start();
}

// 阻塞去监听计数器的值是否为0,是则往下执行
latch.await();
System.out.println("所有子线程执行完毕");
System.out.println(num);
}
}

底层实现

  • 底层使用 AQS
  • 计数器的数量其实就是 AQS 的 state 值
  • 当计数器为0(state = 0),调用await()的线程被唤醒

CyclicBarrier

主要用于阻塞子线程,让多个子线程在同一时刻开始执行

线程栅栏

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public static void main(String[] args) throws InterruptedException {
CyclicBarrier cyclicBarrier = new CyclicBarrier(3);

new Thread(() -> {
System.out.println("员工进入会议室");
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("会议开始");
}).start();

new Thread(() -> {
System.out.println("组长进入会议室");
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("会议开始");
}).start();

new Thread(() -> {
System.out.println("CEO进入会议室");
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("会议开始");
}).start();

// 员工进入会议室
// 组长进入会议室
// CEO进入会议室
// 会议开始
// 会议开始
// 会议开始
}

底层实现

1
2
3
   private final ReentrantLock lock = new ReentrantLock();
private final Condition trip = lock.newCondition();
private int count;
  • 底层使用 ReentrantLockCondition 实现
  • 线程都会调用 Condition 的 await 进行阻塞
  • 当 count 被线程减到0会调用 Condition 的 signalAll() 唤醒所有阻塞线程

Semaphore

互联网项目不能使用 Semaphore 限流,单机版使用 Guava RateLimiter

Semaphore也叫信号量,在JDK1.5被引入,可以用来控制同时访问特定资源的线程数量,通过协调各个线程,以保证合理的使用资源

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package com.example.luckdraw.test;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;

/**
* 信号量有限的资源共享
*/
public class SemaphoreTest {

/**
* 每次只有2个线程有许可证获取共享资源
*/
private static Semaphore semaphore = new Semaphore(2);
private static CountDownLatch latch = new CountDownLatch(10);
private static int num;


public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
// 获取凭证
semaphore.acquire();
System.out.println(String.format("%s 线程获取资源", Thread.currentThread().getName()));
num++;
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 释放凭证
System.out.println(String.format("%s 线程释放资源", Thread.currentThread().getName()));
semaphore.release();
latch.countDown();
}
}).start();
}

latch.await();
System.out.println(num);
}
}
/**
* 输出
* Thread-2 线程获取资源
* Thread-0 线程获取资源
* Thread-2 线程释放资源
* Thread-0 线程释放资源
* Thread-6 线程获取资源
* Thread-1 线程获取资源
* Thread-6 线程释放资源
* Thread-1 线程释放资源
* Thread-3 线程获取资源
* Thread-4 线程获取资源
* Thread-4 线程释放资源
* Thread-3 线程释放资源
* Thread-5 线程获取资源
* Thread-7 线程获取资源
* Thread-5 线程释放资源
* Thread-7 线程释放资源
* Thread-8 线程获取资源
* Thread-9 线程获取资源
* Thread-8 线程释放资源
* Thread-9 线程释放资源
* 10
*

底层实现

  • AQS 的 state 进行控制
  • 当线程获得凭证后,state 会进行 -1 操作
  • 当 state = 0 时,不允许线程申请凭证(阻塞)
  • 当线程释放凭证时,state 会进行 +1 操作

Hook线程(钩子)

不建议在线程池中使用异常回调

  • 用于处理线程异常回调优雅停机
  • 线程异常回调UncaughtExceptionHandler
  • JVM退出监控 Hook线程

UncaughtExceptionHandler

线程异常回调需要实现的接口,它是一个函数式接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static void main(String[] args) throws InterruptedException {

Thread thread = new Thread(() -> {
int a = 10 / 0;
});

// 对单独线程设置异常回调
thread.setUncaughtExceptionHandler((Thread t, Throwable e) -> {
System.out.println(t.getName());
System.out.println(e.getMessage());
});

thread.start();
}

DefaultUncaughtExceptionHandler

设置线程全局异常回调

1
2
3
4
5
6
7
8
9
10
11
12
public static void main(String[] args) {

// 注入全局线程设置异常接口回调
Thread.setDefaultUncaughtExceptionHandler((Thread t, Throwable e) -> {
System.out.println(t.getName());
System.out.println(e.getMessage());
});

new Thread(() -> {
int a = 10 / 0;
}).start();
}

注入钩子线程[JVM]

实现优雅停机

JVM之所以运行是因为存在活跃的非守护进程,当JVM的所有非守护进程结束,程序退出,这时候就会触发Hook线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public static void main(String[] args) {

// 注入Hook线程
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println("The Hook Thread 1 is Run");
}));

// Hook线程可以注入多个
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println("The Hook Thread 2 is Run");
}));
}

ThreadGroup钩子

如果没有在线程注入异常回调接口,线程异常又会如何处理?

  • getUncaughtExceptionHandler 方法先会判断自身是否存在回调接口
  • 若线程本身不存在则获取所在ThreadGroup的UncaughtExceptionHandler
  • 若该ThreadGroup有parent则获取parent的UncaughtExceptionHandler
  • 一直向上递归,若没有在ThreadGroup找到回调就找Thread的DefaultUncaughtExceptionHandle,若没有就输出异常堆栈

Thread 获取回调接口源码

1
2
3
4
public UncaughtExceptionHandler getUncaughtExceptionHandler() {
return uncaughtExceptionHandler != null ?
uncaughtExceptionHandler : group;
}

ThreadGroup获取回调接口源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void uncaughtException(Thread t, Throwable e) {
if (parent != null) {
parent.uncaughtException(t, e);
} else {
Thread.UncaughtExceptionHandler ueh =
Thread.getDefaultUncaughtExceptionHandler();
if (ueh != null) {
ueh.uncaughtException(t, e);
} else if (!(e instanceof ThreadDeath)) {
System.err.print("Exception in thread \""
+ t.getName() + "\" ");
e.printStackTrace(System.err);
}
}
}

守护线程

守护线程,专门用于服务其他的线程,守护线程有自动退出特性,当JVM没有非守护线程运行时,守护线程会自动退出。比如垃圾回收线程,就是最典型的守护线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(() -> {
while (true) {
try {
TimeUnit.MILLISECONDS.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("守护线程");
}
});

// 设置为守护线程
thread.setDaemon(true);
thread.start();

// 主线程休眠1秒
TimeUnit.SECONDS.sleep(1);
}

// 输出
守护线程
守护线程
守护线程

线程池

​ 为什么使用线程池

  • 减少系统维护线程的开销
  • 解耦,运行和创建分开
  • 线程可复用

ThreadPoolExecutor

源码

1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)

核心参数说明

参数名作用
corePoolSize核心线程池大小
maximumPoolSize最大线程池大小
keepAliveTime非核心线程最大存活时间
TimeUnit非核心线程最大存活时间单位
workQueue阻塞任务队列
threadFactory(可选)新建线程工厂
RejectedExecutionHandler(可选)当提交任务数超过maxmumPoolSize+workQueue之和时,任务会交给RejectedExecutionHandler来处理

拒绝策略

​ 当提交的任务数大于corePoolSize时,会优先放到队列缓冲区,只有填满了缓冲区后,才会判断当前运行的任务是否大于maxPoolSize,小于时会新建线程处理,大于时就触发了拒绝策略

拒绝策略说明
ThreadPoolExecutor.AbortPolicy (默认拒绝策略)丢弃任务并抛出RejectedExecutionException异常
ThreadPoolExecutor.DiscardPolicy也是丢弃任务,但是不抛出异常
ThreadPoolExecutor.DiscardOldestPolicy丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy由调用线程处理该任务

重点讲解

  1. 当线程池小于corePoolSize时,新提交任务将创建一个新线程执行任务,即使此时线程池中存在空闲线程。
  2. 当线程池达到corePoolSize时,新提交任务将被放入workQueue中,等待线程池中任务调度执行
  3. 当workQueue已满,且maximumPoolSize>corePoolSize时,新提交任务会创建新线程执行任务
  4. 当提交任务数超过maximumPoolSize时,新提交任务由RejectedExecutionHandler处理
  5. 当线程池中超过corePoolSize线程,空闲时间达到keepAliveTime时,关闭空闲线程

非核心线程创建时机

当每个核心线程数都在执行时,等待队列已满,就会启用非核心线程处理堆积的任务

任务队列没满时就只有核心线程在执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
 public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 10, 100, TimeUnit.SECONDS, new LinkedBlockingQueue<>());

for (int i = 0; i < 10; i++) {
pool.execute(() -> {System.out.println(Thread.currentThread().getName());});
}

// pool-1-thread-1
// pool-1-thread-1
// pool-1-thread-1
// pool-1-thread-1
// pool-1-thread-1
// pool-1-thread-1
// pool-1-thread-1
// pool-1-thread-1
// pool-1-thread-1
// pool-1-thread-1
}

任务队列满了就创建额外线程执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
    public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 3, 100, TimeUnit.SECONDS, new LinkedBlockingQueue<>(2)
, Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());

for (int i = 0; i < 10; i++) {
pool.execute(() -> {System.out.println(Thread.currentThread().getName());});
}

// main
// pool-1-thread-3
// pool-1-thread-1
// pool-1-thread-2
// pool-1-thread-1
// pool-1-thread-3
// main
// main
// pool-1-thread-2
// pool-1-thread-1
}

线程池状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class ThreadPoolExecutor extends AbstractExecutorService {

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

// 线程状态
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
...

ctl:一个Integr占4个字节,也就是32位,前三位用来表示线程池的状态,后29位用来表示线程池的工作线程数

RUNNING

  • 状态说明:线程池处在RUNNING状态时,能够接收新任务,以及对已添加的任务进行处理
  • 状态切换:线程池的初始化状态是RUNNING。线程池被一旦被创建,就处于RUNNING状态,并且线程池中的任务数为0

SHUTDOWN

  • 状态说明:线程池处在SHUTDOWN状态时,不接收新任务,但能处理已添加的任务
  • 状态切换:调用线程池的 shutdown() 方法时,线程池由RUNNING -> SHUTDOWN

STOP

  • 状态说明:线程池处在STOP状态时,不接收新任务,不处理已添加的任务,并且会中断正在处理的任务
  • 状态切换:调用线程池的 shutdownNow() 方法时,线程池由(RUNNING or SHUTDOWN ) -> STOP

TIDYING

  • 状态说明:当所有的任务已终止,ctl记录的工作线程数为0,线程池会变为TIDYING状态。当线程池变为TIDYING状态时,会执行钩子函数 terminated()。terminated()在ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理;可以通过重载terminated()函数来实现
  • 状态切换:当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池工作线程也为空,就会由 SHUTDOWN -> TIDYING,当线程池在STOP状态下,线程池中执行的任务为空时,就会由STOP -> TIDYING

TERMINATED

  • 状态说明:线程池彻底终止,就变成TERMINATED状态
  • 状态切换:线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING -> TERMINATED

线程池执行流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public void execute(Runnable command) { 
if (command == null) // 健壮性,任务判空
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) { // 判断当前线程数是否少于核心线程数
if (addWorker(command, true)) // 如果当前线程数少于核心线程则创建一个新的核心线程执行当前任务,并把线程放入池子
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) { // 如果当前核心线程满了就将任务放入队列
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command)) // 如果线程池状态停止运行,则行拒绝策略
reject(command);
else if (workerCountOf(recheck) == 0) // 工作线程数量为0
addWorker(null, false); // 创建一个任务为空的非核心线程
}
else if (!addWorker(command, false)) // 任务队列满了就尝试创建一个非核心线程执行任务,否则就执行拒绝策略
reject(command);
}

addWorker(null, false) 为什么要添加一个任务为空的非核心线程?

  • 线程池的核心线程数是允许为0的
  • 那么就存在线程饥饿问题,就是任务队列里有任务,但是线程池里没有线程处理
  • 创建一个空任务线程就是防止线程饥饿,先处理堆积任务

添加工作线程逻辑

  • 校验线程池的状态以及工作线程个数
  • 添加工作线程并且启动工作线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
private boolean addWorker(Runnable firstTask, boolean core) { // 提交的任务以及是否核心线程
retry: // 外层for的标记,方便在内层for跳出外层for循环
for (;;) { // 外层for在校验线程池的状态
int c = ctl.get();
int rs = runStateOf(c); // 拿到ctl的高3位

// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN && //判断线程池是否调用了 shutdown
firstTask == null &&
! workQueue.isEmpty()))
return false;

for (;;) { // 内层for在校验工作线程的个数
int wc = workerCountOf(c); //获取当前线程数
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize)) // 判断当前是否核心线程,当前线程数是否小于核心线程
return false; // 判断是否大于核心线程数或最大线程数
if (compareAndIncrementWorkerCount(c)) // cas 线程数 + 1
break retry; // 跳出循环
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask); // 创建一个工作线程
final Thread t = w.thread;
if (t != null) { // 判断 Worker 线程是否为空,担心线程工厂有问题
final ReentrantLock mainLock = this.mainLock;
mainLock.lock(); // 加锁,存储工作线程的对象是HashSet
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get()); // 再次判断线程池运行状态

if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w); // 将当前线程加入线程池
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true; // 工作线程创建成功
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start(); // 开始执行线程
workerStarted = true; // 工作线程启动成功
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w); // 如果工作线程启动失败,则进行补偿处理
}
return workerStarted; // 返回工作线程是否启动成功
}

Executors

java官方提供用于创建线程池的工具类

这个类阿里巴巴的规范不推荐使用,原因是容易造成资源浪费和存在大量任务的时候会OOM

1
2
3
4
5
public interface Executor {

// 提交一个无返回值的任务以供执行
void execute(Runnable command);
}

ExecutorService

Executors创建线程池返回的对象,是ThreadPoolExecutor的实现接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public interface ExecutorService extends Executor {

// 启动有序关机,执行以前提交的任务,但不接受新任务
void shutdown();

// 尝试停止所有正在执行的任务,停止正在等待的任务的处理,并返回正在等待执行的任务的列表
List<Runnable> shutdownNow();

// 如果此执行器已关闭,则返回true
boolean isShutdown();

// 如果关闭后所有任务都已完成,则返回true
boolean isTerminated();

// 阻塞,直到所有任务在关闭请求后完成执行,或超时发生,或当前线程中断(以先发生的为准)
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;

// 提交一个有返回值的任务以供执行,并返回一个表示任务执行结果的Future, Future的get()方法将在成功完成后返回任务的结果
<T> Future<T> submit(Callable<T> task);

// 提交一个无返回值的任务和返回结果,Future的get()方法得到的结果是传入的result
<T> Future<T> submit(Runnable task, T result);

// 提交一个无返回值的任务,Future的get()方法得到的结果是null
Future<?> submit(Runnable task);

// 提交一个任务的集合,返回一个Future的集合
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;

// 提交一个任务的集合,返回一个Future的集合,多了任务执行超时功能
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;

// 执行集合中任务,返回任意一个任务执行结果
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;

// 执行集合中任务,返回任意一个任务执行结果,多了任务执行超时功能
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

execute方法 [无返回值]

实现原理

  • ThreadPoolExecutor维护一组工作线程 Worker
  • execute 提交的 Runnable会被加入到任务队列中
  • Worker run 方法阻塞循环获取任务队列,然后在其run方法中执行Runnable的 run方法(非start)
  • 最后启动线程池中 Worker(实现Runnable)线程

submit方法 [有返回值]

  • Callable提交到线程池
  • 线程池转为FutureTask并返回,把任务提交到队列中进行计算
  • FutureTask 继承了Runnable并且重写了run方法,run方法调用了Callable的call方法能够获取返回值,其本身就定义了一个返回值成员变量,当调用get的时候就时自旋判断是否有返回值,没有则调用LockSuppot挂起线程
  • 调用get方法,此时如果线程池已计算完任务结果则直接返回,否则睡眠调用线程,直到任务计算完毕返回结果
1
2
3
4
@FunctionalInterface
public interface Callable<V> {
V call() throws Exception;
}
1
2
3
4
5
6
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task); // 封装成 FutureTask
execute(ftask); // FutureTask 继承了Runnable并且重写了run方法,其本身就定义了一个返回值成员变量,当调用get的时候就时自旋判断是否有返回值,没有则调用LockSuppot挂起线程
return ftask;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
// 调用set回写返回值
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
1
2
3
4
5
6
7
8
protected void set(V v) {
// 修改状态回写返回值
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}

FutureTask#get() 实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}

private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
// 计算等待截止时间
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
// 自旋
for (;;) {
// 1. 判断阻塞线程是否被中断,如果被中断则在等待队列中删除该节点并抛出InterruptedException异常
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
// 2. 获取当前状态,如果状态大于COMPLETING
// 说明任务已经结束(要么正常结束,要么异常结束,要么被取消)
// 则把thread显示置空,并返回结果
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
// 3. 如果状态处于中间状态COMPLETING
// 表示任务已经结束但是任务执行线程还没来得及给outcome赋值
// 这个时候让出执行权让其他线程优先执行
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
// 4. 如果等待节点为空,则构造一个等待节点
else if (q == null)
q = new WaitNode();
// 5. 如果还没有入队列,则把当前节点加入waiters首节点并替换原来waiters
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
// 如果需要等待特定时间,则先计算要等待的时间
// 如果已经超时,则删除对应节点并返回对应的状态
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
// 6. 阻塞等待特定时间
LockSupport.parkNanos(this, nanos);
}
else
// 6. 阻塞等待直到被其他线程唤醒
LockSupport.park(this);
}

// 等待节点,链表节点
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
}

newFixedThreadPool

固定数量的线程池

1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
  • 创建一个核心线程数和最大线程数一样
  • 核心线程最大存活时间无限 (容易造成资源浪费)
  • 使用无边界的链表队列存放待执行的任务(如果存在大量任务容易造成任务堆积导致OOM)

使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(2);

for (int i = 0; i < 10; i++) {
Runnable runnable = () -> {
try {
TimeUnit.SECONDS.sleep(1);
System.out.println(String.format("%s %s", Thread.currentThread().getName(),
DateFormatUtils.format(new Date(), "yyyy-MM-dd hh:mm:ss")));
} catch (InterruptedException e) {
e.printStackTrace();
}
};

// 提交任务
executorService.execute(runnable);
}
}
// 输出
pool-1-thread-1 2020-07-22 01:25:06
pool-1-thread-2 2020-07-22 01:25:06
pool-1-thread-2 2020-07-22 01:25:07
pool-1-thread-1 2020-07-22 01:25:07
pool-1-thread-2 2020-07-22 01:25:08
pool-1-thread-1 2020-07-22 01:25:08
pool-1-thread-2 2020-07-22 01:25:09
pool-1-thread-1 2020-07-22 01:25:09
pool-1-thread-2 2020-07-22 01:25:10
pool-1-thread-1 2020-07-22 01:25:10

newSingleThreadExecutor

创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,提交的任务按照FIFO顺序执行

  • 创建一个核心线程数,最大线程数为1的线程池
  • 任何时候都只有一个活跃的线程
  • 适用于单个任务顺序执行场景, 但是使用了无边界的BlockingQueue还是存在OOM风险
1
2
3
4
5
6
7
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}

使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public static void main(String[] args) {
ExecutorService executorService = Executors.newSingleThreadExecutor();

for (int i = 0; i < 10; i++) {
Runnable runnable = () -> {
try {
TimeUnit.SECONDS.sleep(1);
System.out.println(String.format("%s %s", Thread.currentThread().getName(),
DateFormatUtils.format(new Date(), "yyyy-MM-dd hh:mm:ss")));
} catch (InterruptedException e) {
e.printStackTrace();
}
};

// 提交任务
executorService.execute(runnable);
}
}

// 输出
pool-1-thread-1 2020-07-22 01:27:05
pool-1-thread-1 2020-07-22 01:27:06
pool-1-thread-1 2020-07-22 01:27:07
pool-1-thread-1 2020-07-22 01:27:08
pool-1-thread-1 2020-07-22 01:27:09
pool-1-thread-1 2020-07-22 01:27:10
pool-1-thread-1 2020-07-22 01:27:11
pool-1-thread-1 2020-07-22 01:27:12
pool-1-thread-1 2020-07-22 01:27:13
pool-1-thread-1 2020-07-22 01:27:14

newCachedThreadPool

具有缓存功能的线程池,无限制创建线程

  • 创建一个核心线程数为0,没有最大线程数的线程池
  • 线程的活跃时间为60秒,线程池会根据实际需求创建线程
  • 如果线程池中有空闲线程未被销毁,则新提交的任务会重用这些线程
  • 适用于高并发,任务处理时间短场景。长期空闲的池子不会消耗系统资源
  • 使用SynchronousQueue任务队列只能存储一个任务,其他全部通过创建新的非核心线程执行,大量任务涌入会OOM
1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();

for (int i = 0; i < 10; i++) {
Runnable runnable = () -> {
try {
TimeUnit.SECONDS.sleep(1);
System.out.println(String.format("%s %s", Thread.currentThread().getName(),
DateFormatUtils.format(new Date(), "yyyy-MM-dd hh:mm:ss")));
} catch (InterruptedException e) {
e.printStackTrace();
}
};

// 提交任务
executorService.execute(runnable);
}
}
// 输出
pool-1-thread-7 2020-07-22 01:25:53
pool-1-thread-6 2020-07-22 01:25:53
pool-1-thread-9 2020-07-22 01:25:53
pool-1-thread-8 2020-07-22 01:25:53
pool-1-thread-5 2020-07-22 01:25:53
pool-1-thread-1 2020-07-22 01:25:53
pool-1-thread-2 2020-07-22 01:25:53
pool-1-thread-4 2020-07-22 01:25:53
pool-1-thread-3 2020-07-22 01:25:53
pool-1-thread-10 2020-07-22 01:25:53

newScheduledThreadPool

创建一个定长线程池,支持定时及周期性任务执行

  • 创建一个可配置的核心线程数延迟队列
  • 队列是按延时时间升序排序,不是按照submit时间排序的了以延时时间作为优先级排序,延时时间短的,优先级高,放在前面先执行
1
2
3
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}

使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public static void main(String[] args) {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);

for (int i = 0; i < 10; i++) {
final int flag = i;
Runnable runnable = () -> {
try {
TimeUnit.SECONDS.sleep(1);
System.out.println(String.format("%s %s %s", Thread.currentThread().getName(),
flag,
DateFormatUtils.format(new Date(), "yyyy-MM-dd hh:mm:ss:SSS")));
} catch (InterruptedException e) {
e.printStackTrace();
}
};

/**
* 第一个参数: Runnable
* 第二个参数: initialDelay 初始延迟时间
* 第三个参数: period 间隔时长
* 第四个参数: unit 时间单位
*/
executorService.scheduleAtFixedRate(runnable, 10, 10, TimeUnit.MILLISECONDS);
}
}

//输出
pool-1-thread-2 1 2020-07-22 01:36:17:944
pool-1-thread-1 0 2020-07-22 01:36:17:937
pool-1-thread-2 2 2020-07-22 01:36:19:057
pool-1-thread-1 3 2020-07-22 01:36:19:057
pool-1-thread-1 4 2020-07-22 01:36:20:057
pool-1-thread-2 5 2020-07-22 01:36:20:057
pool-1-thread-1 6 2020-07-22 01:36:21:058
pool-1-thread-2 7 2020-07-22 01:36:21:058
pool-1-thread-1 8 2020-07-22 01:36:22:058
pool-1-thread-2 9 2020-07-22 01:36:22:058
pool-1-thread-1 0 2020-07-22 01:36:23:059
pool-1-thread-2 1 2020-07-22 01:36:23:059
pool-1-thread-2 3 2020-07-22 01:36:24:059
pool-1-thread-1 2 2020-07-22 01:36:24:059
pool-1-thread-1 5 2020-07-22 01:36:25:059
pool-1-thread-2 4 2020-07-22 01:36:25:059
pool-1-thread-1 6 2020-07-22 01:36:26:062
pool-1-thread-2 7 2020-07-22 01:36:26:063

CompletableFuture

API文档

example1

example2

CompletionService

CompletionService的实现目标是任务先完成可优先获取到,即结果按照完成先后顺序排序

API文档(https://www.matools.com/file/manual/jdk_api_1.8_google/java/util/concurrent/CompletionService.html)

example

Fork Join

核心类

  • RecursiveTask 递归任务类
  • ForkJoinPool 线程池
  • Future 获取结果

Fork/Join是Java中用于并行计算的框架,它基于工作窃取算法,用于将一个大任务切分为多个子任务,并行地执行,最后合并子任务的执行结果得到最终结果

工作窃取算法

​ 工作窃取算法是指某个线程从其他队列里窃取任务来执行。对于一个比较大的任务,可以把它分割为若干个互不依赖的子任务,为了减少线程间的竞争,把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应。但是,有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务需要处理,于是它就去其他线程的队列里窃取一个任务来执行。由于此时它们访问同一个队列,为了减小竞争,通常会使用双端队列。被窃取任务的线程永远从双端队列的头部获取任务,窃取任务的线程永远从双端队列的尾部获取任务

窃取算法优缺点

  • 优点:充分利用线程进行并行计算,减少了线程间的竞争
  • 缺点:双端队列只存在一个任务时会导致竞争,会消耗更多的系统资源,因为需要创建多个线程和多个双端队列

Fork/Join 框架的异常处理

ForkJoinTask 在执行的时候可能抛出异常,但没有办法在主线程中直接捕获异常,所以 ForkJoinTask 提供了 isCompletedAbnormally() 方法检查任务是否已经抛出异常或已经被取消。getException() 方法返回 Throwable 对象,如果任务被取消了则返回 CancellationException如果任务没有完成或者没有抛出异常则返回 null

fork() 方法的实现原理

​ 当调用 ForkJoinTask 的 fork() 方法时,程序会调用 ForkJoinPool.WorkQueuepush() 方法异步地执行这个任务,然后立即返回结果。代码如下:

1
2
3
4
5
6
7
8
public final ForkJoinTask<V> fork() {
Thread t;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
((ForkJoinWorkerThread)t).workQueue.push(this);
else
ForkJoinPool.common.externalPush(this);
return this;
}

​ push() 方法把当前任务存放在一个 ForkJoinTask 数组队列里,然后再调用 ForkJoinPoolsignalWork() 方法唤醒或创建一个工作线程来执行任务。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
final void push(ForkJoinTask<?> task) {
ForkJoinTask<?>[] a; ForkJoinPool p;
int b = base, s = top, n;
if ((a = array) != null) { // ignore if queue removed
int m = a.length - 1; // fenced write for task visibility
U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
U.putOrderedInt(this, QTOP, s + 1);
if ((n = s - b) <= 1) {
if ((p = pool) != null)
p.signalWork(p.workQueues, this);
}
else if (n >= m)
growArray();
}
}

join() 方法的实现原理

​ 当调用 ForkJoinTask 的 join() 方法时,程序会调用 doJoin() 方法,通过 doJoin() 方法来判断返回什么结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public final V join() {
int s;
if ((s = doJoin() & DONE_MASK) != NORMAL)
reportException(s);
return getRawResult();
}
private int doJoin() {
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
return (s = status) < 0 ? s :
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(w = (wt = (ForkJoinWorkerThread)t).workQueue).
tryUnpush(this) && (s = doExec()) < 0 ? s :
wt.pool.awaitJoin(w, this, 0L) :
externalAwaitDone();
}
private void reportException(int s) {
if (s == CANCELLED)
throw new CancellationException();
if (s == EXCEPTIONAL)
rethrow(getThrowableException());
}
public abstract V getRawResult();

1~10累加

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
/**
* RecursiveTask 递归任务
*/
public class Calculator extends RecursiveTask<Integer> {
// 任务递归分解停止的阈值
private static final int THRESHOLD = 1;

// 最小计算元素
private int start;
// 最大计算元素
private int end;

public Calculator(int start, int end) {
System.out.println("min:" + start + " max:" + end);
this.start = start;
this.end = end;
}

@Override
protected Integer compute() {
int sum = 0;

// 当数据分解的范围小于设定的阈值后才会真正执行计算求sum
if ((end - start) < THRESHOLD) {
for (int i = start; i <= end; i++) {
sum += i;
}
// 任务拆解范围如果还大于阈值,线程工作不参与求值计算,而是进行任务分解后求sum
} else {
int middle = (end + start) / 2;
Calculator left = new Calculator(start, middle);
Calculator rigth = new Calculator(middle + 1, end);

//分解
left.fork();
rigth.fork();

//合并
sum = left.join() + rigth.join();
}
return sum;
}

public static void main(String[] args) throws ExecutionException, InterruptedException {
ForkJoinPool forkJoinPool = new ForkJoinPool(4);
Future<Integer> result = forkJoinPool.submit(new Calculator(0, 10));
System.out.println(result.get());
}
}

ThreadLocal

在线参考

ThreadLocal是Java中的一个线程局部变量,用于在多线程环境下实现线程的数据隔离。它主要解决的问题是让每个线程都拥有自己的局部变量副本,避免了线程间的数据共享问题

ThreadLocal数据存储

数据直接存储在线程对象中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class Thread implements Runnable {
...

// ThreadLocalMap用于支持一个线程使用多个ThreadLocal
/* ThreadLocal values pertaining to this thread. This map is maintained
* by the ThreadLocal class. */
ThreadLocal.ThreadLocalMap threadLocals = null;

/*可继承的ThreadLocalMap,用于在父子线程间传递,在Thread构造函数中,如果父线程的这个属性不为空,子线程会复制一个一样的Map
* InheritableThreadLocal values pertaining to this thread. This map is
* maintained by the InheritableThreadLocal class.
*/
ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
...

用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private ThreadLocal<User> threadLocal = ThreadLocal.withInitial(() -> new User());

/**
* 测试是否每个线程都有副本
* 结果每个hashCode都不同
*/
@Test
public void hashCodeTest() {

for (int i = 0; i < 5; i++) {
new Thread(() -> {
log.info("thread:{} hashCode:{}", Thread.currentThread().getName(), threadLocal.get().hashCode());
}).start();
}
}

父子线程不共享ThreadLocal

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Test
public void subThreadTest() throws InterruptedException {
User user = threadLocal.get();
user.setName("hello");
threadLocal.set(user);

new Thread(() -> {
log.info("name: {}", threadLocal.get().getName()); // null
}).start();
TimeUnit.SECONDS.sleep(1);
}

@Data
class User {
private String name;
}

核心源码

1
2
3
4
5
6
7
8
9
public
class Thread implements Runnable {
...
ThreadLocal.ThreadLocalMap threadLocals = null;

//可继承的ThreadLocalMap,用于在父子线程间传递,在Thread构造函数中,如果父线程的这个属性不为空,子线程会复制一个一样的Map
ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
...
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
public class ThreadLocal<T> {
// 设置初始值
public static <S> ThreadLocal<S> withInitial(Supplier<? extends S> supplier) {
return new SuppliedThreadLocal<>(supplier);
}

static final class SuppliedThreadLocal<T> extends ThreadLocal<T> {

private final Supplier<? extends T> supplier;

SuppliedThreadLocal(Supplier<? extends T> supplier) {
this.supplier = Objects.requireNonNull(supplier);
}

// 其实就是返回用户在创建Threadlocal时指定的初始值
@Override
protected T initialValue() {
return supplier.get();
}
}

public T get() {
// 获取当前线程
Thread t = Thread.currentThread();
// 获取线程里的 threadLocals
ThreadLocalMap map = getMap(t);
// 如果 ThreadLocalMap 不为空,则根据当前 ThreadLocal 去map中获取value
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
// 线程第一次使用ThreadLocal则创建ThreadLocalMap 并赋予初始值
return setInitialValue();
}

// 初始化
private T setInitialValue() {
// 获取用户定义withInitial方法给的初始值
T value = initialValue();
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
// 创建ThreadLocalMap并赋值到线程里的 threadLocals
createMap(t, value);
return value;
}

public void set(T value) {
// 通过Thread获取Map
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
// 如果map不为空,则通过当前ThreadLocal作为key去获取value
if (map != null)
map.set(this, value);
else
// map为空则创建value
createMap(t, value);
}
}

ThreadLocal Key 的内存泄露

java存在的引用

如果 ThreadLocal 的引用丢失了(退出了定义ThreadLocal的方法),但是线程还存活,那ThreadLocal是否不会被释放?因为ThreadLocal被当作Key存在线程的 ThreadLocalMap 中,而导致内存泄漏?

防止Key的内存泄露:答案是否定的,当ThreadLocal 的引用丢失线程依然存活,丢失的ThreadLocal 能被回收!原因就是ThreadLocalMap 的Key是一个 弱引用,就算被GC回收了,再次调用 ThreadLocal 的 get 方法生成一个副本即可

DK建议ThreadLocal定义为private static,这样ThreadLocal的弱引用问题则不存在了

1
2
3
4
5
6
7
8
9
10
11
12
   // Thread 源码 
static class ThreadLocalMap {
// key是一个弱引用
static class Entry extends WeakReference<ThreadLocal<?>> {
/** The value associated with this ThreadLocal. */
Object value;

Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}

ThreadLocal value 的内存泄露

弱引用 的key(ThreadLocal)被GC线程回收了但是线程依然存活,此时 ThreadLocalMap 里的 value 会因为 key 被回收无法使用(再内存中但是无法通过key获取)从而造成内存泄漏问题,JDK是如何解决的?

只要在使用完 ThreadLocal 对象之后,及时调用 remove 方法, 移除 Entry 即可

InheritableThreadLocal

在Thread的构造函数调用init函数中,如果父线程inheritableThreadLocals的属性不为空,子线程就会复制一份保存到当前的inheritableThreadLocals(浅拷贝,重新创建一个map)属性中,以便于实现ThreadLocal的继承

线程间传值

父线程共享数据给子线程,但是父子线程之间相互独立,互不影响

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private InheritableThreadLocal<User> inheritableThreadLocal = new InheritableThreadLocal();

@Test
public void inheritableThreadLocalTest() throws InterruptedException {
User user = new User();
user.setName("hello");
inheritableThreadLocal.set(user);
log.info("name: {}", inheritableThreadLocal.get().getName()); // hello

new Thread(() -> {
User us = inheritableThreadLocal.get();
log.info("name: {}", us.getName()); // hello
us.setName("test2");
}).start();
TimeUnit.SECONDS.sleep(1);
}

@Data
class User {
private String name;
}

TransmittableThreadLocal

解决池化线程复用,实现父线程对ThreadLocal的修改池化线程能够感知到

​ 在异步线程之间,实现跨线程的ThreadLocal传递, 简单场景可以用InheritableThreadLocal,但ITL在线程池化场景下不适用,因为ITL是在子线程初始化时,拷贝了父线程的ThreadLocal,但池化场景,子线程是会被多次复用的,但ITL只能在子线程第一次创建时,传递ThreadLocal,之后的复用都无法重新设置ThreadLocal。于是TransmittableThreadLocal出现了,可以解决ThreadLocal在线程池化场景下的传递问题

​ 若希望 TransmittableThreadLocal 在线程池与主线程间传递,需配合 TtlRunnableTtlCallable 使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private TransmittableThreadLocal<Integer> transmittableThreadLocal = new TransmittableThreadLocal<>();

@SneakyThrows
@Test
public void transmittableThreadLocalTest() {
ExecutorService executorService = Executors.newFixedThreadPool(1);

transmittableThreadLocal.set(1);
// transmittableThreadLocal 输出1, InheritableThreadLocal 输出1
executorService.submit(TtlRunnable.get(() -> log.info("value={}", transmittableThreadLocal.get())));
TimeUnit.SECONDS.sleep(1);

transmittableThreadLocal.set(2);
// transmittableThreadLocal 输出2, InheritableThreadLocal 输出1
executorService.submit(TtlRunnable.get(() -> log.info("value={}", transmittableThreadLocal.get())));
TimeUnit.SECONDS.sleep(1);

transmittableThreadLocal.set(3);
// transmittableThreadLocal 输出3, InheritableThreadLocal 输出1
executorService.submit(TtlRunnable.get(() -> log.info("value={}", transmittableThreadLocal.get())));
TimeUnit.SECONDS.sleep(1);
}

Exchanger

详解

Exchanger是Java中的一个同步工具类,用于实现两个线程之间的数据交换。它提供了一个同步点,在这个同步点,两个线程可以交换彼此的数据。当一个线程调用exchange()方法时,它会被阻塞,直到另一个线程也调用了exchange()方法,然后两个线程将彼此的数据进行交换

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Slf4j
public class ExchangerTest {

@SneakyThrows
@Test
public void test() {
// 定义交换机
Exchanger<String> exchanger = new Exchanger<>();

new Thread(() -> {
String value1 = "value1";
try {
// 交换数据
value1 = exchanger.exchange(value1);
log.info("线程1交换数据 value1 = {}", value1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();

new Thread(() -> {
String value2 = "value2";
try {
// 交换数据
value2 = exchanger.exchange(value2);
log.info("线程2交换数据 value2 = {}", value2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();

TimeUnit.SECONDS.sleep(1);
}

// 线程1交换数据 value1 = value2
// 线程1交换数据 value2 = value1
}

Unsafe 魔法类

Java guide

在线example参考

LockSupport

参考

LockSupport是Java中提供的一个用于线程阻塞和唤醒的工具类。可以实现更加灵活的线程同步和通信。

最重要的方法是park()和unpark()。park()方法用于使当前线程进入阻塞状态,而unpark()方法则用于唤醒被阻塞的线程。

LockSupport中的blocker

获取其偏移量源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> tk = Thread.class;
parkBlockerOffset = UNSAFE.objectFieldOffset
(tk.getDeclaredField("parkBlocker"));
SEED = UNSAFE.objectFieldOffset
(tk.getDeclaredField("threadLocalRandomSeed"));
PROBE = UNSAFE.objectFieldOffset
(tk.getDeclaredField("threadLocalRandomProbe"));
SECONDARY = UNSAFE.objectFieldOffset
(tk.getDeclaredField("threadLocalRandomSecondarySeed"));
} catch (Exception ex) { throw new Error(ex); }
}

根据对Unsafe的理解可知,blocker其实就是通过Unsafe去操作ThreadparkBlocker属性

parkBlockerThread类中的成员变量,记录了当前线程阻塞时是被谁阻塞的,用于线程监控和分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
@Slf4j
public class LockSupportTest {
@Test
public void parkTest() {
Thread mainThread = Thread.currentThread();

// 子线程3秒后唤醒父线程
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(3);
log.info("唤醒父线程");
LockSupport.unpark(mainThread);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();

log.info("父线程开始阻塞");
LockSupport.park();
log.info("父线程被唤醒");
}
// 16:38:00.410 [main] INFO com.wgf.unit.lock.LockSupportTest - 父线程开始阻塞
// 16:38:03.417 [Thread-1] INFO com.wgf.unit.lock.LockSupportTest - 唤醒父线程
// 16:38:03.418 [main] INFO com.wgf.unit.lock.LockSupportTest - 父线程被唤醒

@Test
public void parkNanosTest() {
// 指定阻塞多少纳秒
log.info("线程开始阻塞");
LockSupport.parkNanos(2 * 1000000000L);
log.info("线程唤醒");
}
// 16:40:35.576 [main] INFO com.wgf.unit.lock.LockSupportTest - 线程开始阻塞
// 16:40:37.583 [main] INFO com.wgf.unit.lock.LockSupportTest - 线程唤醒

@Test
public void test() {
// 指定休眠多少ms, ms = 现在的毫秒数 + 需要阻塞的 step
long ms = System.currentTimeMillis();
ms += 2 * 1000;

log.info("线程开始阻塞");
LockSupport.parkUntil(ms);
log.info("线程唤醒");
}
// 16:43:01.519 [main] INFO com.wgf.unit.lock.LockSupportTest - 线程开始阻塞
// 16:43:03.524 [main] INFO com.wgf.unit.lock.LockSupportTest - 线程唤醒
}

网络IO

何为 I/O?

I/O(Input/Outpu) 即输入/输出

我们先从计算机结构的角度来解读一下 I/O

根据冯.诺依曼结构,计算机结构分为 5 大部分:运算器、控制器、存储器、输入设备、输出设备

输入设备(比如键盘)和输出设备(比如显示器)都属于外部设备。网卡、硬盘这种既可以属于输入设备,也可以属于输出设备

输入设备向计算机输入数据,输出设备接收计算机输出的数据

从计算机结构的视角来看的话, I/O 描述了计算机系统与外部设备之间通信的过程

我们在平常开发过程中接触最多的就是 磁盘 IO(读写文件)网络 IO(网络请求和响应)


  • read系统调用,并不是把数据直接从物理设备,读数据到内存。write系统调用,也不是直接把数据,写入到物理设备
  • read系统调用,是把数据从内核缓冲区复制到进程缓冲区;而write系统调用,是把数据从进程缓冲区复制到内核缓冲区。这个两个系统调用,都不负责数据在内核缓冲区和磁盘之间的交换。底层的读写交换,是由操作系统kernel内核完成的

内核缓冲与进程缓冲区

缓冲区的目的,是为了减少频繁的系统IO调用。大家都知道,系统调用需要保存之前的进程数据和状态等信息,而结束调用之后回来还需要恢复之前的信息,为了减少这种损耗时间、也损耗性能的系统调用,于是出现了缓冲区。

有了缓冲区,操作系统使用read函数把数据从内核缓冲区复制到进程缓冲区,write把数据从进程缓冲区复制到内核缓冲区中。等待缓冲区达到一定数量的时候,再进行IO的调用,提升性能。至于什么时候读取和存储则由内核来决定,用户程序不需要关心。

在linux系统中,系统内核也有个缓冲区叫做内核缓冲区。每个进程有自己独立的缓冲区,叫做进程缓冲区。

所以,用户程序的IO读写程序,大多数情况下,并没有进行实际的IO操作,而是在读写自己的进程缓冲区。



BIO(blocking IO )

BIO 属于同步阻塞 IO 模型

同步阻塞 IO 模型中,应用程序发起 read 调用后,会一直阻塞,直到内核把数据拷贝到用户空间 (比如读一个大文件,需要把文件都读取完毕后才能继续往下执行)

​ 在客户端连接数量不高的情况下,是没问题的。但是,当面对十万甚至百万级连接的时候,传统的 BIO 模型是无能为力的。因此,我们需要一种更高效的 I/O 处理模型来应对更高的并发量



NIO (nonblocking IO)

同步非阻塞 I/O

10 分钟看懂, Java NIO 底层原理

架构图

  • 选择器Selector: 可以理解为一个IO事件的监听与查询器。通过选择器,一个线程可以查询多个通道的IO事件的就绪状态

  • 通道 Channel:在NIO中,一个网络连接使用一个通道表示,所有NIO的IO操作都是通过连接通道完成的。一个通道类似于BIO中两个流的结合体,既可以从通道读取数据,也可以向通道写入数据(代表用户的一个链接)

  • 缓冲区 Buffer: 缓冲区本质上是一个可以读写数据的内存块,可以理解为是一个容器对象(含数组),Channel要读写数据必须经过缓冲区


IO多路复用: 服务端的一个线程可以处理多个客户端请求

​ 从编程实现维度来说,IO多路复用编程的第一步是把通道注册到选择器中,第二步是通过选择器所提供的事件查询(select)方法来查询这些注册的通道是否有已经就绪的IO事件(例如可读、可写、网络连接完成等)。由于一个选择器只需要一个线程进行监控,因此我们可以很简单地使用一个线程,通过选择器去管理多个连接通道


  • BIO 在读不到请求数据的情况下会选择阻塞,这样就无法处理其他客户端的请求
    • 当然BIO也可以对每个客户端创建一个线程,做伪异步,在新建的线程里阻塞请求(read调用),直到能读取数据。但是这样如果有1000个链接就需要创建1000个线程,对系统开销太大,无法满足高并发场景
  • 由于BIO的阻塞问题,NIO就提出了多路复用,每次都先调用select、epoll函数去查询有没有活跃的链接,如果没有活跃的链接也不进行阻塞操作,本次空循环进入下次循环继续查看是否有活跃的链接,有的话调用read函数将数据从内核缓存区复制到进程缓冲区(阻塞),执行业务逻辑后再将数据由进程缓冲区复制到内核缓冲区再到网卡,最终响应给客户端。这样的好处就是做到IO多路复用,服务器的一个线程能够处理多个客户端的请求(selector组件)


AIO (asynchronous IO)

异步 I/O

AIO 也就是 NIO 2。Java 7 中引入了 NIO 的改进版 NIO 2,它是异步 IO 模型



TCP UPD



序列化


java源码
https://wugengfeng.cn/2022/04/13/java源码/
作者
wugengfeng
发布于
2022年4月13日
许可协议