而且会介绍其重要API的利用方法betvictor1946,然后介绍了其使用时的小心点

       本文主要分为三个部分,第二有的首先会对ScheduledThreadPoolExecutor进行简短的牵线,并且会介绍其首要API的利用方法,然后介绍了其选取时的注意点,第2部分则根本对ScheduledThreadPoolExecutor的达成细节举行介绍。

       本文重要分为八个部分,第三有的首先会对ScheduledThreadPoolExecutor进行简要的介绍,并且会介绍其重庆大学API的运用办法,然后介绍了其应用时的注意点,第壹部分则首要对ScheduledThreadPoolExecutor的落到实处细节实行介绍。

1. 使用简介

       ScheduledThreadPoolExecutor是三个使用线程池执行定时职分的类,相较于Java中提供的另三个实践定时职分的类Timer,其关键有如下三个优点:

  • 运用十六线程执行职务,不用操心职务执行时间过长而招致职责相互阻塞的动静,Timer是单线程执行的,由此会产出这些标题;
  • 不要顾虑任务执行进度中,若是线程失活,其会新建线程执行职分,提姆er类的单线程挂掉之后是不会再度创制线程执行后续义务的。

       除去上述两个亮点外,ScheduledThreadPoolExecutor还提供了非凡灵活的API,用于实践义务。其任务的施行策略首要分为两大类:①在早晚延迟之后只进行一次有些任务;②在任其自然延迟之晋代期性的实践有些职分。如下是其重庆大学API:

public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                 long initialDelay, long delay, TimeUnit unit);
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay, long period, TimeUnit unit);

       上述多少个措施中,第三个和第三个措施属于第叁类,即在delay钦点的延迟之后执行第③个参数所钦定的任务,差异在于,第3个法子执行之后会有重回值,而首先个办法执行之后是未曾重返值的。首个和第多个点子则属于第贰类,即在其次个参数(initialDelay)钦定的时日过后开端周期性的实践任务,执行周时期隔为第多个参数钦命的时光,可是那多个艺术的分别在于第多少个情势执行职责的区间是固定的,无论上三个职务是不是实施到位,而第8个法子的实施时间距离是不定点的,其会在周期任务的上二个职务履行到位以往才开端计时,并在内定时间间隔之后才起来履行职务。如下是利用scheduleWithFixedDelay()和scheduleAtFixedRate()方法编写的测试用例:

public class ScheduledThreadPoolExecutorTest {
  private ScheduledThreadPoolExecutor executor;
  private Runnable task;

  @Before
  public void before() {
    executor = initExecutor();
    task = initTask();
  }

  private ScheduledThreadPoolExecutor initExecutor() {
    return new ScheduledThreadPoolExecutor(2);;
  }

  private Runnable initTask() {
    long start = System.currentTimeMillis();
    return () -> {
      print("start task: " + getPeriod(start, System.currentTimeMillis()));
      sleep(SECONDS, 10);
      print("end task: " + getPeriod(start, System.currentTimeMillis()));
    };
  }

  @Test
  public void testFixedTask() {
    print("start main thread");
    executor.scheduleAtFixedRate(task, 15, 30, SECONDS);
    sleep(SECONDS, 120);
    print("end main thread");
  }

  @Test
  public void testDelayedTask() {
    print("start main thread");
    executor.scheduleWithFixedDelay(task, 15, 30, SECONDS);
    sleep(SECONDS, 120);
    print("end main thread");
  }

  private void sleep(TimeUnit unit, long time) {
    try {
      unit.sleep(time);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }

  private int getPeriod(long start, long end) {
    return (int)(end - start) / 1000;
  }

  private void print(String msg) {
    System.out.println(msg);
  }
}

       能够看看,上述多个测试用例代码块基本是一致的,区别在于第三个用例调用的是scheduleAtFixedRate()方法,而第1个用例调用的是scheduleWithFixedDelay()。那里八个用例都以安装的在延迟15s后每一个30s执行1次钦定的职分,而该职务履行时长为10s。如下分别是那多少个测试用例的实施结果:

start main thread
start task: 15
end task: 25
start task: 45
end task: 55
start task: 75
end task: 85
start task: 105
end task: 115
end main thread

start main thread
start task: 15
end task: 25
start task: 55
end task: 65
start task: 95
end task: 105
end main thread

      相比较上述执行结果能够见见,对于scheduleAtFixedRate()方法,其每一趟执行任务的伊始时间距离都为固定不变的30s,与职务执行时间长度无关,而对于scheduleWithFixedDelay()方法,其每回执行职分的开始时间间隔都为上次任务履行时间拉长钦定的刻钟间隔。

       那里关于ScheduledThreadPoolExecutor的应用有三点必要验证如下:

  • ScheduledThreadPoolExecutor继承自ThreadPoolExecutor(ThreadPoolExecutor详解betvictor1946,),由此也有持续而来的execute()和submit()方法,不过ScheduledThreadPoolExecutor重写了这五个点子,重写的方法是向来开立五个立即施行并且只进行2遍的职务;
  • ScheduledThreadPoolExecutor使用ScheduledFutureTask封装每一个供给实践的职分,而职分都以放入DelayedWorkQueue队列中的,该队列是2个施用数组实现的先期队列,在调用ScheduledFutureTask::cancel()方法时,其会基于removeOnCancel变量的安装来确认是还是不是必要将当前职责真正的从队列中移除,而不只是标识其为已去除状态;
  • ScheduledThreadPoolExecutor提供了多个钩子方法decorateTask(Runnable,
    RunnableScheduledFuture)用于对实施的天职进展装点,该办法第③个参数是调用方传入的职分实例,第③个参数则是应用ScheduledFutureTask对用户传入义务实例进行打包之后的实例。那里要求留意的是,在ScheduledFutureTask对象中有3个heapIndex变量,该变量用于记录当前实例处于队列数组中的下标地点,该变量能够将诸如contains(),remove()等艺术的小时复杂度从O(N)下跌到O(logN),因此作用升高是相比较高的,但是假若那里用户重写decorateTask()方法封装了队列中的职责实例,那么heapIndex的优化就不存在了,由此这里强烈建议是不择手段不要重写该措施,或然重写时也依旧复用ScheduledFutureTask类。

1. 利用简介

