不做大哥好多年 不做大哥好多年
首页
  • MySQL
  • Redis
  • Elasticsearch
  • Kafka
  • Etcd
  • MongoDB
  • TiDB
  • RabbitMQ
  • 01.GO基础
  • 02.面向对象
  • 03.并发编程
  • 04.常用库
  • 05.数据库操作
  • 06.Beego框架
  • 07.Beego商城
  • 08.GIN框架
  • 09.GIN论坛
  • 10.微服务
  • 01.Python基础
  • 02.Python模块
  • 03.Django
  • 04.Flask
  • 05.SYL
  • 06.Celery
  • 10.微服务
  • 01.Java基础
  • 02.面向对象
  • 03.Java进阶
  • 04.Web基础
  • 05.Spring框架
  • 100.微服务
  • Docker
  • K8S
  • 容器原理
  • Istio
  • 数据结构
  • 算法基础
  • 算法题分类
  • 前置知识
  • PyTorch
  • 01.Python
  • 02.GO
  • 03.Java
  • 04.业务问题
  • 05.关键技术
  • 06.项目常识
  • 10.计算机基础
  • Linux基础
  • Linux高级
  • Nginx
  • KeepAlive
  • ansible
  • zabbix
  • Shell
  • Linux内核

逍遥子

不做大哥好多年
首页
  • MySQL
  • Redis
  • Elasticsearch
  • Kafka
  • Etcd
  • MongoDB
  • TiDB
  • RabbitMQ
  • 01.GO基础
  • 02.面向对象
  • 03.并发编程
  • 04.常用库
  • 05.数据库操作
  • 06.Beego框架
  • 07.Beego商城
  • 08.GIN框架
  • 09.GIN论坛
  • 10.微服务
  • 01.Python基础
  • 02.Python模块
  • 03.Django
  • 04.Flask
  • 05.SYL
  • 06.Celery
  • 10.微服务
  • 01.Java基础
  • 02.面向对象
  • 03.Java进阶
  • 04.Web基础
  • 05.Spring框架
  • 100.微服务
  • Docker
  • K8S
  • 容器原理
  • Istio
  • 数据结构
  • 算法基础
  • 算法题分类
  • 前置知识
  • PyTorch
  • 01.Python
  • 02.GO
  • 03.Java
  • 04.业务问题
  • 05.关键技术
  • 06.项目常识
  • 10.计算机基础
  • Linux基础
  • Linux高级
  • Nginx
  • KeepAlive
  • ansible
  • zabbix
  • Shell
  • Linux内核
  • Java基础

  • 面向对象

  • Java进阶

    • 01.文件处理
    • 02.异常处理
    • 03.反射
    • 04.注解
    • 05.泛型
    • 06.日期与时间
    • 07.多线程基础
    • 08.多线程进阶
      • 01.Concurrent集合
        • 1、ConcurrentHashMap
        • 2、ConcurrentLinkedQueue
        • 3、CopyOnWriteArrayList
      • 02.线程池
        • 1、ExecutorService
        • 2、Future
        • 3、CompletableFuture
        • 4、ForkJoin
    • 10.设计模式
  • Web基础

  • Spring框架

  • 微服务

  • Java
  • Java进阶
xiaonaiqiang
2024-04-30
目录

08.多线程进阶

# 01.Concurrent集合

# 1、ConcurrentHashMap

  • ConcurrentHashMap 是 Java 中提供的一种并发哈希表,它可以在多线程环境下安全地访问和修改共享数据
  • 与 HashMap 不同,ConcurrentHashMap 不使用全局锁来保护共享数据,而是使用分段锁来保护不同段的数据
  • 在多个线程同时修改不同段的数据时,不需要等待其他线程释放锁,从而提高了并发性能
  • 说明
    • 们创建了一个 ConcurrentHashMap 实例,并在 increaseCount() 和 getCount() 方法中使用它来存储和检索计数
    • 我们创建了两个线程并发地增加计数,由于 ConcurrentHashMap 是线程安全的
    • 我们不需要使用锁或其他同步机制来保护计数
    • 在所有线程结束后,我们可以获取到正确的计数值
import java.util.concurrent.ConcurrentHashMap;

public class Main {

    private ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();

    public void increaseCount(String key) {
        map.compute(key, (k, v) -> (v == null) ? 1 : v + 1);
    }

