Java线程池 - ThreadPool自定义封装

  • 自定义线程池,便于调试和控制线程
  • 自定义线程数、拒绝策略、关闭等
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    @Slf4j
    public class CustomTaskThreadPool implements CustomThreadPool {
    /**
    * 核心大小
    */
    private static int CORE_POOLSIZE = Runtime.getRuntime().availableProcessors() * 2;

    /**
    * 上限
    */
    private static int MAX_POOLSIZE = CORE_POOLSIZE * 10;

    //留存时间ms
    private static long KEEP_ALIVETIME = 300000L;

    private static long SHUTDOWN_WAIT_TIME = 300000L;

    /**
    * 时间单位
    */
    private static TimeUnit UNIT = TimeUnit.MILLISECONDS;

    /**
    * 队列容量
    */
    private static int CAPACITY = 20;

    /**
    * 队列
    */
    private static BlockingQueue<Runnable> WORK_QUEUE;

    /**
    * 线程工厂
    */
    private static ThreadFactory THREAD_FACTORY;

    /**
    * 默认拒绝策略
    */
    private static RejectedExecutionHandler REJECTED_EXECUTION_HANDLER;


    private static ThreadPoolExecutor executor = null;

    private static class CustomTaskThreadPoolFactory {
    public static CustomTaskThreadPool CustomTaskThreadPool = new CustomTaskThreadPool();
    }

    /**
    * 线程池单例
    *
    * @return
    */
    public static CustomTaskThreadPool getInstance() {
    return CustomTaskThreadPoolFactory.CustomTaskThreadPool;
    }

    public CustomTaskThreadPool() {
    if (CORE_POOLSIZE <= 0 || CORE_POOLSIZE > 10) {
    CORE_POOLSIZE = 4;
    MAX_POOLSIZE = CORE_POOLSIZE * 10;
    }

    THREAD_FACTORY = new CustomThreadFactory();
    WORK_QUEUE = new ArrayBlockingQueue<>(CAPACITY); REJECTED_EXECUTION_HANDLER = new AbortPolicy();

    executor = new ThreadPoolExecutor(
    CORE_POOLSIZE,
    MAX_POOLSIZE,
    KEEP_ALIVETIME,
    UNIT,
    WORK_QUEUE,
    THREAD_FACTORY,
    REJECTED_EXECUTION_HANDLER
    );
    }

    static {
    Runtime.getRuntime().addShutdownHook(new Thread(new CustomRunnable() {
    @Override
    public void run() {
    close();
    }
    }));
    }

    private static void close() {
    try {
    log.warn("即将关闭线程池,等待:{}ms,活跃:{},未完成:{},最大完成:{},总完成:{}",
    SHUTDOWN_WAIT_TIME,
    executor.getActiveCount(),
    executor.getQueue().size(),
    executor.getLargestPoolSize(),
    executor.getCompletedTaskCount());
    executor.shutdown();

    //等待关闭
    executor.awaitTermination(SHUTDOWN_WAIT_TIME, UNIT);

    log.warn("已关闭线程池");

    //未完成
    List<Runnable> CustomRunnableList = executor.shutdownNow();
    if (!CollectionUtil.isEmpty(CustomRunnableList)) {
    CustomRunnableList.forEach(e -> log.warn("关闭未完成任务:" + e.toString()));
    }

    } catch (Exception e) {
    e.printStackTrace();
    }
    }

    @Override
    public boolean execute(Runnable runnable) {
    try {
    executor.execute(new CustomRunnable(runnable));
    } catch (Exception e) {
    log.error(e.getMessage(), e);
    return false;
    }

    return true;
    }

    @Override
    public <T> Future<T> submit(Callable<T> tCallable) {
    try {
    return executor.submit(new CustomCallable<T>(tCallable));
    } catch (RejectedExecutionException e) {
    log.error(e.getMessage(), e);
    }
    return null;
    }

    @Override
    public void shutDown() {
    executor.shutdown();
    }
    /**
    * 拒绝策略,丢弃不作任何处理
    */
    public static class AbortPolicy implements RejectedExecutionHandler {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

    String info = "";

    try {
    Field field = FutureTask.class.getDeclaredField("callable");
    field.setAccessible(true);

    FutureTask futureTask = (FutureTask) r;
    Callable callable = (Callable) field.get(futureTask);
    CustomCallable CustomCallable = (CustomCallable) ((CustomCallable) callable).callable;
    info = CustomCallable.toString();
    } catch (Exception ex) {
    ex.printStackTrace();
    }

    log.error("AbortPolicy: " + info + " rejected from " + e.toString());
    }
    }
    }

注意

  • 计算密集型

    线程数 = CPU核数+1,也可以设置成CPU核数*2,但还要看JDK的版本以及CPU配置(服务器的CPU有超线程)。

  • IO密集型

    线程数 = CPU核心数/(1-阻塞系数) 这个阻塞系数一般为0.8~0.9之间,也可以取0.8或者0.9。
    套用公式,对于双核CPU来说,它比较理想的线程数就是20,当然这都不是绝对的,需要根据实际情况以及实际业务来调整:final int poolSize = (int)(cpuCore/(1-0.9))

    对于阻塞系数,我们可以先试着猜测,抑或采用一些性能分析工具或java.lang.management API 来确定线程花在系统/IO操作上的时间与CPU密集任务所耗的时间比值。

  • Runtime.getRuntime().availableProcessors()

    因为容器不是物理隔离的,使用Runtime.getRuntime().availableProcessors() ,会拿到物理CPU个数,而不是容器申请时的个数。

    例如宿主机器是4核CPU16G内存

    • java 6/7/8/9

      docker run –cpus 1 -m 1G -it adoptopenjdk/openjdk9:latest # 给1核

      jshell -J-Xmx512M -v # 启动jshell

      Runtime.getRuntime().availableProcessors() # 结果是不是1!!!

    • java 10

      docker run –cpus 1 -m 1G -it adoptopenjdk/openjdk10:latest # 给1核

      jshell -J-Xmx512M -v # 启动jshell

      Runtime.getRuntime().availableProcessors() # 结果是1

------ 本文结束------

本文标题:Java线程池 - ThreadPool自定义封装

文章作者:Perkins

发布时间:2019年08月16日

原始链接:https://perkins4j2.github.io/posts/33509/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。