       ScheduledThreadPoolExecutor是3个使用线程池执行定时职分的类,相较于Java中提供的另3个进行定时任务的类提姆er,其主要性有如下四个优点:

  • 动用多线程执行职分,不用担心任务执行时间过长而造成任务互相阻塞的意况,Timer是单线程执行的,因此会并发这几个题材;
  • 无须操心职责执行进度中,假使线程失活,其会新建线程执行职责,Timer类的单线程挂掉之后是不会再也创制线程执行后续职分的。

       除去上述多个优点外,ScheduledThreadPoolExecutor还提供了相当灵活的API,用于实践任务。其任务的施行政策重要分为两大类:①在必然延迟之后只进行2次某些职务;②在肯定延迟之西魏期性的进行某些职责。如下是其关键API:

public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                 long initialDelay, long delay, TimeUnit unit);
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay, long period, TimeUnit unit);

       上述八个格局中,第3个和首个方法属于第壹类,即在delay钦点的推迟之后执行第多个参数所钦点的任务,分裂在于,第3个点子执行之后会有再次回到值,而首先个章程执行之后是从未有过再次回到值的。第5个和第三个措施则属于第②类,即在其次个参数(initialDelay)钦赐的年华之后起头周期性的实践任务,执行周时期隔为第三个参数钦点的年月,不过那八个法子的分歧在于第十二个法子执行任务的间距是一定的,无论上三个任务是不是实行到位,而第多个主意的施行时间距离是不定点的,其会在周期职分的上2个职务履行到位未来才起来计时,并在钦赐时间距离之后才开头推行任务。如下是利用scheduleWithFixedDelay()和scheduleAtFixedRate()方法编写的测试用例:

public class ScheduledThreadPoolExecutorTest {
  private ScheduledThreadPoolExecutor executor;
  private Runnable task;

  @Before
  public void before() {
    executor = initExecutor();
    task = initTask();
  }

  private ScheduledThreadPoolExecutor initExecutor() {
    return new ScheduledThreadPoolExecutor(2);;
  }

  private Runnable initTask() {
    long start = System.currentTimeMillis();
    return () -> {
      print("start task: " + getPeriod(start, System.currentTimeMillis()));
      sleep(SECONDS, 10);
      print("end task: " + getPeriod(start, System.currentTimeMillis()));
    };
  }

  @Test
  public void testFixedTask() {
    print("start main thread");
    executor.scheduleAtFixedRate(task, 15, 30, SECONDS);
    sleep(SECONDS, 120);
    print("end main thread");
  }

  @Test
  public void testDelayedTask() {
    print("start main thread");
    executor.scheduleWithFixedDelay(task, 15, 30, SECONDS);
    sleep(SECONDS, 120);
    print("end main thread");
  }

  private void sleep(TimeUnit unit, long time) {
    try {
      unit.sleep(time);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }

  private int getPeriod(long start, long end) {
    return (int)(end - start) / 1000;
  }

  private void print(String msg) {
    System.out.println(msg);
  }
}

       能够看看,上述三个测试用例代码块基本是平等的,分歧在于第二个用例调用的是scheduleAtFixedRate()方法,而第四个用例调用的是scheduleWithFixedDelay()。那里三个用例都是安装的在延迟15s后每一种30s执行壹遍钦定的职分,而该任务履行时间长度为10s。如下分别是那四个测试用例的施行结果:

start main thread
start task: 15
end task: 25
start task: 45
end task: 55
start task: 75
end task: 85
start task: 105
end task: 115
end main thread

start main thread
start task: 15
end task: 25
start task: 55
end task: 65
start task: 95
end task: 105
end main thread

      比较上述执行结果能够看来,对于scheduleAtFixedRate()方法,其每便执行职分的晌卯时间距离都为一定不变的30s,与职分执行时间长度无关,而对于scheduleWithFixedDelay()方法,其每一回执行职务的始发时间间隔都为上次职分履行时间增加内定的时日间隔。

       那里关于ScheduledThreadPoolExecutor的选取有三点必要申明如下:

  • ScheduledThreadPoolExecutor继承自ThreadPoolExecutor(ThreadPoolExecutor详解),由此也有接二连三而来的execute()和submit()方法,不过ScheduledThreadPoolExecutor重写了那八个方法,重写的艺术是一贯开立多个立刻实施并且只实行壹次的职务;
  • ScheduledThreadPoolExecutor使用ScheduledFutureTask封装每种需求实践的职务,而任务都是放入DelayedWorkQueue队列中的,该队列是七个应用数组完毕的先期队列,在调用ScheduledFutureTask::cancel()方法时,其会基于removeOnCancel变量的安装来确认是还是不是必要将当前职务真正的从队列中移除,而不只是标识其为已删除状态;
  • ScheduledThreadPoolExecutor提供了3个钩子方法decorateTask(Runnable,
    RunnableScheduledFuture)用于对履行的天职拓展装裱,该格局第2个参数是调用方传入的职分实例,第一个参数则是运用ScheduledFutureTask对用户传入任务实例进行李包裹装之后的实例。那里必要专注的是,在ScheduledFutureTask对象中有贰个heapIndex变量,该变量用于记录当前实例处于队列数组中的下标地点,该变量能够将诸如contains(),remove()等艺术的年华复杂度从O(N)下跌到O(logN),由此效率进步是相比高的,不过假设那里用户重写decorateTask()方法封装了队列中的职责实例,那么heapIndex的优化就不存在了,因此那里强烈建议是不择手段不要重写该方法,只怕重写时也依然复用ScheduledFutureTask类。

2. 源码详解

2. 源码详解

2.1 主要质量

       ScheduledThreadPoolExecutor首要有八个属性,分别如下:

private volatile boolean continueExistingPeriodicTasksAfterShutdown;

private volatile boolean executeExistingDelayedTasksAfterShutdown = true;

private volatile boolean removeOnCancel = false;

private static final AtomicLong sequencer = new AtomicLong();
  • continueExistingPeriodicTasksAfterShutdown:用于标识当前Executor对象shutdown时,是不是继续执行已经存在于任务队列中的定时任务(调用scheduleAtFixedRate()方法生成的天职);
  • executeExistingDelayedTasksAfterShutdown:用于标识当前Executor对象shutdown时,是或不是继续执行已经存在于任务队列中的定时职分(调用scheduleWithFixedDelay()方法生成的天职);
  • removeOnCancel:用于标识假若当前职务已经撤回了,是还是不是将其从任务队列中的确的移除,而不只是标识其为除去状态;
  • sequencer:其为一个AtomicLong类型的变量,该变量记录了当前任务被创设时是第多少个职责的3个序号,那么些序号的重点用来确认当八个职分开端履行时间相同时具体哪些职务先实施,比如五个职责的发端施行时间都为1515847881158,那么序号小的职分将先实施。

2.1 重要质量

       ScheduledThreadPoolExecutor主要有八个属性,分别如下:

private volatile boolean continueExistingPeriodicTasksAfterShutdown;

private volatile boolean executeExistingDelayedTasksAfterShutdown = true;

private volatile boolean removeOnCancel = false;

private static final AtomicLong sequencer = new AtomicLong();
  • continueExistingPeriodicTasksAfterShutdown:用于标识当前Executor对象shutdown时,是还是不是继续执行已经存在于职务队列中的定时任务(调用scheduleAtFixedRate()方法生成的天职);
  • executeExistingDelayedTasksAfterShutdown:用于标识当前Executor对象shutdown时,是还是不是继续执行已经存在于职务队列中的定时任务(调用scheduleWithFixedDelay()方法生成的天职);
  • removeOnCancel:用于标识假如当前职责已经撤回了,是还是不是将其从职分队列中确实的移除,而不只是标识其为除去状态;
  • sequencer:其为二个AtomicLong类型的变量,该变量记录了当前职务被创设时是第多少个职责的贰个序号,那么些序号的重点用以确认当七个任务起先推行时间一模一样时具体哪些职务先实施,比如多少个职分的上马执行时间都为1515847881158,那么序号小的任务将先举办。

2.2 ScheduledFutureTask

       在ScheduledThreadPoolExecutor中,主要运用ScheduledFutureTask封装须要实施的任务,该类的显要阐明如下:

private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {

  private final long sequenceNumber;    // 记录当前实例的序列号
  private long time;    // 记录当前任务下次开始执行的时间

  // 记录当前任务执行时间间隔,等于0则表示当前任务只执行一次,大于0表示当前任务为fixedRate类型的任务,
  // 小于0则表示其为fixedDelay类型的任务
  private final long period;

  RunnableScheduledFuture<V> outerTask = this;  // 记录需要周期性执行的任务的实例
  int heapIndex;    // 记录当前任务在队列数组中位置的下标

  ScheduledFutureTask(Runnable r, V result, long ns, long period) {
    super(r, result);
    this.time = ns;
    this.period = period;
    this.sequenceNumber = sequencer.getAndIncrement();  // 序号在创建任务实例时指定,且后续不会变化
  }

  public long getDelay(TimeUnit unit) {
    return unit.convert(time - now(), NANOSECONDS);
  }

  // 各个任务在队列中的存储方式是一个基于时间和序号进行比较的优先队列,当前方法定义了优先队列中两个
  // 任务执行的先后顺序。这里先对两个任务开始执行时间进行比较,时间较小者优先执行,若开始时间相同,
  // 则比较两个任务的序号,序号小的任务先执行
  public int compareTo(Delayed other) {
    if (other == this)
      return 0;
    if (other instanceof ScheduledFutureTask) {
      ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
      long diff = time - x.time;
      if (diff < 0)
        return -1;
      else if (diff > 0)
        return 1;
      else if (sequenceNumber < x.sequenceNumber)
        return -1;
      else
        return 1;
    }
    long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
    return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
  }

  public boolean isPeriodic() { // 判断是否为周期性任务
    return period != 0;
  }

  // 当前任务执行之后,会判断当前任务是否为周期性任务,如果为周期性任务,那么就调用当前方法计算
  // 当前任务下次开始执行的时间。这里如果当前任务是fixedRate类型的任务(p > 0),那么下次执行时间
  // 就是此次执行的开始时间加上时间间隔,如果当前任务是fixedDelay类型的任务(p < 0),那么下次执行
  // 时间就是当前时间(triggerTime()方法会获取系统当前时间)加上任务执行时间间隔。可以看到,定频率
  // 和定延迟的任务的执行时间区别就在当前方法中进行了指定,因为调用当前方法时任务已经执行完成了,
  // 因而triggerTime()方法中获取的时间就是任务执行完成之后的时间点
  private void setNextRunTime() {
    long p = period;
    if (p > 0)
      time += p;
    else
      time = triggerTime(-p);
  }

  // 取消当前任务的执行,super.cancel(boolean)方法也即FutureTask.cancel(boolean)方法。该方法传入
  // true表示如果当前任务正在执行,那么立即终止其执行;传入false表示如果当前方法正在执行,那么等待其
  // 执行完成之后再取消当前任务。
  public boolean cancel(boolean mayInterruptIfRunning) {
    boolean cancelled = super.cancel(mayInterruptIfRunning);
    // 判断是否设置了取消后移除队列中当前任务,是则移除当前任务
    if (cancelled && removeOnCancel && heapIndex >= 0)  
      remove(this);
    return cancelled;
  }

  public void run() {
    boolean periodic = isPeriodic();    // 判断是否为周期性任务
    if (!canRunInCurrentRunState(periodic)) // 判断是否能够在当前状态下执行该任务
      cancel(false);
    else if (!periodic) // 如果能执行当前任务,但是任务不是周期性的,那么就立即执行该任务一次
      ScheduledFutureTask.super.run();
    else if (ScheduledFutureTask.super.runAndReset()) { // 是周期性任务,则立即执行当前任务并且重置
      setNextRunTime(); // 在当前任务执行完成后调用该方法计算当前任务下次执行的时间
      reExecutePeriodic(outerTask); // 将当前任务放入任务队列中以便下次执行
    }
  }
}