    public int getCount(String key) {
        return map.getOrDefault(key, 0);
    }

    public static void main(String[] args) throws InterruptedException {
        Main example = new Main();
        // 创建两个线程分别增加计数
        Thread t1 = new Thread(() -> {
            for (int i = 0; i < 5000; i++) {
                example.increaseCount("count");
            }
        });
        Thread t2 = new Thread(() -> {
            for (int i = 0; i < 5000; i++) {
                example.increaseCount("count");
            }
        });
        // 启动线程
        t1.start();
        t2.start();
        // 等待线程结束
        t1.join();
        t2.join();
        // 输出计数值
        System.out.println(example.getCount("count")); // 输出 10000
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

# 2、ConcurrentLinkedQueue

  • ConcurrentLinkedQueue 是一个适用于高并发场景下的队列,使用链表实现,可以保证线程安全的入队和出队
  • 说明
    • 我们创建了一个 ConcurrentLinkedQueue 实例,并创建了四个线程
    • 两个生产者线程往队列中添加元素,两个消费者线程从队列中取出元素
    • 由于 ConcurrentLinkedQueue 是线程安全的,我们不需要在 offer() 和 poll() 方法中使用锁或其他同步机制来保证线程安全
import java.util.concurrent.ConcurrentLinkedQueue;

public class Main {
    public static void main(String[] args) throws InterruptedException {
        ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
        //创建两个线程往队列中添加元素
        Thread producer1 = new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                queue.offer("producer1's data " + i);
            }
        });
        Thread producer2 = new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                queue.offer("producer2's data " + i);
            }
        });
        //创建两个线程从队列中取出元素
        Thread consumer1 = new Thread(() -> {
            while (true) {
                String data = queue.poll();
                if (data != null) {
                    System.out.println("consumer1 consumed " + data);
                } else {
                    try {
                        Thread.sleep(1000); // 如果队列为空,暂停一会再尝试
                    } catch (InterruptedException e) {
                        break; // 如果被中断,结束线程
                    }
                }
            }
        });
        Thread consumer2 = new Thread(() -> {
            while (true) {
                String data = queue.poll();
                if (data != null) {
                    System.out.println("consumer2 consumed " + data);
                } else {
                    try {
                        Thread.sleep(1000); // 如果队列为空,暂停一会再尝试
                    } catch (InterruptedException e) {
                        break; // 如果被中断,结束线程
                    }
                }
            }
        });
        // 启动线程
        producer1.start();
        producer2.start();
        consumer1.start();
        consumer2.start();
        // 让producer线程执行完毕
        producer1.join();
        producer2.join();
        // 让consumer线程在主线程中执行一段时间后停止
        Thread.sleep(1000);
        consumer1.interrupt();
        consumer2.interrupt();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59

# 3、CopyOnWriteArrayList

  • CopyOnWriteArrayList 是一个线程安全的 ArrayList
  • 在每次修改操作(如 add,set 等)时,都会复制一份新的数据
  • 而读操作则直接在当前数据上进行,因此读操作是非阻塞的,非常适合读多写少的并发场景
  • 但是由于每次写操作都要复制一份新数据,所以在写多的场景下性能较差,且占用内存也较大
package cls;

import java.util.concurrent.CopyOnWriteArrayList;

public class Main {
    private CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();

    public void add(String e) {
        list.add(e);
    }

    public String get(int index) {
        return list.get(index);
    }

    public static void main(String[] args) throws InterruptedException {
        Main example = new Main();

        // 创建一个线程向列表中添加元素
        Thread writer = new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                example.add("String " + i);
            }
        });

        // 创建一个线程从列表中读取元素
        Thread reader = new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                System.out.println(example.get(i));
            }
        });

        // 启动线程
        writer.start();
        writer.join(); // 确保写线程先执行完
        reader.start();
        reader.join();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

# 02.线程池

