盒子
盒子
文章目录
  1. 责任链基本介绍
  2. Volley中的责任链
    1. 队列与线程的配合
    2. 责任链形成
  3. 责任链的使用场景
  4. 封装责任链配合线程队列
    1. 基本的队列与线程配合实现
    2. 处理线程
    3. 主线程分发器
    4. 拦截器
    5. 优先级
    6. 多队列配合切换
  5. 更新时间:2016.12.9

优雅的责任链模式

24种设计模式6大原则中的责任链平时开发中接触到的并不多,如果你读过Volley的源码应该会有体会,Android同学们都说Volley代码优雅,我想责任链的运用应该是它优雅的部分原因。

责任链基本介绍

使多个对象都有机会处理请求,从而避免了请求的发送者和接受者之间的耦合关系。将这些对象连成一条链,并沿着这条链传递该请求,直到有对象处理它为止。

说白一点就是一个任务,进入一条链,这条链上存在多个处理器,从第一个处理器开始,一直向后传递,使得每个处理器都有机会处理这个任务,谁可以处理谁就处理,处理完不再向后传递。此任务被处理完毕。

简单的抽象处理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public abstract class Handler {
private Handler nextHandler;
//每个处理者都必须对请求做出处理
public final Response handleMessage(Request request){
Response response = null;
//判断是否是自己的处理级别
if(this.getHandlerLevel().equals(request.getRequestLevel())){
response = this.echo(request);
}else{ //不属于自己的处理级别
//判断是否有下一个处理者
if(this.nextHandler != null){
response = this.nextHandler.handleMessage(request);
}else{
//没有适当的处理者,业务自行处理
}
}
return response;
}
//设置下一个处理者是谁
public void setNext(Handler _handler){
this.nextHandler = _handler;
}
//每个处理者都有一个处理级别
protected abstract Level getHandlerLevel();
//每个处理者都必须实现处理任务
protected abstract Response echo(Request request);
}

抽象的处理者实现三个职责:一是定义一个请求的处理方法handleMessage,唯一对外开放的方法;二是定义一个链的编排方法setNext,设置下一个处理者;三是定义了具体的请求者必须实现的两个方法:定义自己能够处理的级别getHandlerLevel和具体的处理任务echo

们定义三个具体的处理者,以便可以组成一个链

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class ConcreteHandler1 extends Handler {
//定义自己的处理逻辑
protected Response echo(Request request) {
//完成处理逻辑
return null;
}
//设置自己的处理级别
protected Level getHandlerLevel() {
//设置自己的处理级别
return null;
}
}
public class ConcreteHandler2 extends Handler {
//定义自己的处理逻辑
protected Response echo(Request request) {
//完成处理逻辑
return null;
}
//设置自己的处理级别
protected Level getHandlerLevel() {
//设置自己的处理级别
return null;
}
}
public class ConcreteHandler3 extends Handler {
//定义自己的处理逻辑
protected Response echo(Request request) {
//完成处理逻辑
return null;
}
//设置自己的处理级别
protected Level getHandlerLevel() {
//设置自己的处理级别
return null;
}
}

在场景类或高层模块中对链进行组装,并传递请求,返回结果

1
2
3
4
5
6
7
8
9
10
11
12
13
public class Client {
public static void main(String[] args) {
//声明所有的处理节点
Handler handler1 = new ConcreteHandler1();
Handler handler2 = new ConcreteHandler2();
Handler handler3 = new ConcreteHandler3();
//设置链中的阶段顺序1-->2-->3
handler1.setNext(handler2);
handler2.setNext(handler3);
//提交请求,返回结果
Response response = handler1.handlerMessage(new Request());
}
}

在实际应用中,一般会有一个封装类对责任模式进行封装,也就是替代Client类,直接返回链中的第一个处理者,具体链的设置不需要高层次模块关系,这样,更简化了高层次模块的调用,减少模块间的耦合,提高系统的灵活性。

Volley中的责任链