       在ScheduledFutureTask中,首要有四个点需求强调:

  • 对此run()方法的率先个分支,canRunInCurrentRunState()方法的表明如下所示,能够见到,该格局是用来判断当前职分若是为周期性职分,那么其是不是同意在shutdown状态下继续执行已经存在的周期性任务,是则代表如今气象下是足以实施当前职责的,那里isRunningOrShutdown()方法继承自ThreadPoolExecutor;

    boolean canRunInCurrentRunState(boolean periodic) {
    return isRunningOrShutdown(periodic ?

                             continueExistingPeriodicTasksAfterShutdown :
                             executeExistingDelayedTasksAfterShutdown);
    

    }

  • 在run()方法的最后二个if分支中,其首先会执行当前任务,在进行到位时才会调用setNextRun提姆e()方法设置下次职务执行时间,相当于说对于fixedRate和fixedDelay类型的任务都以在那些时刻点才设置的,因此即便fixedRate类型的天职,即便该任务下次执行时间比当下岁月要早,其也只会在当前任务执行到位后立即施行,而不会与当前职责还未履行完时就执行;对于fixedDelay职分则不会存在该难题,因为其是以任务完结后的大运点为根基测算下次执行的时间点;

  • 对此run()方法的末段七个支行中的reExecutePeriodic()方法,其会将当前任务参与到职务队列中,并且调用父类的ensurePrestart()方法确认保证有可用的线程来举办当前任务,如下是该措施的求实完成:

    void reExecutePeriodic(RunnableScheduledFuture task) {
    if (canRunInCurrentRunState(true)) { // 判断当前任务是还是不是能够继续执行

    super.getQueue().add(task); // 将当前任务加入到任务队列中
    if (!canRunInCurrentRunState(true) && remove(task)) // 双检查法判断任务在加入过程中是否取消了
      task.cancel(false);
    else
      ensurePrestart(); // 初始化核心线程等确保任务可以被执行
    

    }
    }

       从ScheduledFutureTask的落到实处计算来看,当每成立一个此类实例时,会早先化该类的部分关键质量,如下次始于履行的时辰和执行的周期。当有个别线程调用该职分,即执行该任务的run()方法时,假设该义务不为周期性任务,那么执行该职分之后就不会有其它的动作,要是该职分为周期性职务,那么在将当前任务执行实现之后,还会重置当前职责的景况,并且计算下次履行当前任务的大运,然后将其放入队列中以便下次执行。

2.2 ScheduledFutureTask