# 1、ExecutorService

  • Java通过ExecutorService接口提供了线程池的功能,它可以用来在后台执行任务
  • 线程池可以有效地管理和控制线程的数量,避免大量的线程创建和销毁带来的资源浪费
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Main {

    public static void main(String[] args) {
        // 创建一个线程池,包含2个线程
        ExecutorService executorService = Executors.newFixedThreadPool(2);

        // 提交任务到线程池
        for (int i = 0; i < 5; i++) {
            final int taskID = i;
            executorService.execute(new Runnable() {
                public void run() {
                    System.out.println("Task " + taskID + " is running in " + Thread.currentThread().getName());
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }

        // 关闭线程池
        executorService.shutdown();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

# 2、Future

  • Future接口表示一个可能还没有完成的异步任务的结果
  • 它提供了检查计算是否完成的方法,以等待计算的完成,并检索计算的结果
import java.util.concurrent.*;

public class Main {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService executorService = Executors.newFixedThreadPool(1);

        // 创建一个任务
        Callable<Integer> task = new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                System.out.println("Task started");
                Thread.sleep(2000);
                System.out.println("Task finished");
                return 123;
            }
        };

        System.out.println("Submitting Task");

        Future<Integer> future = executorService.submit(task);

        System.out.println("Task is submitted");

        // 这会阻塞,直到任务完成
        while (!future.isDone()) {
            System.out.println("Task is not completed yet....");
            Thread.sleep(1); //sleep for 1 millisecond before checking again
        }

        System.out.println("Task is completed, let's check the result");

        // 获取任务的返回值
        Integer result = future.get();

        System.out.println("Result is: " + result);

        executorService.shutdown();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40

# 3、CompletableFuture

  • CompletableFuture,它是Future的增强版本,提供了更多的异步编程能力
  • 你可以把CompletableFuture看作是异步执行的任务,它可以包含任务的状态,比如是否成功,是否被取消等
  • 并且它可以关联其他的CompletableFuture,一旦当前的任务完成,它可以触发其他的任务
  • 说明
    • 我们创建了一个CompletableFuture对象,然后在另一个线程中,模拟了一个耗时的计算
    • 然后用CompletableFuture.complete方法来告诉CompletableFuture这个耗时操作已经完成,结果是"Task Completed"
    • 在主线程中,我们使用CompletableFuture.get方法等待任务完成,并得到任务的结果
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class Main {
    public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
        // 创建一个CompletableFuture对象
        CompletableFuture<String> completableFuture = new CompletableFuture<>();

        // 在另一个线程中完成此Future
        new Thread( () -> {
            // 模拟长时间的计算任务
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 告诉completableFuture任务已经完成
            completableFuture.complete("Task Completed");
        }).start();

        // 等待Future完成
        String result = completableFuture.get(10, TimeUnit.SECONDS);
        System.out.println(result);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

# 4、ForkJoin

  • 它是一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架
  • 说明
    • 我们创建了一个SumTask,它是一个RecursiveTask,它表示一个可以分割的任务
    • 我们在compute方法中检查任务是否足够小,如果足够小就直接计算任务
    • 否则就分割任务,并递归调用compute方法,最后合并结果
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

class SumTask extends RecursiveTask<Long> {
    static final int THRESHOLD = 100;
    long[] array;
    int start;
    int end;

    SumTask(long[] array, int start, int end) {
        this.array = array;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        if (end - start <= THRESHOLD) {
            // 如果任务足够小就计算任务
            long sum = 0;
            for (int i = start; i < end; i++) {
                sum += array[i];
            }
            return sum;
        } else {
            // 如果任务大则分裂成两个子任务
            int middle = (start + end) / 2;
            SumTask subTask1 = new SumTask(this.array, start, middle);
            SumTask subTask2 = new SumTask(this.array, middle, end);
            invokeAll(subTask1, subTask2);
            Long subResult1 = subTask1.join();
            Long subResult2 = subTask2.join();
            return subResult1 + subResult2; //合并子任务
        }
    }
}

public class Main {
    public static void main(String[] args) {
        long[] array = new long[1000];
        // 创建1000个随机数
        for (int i = 0; i < array.length; i++) {
            array[i] = (long) (Math.random() * 10000);
        }
        // 创建一个ForkJoinPool
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        // 创建一个大任务
        SumTask task = new SumTask(array, 0, array.length);
        // 提交大任务
        Long result = forkJoinPool.invoke(task);
        System.out.println(result);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
上次更新: 2024/5/31 11:18:42
07.多线程基础
10.设计模式

← 07.多线程基础 10.设计模式→

最近更新
01
04.数组双指针排序_子数组
03-25
02
08.动态规划
03-25
03
06.回溯算法
03-25
更多文章>
Theme by Vdoing | Copyright © 2019-2025 逍遥子 技术博客 京ICP备2021005373号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式