Volley中,把责任链与线程队列结合了起来,形成一条循环的流水处理线,这个责任链中存在了两个处理器Handler,也叫调度者Dispatcher有关Volley的分析,你可以阅读源码,也可以看这篇分析文章了解下Volley 源码解析

队列与线程的配合

Java为我们提供了很多队列类,这里不详细介绍,感兴趣的可以去谷歌下,Volley中实际创建了两个队列,一个缓存队列,一个网络请求队列

1
2
3
4
5
6
7
/** The cache triage queue. */
private final PriorityBlockingQueue<Request<?>> mCacheQueue =
new PriorityBlockingQueue<Request<?>>();
/** The queue of requests that are actually going out to the network. */
private final PriorityBlockingQueue<Request<?>> mNetworkQueue =
new PriorityBlockingQueue<Request<?>>();

然后分别开启了1个缓存线程循环从缓存队列中取任务,4个网络请求线程循环从网络请求队列中取任务。在取的时候通过队列的take()方法取出任务,这个方法是阻塞的方法,当队列中无任务的时候,线程会阻塞在这里,直到有新的任务到来

1
2
// Take a request from the queue.
request = mQueue.take();

责任链形成

一个新的请求到来的时候会先进入缓存队列,在缓存处理线程中,有缓存则直接抛出缓存数据到主线程,当发现此Request没有缓存的话,直接进入网络请求队列,由网络请求线程进行请求,获取到数据之后抛出数据到主线程。这就形成了两个处理者构成的责任链。也是Volley的核心。

1
2
3
4
5
6
7
8
// Attempt to retrieve this item from cache.
Cache.Entry entry = mCache.get(request.getCacheKey());
if (entry == null) {
request.addMarker("cache-miss");
// Cache miss; send off to the network dispatcher.
mNetworkQueue.put(request);
continue;
}

责任链的使用场景

相信我们都有过读过的相关知识不知道应用场景,即使知道应用场景,到了那个时候也想不起来用的情况。责任链我个人觉得虽然用的场景不是特别频繁,但是它能解决一些核心的东西,一些复杂的东西在它面前会变得十分简单。

曾经我帮人做过一个相机连拍的APP,需求是一秒钟至少拍摄5张以上的照片,并且存储到本地,我的实现是通过获取相机每一帧转换成图片,并保存到本地,这其中有不少其他问题,其中一个就是存储的问题,如此高速下,存储的图片小还好说,稍微大点,放主线程肯定是处理不了的,还有就是前面的图片还没存储完,后面又不定时的来新的图片需要存储。存储过程中还要实时展示存储进度。当时碰到这个问题,第一反应是队列来搞,第二就是责任链,原理与Volley处理请求大同小异。责任链与线程队列配合让这个问题变的so easy,再次佩服下谷歌的工程师。

封装责任链配合线程队列

有了那次经验之后,意识到了责任链在处理一些问题方面的强大能力,但是每次都要根据具体的场景完整的搭建一个责任链模型,这显然不是我这种封装狂魔的风格。封装起来用,后面遇到任何需要处理队列的地方,直接拿来用,爽爆了。那么封装前,我们需要考虑好支持以下特性

  • 基本的队列与线程配合,形成流水线
  • 优先级,任务处理有先后,需要支持设置优先级
  • 多队列配合,把任务从一个队列抛出到另一个队列进行处理
  • 拦截器,拦截处理任务,自定义拦截条件
  • 分发器,负责把处理结果分发到主线程
  • 处理进度的实时监听
  • 任务取消,需要自由取消已经已经放入队列中的任务与正在执行的任务
  • 重复任务过滤,等待