       在ScheduledThreadPoolExecutor中,首要利用ScheduledFutureTask封装需求举行的职分,该类的关键注脚如下:

private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {

  private final long sequenceNumber;    // 记录当前实例的序列号
  private long time;    // 记录当前任务下次开始执行的时间

  // 记录当前任务执行时间间隔,等于0则表示当前任务只执行一次,大于0表示当前任务为fixedRate类型的任务,
  // 小于0则表示其为fixedDelay类型的任务
  private final long period;

  RunnableScheduledFuture<V> outerTask = this;  // 记录需要周期性执行的任务的实例
  int heapIndex;    // 记录当前任务在队列数组中位置的下标

  ScheduledFutureTask(Runnable r, V result, long ns, long period) {
    super(r, result);
    this.time = ns;
    this.period = period;
    this.sequenceNumber = sequencer.getAndIncrement();  // 序号在创建任务实例时指定,且后续不会变化
  }

  public long getDelay(TimeUnit unit) {
    return unit.convert(time - now(), NANOSECONDS);
  }

  // 各个任务在队列中的存储方式是一个基于时间和序号进行比较的优先队列,当前方法定义了优先队列中两个
  // 任务执行的先后顺序。这里先对两个任务开始执行时间进行比较,时间较小者优先执行,若开始时间相同,
  // 则比较两个任务的序号,序号小的任务先执行
  public int compareTo(Delayed other) {
    if (other == this)
      return 0;
    if (other instanceof ScheduledFutureTask) {
      ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
      long diff = time - x.time;
      if (diff < 0)
        return -1;
      else if (diff > 0)
        return 1;
      else if (sequenceNumber < x.sequenceNumber)
        return -1;
      else
        return 1;
    }
    long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
    return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
  }

  public boolean isPeriodic() { // 判断是否为周期性任务
    return period != 0;
  }

  // 当前任务执行之后,会判断当前任务是否为周期性任务,如果为周期性任务,那么就调用当前方法计算
  // 当前任务下次开始执行的时间。这里如果当前任务是fixedRate类型的任务(p > 0),那么下次执行时间
  // 就是此次执行的开始时间加上时间间隔,如果当前任务是fixedDelay类型的任务(p < 0),那么下次执行
  // 时间就是当前时间(triggerTime()方法会获取系统当前时间)加上任务执行时间间隔。可以看到,定频率
  // 和定延迟的任务的执行时间区别就在当前方法中进行了指定,因为调用当前方法时任务已经执行完成了,
  // 因而triggerTime()方法中获取的时间就是任务执行完成之后的时间点
  private void setNextRunTime() {
    long p = period;
    if (p > 0)
      time += p;
    else
      time = triggerTime(-p);
  }

  // 取消当前任务的执行,super.cancel(boolean)方法也即FutureTask.cancel(boolean)方法。该方法传入
  // true表示如果当前任务正在执行,那么立即终止其执行;传入false表示如果当前方法正在执行,那么等待其
  // 执行完成之后再取消当前任务。
  public boolean cancel(boolean mayInterruptIfRunning) {
    boolean cancelled = super.cancel(mayInterruptIfRunning);
    // 判断是否设置了取消后移除队列中当前任务,是则移除当前任务
    if (cancelled && removeOnCancel && heapIndex >= 0)  
      remove(this);
    return cancelled;
  }

  public void run() {
    boolean periodic = isPeriodic();    // 判断是否为周期性任务
    if (!canRunInCurrentRunState(periodic)) // 判断是否能够在当前状态下执行该任务
      cancel(false);
    else if (!periodic) // 如果能执行当前任务,但是任务不是周期性的,那么就立即执行该任务一次
      ScheduledFutureTask.super.run();
    else if (ScheduledFutureTask.super.runAndReset()) { // 是周期性任务,则立即执行当前任务并且重置
      setNextRunTime(); // 在当前任务执行完成后调用该方法计算当前任务下次执行的时间
      reExecutePeriodic(outerTask); // 将当前任务放入任务队列中以便下次执行
    }
  }
}

       在ScheduledFutureTask中,重要有七个点须要强调:

  • 对于run()方法的首先个支行,canRunInCurrentRunState()方法的扬言如下所示,可以见到,该办法是用于判断当前职责如果为周期性任务,那么其是还是不是允许在shutdown状态下继续执行已经存在的周期性职务,是则意味近期情形下是能够进行当前职分的,这里isRunningOrShutdown()方法继承自ThreadPoolExecutor;

boolean canRunInCurrentRunState(boolean periodic) {
  return isRunningOrShutdown(periodic ?
                             continueExistingPeriodicTasksAfterShutdown :
                             executeExistingDelayedTasksAfterShutdown);
}
  • 在run()方法的结尾二个if分支中,其首先会实施当前任务,在实践到位时才会调用setNextRunTime()方法设置下次职分履行时间,相当于说对于fixedRate和fixedDelay类型的天职都以在这几个日子点才设置的,由此固然fixedRate类型的任务,即便该职责下次执行时间比近来时光要早,其也只会在当前职务执行到位后随即实施,而不会与当前职分还未实行完时就执行;对于fixedDelay任务则不会存在该难题,因为其是以职责到位后的年华点为根基测算下次执行的时间点;
  • 对此run()方法的最终一个分层中的reExecutePeriodic()方法,其会将当前职务参预到任务队列中,并且调用父类的ensurePrestart()方法确定保证有可用的线程来施行当前任务,如下是该办法的实际达成:

void reExecutePeriodic(RunnableScheduledFuture<?> task) {
  if (canRunInCurrentRunState(true)) {  // 判断当前任务是否可以继续执行
    super.getQueue().add(task); // 将当前任务加入到任务队列中
    if (!canRunInCurrentRunState(true) && remove(task)) // 双检查法判断任务在加入过程中是否取消了
      task.cancel(false);
    else
      ensurePrestart(); // 初始化核心线程等确保任务可以被执行
  }
}

       从ScheduledFutureTask的兑现总括来看,当每创立2个此类实例时,会伊始化该类的局部重中之重质量,如下次开头实行的年月和进行的周期。当有个别线程调用该职责,即执行该任务的run()方法时,假诺该任务不为周期性职务,那么执行该职分之后就不会有其余的动作,借使该职分为周期性职分,那么在将当前职分执行达成之后,还会重置当前职务的状态,并且统计下次进行当前义务的光阴,然后将其放入队列中以便下次执行。

2.3 DelayedWorkQueue

