线程池简介

Author Avatar
罗炜光 3月 26, 2016
  • 在其它设备中阅读本文章

一个线程的生命周期由三方面组成

  1. 创建线程的时间
  2. 线程执行的时间
  3. 执行销毁的时间

那么只要这三方面中的某一个所需的时间减少,就能够提高程序的性能
而线程池技术就是缩短创建与销毁线程的时间。

线程池由至少四部分组成

  1. 线程池管理器:用于创建并管理线程池
  2. 工作线程:线程池中的线程
  3. 任务接口:每个任务必须实现的而接口,以供工作线程调度任务的执行
  4. 任务队列:用于存放没有处理的任务。提供一种缓冲机制

简单线程池的实现

import java.util.Date;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class Test
{
    public static void main(String[] args)
    {
        ThreadPool t = ThreadPool.getThreadPool(3);
        t.execute(new Task[] { new TestTask(), new TestTask(), new TestTask() });
        System.out.println(t);
        t.execute(new Task[] { new TestTask(), new TestTask(), new TestTask() });
        System.out.println(t);
        t.destroy();// 所有线程都执行完成才destory
        System.out.println(t);
    }

    static class TestTask extends Task
    {
        @Override
        public void run()
        {
            try
            {
                Thread.sleep((long)(Math.random()*100));
            } catch (InterruptedException e)
            {
                e.printStackTrace();
            }
            System.out.println("线程编号"+this.getTaskId()+"线程产生时间:"+this.getGenerateTime().getTime()+"线程提交到任务队列时间:"+this.getSubmitTime().getTime()+"线程从任务队列移除的时间:"+this.getRemoveFinishTime().getTime()+"线程开始执行时间:"+this.getBeginExceuteTime().getTime());
        }
    }
}

final class ThreadPool
{

    // 线程池中线程的默认数
    private static int worker_num = 5;
    // 工作线程
    private WorkThread[] workThreads;
    // 未处理的任务
    private static volatile int removefinished_task = 0;
    private static volatile int taskID = 0;
    // 任务队列
    private BlockingQueue<Task> taskQueue = new LinkedBlockingQueue<Task>();
    private static ThreadPool threadPool;

    // 创建具有默认线程个数的线程池
    private ThreadPool()
    {
        this(worker_num);
    }

    // 创建线程池,worker_num为线程池中工作线程的个数
    private ThreadPool(int worker_num)
    {
        ThreadPool.worker_num = worker_num;
        workThreads = new WorkThread[worker_num];
        for (int i = 0; i < worker_num; i++)
        {
            workThreads[i] = new WorkThread();
            workThreads[i].start();// 开启线程池中的线程
        }
    }

    // 单态模式,获得一个默认线程个数的线程池
    public static ThreadPool getThreadPool()
    {
        return getThreadPool(ThreadPool.worker_num);
    }

    // 单态模式,获得一个指定线程个数的线程池,worker_num(>0)为线程池中工作线程的个数
    // worker_num<=0创建默认的工作线程个数
    public static ThreadPool getThreadPool(int worker_num)
    {
        if (worker_num <= 0)
        {
            worker_num = ThreadPool.worker_num;
        }
        if (threadPool == null)
        {
            threadPool = new ThreadPool(worker_num);
        }
        return threadPool;
    }

    // 执行任务,其实只是把任务加入任务队列,什么时候执行有线程池管理器觉定
    public void execute(Task task)
    {
        synchronized (taskQueue)
        {
            task.setTaskId(taskID++);
            task.setSubmitTime(new Date());
            taskQueue.add(task);
            taskQueue.notify();
        }
    }

    // 批量执行任务,其实只是把任务加入任务队列,什么时候执行有线程池管理器觉定
    public void execute(Task[] task)
    {
        synchronized (taskQueue)
        {
            for (Task t : task)
            {
                t.setTaskId(taskID++);
                t.setSubmitTime(new Date());
                taskQueue.add(t);
            }
            taskQueue.notify();
        }
    }

