java自带的定时任务执行器为ScheduledThreadPoolExecutor
,继承于线程池管理器ThreadPoolExecutor
,常用方法如下,程序将每2秒输出一次系统时间。
1 2 3 4 5 6 7 8 9
| ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2); executor.scheduleAtFixedRate(new Runnable() { @Override public void run() { System.out.println(DateFormatUtils.format(System.currentTimeMillis(), "HH:mm:ss SSS")); } }, 0, 2, TimeUnit.SECONDS); Thread.sleep(10_000); executor.shutdown();
|
java.util.concurrent.ScheduledThreadPoolExecutor#scheduleAtFixedRate
方法把参数Runnable
对象封装成自定义的定时任务java.util.concurrent.ScheduledThreadPoolExecutor.ScheduledFutureTask
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; }
|
ScheduledThreadPoolExecutor
的阻塞队列为其自实现的DelayedWorkQueue
,实际上是一个按定时任务RunnableScheduledFuture
的下次执行时间排序的有序队列。
在ScheduledThreadPoolExecutor
中每次添加一个定时任务时,调用方法java.util.concurrent.ScheduledThreadPoolExecutor.DelayedWorkQueue#offer(java.lang.Runnable)
向DelayedWorkQueue
添加元素。DelayedWorkQueue
的具体实现如下,如果队列为空,则元素直接置入为队列首位;如果队列不为空,则队列以任务的下一次执行时间顺序排序。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| public boolean offer(Runnable x) { if (x == null) throw new NullPointerException(); RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x; final ReentrantLock lock = this.lock; lock.lock(); try { int i = size; if (i >= queue.length) grow(); size = i + 1; if (i == 0) { queue[0] = e; setIndex(e, 0); } else { siftUp(i, e); } if (queue[0] == e) { leader = null; available.signal(); } } finally { lock.unlock(); } return true; }
private void siftUp(int k, RunnableScheduledFuture<?> key) { while (k > 0) { int parent = (k - 1) >>> 1; RunnableScheduledFuture<?> e = queue[parent]; if (key.compareTo(e) >= 0) break; queue[k] = e; setIndex(e, k); k = parent; } queue[k] = key; setIndex(key, k); }
|
ScheduledThreadPoolExecutor
添加Worker创建线程调用的父类的方法java.util.concurrent.ThreadPoolExecutor#addWorker
,然后新建的线程程调用方法java.util.concurrent.ThreadPoolExecutor#runWorker
读取阻塞队列获取待执行的任务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
|
这块代码的关键代码为while (task != null || (task = getTask()) != null)
,作用为读取阻塞队列中的待执行任务。getTask()
中Worker线程从阻塞获取元素,而ScheduledThreadPoolExecutor
的DelayedWorkQueue
在获取元素时,如果有任务的下一次执行时间到了则立即返回,否则通过互斥锁阻塞直到有新元素添加或者有元素的下一次执行时间到了。
因此Worker线程在while循环中不断阻塞-执行任务-阻塞,从而实现了循环间隔固定时间执行任务的效果。