       DelayedWorkQueue的完结与DelayQueue以及PriorityQueue的完毕主旨相似,格局都为2个先期队列,并且底层是应用堆结构来贯彻优先队列的效应,在多少存款和储蓄格局上,其利用的是数组来促成。那里DelayedWorkQueue与DelayQueue以及PriorityQueue差异的点在于DelayedWorkQueue中首要囤积ScheduledFutureTask类型的天职,该任务中有三个heapIndex属性保存了当前职务在现阶段队列数组中的地点下标,其重庆大学进步的是对队列的诸如contains()和remove()等急需一定当前职责地点的主意的频率,时间复杂度能够从O(N)提高到O(logN)。如下是DelayedWorkQueue的落到实处代码(那里只列出了此类的要紧质量和与落实ScheduledThreadPoolExecutor作用有关的艺术,关于怎么样选取数组实现优先队列请读者查阅有关文档):

static class DelayedWorkQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> {

  private static final int INITIAL_CAPACITY = 16;   // 数组初始化大小
  private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
  private final ReentrantLock lock = new ReentrantLock();   // 对添加和删除元素所使用的锁
  private int size = 0; // 当前队列中有效任务的个数

  private Thread leader = null; // 执行队列头部任务的线程
  private final Condition available = lock.newCondition();  // 除leader线程外其余线程的等待队列

  // 在对任务进行移动时,判断其是否为ScheduledFutureTask实例,如果是则维护其heapIndex属性
  private void setIndex(RunnableScheduledFuture<?> f, int idx) {
    if (f instanceof ScheduledFutureTask)
      ((ScheduledFutureTask)f).heapIndex = idx;
  }

  private void siftUp(int k, RunnableScheduledFuture<?> key) {/* 省略 */}

  private void siftDown(int k, RunnableScheduledFuture<?> key) {/* 省略 */}

  private int indexOf(Object x) {
    if (x != null) {
      if (x instanceof ScheduledFutureTask) {   // 如果为ScheduledFutureTask则可返回其heapIndex属性
        int i = ((ScheduledFutureTask) x).heapIndex;
        if (i >= 0 && i < size && queue[i] == x)
          return i;
      } else {  // 如果不为ScheduledFutureTask实例,则需要遍历队列查询当前元素的位置
        for (int i = 0; i < size; i++)
          if (x.equals(queue[i]))
            return i;
      }
    }
    return -1;
  }

  public boolean offer(Runnable x) {
    if (x == null)
      throw new NullPointerException();
    RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
      int i = size;
      if (i >= queue.length)
        grow(); // 队列容量不足,对其进行扩容
      size = i + 1;
      if (i == 0) { // 如果其为队列第一个元素,则将其放入队列头部
        queue[0] = e;
        setIndex(e, 0);
      } else {  //如果不为第一个元素,则通过堆的上移元素操作移动当前元素至合适的位置
        siftUp(i, e);
      }
      if (queue[0] == e) {  // 如果被更新的是队列头部元素,则更新记录的执行头部任务的线程
        leader = null;
        available.signal();
      }
    } finally {
      lock.unlock();
    }
    return true;
  }

  // 完成从队列拉取元素操作,并且将其从队列中移除
  private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
    int s = --size;
    RunnableScheduledFuture<?> x = queue[s];
    queue[s] = null;    // 将队列最尾部的元素置空
    if (s != 0) // 将最后一个元素放入第一个位置,并且将其下推至合适的位置
      siftDown(0, x);   // 这里idx置为0是因为当前方法的入参f都为队列的第一个元素
    setIndex(f, -1);
    return f;
  }

  // 尝试从队列(堆)中获取元素,如果没有元素或者元素的延迟时间还未到则返回空
  public RunnableScheduledFuture<?> poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
      RunnableScheduledFuture<?> first = queue[0];
      // 在此处代码控制了当从堆顶拉取元素时,如果元素的延迟时间还未达到,则不返回当前元素
      if (first == null || first.getDelay(NANOSECONDS) > 0)
        return null;
      else
        return finishPoll(first);   // 返回堆顶元素
    } finally {
      lock.unlock();
    }
  }

  // 通过无限for循环获取堆顶的元素,这里take()方法会阻塞当前线程,直至获取到了可执行的任务。
  // 可以看到,在第一次for循环中,如果堆顶不存在任务,则其会加入阻塞队列中,如果存在任务,但是
  // 其延迟时间还未到,那么当前线程会等待该延迟时间长的时间,然后查看任务是否可用,当获取到任务
  // 之后,其会将其从队列中移除,并且唤醒等待队列中其余等待的线程执行下一个任务
  public RunnableScheduledFuture<?> take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
      for (;;) {
        RunnableScheduledFuture<?> first = queue[0];
        if (first == null)
          available.await();    // 堆内没有元素,当前线程进入等待队列中
        else {
          long delay = first.getDelay(NANOSECONDS);
          if (delay <= 0)   // 堆顶元素延迟时间小于0,可立即获取任务
            return finishPoll(first);
          first = null;
          if (leader != null)
            available.await();  // 已经有线程在等待堆顶元素,则当前线程进入等待队列中
          else {
            Thread thisThread = Thread.currentThread();
            leader = thisThread;
            try {
              available.awaitNanos(delay);  // 当前线程等待一定时长后获取任务并执行
            } finally {
              if (leader == thisThread)
                leader = null;
            }
          }
        }
      }
    } finally {
      if (leader == null && queue[0] != null)
        available.signal(); // 当前线程获取完任务之后唤醒等待队列中的下一个线程执行下一个任务
      lock.unlock();
    }
  }
}

       从DelayedWorkQueue的take()和poll()方法能够看出来,对于队列中职责的守候时间的限制重点是在那五个点子中达成的,若是职务的等候时间还未到,那么该方式就会阻塞线程池中的线程,直至职责可以实行。

