线程池简介
一个线程的生命周期由三方面组成
- 创建线程的时间
- 线程执行的时间
- 执行销毁的时间
那么只要这三方面中的某一个所需的时间减少,就能够提高程序的性能
而线程池技术就是缩短创建与销毁线程的时间。
线程池由至少四部分组成
- 线程池管理器:用于创建并管理线程池
- 工作线程:线程池中的线程
- 任务接口:每个任务必须实现的而接口,以供工作线程调度任务的执行
- 任务队列:用于存放没有处理的任务。提供一种缓冲机制
简单线程池的实现
import java.util.Date;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class Test
{
public static void main(String[] args)
{
ThreadPool t = ThreadPool.getThreadPool(3);
t.execute(new Task[] { new TestTask(), new TestTask(), new TestTask() });
System.out.println(t);
t.execute(new Task[] { new TestTask(), new TestTask(), new TestTask() });
System.out.println(t);
t.destroy();// 所有线程都执行完成才destory
System.out.println(t);
}
static class TestTask extends Task
{
@Override
public void run()
{
try
{
Thread.sleep((long)(Math.random()*100));
} catch (InterruptedException e)
{
e.printStackTrace();
}
System.out.println("线程编号"+this.getTaskId()+"线程产生时间:"+this.getGenerateTime().getTime()+"线程提交到任务队列时间:"+this.getSubmitTime().getTime()+"线程从任务队列移除的时间:"+this.getRemoveFinishTime().getTime()+"线程开始执行时间:"+this.getBeginExceuteTime().getTime());
}
}
}
final class ThreadPool
{
// 线程池中线程的默认数
private static int worker_num = 5;
// 工作线程
private WorkThread[] workThreads;
// 未处理的任务
private static volatile int removefinished_task = 0;
private static volatile int taskID = 0;
// 任务队列
private BlockingQueue<Task> taskQueue = new LinkedBlockingQueue<Task>();
private static ThreadPool threadPool;
// 创建具有默认线程个数的线程池
private ThreadPool()
{
this(worker_num);
}
// 创建线程池,worker_num为线程池中工作线程的个数
private ThreadPool(int worker_num)
{
ThreadPool.worker_num = worker_num;
workThreads = new WorkThread[worker_num];
for (int i = 0; i < worker_num; i++)
{
workThreads[i] = new WorkThread();
workThreads[i].start();// 开启线程池中的线程
}
}
// 单态模式,获得一个默认线程个数的线程池
public static ThreadPool getThreadPool()
{
return getThreadPool(ThreadPool.worker_num);
}
// 单态模式,获得一个指定线程个数的线程池,worker_num(>0)为线程池中工作线程的个数
// worker_num<=0创建默认的工作线程个数
public static ThreadPool getThreadPool(int worker_num)
{
if (worker_num <= 0)
{
worker_num = ThreadPool.worker_num;
}
if (threadPool == null)
{
threadPool = new ThreadPool(worker_num);
}
return threadPool;
}
// 执行任务,其实只是把任务加入任务队列,什么时候执行有线程池管理器觉定
public void execute(Task task)
{
synchronized (taskQueue)
{
task.setTaskId(taskID++);
task.setSubmitTime(new Date());
taskQueue.add(task);
taskQueue.notify();
}
}
// 批量执行任务,其实只是把任务加入任务队列,什么时候执行有线程池管理器觉定
public void execute(Task[] task)
{
synchronized (taskQueue)
{
for (Task t : task)
{
t.setTaskId(taskID++);
t.setSubmitTime(new Date());
taskQueue.add(t);
}
taskQueue.notify();
}
}
// 批量执行任务,其实只是把任务加入任务队列,什么时候执行有线程池管理器觉定
public void execute(List<Task> task)
{
synchronized (taskQueue)
{
for (Task t : task)
{
t.setTaskId(taskID++);
t.setSubmitTime(new Date());
taskQueue.add(t);
}
taskQueue.notify();
}
}
// 销毁线程池,该方法保证在所有任务都完成的情况下才销毁所有线程,否则等待任务完成才销毁
public void destroy()
{
while (!taskQueue.isEmpty())// 如果还有任务没执行完成,就先睡会吧
{
try
{
Thread.sleep(10);
} catch (Exception e)
{
e.printStackTrace();
}
}
// 工作线程停止工作,且置为null
for (int i = 0; i < worker_num; i++)
{
workThreads[i].stopWorker();
workThreads[i] = null;
}
threadPool = null;
taskQueue.clear();// 清空任务队列
}
// 返回工作线程的个数
public int getWorkThreadNumber()
{
return worker_num;
}
// 返回已完成任务的个数,这里的已完成是只出了任务队列的任务个数,可能该任务并没有实际执行完成
public int getFinishedTasknumber()
{
return removefinished_task;
}
// 返回任务队列的长度,即还没处理的任务个数
public int getWaitTasknumber()
{
return taskQueue.size();
}
public String toString()
{
return "WorkThread number:" + worker_num + " finished task number:"
+ removefinished_task + " wait task number:" + getWaitTasknumber();
}
/**
* 内部类,工作线程
*/
private class WorkThread extends Thread
{
// 该工作线程是否有效,用于结束该工作线程
private boolean isRunning = true;
/*
* 关键所在啊,如果任务队列不空,则取出任务执行,若任务队列空,则等待
*/
@Override
public void run()
{
Task r = null;
while (isRunning)// 注意,若线程无效则自然结束run方法,该线程就没用了
{
synchronized (taskQueue)
{
while (isRunning && taskQueue.isEmpty())// 队列为空
{
try
{
taskQueue.wait(20);
} catch (Exception e)
{
e.printStackTrace();
}
}
if (!taskQueue.isEmpty())
{
removefinished_task++;
r = taskQueue.remove();// 取出任务
r.setRemoveFinishTime(new Date());
}
}
if (r != null)
{
r.setBeginExceuteTime(new Date());
r.run();// 执行任务
}
r = null;
}
}
// 停止工作,让该线程自然执行完run方法,自然结束
public void stopWorker()
{
isRunning = false;
}
}
}
/**
* 所有任务接口 其他任务必须继承访类
*
* @author obullxl
*/
abstract class Task implements Runnable
{
/* 产生时间 */
private Date generateTime = null;
/* 提交执行时间 */
private Date submitTime = null;
/* 开始执行时间 */
private Date beginExceuteTime = null;
/* 出任务队列的时间 */
private Date removeFinishTime = null;
private long taskId;
public Task()
{
this.generateTime = new Date();
}
/**
* 任务执行入口
*/
abstract public void run();
public Date getGenerateTime()
{
return generateTime;
}
public Date getBeginExceuteTime()
{
return beginExceuteTime;
}
public void setBeginExceuteTime(Date beginExceuteTime)
{
this.beginExceuteTime = beginExceuteTime;
}
public Date getRemoveFinishTime()
{
return removeFinishTime;
}
public void setRemoveFinishTime(Date finishTime)
{
this.removeFinishTime = finishTime;
}
public Date getSubmitTime()
{
return submitTime;
}
public void setSubmitTime(Date submitTime)
{
this.submitTime = submitTime;
}
public long getTaskId()
{
return taskId;
}
public void setTaskId(long taskId)
{
this.taskId = taskId;
}
}
Java中的线程池
Java中的几种线程池最终都会调用此方法。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
- corePoolSize:核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;
- maximumPoolSize:线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;
- keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;
- unit:参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性:
- TimeUnit.DAYS; //天
- TimeUnit.HOURS; //小时
- TimeUnit.MINUTES; //分钟
- TimeUnit.SECONDS; //秒
- TimeUnit.MILLISECONDS; //毫秒
- TimeUnit.MICROSECONDS; //微妙
- TimeUnit.NANOSECONDS; //纳秒
- workQueue:线程池所使用的缓冲队列,该缓冲队列的长度决定了能够缓冲的最大数量,缓冲队列有三种通用策略:
- 直接提交。工作队列的默认选项是 SynchronousQueue,它将任务直接提交给线程而不保持它们。在此,如果不存在可用于立即运行任务的线程,则试图把任务加入队列将失败,因此会构造一个新的线程。此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。直接提交通常要求无界 maximumPoolSizes以避免拒绝新提交的任务。当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性;
- 无界队列。使用无界队列(例如,不具有预定义容量的LinkedBlockingQueue)将导致在所有 corePoolSize线程都忙时新任务在队列中等待。这样,创建的线程就不会超过 corePoolSize。(因此,maximumPoolSize的值也就无效了。)当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列;例如,在 Web 页服务器中。这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性;
- 有界队列。当使用有限的 maximumPoolSizes 时,有界队列(如ArrayBlockingQueue)有助于防止资源耗尽,但是可能较难调整和控制。队列大小和最大池大小可能需要相互折衷:使用大型队列和小型池可以最大限度地降低 CPU使用率、操作系统资源和上下文切换开销,但是可能导致人工降低吞吐量。如果任务频繁阻塞(例如,如果它们是 I/O边界),则系统可能为超过您许可的更多线程安排时间。使用小型队列通常要求较大的池大小,CPU 使用率较高,但是可能遇到不可接受的调度开销,这样也会降低吞吐量.
- threadFactory:线程工厂,主要用来创建线程;
- handler:表示当拒绝处理任务时的策略,有以下四种取值:
- ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
- ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
- ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
- ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
corePoolSize与maximumPoolSize
由于ThreadPoolExecutor 将根据 corePoolSize和 maximumPoolSize设置的边界自动调整池大小,当新任务在方法 execute(java.lang.Runnable) 中提交时:1. 如果运行的线程少于 corePoolSize,则创建新线程来处理请求,即使其他辅助线程是空闲的; 2. 如果设置的corePoolSize 和 maximumPoolSize相同,则创建的线程池是大小固定的,如果运行的线程与corePoolSize相同,当有新请求过来时,若workQueue未满,则将请求放入workQueue中,等待有空闲的线程去从workQueue中取任务并处理 3. 如果运行的线程多于 corePoolSize 而少于 maximumPoolSize,则仅当队列满时才创建新的线程去处理请求; 4. 如果运行的线程多于corePoolSize 并且等于maximumPoolSize,若队列已经满了,则通过handler所指定的策略来处理新请求; 5. 如果将 maximumPoolSize 设置为基本的无界值(如 Integer.MAX_VALUE),则允许池适应任意数量的并发任务
也就是说,处理任务的优先级为:
- 核心线程corePoolSize > 任务队列workQueue > 最大线程maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。 - 当池中的线程数大于corePoolSize的时候,多余的线程会等待keepAliveTime长的时间,如果无请求可处理就自行销毁。
newCachedThreadPool()
创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程.
线程池为无限大,当执行第二个任务时第一个任务已经完成,会复用执行第一个任务的线程,而不用每次新建线程。public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
参数类型 | 参数名 | 参数值 |
---|---|---|
int | corePoolSize | 0 |
int | maximumPoolSize | Integer.MAX_VALUE |
long | keepAliveTime | 60L |
TimeUnit | unit | TimeUnit.SECONDS |
BlockingQueue |
workQueue | new SynchronousQueue |
ThreadFactory | threadFactory | Executors.defaultThreadFactory() |
RejectedExecutionHandler | handler | defaultHandler |
newFixedThreadPool(int nThreads)
创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。定长线程池的大小最好根据系统资源进行设置。
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
参数类型 参数名 参数值 int corePoolSize nThreads int maximumPoolSize nThreads long keepAliveTime 0L TimeUnit unit TimeUnit.MILLISECONDS BlockingQueue workQueue new LinkedBlockingQueue () ThreadFactory threadFactory Executors.defaultThreadFactory() RejectedExecutionHandler handler defaultHandler
newScheduledThreadPool(int corePoolSize)
创建一个定长线程池,支持定时及周期性任务执行。
``` java
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
{
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public class ScheduledThreadPoolExecutorextends ThreadPoolExecutorimplements ScheduledExecutorService
{
public ScheduledThreadPoolExecutor(int corePoolSize)
{
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue());
}
}
|参数类型|参数名|参数值|
|:---:|:----:|:---:|
|int|corePoolSize|corePoolSize|
|int|maximumPoolSize|Integer.MAX_VALUE|
|long|keepAliveTime|0|
|TimeUnit|unit|NANOSECONDS|
|BlockingQueue<Runnable>|workQueue|new DelayedWorkQueue()|
|ThreadFactory|threadFactory| Executors.defaultThreadFactory()|
|RejectedExecutionHandler|handler|defaultHandler|
**newSingleThreadExecutor()**
> 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
``` java
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
参数类型 | 参数名 | 参数值 |
---|---|---|
int | corePoolSize | 1 |
int | maximumPoolSize | 1 |
long | keepAliveTime | 0 |
TimeUnit | unit | TimeUnit.MILLISECONDS |
BlockingQueue |
workQueue | new LinkedBlockingQueue |
ThreadFactory | threadFactory | Executors.defaultThreadFactory() |
RejectedExecutionHandler | handler | defaultHandler |
※defaultHandler即AbortPolicy
※threadFactory也可以自定义,以下是默认线程工厂的源码:
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-";
}
// 为线程池创建新的任务执行线程
public Thread newThread(Runnable r) {
// 线程对应的任务是Runnable对象r
Thread t = new Thread(group, r,namePrefix + threadNumber.getAndIncrement(), 0);
// 设为非守护线程
if (t.isDaemon())
t.setDaemon(false);
// 将优先级设为Thread.NORM_PRIORITY
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}