    // 批量执行任务,其实只是把任务加入任务队列,什么时候执行有线程池管理器觉定
    public void execute(List<Task> task)
    {
        synchronized (taskQueue)
        {
            for (Task t : task)
            {
                t.setTaskId(taskID++);
                t.setSubmitTime(new Date());
                taskQueue.add(t);
            }
            taskQueue.notify();
        }
    }

    // 销毁线程池,该方法保证在所有任务都完成的情况下才销毁所有线程,否则等待任务完成才销毁
    public void destroy()
    {
        while (!taskQueue.isEmpty())// 如果还有任务没执行完成,就先睡会吧
        {
            try
            {
                Thread.sleep(10);
            } catch (Exception e)
            {
                e.printStackTrace();
            }
        }
        // 工作线程停止工作,且置为null
        for (int i = 0; i < worker_num; i++)
        {
            workThreads[i].stopWorker();
            workThreads[i] = null;
        }
        threadPool = null;
        taskQueue.clear();// 清空任务队列
    }

    // 返回工作线程的个数
    public int getWorkThreadNumber()
    {
        return worker_num;
    }

    // 返回已完成任务的个数,这里的已完成是只出了任务队列的任务个数,可能该任务并没有实际执行完成
    public int getFinishedTasknumber()
    {
        return removefinished_task;
    }

    // 返回任务队列的长度,即还没处理的任务个数
    public int getWaitTasknumber()
    {
        return taskQueue.size();
    }

    public String toString()
    {
        return "WorkThread number:" + worker_num + "  finished task number:"
                + removefinished_task + "  wait task number:" + getWaitTasknumber();
    }

    /**
     * 内部类,工作线程
     */
    private class WorkThread extends Thread
    {
        // 该工作线程是否有效,用于结束该工作线程
        private boolean isRunning = true;
        /*
         * 关键所在啊,如果任务队列不空,则取出任务执行,若任务队列空,则等待
         */
        @Override
        public void run()
        {
            Task r = null;
            while (isRunning)// 注意,若线程无效则自然结束run方法,该线程就没用了
            {
                synchronized (taskQueue)
                {
                    while (isRunning && taskQueue.isEmpty())// 队列为空
                    {
                        try
                        {
                            taskQueue.wait(20);
                        } catch (Exception e)
                        {
                            e.printStackTrace();
                        }
                    }
                    if (!taskQueue.isEmpty())
                    {
                        removefinished_task++;
                        r = taskQueue.remove();// 取出任务
                        r.setRemoveFinishTime(new Date());
                    }
                }
                if (r != null)
                {
                    r.setBeginExceuteTime(new Date());
                    r.run();// 执行任务
                }
                r = null;

            }

        }

        // 停止工作,让该线程自然执行完run方法,自然结束
        public void stopWorker()
        {
            isRunning = false;
        }
    }

}

/**
 * 所有任务接口 其他任务必须继承访类
 * 
 * @author obullxl
 */
abstract class Task implements Runnable
{
    /* 产生时间 */
    private Date generateTime = null;
    /* 提交执行时间 */
    private Date submitTime = null;
    /* 开始执行时间 */
    private Date beginExceuteTime = null;
    /* 出任务队列的时间 */
    private Date removeFinishTime = null;

    private long taskId;

    public Task()
    {
        this.generateTime = new Date();
    }

    /**
     * 任务执行入口
     */
     abstract public void run();

    public Date getGenerateTime()
    {
        return generateTime;
    }

    public Date getBeginExceuteTime()
    {
        return beginExceuteTime;
    }

    public void setBeginExceuteTime(Date beginExceuteTime)
    {
        this.beginExceuteTime = beginExceuteTime;
    }

    public Date getRemoveFinishTime()
    {
        return removeFinishTime;
    }

