ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
corePoolSize:核心池的大小。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;
maximumPoolSize:线程池最大线程数。这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;
keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;
unit:参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性
workQueue:一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择:ArrayBlockingQueue; LinkedBlockingQueue; SynchronousQueue; PriorityBlockingQueue ArrayBlockingQueue和PriorityBlockingQueue使用较少,一般使用LinkedBlockingQueue和SynchronousQueue。线程池的排队策略与BlockingQueue有关。
threadFactory:线程工厂,主要用来创建线程
handler:表示当拒绝处理任务时的策略。有四种取值:
AbortPolicy:丢弃任务并抛出RejectedExecutionException异常;
DiscardPolicy:也是丢弃任务,但是不抛出异常
DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程);
CallerRunsPolicy:由调用线程处理该任务。
介绍JDK提供的线程池:
newSingleThreadExecutor,
newFixedThreadPool,
newCachedThreadPool,
newScheduledThreadPool,
newSingleThreadScheduledExecutor
newWorkStealingPool
创建一个单线程化的Executor,即只创建唯一的工作者线程来执行任务,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO,优先级)执行。如果这个线程异常结束,会有另一个取代它,保证顺序执行。单工作线程最大的特点是可保证顺序地执行各个任务,并且在任意给定的时间不会有多个线程是活动的。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue()));
}
示例代码如下:
package test;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolExecutorTest {
public static void main(String[] args) {
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 10; i++) {
final int index = i;
singleThreadExecutor.execute(new Runnable() {
public void run() {
try {
System.out.println(index);
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
}
创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue());
}
创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue());
}
当核心线程为0,最大线程数为无穷大。
与其他Java线程池比较,当新任务到来时,线程池会创建一个线程。
如果此任务没有执行完,又往线程池添加任务,线程池又会创建新的线程处理新加任务。
如果有任务执行完毕,并且没超过1分钟,这时又有新任务进来,此时不会创建新线程,而是使用执行完测任务的线程进行对新任务进行处理。
但是假如线程池中任务都没处理完,又有大批量的任务进来,系统会由于大量线程创建执行而瘫痪。
这种类型的线程池特点是:
示例代码如下:
package test;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolExecutorTest {
public static void main(String[] args) {
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
final int index = i;
try {
Thread.sleep(index * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
cachedThreadPool.execute(new Runnable() {
public void run() {
System.out.println(index);
}
});
}
}
}
创建一个定长的线程池,而且支持定时的以及周期性的任务执行,支持定时及周期性任务执行。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
延迟3秒执行,延迟执行示例代码如下:
package test;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ThreadPoolExecutorTest {
public static void main(String[] args) {
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
scheduledThreadPool.schedule(new Runnable() {
public void run() {
System.out.println("delay 3 seconds");
}
}, 3, TimeUnit.SECONDS);
}
}
表示延迟1秒后每3秒执行一次,定期执行示例代码如下:
package test;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ThreadPoolExecutorTest {
public static void main(String[] args) {
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
public void run() {
System.out.println("delay 1 seconds, and excute every 3 seconds");
}
}, 1, 3, TimeUnit.SECONDS);
}
}
5. newSingleThreadScheduledExecutor
创建只有一条线程的线程池,他可以在指定延迟后执行线程任务
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
创建一个拥有多个任务队列(以便减少连接数)的线程池
这是jdk1.8中新增加的一种线程池实现,先看一下它的无参实现
返回的ForkJoinPool从jdk1.7开始引进,个人感觉类似于mapreduce的思想。这个线程池较为特殊,将在后续博客中给出详细的使用说明和原理。
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
线程池对比:
线程池 | 特点 | 建议使用场景 |
---|---|---|
newCachedThreadPool | 1、线程数无上限 2、空闲线程存活60s 3、阻塞队列 |
1、任务执行时间短 2、任务要求响应时间短 |
newFixedThreadPool | 1、线程数固定 2、无界队列 |
1、任务比较平缓 2、控制最大的线程数 |
newScheduledThreadPool | 核心线程数量固定、非核心线程数量无限制(闲置时马上回收) | 执行定时 / 周期性 任务 |
newSingleThreadExecutor | 只有一个核心线程(保证所有任务按照指定顺序在一个线程中执行,不需要处理线程同步的问题) | 不适合并发但可能引起IO阻塞性及影响UI线程响应的操作,如数据库操作,文件操作等 |
现象 | 原因 |
---|---|
整个系统影响缓慢,大部分504 | 1、为设置最大的线程数,任务积压过多,线程数用尽 |
oom | 1、队列无界或者size设置过大 |
使用线程池对效率并没有明显的提升 | 1、线程池的参数设置过小,线程数过小或者队列过小,或者是服务器的cpu核数太低 |
全部评论