2.3 DelayedWorkQueue

       DelayedWorkQueue的贯彻与DelayQueue以及PriorityQueue的落实核心相似,格局都为3个预先队列,并且底层是运用堆结构来兑现优先队列的功效,在数码存款和储蓄方式上,其行使的是数组来贯彻。那里DelayedWorkQueue与DelayQueue以及PriorityQueue分歧的点在于DelayedWorkQueue中一言九鼎囤积ScheduledFutureTask类型的任务,该职务中有三个heapIndex属性保存了当前职分在时下队列数组中的位置下标,其根本升高的是对队列的诸如contains()和remove()等要求一定当前任务地点的方式的效率,时间复杂度能够从O(N)提高到O(logN)。如下是DelayedWorkQueue的贯彻代码(那里只列出了此类的要害质量和与落到实处ScheduledThreadPoolExecutor功用相关的主意,关于如何行使数组达成优先队列请读者查阅相关文书档案):

static class DelayedWorkQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> {

  private static final int INITIAL_CAPACITY = 16;   // 数组初始化大小
  private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
  private final ReentrantLock lock = new ReentrantLock();   // 对添加和删除元素所使用的锁
  private int size = 0; // 当前队列中有效任务的个数

  private Thread leader = null; // 执行队列头部任务的线程
  private final Condition available = lock.newCondition();  // 除leader线程外其余线程的等待队列

  // 在对任务进行移动时,判断其是否为ScheduledFutureTask实例,如果是则维护其heapIndex属性
  private void setIndex(RunnableScheduledFuture<?> f, int idx) {
    if (f instanceof ScheduledFutureTask)
      ((ScheduledFutureTask)f).heapIndex = idx;
  }

  private void siftUp(int k, RunnableScheduledFuture<?> key) {/* 省略 */}

  private void siftDown(int k, RunnableScheduledFuture<?> key) {/* 省略 */}

  private int indexOf(Object x) {
    if (x != null) {
      if (x instanceof ScheduledFutureTask) {   // 如果为ScheduledFutureTask则可返回其heapIndex属性
        int i = ((ScheduledFutureTask) x).heapIndex;
        if (i >= 0 && i < size && queue[i] == x)
          return i;
      } else {  // 如果不为ScheduledFutureTask实例,则需要遍历队列查询当前元素的位置
        for (int i = 0; i < size; i++)
          if (x.equals(queue[i]))
            return i;
      }
    }
    return -1;
  }

  public boolean offer(Runnable x) {
    if (x == null)
      throw new NullPointerException();
    RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
      int i = size;
      if (i >= queue.length)
        grow(); // 队列容量不足,对其进行扩容
      size = i + 1;
      if (i == 0) { // 如果其为队列第一个元素,则将其放入队列头部
        queue[0] = e;
        setIndex(e, 0);
      } else {  //如果不为第一个元素,则通过堆的上移元素操作移动当前元素至合适的位置
        siftUp(i, e);
      }
      if (queue[0] == e) {  // 如果被更新的是队列头部元素,则更新记录的执行头部任务的线程
        leader = null;
        available.signal();
      }
    } finally {
      lock.unlock();
    }
    return true;
  }

  // 完成从队列拉取元素操作,并且将其从队列中移除
  private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
    int s = --size;
    RunnableScheduledFuture<?> x = queue[s];
    queue[s] = null;    // 将队列最尾部的元素置空
    if (s != 0) // 将最后一个元素放入第一个位置,并且将其下推至合适的位置
      siftDown(0, x);   // 这里idx置为0是因为当前方法的入参f都为队列的第一个元素
    setIndex(f, -1);
    return f;
  }

  // 尝试从队列(堆)中获取元素,如果没有元素或者元素的延迟时间还未到则返回空
  public RunnableScheduledFuture<?> poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
      RunnableScheduledFuture<?> first = queue[0];
      // 在此处代码控制了当从堆顶拉取元素时,如果元素的延迟时间还未达到,则不返回当前元素
      if (first == null || first.getDelay(NANOSECONDS) > 0)
        return null;
      else
        return finishPoll(first);   // 返回堆顶元素
    } finally {
      lock.unlock();
    }
  }

  // 通过无限for循环获取堆顶的元素,这里take()方法会阻塞当前线程,直至获取到了可执行的任务。
  // 可以看到,在第一次for循环中,如果堆顶不存在任务,则其会加入阻塞队列中,如果存在任务,但是
  // 其延迟时间还未到,那么当前线程会等待该延迟时间长的时间,然后查看任务是否可用,当获取到任务
  // 之后,其会将其从队列中移除,并且唤醒等待队列中其余等待的线程执行下一个任务
  public RunnableScheduledFuture<?> take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
      for (;;) {
        RunnableScheduledFuture<?> first = queue[0];
        if (first == null)
          available.await();    // 堆内没有元素,当前线程进入等待队列中
        else {
          long delay = first.getDelay(NANOSECONDS);
          if (delay <= 0)   // 堆顶元素延迟时间小于0,可立即获取任务
            return finishPoll(first);
          first = null;
          if (leader != null)
            available.await();  // 已经有线程在等待堆顶元素,则当前线程进入等待队列中
          else {
            Thread thisThread = Thread.currentThread();
            leader = thisThread;
            try {
              available.awaitNanos(delay);  // 当前线程等待一定时长后获取任务并执行
            } finally {
              if (leader == thisThread)
                leader = null;
            }
          }
        }
      }
    } finally {
      if (leader == null && queue[0] != null)
        available.signal(); // 当前线程获取完任务之后唤醒等待队列中的下一个线程执行下一个任务
      lock.unlock();
    }
  }
}

       从DelayedWorkQueue的take()和poll()方法能够看出来,对于队列中任务的等候时间的界定重点是在那多个措施中完毕的,若是职务的守候时间还未到,那么该措施就会阻塞线程池中的线程,直至职责能够实施。