    public void setRemoveFinishTime(Date finishTime)
    {
        this.removeFinishTime = finishTime;
    }

    public Date getSubmitTime()
    {
        return submitTime;
    }

    public void setSubmitTime(Date submitTime)
    {
        this.submitTime = submitTime;
    }

    public long getTaskId()
    {
        return taskId;
    }

    public void setTaskId(long taskId)
    {
        this.taskId = taskId;
    }

}

Java中的线程池

Java中的几种线程池最终都会调用此方法。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)

  • corePoolSize:核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;
  • maximumPoolSize:线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;
  • keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;
  • unit:参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性:
    • TimeUnit.DAYS; //天
    • TimeUnit.HOURS; //小时
    • TimeUnit.MINUTES; //分钟
    • TimeUnit.SECONDS; //秒
    • TimeUnit.MILLISECONDS; //毫秒
    • TimeUnit.MICROSECONDS; //微妙
    • TimeUnit.NANOSECONDS; //纳秒
  • workQueue:线程池所使用的缓冲队列,该缓冲队列的长度决定了能够缓冲的最大数量,缓冲队列有三种通用策略:
  1. 直接提交。工作队列的默认选项是 SynchronousQueue,它将任务直接提交给线程而不保持它们。在此,如果不存在可用于立即运行任务的线程,则试图把任务加入队列将失败,因此会构造一个新的线程。此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。直接提交通常要求无界 maximumPoolSizes以避免拒绝新提交的任务。当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性;
  2. 无界队列。使用无界队列(例如,不具有预定义容量的LinkedBlockingQueue)将导致在所有 corePoolSize线程都忙时新任务在队列中等待。这样,创建的线程就不会超过 corePoolSize。(因此,maximumPoolSize的值也就无效了。)当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列;例如,在 Web 页服务器中。这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性;
  3. 有界队列。当使用有限的 maximumPoolSizes 时,有界队列(如ArrayBlockingQueue)有助于防止资源耗尽,但是可能较难调整和控制。队列大小和最大池大小可能需要相互折衷:使用大型队列和小型池可以最大限度地降低 CPU使用率、操作系统资源和上下文切换开销,但是可能导致人工降低吞吐量。如果任务频繁阻塞(例如,如果它们是 I/O边界),则系统可能为超过您许可的更多线程安排时间。使用小型队列通常要求较大的池大小,CPU 使用率较高,但是可能遇到不可接受的调度开销,这样也会降低吞吐量.
  • threadFactory:线程工厂,主要用来创建线程;
  • handler:表示当拒绝处理任务时的策略,有以下四种取值:
    • ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
    • ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
    • ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
    • ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

corePoolSize与maximumPoolSize
由于ThreadPoolExecutor 将根据 corePoolSize和 maximumPoolSize设置的边界自动调整池大小,当新任务在方法 execute(java.lang.Runnable) 中提交时:

1. 如果运行的线程少于 corePoolSize,则创建新线程来处理请求,即使其他辅助线程是空闲的;
2. 如果设置的corePoolSize 和 maximumPoolSize相同,则创建的线程池是大小固定的,如果运行的线程与corePoolSize相同,当有新请求过来时,若workQueue未满,则将请求放入workQueue中,等待有空闲的线程去从workQueue中取任务并处理
3. 如果运行的线程多于 corePoolSize 而少于 maximumPoolSize,则仅当队列满时才创建新的线程去处理请求;
4. 如果运行的线程多于corePoolSize 并且等于maximumPoolSize,若队列已经满了,则通过handler所指定的策略来处理新请求;
5. 如果将 maximumPoolSize 设置为基本的无界值(如 Integer.MAX_VALUE),则允许池适应任意数量的并发任务

也就是说,处理任务的优先级为:

- 核心线程corePoolSize > 任务队列workQueue > 最大线程maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。
- 当池中的线程数大于corePoolSize的时候,多余的线程会等待keepAliveTime长的时间,如果无请求可处理就自行销毁。

