JUC系列学习(一):线程池Executor框架及其实现ThreadPoolExecutor-灵析社区

IT大鲨鱼

Executor框架

Executor 框架将任务的提交与任务的执行解耦了。

· Executor 顶层接口,Executor中只有一个execute方法,用于执行任务。线程的创建、调度等细节均由其子类实现

· ExecutorService 继承并扩展了Executor,在ExecutorService内部提供了更全面的提交机制以及线程池关闭等方法。

· ThreadPoolExecutor:实现了ExecutorService接口,线程池机制都封装在这个类中。

· ScheduledExecutorService:实现了ExecutorService接口,增加了定时任务的相关方法。

· ScheduledThreadPoolExecutor:继承自ThreadPoolExecutor并实现了ScheduledExecutorService接口

· ForkJoinPool:是一种支持任务分解的线程池,一般配合接口ForkJoinTask使用。

ThreadPoolExecutor

配置ThreadPoolExecutor线程池时,要避免线程池大小出现过大或过小的情况。如果线程池过大,那么大量的线程将会在cpu及内存资源上发生竞争,从而导致更高的内存使用量,严重情况下会导致资源耗尽;如果线程池过小,会导致空闲的cpu处理器无法工作,从而降低了吞吐率。要设置合适的线程池大小,需要考虑下面的几个因素:

· CPU个数( 可以通过Runtime.getRuntime().availableProcessors()获取)

· 内存大小

· 任务类型:计算密集型I/O密集型还是两者皆可。如:任务为计算密集型,当CPU处理器个数为N时,线程池大小为N+1比较合适。

public class ThreadPoolExecutor extends AbstractExecutorService {    
     public ThreadPoolExecutor(int corePoolSize,                          
                             int maximumPoolSize,
                             long keepAliveTime,                          
                             TimeUnit unit,                          
                             BlockingQueue<Runnable> workQueue,                          
                             ThreadFactory threadFactory,                          
                             RejectedExecutionHandler handler) {    
      if (corePoolSize < 0 ||
          maximumPoolSize <= 0 ||
          maximumPoolSize < corePoolSize ||        
          keepAliveTime < 0)        
          throw new IllegalArgumentException();    
      if (workQueue == null || threadFactory == null || handler == null)        
          throw new NullPointerException();    
          this.acc = System.getSecurityManager() == null ?           
          null :            
          AccessController.getContext();    
      this.corePoolSize = corePoolSize;    
      this.maximumPoolSize = maximumPoolSize;    
      this.workQueue = workQueue;    
      this.keepAliveTime = unit.toNanos(keepAliveTime);    
      this.threadFactory = threadFactory;    
      this.handler = handler;
}
}

ThreadPoolExecutor继承自AbstractExecutorService类(实现了ExecutorService接口),ThreadPoolExecutor中有四个构造器,但最后都会调用到上面代码中的这个,来看构造器中的这些参数代表的意义:

· corePoolSize(int类型): 核心线程数,默认一直存活(即使线程处于空闲状态)。

· maximumPoolSize(int类型): 线程池允许的最大线程数,其值大小>=corePoolSize。

· keepAliveTime(long类型): 线程的存活时间,默认是超过corePoolSize大小之后启动的非核心线程的存活时间,当线程池设置allowCoreThreadTimeOut=true时,对核心线程也会起作用。

· unit(TimeUnit类型): 时间单位

NANOSECONDS   //纳秒MICROSECONDS  //微秒MILLISECONDS  //毫秒SECONDS       //秒MINUTES       //分HOURS         //小时DAYS          //天

· workQueue(BlockingQueue<Runnable>) : 阻塞队列(实现了BlockingQueue接口),当线程数超过核心线程数corePoolSize大小时,会将任务放入阻塞队列中,ThreadPoolExecutor中使用了下面几种队列:

ArrayBlockingQueue :数组实现的有界阻塞队列,队列满时,后续提交的任务通过handler中的拒绝策略去处理。