2.4 scheduleAtFixedRate()和scheduleWithFixedDelay()方法

       后边我们对ScheduledThreadPoolExecutor的严重性质量和重点内部类都进行了详细的上课,基本上已经得以观望其是如何促成定时执行任务的功力的,接下去大家重点对客户端能够调用的重要措施开始展览简短介绍,那里scheduleAtFixedRate()和scheduleWithFixedDelay()方法的贯彻中央是一样的,两个措施最微薄的区分在于ScheduledFutureTask的setNextRunTime()方法的兑现,该办法的兑现后面早已进展了讲课,我们那里则以scheduleAtFixedRate()方法的落到实处为例对该方式实行讲解。如下是该方法的求实达成:

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, 
                                              long period, TimeUnit unit) {
  if (command == null || unit == null)
    throw new NullPointerException();
  if (period <= 0)
    throw new IllegalArgumentException();
  ScheduledFutureTask<Void> sft =   // 封装客户端的任务实例
    new ScheduledFutureTask<Void>(command, null, 
                                  triggerTime(initialDelay, unit),unit.toNanos(period));
  RunnableScheduledFuture<Void> t = decorateTask(command, sft); // 对客户端任务实例进行装饰
  sft.outerTask = t;    // 初始化周期任务属性outerTask
  delayedExecute(t);    // 执行该任务
  return t;
}

       从上述代码能够看出来,scheduleAtFixedRate()首先对客户端职务实例实行了包装,装饰,并且起始化了包装后的天职实例的outerTask属性,最终调用delayedExecute()方法执行职分。如下是delayedExecute()方法的兑现:

private void delayedExecute(RunnableScheduledFuture<?> task) {
  if (isShutdown())
    reject(task);
  else {
    super.getQueue().add(task); // 添加当前任务到任务队列中
    if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task))
      task.cancel(false);   // 双检查法再次判断当前线程池是否处于可用状态,不是则移除当前任务
    else
      ensurePrestart(); // 若线程池没有初始化,则进行一些初始化工作
  }
}

       上述办法为重中之重的执行任务的章程,该措施首先会将任务插手到任务队列中,假诺线程池已经开头化过,那么该职分就会有等待的线程执行该义务。在投入到职责队列之后经过双检查法检查线程池是还是不是业已shutdown了,假若是则将该职务从职分队列中移除。如若当前线程池没有shutdown,就调用继承自ThreadPoolExecutor的ensurePrestart()方法,该方法会对线程池举办局地初步化学工业作,如初始化主旨线程,然后挨家挨户线程会调用上述等待队列的take()方法赢得职务执行。

2.4 scheduleAtFixedRate()和scheduleWithFixedDelay()方法

       后边大家对ScheduledThreadPoolExecutor的基本点质量和要害内部类都开始展览了详实的授课,基本上已经能够看来其是怎么贯彻定时执行职责的意义的,接下去大家首要对客户端能够调用的首要措施开展简要介绍,那里scheduleAtFixedRate()和scheduleWithFixedDelay()方法的完结基本是一模一样的,两个措施最微薄的区别在于ScheduledFutureTask的setNextRunTime()方法的贯彻,该格局的落实前边早已拓展了讲学,大家那里则以scheduleAtFixedRate()方法的兑现为例对该办法开始展览教学。如下是该措施的切实落实:

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, 
                                              long period, TimeUnit unit) {
  if (command == null || unit == null)
    throw new NullPointerException();
  if (period <= 0)
    throw new IllegalArgumentException();
  ScheduledFutureTask<Void> sft =   // 封装客户端的任务实例
    new ScheduledFutureTask<Void>(command, null, 
                                  triggerTime(initialDelay, unit),unit.toNanos(period));
  RunnableScheduledFuture<Void> t = decorateTask(command, sft); // 对客户端任务实例进行装饰
  sft.outerTask = t;    // 初始化周期任务属性outerTask
  delayedExecute(t);    // 执行该任务
  return t;
}

       从上述代码能够看出来,scheduleAtFixedRate()首先对客户端职务实例实行了打包,装饰,并且开头化了打包后的职分实例的outerTask属性,最终调用delayedExecute()方法执行职务。如下是delayedExecute()方法的贯彻:

private void delayedExecute(RunnableScheduledFuture<?> task) {
  if (isShutdown())
    reject(task);
  else {
    super.getQueue().add(task); // 添加当前任务到任务队列中
    if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task))
      task.cancel(false);   // 双检查法再次判断当前线程池是否处于可用状态,不是则移除当前任务
    else
      ensurePrestart(); // 若线程池没有初始化,则进行一些初始化工作
  }
}

       上述办法为主要的进行任务的点子,该方式首先会将任务参加到职务队列中,假设线程池已经初始化过,那么该义务就会有等待的线程执行该职分。在参预到职务队列之后通过双检查法检查线程池是还是不是早已shutdown了,假使是则将该职责从职分队列中移除。如若当前线程池没有shutdown,就调用继承自ThreadPoolExecutor的ensurePrestart()方法,该方法会对线程池举办部分伊始化工作,如初叶化大旨线程,然后逐一线程会调用上述等待队列的take()方法取得任务履行。

相关文章