newCachedThreadPool()

创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程.
线程池为无限大,当执行第二个任务时第一个任务已经完成,会复用执行第一个任务的线程,而不用每次新建线程。

    public static ExecutorService newCachedThreadPool() 
    {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
参数类型 参数名 参数值
int corePoolSize 0
int maximumPoolSize Integer.MAX_VALUE
long keepAliveTime 60L
TimeUnit unit TimeUnit.SECONDS
BlockingQueue workQueue new SynchronousQueue()
ThreadFactory threadFactory Executors.defaultThreadFactory()
RejectedExecutionHandler handler defaultHandler

newFixedThreadPool(int nThreads)

创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。定长线程池的大小最好根据系统资源进行设置。

    public static ExecutorService newFixedThreadPool(int nThreads) 
    {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
参数类型 参数名 参数值
int corePoolSize nThreads
int maximumPoolSize nThreads
long keepAliveTime 0L
TimeUnit unit TimeUnit.MILLISECONDS
BlockingQueue workQueue new LinkedBlockingQueue()
ThreadFactory threadFactory Executors.defaultThreadFactory()
RejectedExecutionHandler handler defaultHandler

newScheduledThreadPool(int corePoolSize)

创建一个定长线程池,支持定时及周期性任务执行。
``` java
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
{
return new ScheduledThreadPoolExecutor(corePoolSize);
}

public class ScheduledThreadPoolExecutorextends ThreadPoolExecutorimplements ScheduledExecutorService
{
public ScheduledThreadPoolExecutor(int corePoolSize)
{
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue());
}
}

|参数类型|参数名|参数值|
|:---:|:----:|:---:|
|int|corePoolSize|corePoolSize|
|int|maximumPoolSize|Integer.MAX_VALUE|
|long|keepAliveTime|0|
|TimeUnit|unit|NANOSECONDS|
|BlockingQueue<Runnable>|workQueue|new DelayedWorkQueue()|
|ThreadFactory|threadFactory| Executors.defaultThreadFactory()|
|RejectedExecutionHandler|handler|defaultHandler|


**newSingleThreadExecutor()**
> 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
``` java
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>()));
}
参数类型 参数名 参数值
int corePoolSize 1
int maximumPoolSize 1
long keepAliveTime 0
TimeUnit unit TimeUnit.MILLISECONDS
BlockingQueue workQueue new LinkedBlockingQueue()
ThreadFactory threadFactory Executors.defaultThreadFactory()
RejectedExecutionHandler handler defaultHandler

※defaultHandler即AbortPolicy
※threadFactory也可以自定义,以下是默认线程工厂的源码:

static class DefaultThreadFactory implements ThreadFactory {  
    private static final AtomicInteger poolNumber = new AtomicInteger(1);  
    private final ThreadGroup group;  
    private final AtomicInteger threadNumber = new AtomicInteger(1);  
    private final String namePrefix;  

    DefaultThreadFactory() {  
        SecurityManager s = System.getSecurityManager();  
        group = (s != null) ? s.getThreadGroup() :  Thread.currentThread().getThreadGroup();  
        namePrefix = "pool-" +  poolNumber.getAndIncrement() +  "-thread-";  
    }  
    // 为线程池创建新的任务执行线程  
    public Thread newThread(Runnable r) {  
        // 线程对应的任务是Runnable对象r  
        Thread t = new Thread(group, r,namePrefix + threadNumber.getAndIncrement(), 0);  
        // 设为非守护线程  
        if (t.isDaemon())  
            t.setDaemon(false);  
        // 将优先级设为Thread.NORM_PRIORITY  
        if (t.getPriority() != Thread.NORM_PRIORITY)  
            t.setPriority(Thread.NORM_PRIORITY);  
        return t;  
    }  
}

参考资料

线程池的实现原理
Java并发编程:线程池的使用
Java 7之多线程线程池 - 线程池原理(1)