线程创建方式
2020.04.20 阅读量次创建线程
在Java中创建一个线程,有且仅有一种方式,创建一个Thread
类实例,并调用它的start
方法。
Thread
最经典也是最常见的方式是通过继承Thread
类,重写run()
方法来创建线程。适用于需要直接控制线程生命周期的情况。
public class MainTest {
public static void main(String[] args) {
ThreadDemo thread1 = new ThreadDemo();
thread1.start();
}
}
class ThreadDemo extends Thread {
@Override
public void run() {
System.out.printf("通过继承Thread类的方式创建线程,线程 %s 启动",Thread.currentThread().getName());
}
}
Runnable
实现Runnale
接口,将它作为target
参数传递给Thread
类构造函数的方式创建线程。
public class MainTest {
public static void main(String[] args) {
new Thread(() -> {
System.out.printf("通过实现Runnable接口的方式,重写run方法创建线程;线程 %s 启动",Thread.currentThread().getName());
}).start();
}
}
Callable
Callable
接口与Runnable
类似,但它可以返回结果,并且可以抛出异常,需要配合Future
接口使用。通过实现Callable
接口,来创建一个带有返回值的线程。
在Callable
执行完之前的这段时间,主线程可以先去做一些其他的事情,事情都做完之后,再获取Callable
的返回结果。可以通过isDone()
来判断子线程是否执行完。
public class MainTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<String> futureTask = new FutureTask<>(() -> {
System.out.printf("通过实现Callable接口的方式,重写call方法创建线程;线程 %s 启动", Thread.currentThread().getName());
System.out.println();
Thread.sleep(10000);
return "我是call方法返回值";
});
new Thread(futureTask).start();
System.out.println("主线程工作中 ...");
String callRet = null;
while (callRet == null){
if(futureTask.isDone()){
callRet = futureTask.get();
}
System.out.println("主线程继续工作 ...");
}
System.out.println("获取call方法返回值:"+ callRet);
}
}
在实际开发中,通常使用异步编程工具,如CompletableFuture
。
CompletableFuture
是JDK8的新特性。CompletableFuture
实现了CompletionStage
接口和Future
接口,前者是对后者的一个扩展,增加了异步会点、流式处理、多个Future
组合处理的能力,使Java在处理多任务的协同工作时更加顺畅便利。
public class CompletableFutureRunAsyncExample {
public static void main(String[] args) {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
// 异步执行的任务,没有返回值
System.out.println("Running asynchronously");
});
future.thenRun(() -> {
System.out.println("After running asynchronously");
});
future.join(); // 等待任务完成
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello")
.thenApply(result -> result + " CompletableFuture!");
}
}
线程池
线程池做的工作主要是控制运行的线程的数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务。如果线程数量超过了最大数量超出数量的线程排队等候,等其它线程执行完毕,再从队列中取出任务来执行。 线程池是一种用于管理和复用线程的机制,可以有效地提高应用程序的性能和资源利用率。它的主要特点为:线程复用,提高响应速度,管理线程。
- 降低资源消耗,通过重复利用己创建的线程降低线程创建和销毁造成的消耗。
- 提高响应速度,当任务到达时,任务可以不需要的等到线程创建就能立即执行。
- 提高线程的管理性,线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
创建方式
在Java中,使用java.util.concurrent
包中的Executor
来创建和管理线程池。几种常见的线程池创建方式:
Executors.newSingleThreadExecutor()
:创建只有一个线程的线程池。Executors.newFixedThreadPool(int)
:创建固定线程的线程池。Executors.newCachedThreadPool()
:创建一个可缓存的线程池,线程数量随着处理业务数量变化。
这三种常用创建线程池的方式,底层代码都是用ThreadPoolExecutor
创建的。
-
使用
Executors.newSingleThreadExecutor()
创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序执行。newSingleThreadExecutor
将corePoolSize
和maximumPoolSize
都设置为1,它使用的LinkedBlockingQueue
。public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
public class MainTest { public static void main(String[] args) { ExecutorService executor1 = null; try { executor1 = Executors.newSingleThreadExecutor(); for (int i = 1; i <= 10; i++) { executor1.execute(() -> { System.out.println(Thread.currentThread().getName() + "执行了"); }); } } finally { executor1.shutdown(); } } }
-
使用
Executors.newFixedThreadPool(int)
创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。newFixedThreadPool
创建的线程池corePoolSize
和maximumPoolSize
值是相等的,它使用的LinkedBlockingQueue
。public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
public class MainTest { public static void main(String[] args) { ExecutorService executor1 = null; try { executor1 = Executors.newFixedThreadPool(10); for (int i = 1; i <= 10; i++) { executor1.execute(() -> { System.out.println(Thread.currentThread().getName() + "执行了"); }); } } finally { executor1.shutdown(); } } }
-
使用
Executors.newCachedThreadPool()
创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收则新建线程。newCachedThreadPool
将corePoolSize
设置为0,将maximumPoolSize
设置为Integer.MAX_VALUE
,使用的SynchronousQueue
,也就是说来了任务就创建线程运行,当线程空闲超过60秒,就销毁线程。public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
public class MainTest { public static void main(String[] args) { ExecutorService executor1 = null; try { executor1 = Executors.newCachedThreadPool(); for (int i = 1; i <= 10; i++) { executor1.execute(() -> { System.out.println(Thread.currentThread().getName() + "执行了"); }); } } finally { executor1.shutdown(); } } }
自定义线程池
在实际开发中用哪个线程池?上面的三种一个都不用,我们生产上只能使用自定义的。Executors
类中已经给你提供了,为什么不用?
摘自《阿里巴巴开发手册》 【强制】线程资源必须通过线程池提供,不允许在应用中自行显式创建线程。 说明:线程池的好处是减少在创建和销毁线程上所消耗的时间以及系统资源的开销,解决资源不足的问题。 如果不使用线程池,有可能造成系统创建大量同类线程而导致消耗完内存或者“过度切换”的问题。
【强制】线程池不允许使用Executors
去创建,而是通过ThreadPoolExecutor
的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。 说明:Executors
返回的线程池对象的弊端如下: 1)FixedThreadPool
和SingleThreadPool
: 允许的请求队列长度为Integer.MAX_VALUE
,可能会堆积大量的请求,从而导致OOM。 2)CachedThreadPool
: 允许的创建线程数量为Integer.MAX_VALUE
,可能会创建大量的线程,从而导致 OOM。
自定义线程池代码演示:
public class MainTest {
public static void main(String[] args) {
ExecutorService executor1 = null;
try {
executor1 = new ThreadPoolExecutor(
2,
5,
1L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy());
for (int i = 1; i <= 20; i++) {
executor1.execute(() -> {
System.out.println(Thread.currentThread().getName() + "执行了");
});
}
} finally {
executor1.shutdown();
}
}
}
SpringBoot异步配置,自定义线程池代码演示:
@EnableAsync
@Configuration
public class AsyncConfig {
/**
* 线程空闲存活的时间 单位: TimeUnit.SECONDS
*/
public static final int KEEP_ALIVE_TIME = 60 * 60;
/**
* CPU 核心数量
*/
private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
/**
* 核心线程数量
*/
public static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4));
/**
* 线程池最大容纳线程数量
* IO密集型:即存在大量堵塞; 公式: CPU核心数量 / 1- 阻塞系数 (阻塞系统在 0.8~0.9 之间)
* CPU密集型: 需要大量运算,没有堵塞或很少有; 公式:CPU核心数量 + 1
*/
public static final int IO_MAXIMUM_POOL_SIZE = (int) (CPU_COUNT / (1 - 0.9));
public static final int CPU_MAXIMUM_POOL_SIZE = CPU_COUNT + 2;
/**
* 执行写入请求时的线程池
*
* @return 线程池
*/
@Bean(name = "iSaveTaskThreadPool")
public Executor iSaveTaskThreadPool() {
return getThreadPoolTaskExecutor("iSaveTaskThreadPool-",IO_MAXIMUM_POOL_SIZE,100000,new ThreadPoolExecutor.CallerRunsPolicy());
}
/**
* 执行读请求时的线程池
*
* @return 线程池
*/
@Bean(name = "iQueryThreadPool")
public Executor iQueryThreadPool() {
return getThreadPoolTaskExecutor("iQueryThreadPool-",CPU_MAXIMUM_POOL_SIZE,10000,new ThreadPoolExecutor.CallerRunsPolicy());
}
/**
* 创建一个线程池对象
* @param threadNamePrefix 线程名称
* @param queueCapacity 堵塞队列长度
* @param refusePolicy 拒绝策略
*/
private ThreadPoolTaskExecutor getThreadPoolTaskExecutor(String threadNamePrefix,int maxPoolSize,int queueCapacity,ThreadPoolExecutor.CallerRunsPolicy refusePolicy) {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(CORE_POOL_SIZE);
taskExecutor.setMaxPoolSize(maxPoolSize);
taskExecutor.setKeepAliveSeconds(KEEP_ALIVE_TIME);
taskExecutor.setThreadNamePrefix(threadNamePrefix);
// 拒绝策略; 既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量
taskExecutor.setRejectedExecutionHandler(refusePolicy);
// 阻塞队列 长度
taskExecutor.setQueueCapacity(queueCapacity);
taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
taskExecutor.setAwaitTerminationSeconds(60);
taskExecutor.initialize();
return taskExecutor;
}
}
线程池工作原理
在创建了线程池后,等待提交过来的任务请求,当调用execute
方法添加一个请求任务时,线程池会做如下判断:
- 如果当前运行的线程数小于
corePoolSize
,那么马上创建线程运行该任务。 - 如果当前运行的线程数大于等于
corePoolSize
,那么该任务会被放入任务队列。 - 如果这时候任务队列满了且正在运行的线程数还小于
maximumPoolSize
,那么要创建非核心线程立刻运行这个任务扩容。 - 如果任务队列满了且正在运行的线程数等于
maximumPoolSize
,那么线程池会启动饱和拒绝策略来执行。 - 随着时间的推移,业务量越来越少,线程池中出现了空闲线程。当一个线程无事可做超过一定的时间时,线程池会进行判断,如果当前运行的线程数大于
corePoolSize
,那么这个线程就被停掉,所以线程池的所有任务完成后它最终会收缩到corePoolSize
的大小。
阻塞队列
阻塞队列,顾名思义,首先它是一个队列,一个阻塞队列在数据结构中所起的作用:
- 当阻塞队列是空时,从队列中获取元素的操作将会被阻塞。
- 当阻塞队列是满时,往队列里添加元素的操作将会被阻塞。
在多线程编程中,阻塞队列扮演着重要角色,特别适用于生产者-消费者模式,确保线程之间的同步和有序执行。阻塞队列的本质是一种数据结构,用于存储待执行的任务。 当任务提交给线程池时,如果核心线程已满或任务队列达到容量上限,新任务将被放入阻塞队列中,等待执行条件的满足。
阻塞在多线程领域指的是线程因某些条件而暂停执行,一旦条件满足,线程会被自动唤醒继续执行。这种机制保证了线程池的任务按照预期顺序执行,有效地管理并发任务的执行流程。
在Java中,常见的线程池阻塞队列包括:
ArrayBlockingQueue
: 由数组结构组成的有界阻塞队列。它按照 FIFO(先进先出)的顺序对元素进行排序。LinkedBlockingQueue
: 由链表结构组成的有界(默认大小为Integer.MAX_VALUE
,大约21亿)阻塞队列。同样按照 FIFO 的顺序对元素进行排序。PriorityBlockingQueue
: 支持优先级排序的无界阻塞队列。元素按照它们的优先级顺序被处理,具有最高优先级的元素总是被队列中的下一个要处理的元素。DelayQueue
: 使用优先级队列实现的延迟无界阻塞队列。队列中的元素只有在其指定的延迟时间到达时才能被取出。SynchronousQueue
: 不存储元素的阻塞队列,每个插入操作必须等待一个对应的移除操作。用于直接传递任务的场景。LinkedTransferQueue
: 由链表结构组成的无界阻塞队列,支持生产者-消费者的传输机制。与其他队列不同,它支持优先级传输。LinkedBlockingDeque
: 由链表结构组成的双向阻塞队列。它支持在队列的两端进行插入和移除操作,是一种双端队列。
当你自定义线程池时,选择合适的阻塞队列是非常重要的。阻塞队列就像一个存放任务的“箱子”,线程池中的任务先放到这里,然后线程池的线程再从这里取出来执行。
如果你的系统可能会有很多任务一起提交,可以考虑用能存很多任务的队列,比如LinkedBlockingQueue
。这样即使任务多了,也不会丢失。
如果你的任务有优先级,比如有些任务比其他的更重要,那就选PriorityBlockingQueue
。它会按照任务的优先级来决定哪个任务先执行。
如果应用程序需要限制内存使用,并希望在达到容量限制时阻塞新任务提交,可以选择ArrayBlockingQueue
。
线程池参数
在Java中,线程池的创建和管理通过java.util.concurrent.ThreadPoolExecutor
类完成。理解这个类构造函数的参数可以帮助我们更好地配置和优化线程池的运行效果。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
// ...
}
corePoolSize
: 核心线程数。当提交一个新任务时,如果当前运行的线程少于corePoolSize
,则即使有空闲的工作线程,也会创建一个新线程来执行任务。 核心线程在ThreadPoolExecutor
的生命周期内始终存活,除非设置了allowCoreThreadTimeOut
。maximumPoolSize
:线程池能够容纳同时执行的最大线程数,此值必须大于等于1。当任务队列满时,如果当前运行的线程数少于maximumPoolSize
,则会创建新的线程来执行任务。threadFactory
:线程工厂,一般用默认的即可。用于创建新线程,通常用来给线程设置名称、设置为守护线程等。workQueue
:任务队列,用于保存等待执行任务的队列。随着业务量的增多,线程开始慢慢处理不过来,这时候需要放到任务队列中去等待线程处理。rejectedExecutionHandler
:拒绝策略。如果业务越来越多,线程池首先会扩容,扩容后发现还是处理不过来,任务队列已经满了,处理被拒绝任务的策略。AbortPolicy
: 默认拒绝策略;直接抛出java.util.concurrent.RejectedExecutionException
异常,阻止系统的正常运行;CallerRunsPolicy
:调用这运行,一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量;DiscardOldestPolicy
:抛弃队列中等待最久的任务,然后把当前任务加入到队列中;DiscardPolicy
:直接丢弃任务,不给予任何处理也不会抛出异常;如果允许任务丢失,这是一种最好的解决方案;
keepAliveTime
:多余的空闲线程的存活时间。如果线程池扩容后,能处理过来,而且数据量并没有那么大,用最初的线程数量就能处理过来,剩下的线程被叫做空闲线程。keepAliveTime
指的是当线程数超过corePoolSize
时,多余的空闲线程在等待新任务到来之前可以存活的最长时间。如果设置为0,则超出核心线程数的空闲线程会立即终止。unit
:keepAliveTime
参数的时间单位,可以是TimeUnit.SECONDS
、TimeUnit.MILLISECONDS
等。
合理配置线程池参数
合理配置线程池参数,可以分为以下两种情况:
- CPU密集型:CPU密集的意思是该任务需要大量的运算,而没有阻塞,CPU一直全速运行;CPU密集型任务配置尽可能少的线程数量:
参考公式:(CPU核数+1)
- IO密集型:即该任务需要大量的IO,即大量的阻塞;在IO密集型任务中使用多线程可以大大的加速程序运行,故需要多配置线程数:参考公式:
CPU核数/ (1-阻塞系数) 阻塞系数在0.8~0.9之间
public class MainTest {
public static void main(String[] args) {
ExecutorService executor1 = null;
try {
// 获取CPU核心数
int coreNum = Runtime.getRuntime().availableProcessors();
/*
* 1. IO密集型: CPU核数/ (1-阻塞系数) 阻塞系数在0.8~0.9之间
* 2. CPU密集型: CPU核数+1
*/
// int maximumPoolSize = coreNum + 1;
int maximumPoolSize = (int) (coreNum / (1 - 0.9));
System.out.println("当前线程池最大允许存放:" + maximumPoolSize + "个线程");
executor1 = new ThreadPoolExecutor(
2,
maximumPoolSize,
1L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy());
for (int i = 1; i <= 20; i++) {
executor1.execute(() -> {
System.out.println(Thread.currentThread().getName() + "执行了");
});
}
} finally {
executor1.shutdown();
}
}
}