江阴高新区建设促进服务中心网站,哪些网站可以做团购,百度网站关键词优化在哪里做,外贸网站建设招聘#x1f4dd;个人主页#xff1a;五敷有你 #x1f525;系列专栏#xff1a;并发编程⛺️稳重求进#xff0c;晒太阳 ThreadPoolExecutor 1) 线程池状态
ThreadPoolExecutor 使用 int 的高 3 位来表示线程池状态#xff0c;低 29 位表示线程数量 状态名 高三位 … 个人主页五敷有你 系列专栏并发编程⛺️稳重求进晒太阳 ThreadPoolExecutor 1) 线程池状态
ThreadPoolExecutor 使用 int 的高 3 位来表示线程池状态低 29 位表示线程数量 状态名 高三位 接受新任务 处理阻塞队列任务 说明 RUNNING 111 Y Y SHUTDOWN 000 N Y 不会接受新任务但会处理阻塞队列剩余任务 STOP 001 N N 会中断正在执行的任务并抛弃阻塞队列任务 TIDYING 010 - - 任务全执行完毕任务线程为0即进入终结 TERMINATED 011 - - 终结状态
从数字上比较TERMINATED TIDYING STOP SHUTDOWN RUNNING
这些信息存储在一个原子变量 ctl 中目的是将线程池状态与线程个数合二为一赋值合二为一这样就可以用一次 cas 原子操作进行赋值
// c 为旧值 ctlOf 返回结果为新值
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))));
// rs 为高 3 位代表线程池状态 wc 为低 29 位代表线程个数ctl 是合并它们
private static int ctlOf(int rs, int wc) { return rs | wc; }
2) 构造方法
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)
corePoolSize 核心线程数目 (最多保留的线程数)maximumPoolSize 最大线程数目keepAliveTime 生存时间 - 针对救急线程unit 时间单位 - 针对救急线程workQueue 阻塞队列threadFactory 线程工厂 - 可以为线程创建时起个好名字handler 拒绝策略
工作方式 流程
线程池中刚开始没有线程当一个任务提交给线程池后线程池会创建一个新线程来执行任务。当线程数达到 corePoolSize 并没有线程空闲这时再加入任务新加的任务会被加入workQueue 队列排队直到有空闲的线程。如果队列选择了有界队列那么任务超过了队列大小时会创建 maximumPoolSize - corePoolSize 数目的救急线程来救急。如果线程到达 maximumPoolSize 仍然有新任务这时会执行拒绝策略。拒绝策略 jdk 提供了 4 种实现其它著名框架也提供了实现 AbortPolicy 让调用者抛出 RejectedExecutionException 异常这是默认策略CallerRunsPolicy 让调用者运行任务DiscardPolicy 放弃本次任务DiscardOldestPolicy 放弃队列中最早的任务本任务取而代之Dubbo 的实现在抛出 RejectedExecutionException 异常之前会记录日志并 dump 线程栈信息方便定位问题Netty 的实现是创建一个新线程来执行任务ActiveMQ 的实现带超时等待60s尝试放入队列类似我们之前自定义的拒绝策略PinPoint 的实现它使用了一个拒绝策略链会逐一尝试策略链中每种拒绝策略当高峰过去后超过corePoolSize 的救急线程如果一段时间没有任务做需要结束节省资源这个时间由keepAliveTime 和 unit 来控制。 根据这个构造方法JDK Executors 类中提供了众多工厂方法来创建各种用途的线程池
3) newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());} 特点
核心线程数 最大线程数没有救急线程被创建因此也无需超时时间
阻塞队列是无界的可以放任意数量的任务
评价
适用于任务量已知相对耗时的任务。
线程工厂的使用
ExecutorService executorService Executors.newFixedThreadPool(2, new ThreadFactory() {private AtomicInteger tnew AtomicInteger(1);Overridepublic Thread newThread(Runnable r) {return new Thread(r,tt.getAndIncrement());}
});
4) newCachedThreadPool
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueueRunnable());}
特点
核心线程数是 0 最大线程数是 Integer.MAX_VALUE救急线程的空闲生存时间是 60s意味着 全部都是救急线程60s 后可以回收救急线程可以无限创建队列采用了 SynchronousQueue 实现特点是它没有容量没有线程来取是放不进去的一手交钱、一手交货
SynchronousQueueInteger integers new SynchronousQueue();
new Thread(() - {try {log.debug(putting {} , 1);integers.put(1);log.debug({} putted..., 1);log.debug(putting...{} , 2);integers.put(2);log.debug({} putted..., 2);} catch (InterruptedException e) {e.printStackTrace();}
},t1).start();
sleep(1);
new Thread(() - {try {log.debug(taking {}, 1);integers.take();} catch (InterruptedException e) {e.printStackTrace();}
},t2).start();
sleep(1);
new Thread(() - {try {log.debug(taking {}, 2);integers.take();} catch (InterruptedException e) {e.printStackTrace();}
},t3).start();
输出 评价
整个线程池表现为线程数会根据任务量不断增加没有上限当任务执行完毕空闲一分钟后释放线程。
适合任务数比较密集但每个任务执行时间比较短的情况
5newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor(){return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueueRunnable()));
}
使用场景
希望多个任务排队执行。线程数固定为 1任务数多于 1 时会放入无界队列排队。任务执行完毕这唯一的线程也不会被释放。
区别
自己创建一个单线程串行执行任务如果任务执行失败而终止那么没有任何补救措施而线程池还会新建一个线程保证池的正常工作Executors.newSingleThreadExecutor() 线程个数始终为1不能修改 FinalizableDelegatedExecutorService 应用的是装饰器模式只对外暴露了 ExecutorService 接口因此不能调用 ThreadPoolExecutor 中特有的方法Executors.newFixedThreadPool(1) 初始时为1以后还可以修改 对外暴露的是 ThreadPoolExecutor 对象可以强转后调用 setCorePoolSize 等方法进行修改
6提交任务
// 执行任务
void execute(Runnable command);// 提交任务 task用返回值 Future 获得任务执行结果
T Future submit(Callable task);// 提交 tasks 中所有任务
T ListFutureT invokeAll(Collection? extends CallableT tasks) throws InterruptedException;// 提交 tasks 中所有任务带超时时间
T ListFutureT invokeAll(Collection? extends CallableT tasks, long timeout, TimeUnit unit) throws InterruptedException;// 提交 tasks 中所有任务哪个任务先成功执行完毕返回此任务执行结果其它任务取消T T invokeAny(Collection? extends CallableT tasks) throws InterruptedException, ExecutionException;// 提交 tasks 中所有任务哪个任务先成功执行完毕返回此任务执行结果其它任务取消带超时时间T T invokeAny(Collection? extends CallableT tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
使用
ListFutureObject futures executorService.invokeAll(Arrays.asList(() - {return 徐烁大美女;},() - {return 赵菁大美女;}));
for (FutureObject future : futures) {System.out.println(future.get());
}
/*********************/
Object o executorService.invokeAny(Arrays.asList(() - {return 徐烁大美女;},() - {return 赵菁大美女;}));
System.out.println(o);
7) 关闭线程池
shutdown
/*线程池状态变为 SHUTDOWN
- 不会接收新任务
- 但已提交任务会执行完
- 此方法不会阻塞调用线程的执行就是主线程调用后会继续向下执行不会等待其他线程执行完*/void shutdown();
public void shutdown() { final ReentrantLock mainLock this.mainLock; mainLock.lock(); try { checkShutdownAccess(); // 修改线程池状态 advanceRunState(SHUTDOWN); // 仅会打断空闲线程 interruptIdleWorkers(); onShutdown(); // 扩展点 ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } // 尝试终结(没有运行的线程可以立刻终结如果还有运行的线程也不会等) tryTerminate();}
shutdownNow
/*线程池状态变为 STOP
- 不会接收新任务
- 会将队列中的任务返回
- 并用 interrupt 的方式中断正在执行的任务*/
List shutdownNow();
public List shutdownNow() {List tasks; final ReentrantLock mainLock this.mainLock; mainLock.lock(); try { checkShutdownAccess(); // 修改线程池状态 advanceRunState(STOP); // 打断所有线程 interruptWorkers(); // 获取队列中剩余任务 tasks drainQueue(); } finally {mainLock.unlock(); } // 尝试终结 tryTerminate();return tasks;}
其它方法
// 不在 RUNNING 状态的线程池此方法就返回
trueboolean isShutdown();
// 线程池状态是否是 TERMINATED
boolean isTerminated();
// 调用 shutdown 后由于调用线程并不会等待所有任务运行结束因此如果它想在线程池 TERMINATED 后做些事情可以利用此方法等待
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;