基本的队列与线程配合实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
public class JobQueue<T extends Job<T>> {
public static final int DEFAULT_THREAD_POOL_SIZE = 4;
private final PriorityBlockingQueue<T> mJobQueue = new PriorityBlockingQueue<T>(); //处理队列
private JobDispatcher<T>[] mJobDispatchers; //处理线程
private Interceptor<T> mInterceptor; //拦截器
private Delivery<T> mDelivery; //分发器
private List<JobHandlerListener<T>> mJobHandlerListener = new ArrayList<>(); //处理进度监听
private AtomicInteger mSequenceGenerator = new AtomicInteger(); //对任务排序用,用于设置优先级
public JobQueue() {
this(new DefaultInterceptor<T>());
}
public JobQueue(int threadPoolSize) {
this(threadPoolSize, new DefaultInterceptor<T>());
}
public JobQueue(Interceptor interceptor) {
this(DEFAULT_THREAD_POOL_SIZE, interceptor);
}
public JobQueue(int threadPoolSize, Interceptor interceptor) {
mJobDispatchers = new JobDispatcher[threadPoolSize];
this.mInterceptor = interceptor;
mDelivery = new DefaultDelivery<T>(new android.os.Handler(Looper.getMainLooper()));
}
public void start() {
stop();
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(mJobDispatchers.length);
for (int i = 0; i < mJobDispatchers.length; i++) {
JobDispatcher<T> savedDispatcher = new JobDispatcher<T>(mJobQueue, mInterceptor, mDelivery, mJobHandlerListener);
mJobDispatchers[i] = savedDispatcher;
threadPoolExecutor.submit(savedDispatcher);
}
}
public void stop() {
for (int i = 0; i < mJobDispatchers.length; i++) {
if (mJobDispatchers[i] != null) {
mJobDispatchers[i].quit();
}
}
}
public JobQueue add(T element) {
element.setSequence(getSequenceNumber());
mJobQueue.add(element);
return this;
}
public JobQueue action(Action<T> action) {
for (int i = 0; i < mJobDispatchers.length; i++) {
mJobDispatchers[i].setAction(action);
}
return this;
}
public JobQueue addListener(JobHandlerListener listener) {
if (mJobHandlerListener.contains(listener)) {
return this;
}
mJobHandlerListener.add(listener);
return this;
}
private int getSequenceNumber() {
return mSequenceGenerator.incrementAndGet();
}
}

这相当于我们前面说的Client封装类,任务从此类add()函数进入mJobQueue队列,此队列对应默认4个JobDispatcher处理线程。可以理解为并发量为4,一次可同时并发处理四个任务,嫌小的可以设置大点。

处理线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
public class JobDispatcher<T extends Job<T>> extends Thread {
private final BlockingQueue<T> mJobQueue; //处理队列
private Interceptor<T> mInterceptor; //拦截器
private Delivery<T> mDelivery; //分发器
private List<JobHandlerListener<T>> mJobHandlerListeners = new ArrayList<>(); //处理监听
private Action<T> mAction; //执行动作回调,具体的处理逻辑,由上层定义通过此类call方法回调到这里执行
private volatile boolean mQuit = false;
public JobDispatcher(BlockingQueue<T> jobQueue, Interceptor<T> interceptor, Delivery<T> delivery, List<JobHandlerListener<T>> jobHandlerListeners) {
this.mJobQueue = jobQueue;
this.mInterceptor = interceptor;
this.mDelivery = delivery;
if (jobHandlerListeners != null) {
this.mJobHandlerListeners = jobHandlerListeners;
}
}
@Override
public void run() {
CLog.v("start new dispatcher");
Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
while (true) {
try {
T element = mJobQueue.take(); //获取任务
if (element == null) {
continue;
}
if (element.isCancel()) { //判断任务是否取消
mDelivery.deliveryCancel(mJobHandlerListeners,element); //分发到主线程调用取消监听
continue;
}
if (mInterceptor != null && mInterceptor.interceptCondition(element)) { //拦截器不为空且拦截条件为true,进行拦截
mInterceptor.onIntercept(element);
continue;
}
mDelivery.deliveryPrepare(mJobHandlerListeners,element); //分发到主线程调用准备监听
if (this.mAction == null) {
continue;
}
this.mAction.call(element); //执行动作
mDelivery.deliveryFinish(mJobHandlerListeners,element); //分发到主线程调用完成监听
} catch (InterruptedException e) {
if (mQuit) {
return;
}
continue;
}
}
}
public void setAction(Action<T> action) {
this.mAction = action;
}
public void quit() {
mQuit = true;
interrupt();
}
}