LinkedBlockingQueue:链表实现的阻塞队列,默认大小是Integer.MAX_VALUE(无界队列),也可以通过传入指定队列大小capacity

SynchronousQueue:内部并没有缓存数据,缓存的是线程。当生产者线程进行添加操作(put)时必须等待消费者线程的移除操作(take)才会返回。SynchronousQueue可以用于实现生产者与消费者的同步。

PriorityBlockingQueue:二叉堆实现的优先级阻塞队列,传入队列的元素不能为null并且必须实现Comparable接口。

DelayQueue: 延时阻塞队列,队列元素需要实现Delayed接口。

· threadFactory(ThreadFactory): 线程工厂,用于创建线程。Executors中使用了默认的DefaultThreadFactory

 private static class DefaultThreadFactory implements ThreadFactory {
    private static final AtomicInteger poolNumber = new AtomicInteger(1);   
    private final ThreadGroup group;   private final AtomicInteger threadNumber = new AtomicInteger(1);   
    private final String namePrefix;   
    
    DefaultThreadFactory() {       
       SecurityManager s = System.getSecurityManager();       
       group = (s != null) ? s.getThreadGroup() :
                                    Thread.currentThread().getThreadGroup();
       namePrefix = "pool-" +
                      poolNumber.getAndIncrement() +                    
                      "-thread-";   
}   

public Thread newThread(Runnable r) {       
    Thread t = new Thread(group, r,
                          namePrefix + threadNumber.getAndIncrement(),                            
                          0);       
    if (t.isDaemon())       
        t.setDaemon(false);       
    if (t.getPriority() != Thread.NORM_PRIORITY)           
        t.setPriority(Thread.NORM_PRIORITY);       
    return t;  
} }

也可以自定义ThreadFactory,在newThread中自行配置Thread(如:配置线程名、是否是守护线程、线程优先级等)。

· handler(RejectedExecutionHandler): 提交的任务被拒绝执行的饱和策略,通常有下面几种形式:

AbortPolicy: 默认饱和策略,丢弃任务并抛出RejectedExecutionException异常。调用者可以捕获这个异常并自行处理。

CallerRunsPolicy:线程池中不再处理该任务,由调用线程处理该任务。

DiscardPolicy:当任务无法添加到队列中等待执行时,DiscardPolicy策略会丢弃任务,并且不抛异常。

DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试提交新的任务。

具体使用哪个饱和策略要根据具体的业务场景指定。当然也可以自定义RejectedExecutionHandler来自行决定拒绝任务的处理策略。

执行流程

线程池ThreadPoolExecutor的工作流程大致分为下面几步

· 当工作线程数小于核心线程数(corePoolSize)时,直接创建核心线程去执行任务

· 当线程数超过核心线程数(corePoolSize)时,将任务加入等待队列(BlockingQueue)中

· 队列满时,继续创建非核心线程去执行任务(注意:核心线程corePoolSize+非核心线程<=maximumPoolSize)

简单总结一下执行顺序:核心线程数满->队列满->最大线程数满->任务拒绝策略

用流程图大致表示为:

线程池状态

ThreadPoolExecutor有三种状态:运行、关闭、终止

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

· RUNNING: 运行状态,接收并处理队列中的任务

· SHUTDOWN: 不再接收新任务,队列中已提交的任务执行完成。

· STOP: 尝试取消所有正在运行中的任务,并且不再处理队列中尚未开始执行的任务。

· TIDYING: 所有任务已终止,线程池中线程数为变为0,此时线程池状态变为TIDYING

· TERMINATED: 线程池终止。

AsyncTask

AsyncTaskAndroid内置线程池,相当于封装了Thread+Handler,可以很方便地在后台执行耗时任务并将结果更新到UI线程AsyncTask内部的线程池是通过ThreadPoolExecutor实现的。

public abstract class AsyncTask<Params, Progress, Result> {
......
}

AsyncTask是一个抽象类,几个关键方法如下:

· onPreExecute():运行在UI线程,可以用来做一些初始化工作

