08.多线程进阶
# 01.Concurrent集合
# 1、ConcurrentHashMap
ConcurrentHashMap
是 Java 中提供的一种并发哈希表,它可以在多线程环境下安全地访问和修改共享数据- 与
HashMap
不同,ConcurrentHashMap
不使用全局锁来保护共享数据,而是使用分段锁来保护不同段的数据 - 在多个线程同时修改不同段的数据时,不需要等待其他线程释放锁,从而提高了并发性能
- 说明
- 们创建了一个
ConcurrentHashMap
实例,并在increaseCount()
和getCount()
方法中使用它来存储和检索计数 - 我们创建了两个线程并发地增加计数,由于
ConcurrentHashMap
是线程安全的 - 我们不需要使用锁或其他同步机制来保护计数
- 在所有线程结束后,我们可以获取到正确的计数值
- 们创建了一个
import java.util.concurrent.ConcurrentHashMap;
public class Main {
private ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
public void increaseCount(String key) {
map.compute(key, (k, v) -> (v == null) ? 1 : v + 1);
}
public int getCount(String key) {
return map.getOrDefault(key, 0);
}
public static void main(String[] args) throws InterruptedException {
Main example = new Main();
// 创建两个线程分别增加计数
Thread t1 = new Thread(() -> {
for (int i = 0; i < 5000; i++) {
example.increaseCount("count");
}
});
Thread t2 = new Thread(() -> {
for (int i = 0; i < 5000; i++) {
example.increaseCount("count");
}
});
// 启动线程
t1.start();
t2.start();
// 等待线程结束
t1.join();
t2.join();
// 输出计数值
System.out.println(example.getCount("count")); // 输出 10000
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# 2、ConcurrentLinkedQueue
ConcurrentLinkedQueue
是一个适用于高并发场景下的队列,使用链表实现,可以保证线程安全的入队和出队- 说明
- 我们创建了一个
ConcurrentLinkedQueue
实例,并创建了四个线程 - 两个生产者线程往队列中添加元素,两个消费者线程从队列中取出元素
- 由于
ConcurrentLinkedQueue
是线程安全的,我们不需要在offer()
和poll()
方法中使用锁或其他同步机制来保证线程安全
- 我们创建了一个
import java.util.concurrent.ConcurrentLinkedQueue;
public class Main {
public static void main(String[] args) throws InterruptedException {
ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
//创建两个线程往队列中添加元素
Thread producer1 = new Thread(() -> {
for (int i = 0; i < 5; i++) {
queue.offer("producer1's data " + i);
}
});
Thread producer2 = new Thread(() -> {
for (int i = 0; i < 5; i++) {
queue.offer("producer2's data " + i);
}
});
//创建两个线程从队列中取出元素
Thread consumer1 = new Thread(() -> {
while (true) {
String data = queue.poll();
if (data != null) {
System.out.println("consumer1 consumed " + data);
} else {
try {
Thread.sleep(1000); // 如果队列为空,暂停一会再尝试
} catch (InterruptedException e) {
break; // 如果被中断,结束线程
}
}
}
});
Thread consumer2 = new Thread(() -> {
while (true) {
String data = queue.poll();
if (data != null) {
System.out.println("consumer2 consumed " + data);
} else {
try {
Thread.sleep(1000); // 如果队列为空,暂停一会再尝试
} catch (InterruptedException e) {
break; // 如果被中断,结束线程
}
}
}
});
// 启动线程
producer1.start();
producer2.start();
consumer1.start();
consumer2.start();
// 让producer线程执行完毕
producer1.join();
producer2.join();
// 让consumer线程在主线程中执行一段时间后停止
Thread.sleep(1000);
consumer1.interrupt();
consumer2.interrupt();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# 3、CopyOnWriteArrayList
CopyOnWriteArrayList
是一个线程安全的ArrayList
- 在每次修改操作(如
add
,set
等)时,都会复制一份新的数据 - 而读操作则直接在当前数据上进行,因此读操作是非阻塞的,非常适合读多写少的并发场景
- 但是由于每次写操作都要复制一份新数据,所以在写多的场景下性能较差,且占用内存也较大
package cls;
import java.util.concurrent.CopyOnWriteArrayList;
public class Main {
private CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
public void add(String e) {
list.add(e);
}
public String get(int index) {
return list.get(index);
}
public static void main(String[] args) throws InterruptedException {
Main example = new Main();
// 创建一个线程向列表中添加元素
Thread writer = new Thread(() -> {
for (int i = 0; i < 5; i++) {
example.add("String " + i);
}
});
// 创建一个线程从列表中读取元素
Thread reader = new Thread(() -> {
for (int i = 0; i < 5; i++) {
System.out.println(example.get(i));
}
});
// 启动线程
writer.start();
writer.join(); // 确保写线程先执行完
reader.start();
reader.join();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# 02.线程池
# 1、ExecutorService
- Java通过
ExecutorService
接口提供了线程池的功能,它可以用来在后台执行任务 - 线程池可以有效地管理和控制线程的数量,避免大量的线程创建和销毁带来的资源浪费
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Main {
public static void main(String[] args) {
// 创建一个线程池,包含2个线程
ExecutorService executorService = Executors.newFixedThreadPool(2);
// 提交任务到线程池
for (int i = 0; i < 5; i++) {
final int taskID = i;
executorService.execute(new Runnable() {
public void run() {
System.out.println("Task " + taskID + " is running in " + Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
// 关闭线程池
executorService.shutdown();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# 2、Future
Future
接口表示一个可能还没有完成的异步任务的结果- 它提供了检查计算是否完成的方法,以等待计算的完成,并检索计算的结果
import java.util.concurrent.*;
public class Main {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService executorService = Executors.newFixedThreadPool(1);
// 创建一个任务
Callable<Integer> task = new Callable<Integer>() {
@Override
public Integer call() throws Exception {
System.out.println("Task started");
Thread.sleep(2000);
System.out.println("Task finished");
return 123;
}
};
System.out.println("Submitting Task");
Future<Integer> future = executorService.submit(task);
System.out.println("Task is submitted");
// 这会阻塞,直到任务完成
while (!future.isDone()) {
System.out.println("Task is not completed yet....");
Thread.sleep(1); //sleep for 1 millisecond before checking again
}
System.out.println("Task is completed, let's check the result");
// 获取任务的返回值
Integer result = future.get();
System.out.println("Result is: " + result);
executorService.shutdown();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# 3、CompletableFuture
CompletableFuture
,它是Future
的增强版本,提供了更多的异步编程能力- 你可以把
CompletableFuture
看作是异步执行的任务,它可以包含任务的状态,比如是否成功,是否被取消等 - 并且它可以关联其他的
CompletableFuture
,一旦当前的任务完成,它可以触发其他的任务 - 说明
- 我们创建了一个
CompletableFuture
对象,然后在另一个线程中,模拟了一个耗时的计算 - 然后用
CompletableFuture.complete
方法来告诉CompletableFuture
这个耗时操作已经完成,结果是"Task Completed" - 在主线程中,我们使用
CompletableFuture.get
方法等待任务完成,并得到任务的结果
- 我们创建了一个
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class Main {
public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
// 创建一个CompletableFuture对象
CompletableFuture<String> completableFuture = new CompletableFuture<>();
// 在另一个线程中完成此Future
new Thread( () -> {
// 模拟长时间的计算任务
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 告诉completableFuture任务已经完成
completableFuture.complete("Task Completed");
}).start();
// 等待Future完成
String result = completableFuture.get(10, TimeUnit.SECONDS);
System.out.println(result);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 4、ForkJoin
- 它是一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架
- 说明
- 我们创建了一个
SumTask
,它是一个RecursiveTask
,它表示一个可以分割的任务 - 我们在
compute
方法中检查任务是否足够小,如果足够小就直接计算任务 - 否则就分割任务,并递归调用
compute
方法,最后合并结果
- 我们创建了一个
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
class SumTask extends RecursiveTask<Long> {
static final int THRESHOLD = 100;
long[] array;
int start;
int end;
SumTask(long[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
if (end - start <= THRESHOLD) {
// 如果任务足够小就计算任务
long sum = 0;
for (int i = start; i < end; i++) {
sum += array[i];
}
return sum;
} else {
// 如果任务大则分裂成两个子任务
int middle = (start + end) / 2;
SumTask subTask1 = new SumTask(this.array, start, middle);
SumTask subTask2 = new SumTask(this.array, middle, end);
invokeAll(subTask1, subTask2);
Long subResult1 = subTask1.join();
Long subResult2 = subTask2.join();
return subResult1 + subResult2; //合并子任务
}
}
}
public class Main {
public static void main(String[] args) {
long[] array = new long[1000];
// 创建1000个随机数
for (int i = 0; i < array.length; i++) {
array[i] = (long) (Math.random() * 10000);
}
// 创建一个ForkJoinPool
ForkJoinPool forkJoinPool = new ForkJoinPool();
// 创建一个大任务
SumTask task = new SumTask(array, 0, array.length);
// 提交大任务
Long result = forkJoinPool.invoke(task);
System.out.println(result);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
上次更新: 2024/5/31 11:18:42