处理线程内部是个死循环,不断的从队列中获取任务,获取后进行一些列判定后执行相应动作,完成后分发到主线程回调

主线程分发器

下面看下分发器

1
2
3
4
5
6
7
public interface Delivery<T extends Job<T>> {
void deliveryPrepare(List<JobHandlerListener<T>> listeners, T job);
void deliveryCancel(List<JobHandlerListener<T>> listeners, T job);
void deliveryFinish(List<JobHandlerListener<T>> listeners, T job);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
public class DefaultDelivery<T extends Job<T>> implements Delivery<T> {
private final Executor mResponsePoster;
public DefaultDelivery(final Handler handler) {
mResponsePoster = new Executor() {
@Override
public void execute(Runnable command) {
handler.post(command);
}
};
}
public DefaultDelivery(Executor executor) {
mResponsePoster = executor;
}
@Override
public void deliveryPrepare(final List<JobHandlerListener<T>> listeners, final T job) {
mResponsePoster.execute(new Runnable() {
@Override
public void run() {
if (job.isCancel()) {
deliveryCancel(listeners,job);
return;
}
CLog.d("deliveryPrepare,current thread:%s", Thread.currentThread().getName());
for (int i = 0, size = listeners.size(); i < size; i++) {
listeners.get(i).onPrepare(job);
}
}
});
}
@Override
public void deliveryCancel(final List<JobHandlerListener<T>> listeners, final T job) {
mResponsePoster.execute(new Runnable() {
@Override
public void run() {
CLog.d("deliveryCancel,current thread:%s", Thread.currentThread().getName());
for (int i = 0, size = listeners.size(); i < size; i++) {
listeners.get(i).onCancel(job);
}
}
});
}
@Override
public void deliveryFinish(final List<JobHandlerListener<T>> listeners, final T job) {
if (job.isCancel()) {
deliveryCancel(listeners,job);
return;
}
mResponsePoster.execute(new Runnable() {
@Override
public void run() {
CLog.d("deliveryFinish,current thread:%s", Thread.currentThread().getName());
for (int i = 0, size = listeners.size(); i < size; i++) {
listeners.get(i).onFinish(job);
}
}
});
}
}

拦截器

1
2
3
4
public interface Interceptor<T extends Job<T>> {
boolean interceptCondition(T job); //拦截条件
void onIntercept(T job); //拦截后动作
}
1
2
3
4
5
6
7
8
9
10
11
public class DefaultInterceptor<T extends Job<T>> implements Interceptor<T> {
@Override
public boolean interceptCondition(T job) {
return false;
}
@Override
public void onIntercept(T job) {
}
}

默认我们不对拦截做什么动作,上层调用者需要拦截的时候可以传一个自定义的拦截器进行拦截操作

优先级

上面代码中出现的Job类其实是个抽象类的父类,调用者传入的任务必须继承Job

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public abstract class Job<T extends Job<T>> implements java.lang.Comparable<T> {
protected boolean isCancel;
protected boolean isComplete;
protected Integer mSequence;
protected Priority mPriority = Priority.NORMAL;
public Priority getPriority() {
return mPriority;
}
public Job setPriority(Priority priority) {
mPriority = priority;
return this;
}
public boolean isCancel() {
return isCancel;
}
public Job setCancel(boolean cancel) {
isCancel = cancel;
return this;
}
public boolean isComplete() {
return isComplete;
}
public Job setComplete(boolean complete) {
isComplete = complete;
return this;
}
public Integer getSequence() {
return mSequence;
}
public Job setSequence(Integer sequence) {
mSequence = sequence;
return this;
}
@Override
public int compareTo(T another) {
Priority left = this.getPriority();
Priority right = another.getPriority();
return left == right ? this.mSequence - another.mSequence : right.ordinal() - left.ordinal();
}
}

可以看到Job实现了Comparable接口,因为在JobQueue中的PriorityBlockingQueue<T> mJobQueue泛型的必须是Comparable的实现类,这样在
mJobQueue.take()的时候会根据优先级来取出任务,从而实现任务的优先级功能,具体的优先级判定逻辑,在Job类中实现的compareTo函数中

多队列配合切换

其实此功能依赖拦截器来实现,拦截器拦截后,再把任务抛给其他队列就好,看下具体的使用方法就知道了

先定义一个任务

1
2
3
4
5
6
7
8
9
10
11
12
13
public class TestJob extends Job<TestJob> {
private int mCount;
public int getCount() {
return mCount;
}
public TestJob setCount(int count) {
mCount = count;
return this;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
public class MainActivity extends AppCompatActivity {
private int mCount;
private JobHandler<TestJob> mCacheJobHandler;
private JobHandler<TestJob> mNetworkJobHandler;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
//创建缓存处理器,并指定为1个线程,同时设置自定义的拦截器
mCacheJobHandler = new JobHandler.Builder<TestJob>().threadPoolSize(1).interceptor(new CacheJobInterceptor()).build();
//创建网络请求处理器
mNetworkJobHandler = new JobHandler.Builder<TestJob>().build();
findViewById(R.id.btn).setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
TestJob job = new TestJob();
if (mCount == 7) {
job.setPriority(Priority.HIGH); //设置任务优先级
}
if (mCount == 8) {
job.setPriority(Priority.IMMEDIATE); //设置任务优先级
}
job.setCount(mCount);
mCacheJobHandler.enqueue(job)
.action(new Action<TestJob>() {
@Override
public void call(TestJob element) { //具体的读缓存处理逻辑
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
e.printStackTrace();
}
CLog.i("读缓存"+element.getCount());
}
})
.addListener(mCacheJobHandlerListener);
mCount++;
}
});
}
//自定义拦截器
public class CacheJobInterceptor implements net.robinx.queue.Interceptor<TestJob>{
@Override
public boolean interceptCondition(TestJob job) { //拦截条件,当count为5时模拟达成条件,返回ture进行拦截
if (job.getCount() == 5) {
return true;
}
return false;
}
@Override
public void onIntercept(TestJob job) { //缓存队列拦截后,进入网络请求队列
mNetworkJobHandler.enqueue(job)
.action(new Action<TestJob>() {
@Override
public void call(TestJob element) { //具体的网络请求处理逻辑
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
e.printStackTrace();
}
CLog.i("请求网络"+element.getCount());
}
})
.addListener(mNetworkJobHandlerListener);
}
}
//缓存任务处理进度监听
private JobHandlerListener<TestJob> mCacheJobHandlerListener = new JobHandlerListener<TestJob>() {
@Override
public void onPrepare(TestJob job) {
CLog.i("读缓存准备"+job.getCount());
}
@Override
public void onCancel(TestJob job) {
CLog.i("读缓存取消"+job.getCount());
}
@Override
public void onFinish(TestJob job) {
CLog.i("读缓存完成"+job.getCount());
}
};
//网络请求任务处理进度监听
private JobHandlerListener<TestJob> mNetworkJobHandlerListener = new JobHandlerListener<TestJob>() {
@Override
public void onPrepare(TestJob job) {
CLog.i("请求网络准备"+job.getCount());
}
@Override
public void onCancel(TestJob job) {
CLog.i("请求网络取消"+job.getCount());
}
@Override
public void onFinish(TestJob job) {
CLog.i("请求网络完成"+job.getCount());
}
};
}

上面通过封装的责任链模拟了Volley的核心功能,缓存与网络请求处理线程的切换


更新时间:2016.12.9

在以上代码基础上增加了,重复任务判定,当有重复任务时,后续任务进入等待队列,当前任务执行完后,释放队列中的重复任务并执行
完善了任务取消

代码见https://github.com/robinxdroid/JobQueue

转载请指明出处RobinBlog:http://robinx.net/2016/12/06/优雅的责任链模式/