一,Thread和Runnable
1.Thread类是如何开启一个新的线程的
public synchronized void start() {
if (threadStatus
!= 0)
throw new IllegalThreadStateException();
group
.add(this);
boolean started
= false;
try {
start0();
started
= true;
} finally {
try {
if (!started
) {
group
.threadStartFailed(this);
}
} catch (Throwable ignore
) {
}
}
}
可以看到Thread类里面的start()方法调用了start0()方法,
private native void start0();
而这个start0()是一个本地方法,这个方法会调用底层c的方法来根据操作系统执行操作,windows下会创建一个新的线程,linux下会创建一个轻量级的进程。
2.实现runnable接口为什么可以开启一个新的线程?
首先来看Thread类
class Thread implements Runnable
Thread类实现了Runnable接口,那说明他一定实现了run()方法。
private Runnable target
;
public Thread(Runnable target
) {
init(null
, target
, "Thread-" + nextThreadNum(), 0);
}
其次,Thread类里面声明了一个Runnable类型的成员变量target。
@Override
public void run() {
if (target
!= null
) {
target
.run();
}
}
当我们在执行run()方法的时候,他会先判断我们是否传入了一个Runnable接口,如果我们传入进来了一个Runnable接口,也就是target!=null ,此时我们会执行target的run(),也就是我们通过实现Runnable接口的方式创建了一个多线程。
二,Future和FutureTask的关系
1.Future
首先来看一段代码:
public class Main {
public static void main(String
[] args
) {
ExecutorService pool
= Executors
.newCachedThreadPool();
Future
<?> submit
= pool
.submit(new RunnableTask());
Future submit1
= pool
.submit(new CallableTask());
}
private static class RunnableTask implements Runnable{
@Override
public void run() {
}
}
private static class CallableTask implements Callable{
@Override
public Object
call() throws Exception
{
return null
;
}
}
}
当我们调用线程池的submit()方法的时候,会给我们返回一个Future类型的对象。那么这个Future又是什么?
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning
);
boolean isCancelled();
boolean isDone();
V
get() throws InterruptedException
, ExecutionException
;
V
get(long timeout
, TimeUnit unit
)
throws InterruptedException
, ExecutionException
, TimeoutException
;
}
可以说,这个接口主要的功能就是可以获取到线程执行完成后的结果。
2.两者的关系
三,FutureTask源码
1.属性
private volatile int state
;
private static final int NEW
= 0;
private static final int COMPLETING
= 1;
private static final int NORMAL
= 2;
private static final int EXCEPTIONAL
= 3;
private static final int CANCELLED
= 4;
private static final int INTERRUPTING
= 5;
private static final int INTERRUPTED
= 6;
private Callable
<V> callable
;
private Object outcome
;
private volatile Thread runner
;
private volatile WaitNode waiters
;
2.WaitNode内部类
static final class WaitNode {
volatile Thread thread
;
volatile WaitNode next
;
WaitNode() { thread
= Thread
.currentThread(); }
}
这个类里面封装着执行任务的线程和指向下一个线程的指针。
3.构造器
当我们传入一个runnable接口的时候:
public FutureTask(Runnable runnable
, V result
) {
this.callable
= Executors
.callable(runnable
, result
);
this.state
= NEW
;
}
调用了Executors.callable(runnable, result);
public static <T> Callable
<T> callable(Runnable task
, T result
) {
if (task
== null
)
throw new NullPointerException();
return new RunnableAdapter<T>(task
, result
);
}
当任务不为空,实际上返回的是一个RunnableAdapter对象。
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task
;
final T result
;
RunnableAdapter(Runnable task
, T result
) {
this.task
= task
;
this.result
= result
;
}
public T
call() {
task
.run();
return result
;
}
}
在这里,将runnable装饰成了callable。
4.run()
public void run() {
if (state
!= NEW
||
!UNSAFE
.compareAndSwapObject(this, runnerOffset
,
null
, Thread
.currentThread()))
return;
try {
Callable
<V> c
= callable
;
if (c
!= null
&& state
== NEW
) {
V result
;
boolean ran
;
try {
result
= c
.call();
ran
= true;
} catch (Throwable ex
) {
result
= null
;
ran
= false;
setException(ex
);
}
if (ran
)
set(result
);
}
} finally {
runner
= null
;
int s
= state
;
if (s
>= INTERRUPTING
)
handlePossibleCancellationInterrupt(s
);
}
}
5.setException()
protected void setException(Throwable t
) {
if (UNSAFE
.compareAndSwapInt(this, stateOffset
, NEW
, COMPLETING
)) {
outcome
= t
;
UNSAFE
.putOrderedInt(this, stateOffset
, EXCEPTIONAL
);
finishCompletion();
}
}
6.set()
protected void set(V v
) {
if (UNSAFE
.compareAndSwapInt(this, stateOffset
, NEW
, COMPLETING
)) {
outcome
= v
;
UNSAFE
.putOrderedInt(this, stateOffset
, NORMAL
);
finishCompletion();
}
}
7.handlePossibleCancellationInterrupt()
private void handlePossibleCancellationInterrupt(int s
) {
if (s
== INTERRUPTING
)
while (state
== INTERRUPTING
)
Thread
.yield();
}
8.get()
public V
get() throws InterruptedException
, ExecutionException
{
int s
= state
;
if (s
<= COMPLETING
)
s
= awaitDone(false, 0L
);
return report(s
);
}
9.awaitDone()
private int awaitDone(boolean timed
, long nanos
)
throws InterruptedException
{
final long deadline
= timed
? System
.nanoTime() + nanos
: 0L
;
WaitNode q
= null
;
boolean queued
= false;
for (;;) {
if (Thread
.interrupted()) {
removeWaiter(q
);
throw new InterruptedException();
}
int s
= state
;
if (s
> COMPLETING
) {
if (q
!= null
)
q
.thread
= null
;
return s
;
}
else if (s
== COMPLETING
)
Thread
.yield();
else if (q
== null
)
q
= new WaitNode();
else if (!queued
)
queued
= UNSAFE
.compareAndSwapObject(this, waitersOffset
,
else if (timed
) {
nanos
= deadline
- System
.nanoTime();
if (nanos
<= 0L
) {
removeWaiter(q
);
return state
;
}
LockSupport
.parkNanos(this, nanos
);
}
else
LockSupport
.park(this);
}
}
10.report()
private V
report(int s
) throws ExecutionException
{
Object x
= outcome
;
if (s
== NORMAL
)
return (V
)x
;
if (s
>= CANCELLED
)
throw new CancellationException();
throw new ExecutionException((Throwable
)x
);
}
11.finishCompletion()
private void finishCompletion() {
for (WaitNode q
; (q
= waiters
) != null
;) {
if (UNSAFE
.compareAndSwapObject(this, waitersOffset
, q
, null
)) {
for (;;) {
Thread t
= q
.thread
;
if (t
!= null
) {
q
.thread
= null
;
LockSupport
.unpark(t
);
}
WaitNode next
= q
.next
;
if (next
== null
)
break;
q
.next
= null
;
q
= next
;
}
break;
}
}
done();
callable
= null
;
}
11.cancel()
public boolean cancel(boolean mayInterruptIfRunning
) {
if (!(state
== NEW
&&
UNSAFE
.compareAndSwapInt(this, stateOffset
, NEW
,
mayInterruptIfRunning
? INTERRUPTING
: CANCELLED
)))
return false;
try {
if (mayInterruptIfRunning
) {
try {
Thread t
= runner
;
if (t
!= null
)
t
.interrupt();
} finally {
UNSAFE
.putOrderedInt(this, stateOffset
, INTERRUPTED
);
}
}
} finally {
finishCompletion();
}
return true;
}
四,思路整理
1.首先明确一点,不管我们传入的是Runnable还是Callable接口,他最终用的都是Callable,Runnable接口会被他通过装饰者模式封装为Callable。
2.一个任务执行的入口其实就是run方法。
3.进入run方法,首先他会判断当前任务是否已经被执行过或者当前线程通过cas的方式并没有抢到执行当前任务的机会。如果是的话,说明已经有线程正在执行或者执行完了当前的任务,直接返回即可。
4.接下来他会判断当前任务是否为空或者当前任务的状态,其实判断状态就是为了判断当前任务是不是被取消了。如果任务不为空并且没有被取消,他会执行call方法(实际上就是我们自己的业务代码)并将结果设置到result将任务的是否被执行状态改成true表示顺利执行完。
5.call方法执行过程中如果发生异常了,会将结果设置为null,是否被顺利执行的状态为设置为false,表示执行过程发生了异常。
6.接下来,它会将异常信息封装到outcome,然后设置但该你任务的状态为异常结束,并自旋的方式唤醒所有等待队列中等待获取结果的线程。
7.如果call正常执行完了,他会用cas的方式设置当前任务的完成状态为完成中,如果设置成功,outcome来接收任务的结果,设置当前任务的状态为正常完成状态,并且唤醒所有等待结果的线程。
8.最终它会将执行当前任务的线程设置为null,并判断,如果当前任务的状态是被打断中或者已经被打断,他就会自旋判断如果当前线程的状态为被打断,让执行当前任务的线程释放cpu。
9.get方法是获取当前任务的结果。首先他会获取当前任务的状态,如果状态小于等于未完成首先他会通过自旋的方式,第一次自旋尚未给当前线程创建waitNode对象,此时就需要位当前线程创建waitNode对象。第二次自旋,创建好了对象还没有入队,cas的方式入队。第三次自旋,判断是否设置超时时间,如果没设置超时时间,当前get操作的线程就会被park了。 线程状态会变为 WAITING状态,相当于休眠了…除非有其它线程将你唤醒 或者 将当前线程 中断。他会获取当前线程的任务状态,如果任务还没有完成,释放cpu接着等。如果任务已经完成,此时需要将node设置位null help GC,直接返回当前的状态。除此之外还要判断,如果当前线程被其他线程用中断的方式唤醒,这种唤醒方式会将Thread的中断标记位设置为false,当前线程出队,get方法抛出中断异常。
10.如果状态表示已经有结果,会执行report方法。
11.report方法,如果任务正常执行结束,返回结果,如果任务被取消或者中断了,抛出异常,如果任务执行过程中发生异常结束了,返回异常。
12.最后就是任务的取消方法 cancel。他会先判断state == NEW 成立 表示当前任务处于运行中 或者 处于线程池 任务队列中…并且cas修改状态成功,他就会尝试取打断。
13.如果尝试打断成功,给runner线程一个中断信号…如果你的程序是响应中断 会走中断逻辑…假设你程序不是响应中断的…啥也不会发生。最后,设置线程状态为中断,唤醒获取结果的线程。