publicvoidrun(){ // 避免多个线程同时执行任务 // 当第一个线程抢占任务成功后,后面的线程就什么也不做 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; // 设置异常值 setException(ex); } if (ran) // 设置正常返回值 set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() // 重置runner runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; // 任务处于中断中的状态,则进行中断操作 if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
privatevoidfinishCompletion(){ // assert state > COMPLETING; // 断言此时state > COMPLETING for (WaitNode q; (q = waiters) != null;) { // 尝试将waiters全部置为null if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; // 唤醒线程 LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } }
done();
// 减少内存占用 callable = null; }
privatevoidhandlePossibleCancellationInterrupt(int s){ // It is possible for our interrupter to stall before getting a // chance to interrupt us. Let's spin-wait patiently. if (s == INTERRUPTING) while (state == INTERRUPTING) // 让出cpu时间片,等待cancel(true)执行完成,此时INTERRUPTING必然能更成INTERRUPTED Thread.yield(); // wait out pending interrupt
// assert state == INTERRUPTED;
// We want to clear any interrupt we may have received from // cancel(true). However, it is permissible to use interrupts // as an independent mechanism for a task to communicate with // its caller, and there is no way to clear only the // cancellation interrupt. // // Thread.interrupted(); }
public V get()throws InterruptedException, ExecutionException { int s = state; // 如果是新建或完成的中间状态,则等待任务执行完 if (s <= COMPLETING) s = awaitDone(false, 0L); // 返回已完成任务的结果或抛出异常 return report(s); }