· doInBackground(Params…):运行在子线程,用于执行一些耗时操作,执行过程中可以通过publishProgress(Progress…values)来通知UI线程任务的进度。

· onPostExecute(Result result):运行在UI线程,doInBackground执行完毕后返回Result,并将该值传递到onPostExecute(Result result)中。

· onProgressUpdate(Progress…):运行在UI线程,用于显示doInBackground的执行进度,在doInBackground()中执行publishProgress(Progress…values)后会回调到此方法。

· onCancelled():运行在UI线程,当调用cancel(true)后会回调此方法。此时onPostExecute()不会再执行。

注:AsyncTask有三种泛型类型,Params,Progress、Result(可以都指定为空,如AsyncTask<Void, Void, Void>),其中:

· Params:在execute(Params... params)传递,会传递到doInBackground(Params…)中。

· Progress:后台任务进度值,在onProgressUpdate(Progress…)使用

· Result:最终返回的结果类型,在onPostExecute(Result result)使用

AsyncTask中任务的串行&并行

private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();

// We want at least 2 threads and at most 4 threads in the core pool,
// preferring to have 1 less than the CPU count to avoid saturating
// the CPU with background work
private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4));
private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
private static final int KEEP_ALIVE_SECONDS = 30;

private static final ThreadFactory sThreadFactory = new ThreadFactory() {
    private final AtomicInteger mCount = new AtomicInteger(1);    
    
    public Thread newThread(Runnable r) {
            return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());    
    }
};

private static final BlockingQueue<Runnable> sPoolWorkQueue = 
       new LinkedBlockingQueue<Runnable>(128);
//并行线程池
public static final Executor THREAD_POOL_EXECUTOR;

static {
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,            
                sPoolWorkQueue, sThreadFactory);    
            threadPoolExecutor.allowCoreThreadTimeOut(true);    
            THREAD_POOL_EXECUTOR = threadPoolExecutor;
            }
//任务串行
@MainThread
public final AsyncTask<Params, Progress, Result> execute(Params... params) {
    return executeOnExecutor(sDefaultExecutor, params);}
//任务并行
@MainThread
public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec,        Params... params) {
    if (mStatus != Status.PENDING) {
            switch (mStatus) {
                        case RUNNING:        
                             throw new IllegalStateException("Cannot execute task:"                        + " the task is already running.");            
                        case FINISHED:   
                             throw new IllegalStateException("Cannot execute task:"                        + " the task has already been executed "                        + "(a task can be executed only once)");        }    }    
    mStatus = Status.RUNNING;    
    onPreExecute();    
    mWorker.mParams = params;    
    exec.execute(mFuture);    
    return this;
    }
@MainThread
public static void execute(Runnable runnable) {    
        sDefaultExecutor.execute(runnable);
}

AsynacTask中执行任务可以通过execute()或者executeOnExecutor()去提交并执行任务,其中execute()是串行执行任务,而executeOnExecutor是并行执行任务。其中并行线程池就是用的THREAD_POOL_EXECUTOR,来看串行是如何实现的:

private static class SerialExecutor implements Executor {
    final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
    Runnable mActive;

    public synchronized void execute(final Runnable r) {
        mTasks.offer(new Runnable() {
            public void run() {
                try {
                    r.run();
                } finally {
                    scheduleNext();
                }
            }
        });
        if (mActive == null) {
            scheduleNext();
        }
    }

    protected synchronized void scheduleNext() {
        if ((mActive = mTasks.poll()) != null) {
            THREAD_POOL_EXECUTOR.execute(mActive);
        }
    }
}

串行并不是将任务直接交给THREAD_POOL_EXECUTOR去执行,而是内部自行维护了一个mTasks双向队列,每次有新Runnable任务来时,先添加到队列中,然后判断当前是否有正在执行的任务(mActive != null即表示有正在执行的任务),没有的话才会将任务提交给线程池执行;否则会等待前一个任务执行完成,然后从队列中取出新任务(按FIFO顺序)继续交给线程池执行。

阅读量:2011

点赞量:0

收藏量:0