RxJava学习笔记

RxJava的常用操作符

  • just(T...):将传入的参数依次发送出来,快捷创建创建事件队列的方法

    1
    Observable.just("hello","world”);

    等同于

    1
    2
    3
    4
    5
    6
    7
    8
    Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
    subscriber.onNext("hello");
    subscriber.onNext("world");
    subscriber.onCompleted();
    }
    })
  • from(T[]) / from(Iterable) : 将传入的数组或 Iterable 拆分成具体对象后,依次发送出

    1
    2
    String[] words = new String[]{"hello","world"};
    Observable.from(words);

    等同于

    1
    2
    3
    4
    5
    6
    7
    8
    9
    Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
    subscriber.onNext("hello");
    subscriber.onNext("world");
    subscriber.onCompleted();
    }
    })

Rxjava的知识点

RxJava的基本实现方式:

Observable即被观察者,它决定了什么时候触发事件以及触发怎样的事件。

Observer即观察者,它决定了事件触发的时候将有怎样的行为。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("hello");
subscriber.onNext("world");
subscriber.onCompleted();
}
})
.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
Log.d(TAG,"onCompleted")
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onNext(String s) {
Log.d(TAG,s);
}
});

除了Observer接口之外,RxJava还内置了一个实现了Observer的抽象类:Subscriber.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("hello");
subscriber.onNext("world");
subscriber.onCompleted();
}
})
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted");
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onNext(String s) {
Log.d(TAG, s);
}
});

两种使用方式是一样的。它们的区别对于使用者来说主要有两点:

  1. Subscriber新增加了onStart()方法,它会在subscribe刚开始事件还未发送之前被调用,可以用于做一些准备工作,例如数据的清零或者重置。这是一个可选方法,默认情况下它的实现为空。需要注意的是,如果对准备工作有线程要求(如弹出一个转菊花的ProgressBar,就必须在主线程执行),onStart()方法就不适用了,因为它总是在subscribe所发生的线程被调用,而不能指定线程。需要在指定线程来做准备工作,可以使用doOnSubscribe()方法。
  2. Subscriber实现了另一个方法unsubscribe(),这个方法被调用后,Subscriber将不再接收事件。一般这个方法调用前,可以使用isUnsubscribed()先判断一下状态。unsubscribe()主要用于解除引用关系,以避免内存泄露的发生。
  • Action

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    Action1<String> onNextAction = new Action1<String>() {
    @Override
    public void call(String s) {
    // next()
    Log.d(TAG,s);
    }
    };
    Action1<Throwable> onErrorAction = new Action1<Throwable>() {
    @Override
    public void call(Throwable throwable) {
    // ERROR
    }
    };
    Action0 onCompletedAction = new Action0() {
    @Override
    public void call() {
    // complete
    Log.d(TAG,"onCompleted");
    }
    };
    // 自动创建Subscriber,并使用onNextAction来定义onNext();
    observable.subscribe(onNextAction);
    // 自动创建Subscriber,并使用onNextAction onErrorAction 来定义onNext() onError()
    observable.subscribe(onNextAction,onErrorAction);
    // 自动创建Subscriber,并使用onNextAction onErrorAction onCompletedAction来定义onNext() onError() onCompleted()
    observable.subscribe(onNextAction,onErrorAction,onCompletedAction);

    Action0是RxJava的一个接口,它只有一个方法call(),这个方法是无参数无返回值的;由于onCompleted()方法也是无参数无返回值得,因此Action0可以被当成一个包装对象,将onCompleted()的内容打包起来将自己作为一个参数传入subscribe()以实现不完整定义的回调。Action1也是一个接口,它同样只有一个方法call(T t),这个方法也无返回值,但是有一个参数;与Action0同理,由于onNext(T t)onError(Throwable error)也是单参数无返回值的,因此Action1可以将onNext(t)onError(error)打包。 RxJava提供了多个ActionX形式的接口,他们可以用以包装不同的无返回值的方法。ActionX的方法是无返回值的

    例如,将字符串数组names中的所有字符串依次打印出来:

    1
    2
    3
    4
    5
    6
    7
    8
    String[] names = new String[]{"John","Jim","Tom","Alexander"};
    Observable.from(names)
    .subscribe(new Action1<String>() {
    @Override
    public void call(String s) {
    Log.d(TAG,s);
    }
    });
  • 线程控制 —— Scheduler

    在不指定线程的情况下,RxJava遵循的是线程不变的原则,即:在哪个线程调用subscribe(),就在哪个线程产生事件;在哪个线程产生事件就在哪个线程消费事件。如果需要切换线程,就需要用到Scheduler(调度器)。RxJava通过它来指定每一段代码应该运行在什么样的线程。RxJava已经内置了几个Scheduler:

    • Schedulers.immediate():直接在当前线程中运行,相当于不指定线程。这是默认的Scheduler。
    • Schedulers.newThread():总是启用新线程,并在新线程执行操作。
    • Schedulers.io():I/O操作(读写文件,读写数据库,联网等)所使用的Scheduler。行为模式跟newThread()的差不多,区别在于io()的内部实现是一个无数量上线的线程池,可以重用空闲的线程,因此多数情况下io()比newThread()更有效率。不要把计算工作放在io()中,可以避免创建不必要的线程。
    • Schedulers.computation():计算所使用的Scheduler。这个计算指的是cpu密集型计算,即不会被I/O等操作限制性能的操作,例如图形的计算。这个Scheduler使用的是固定的线程池,大小为CPU核心数。不要把I/O操作放在computation()中,否则I/O操作的等待时间会浪费cpu。
    • 另外,Android还有一个专用的AndroidSchedulers.mainThread(),它制定的操作将在Android主线程运行。

    有了这几个Scheduler,就可以使用subscribeOn()observeOn()两个方法来对线程进行控制。subscribeOn()指定subscribe()所发生的线程,即Observable.OnSubscribe被激活时所处的线程,或者叫做事件产生的线程。observeOn()指定Subscriber所运行在的线程。或者叫做事件的消费的线程。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    String[] names = new String[]{"John","Jim","Tom","Alexander"};
    Observable.from(names)
    // 指定names发生的线程在io()线程,被创建的事件的内容names会在io线程发出
    .subscribeOn(Schedulers.io())
    // 指定Subscriber的回调发生在主线程,故打印将发生在主线程。
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Action1<String>() {
    @Override
    public void call(String s) {
    Log.d(TAG,s);
    }
    });

    事实上,这种在subscribe()之前写上两句subscribeOn(Scheduler.io())observeOn(AndroidSchedulers.mainThread())的使用方式非常常见,它适用于多数的“后台线程取数据,主线程显示”的程序策略。

  • 变换

    所谓变化,就是将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或事件序列。

    observeOn()指定的是它之后的操作所在的线程。因此如果有多次切换线程的需求,只要在每个想要切换线程的位置调用一次observeOn()即可。不过,不同于observeOn(),subscribeOn()的位置放在哪里都可以,但它是只能调用一次的。当使用了多个subscribeOn()的时候,只有第一个subscribeOn()起作用。

    然而,虽然超过一个的subscribeOn()对事件处理的流程没有影响,但在流程之前确是可以利用的。那就是与Subscriber.onStart()方法相对应的Observable.doOnSubscribe()方法,它和Subscriber.onStart()同样是在subscribe()调用后且在事件发送前执行,但区别在于它可以指定线程。默认情况下,doOnSubscribe()执行在subscribe()发生的线程。如果在doOnSubscribe()之后有subscribeOn()方法,则它将执行离他最近的subScribeOn()所指定的线程。如下,在doOnSubscribe()后面跟一个subscribeOn(),就能指定准备工作的线程了。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    Observable.just("hello","world")
    .subscribeOn(Schedulers.io())
    .doOnSubscribe(new Action0() {
    @Override
    public void call() {
    // 需要在主线程执行
    showProgressBar();
    }
    })
    // 指定主线程
    .subscribeOn(AndroidSchedulers.mainThread())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Action1<String>() {
    @Override
    public void call(String s) {
    Log.d(TAG,s);
    }
    });
  • Func

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    Observable.just("images/sky.png")
    .map(new Func1<String, Bitmap>() {
    @Override
    public Bitmap call(String s) {
    return getBitmapFromFile(s);
    }
    })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Action1<Bitmap>() {
    @Override
    public void call(Bitmap bitmap) {
    iv.setImageBitmap(bitmap);
    }
    });

    这里出现了一个叫做Func1的类。它和Action1非常相似,也是RxJava的一个接口,用于包装含有一个参数的方法。Func跟Action的区别在于,Func包装的是有返回值的方法。另外,和ActionX一样,FuncX也有多个,用于不同参数个数的方法。

    • map():事件对象的直接变换。它是RxJava最常用的变换。
    • flatMap():它也是把传入的参数转化之后返回另一个对象。但是跟map()不同的是,flatMap()返回的是个Observable对象,并且这个Observable对象并不是被直接发送到了Subscriber的回调方法中。
    • throttleFirst():在每次事件触发后的一定时间间隔内丢弃新的事件。常用作去抖动过滤,例如按钮的点击监听事件。

———————————♥︎举个栗子♥︎————————————

假如我现在需要联网加载百度的页面,然后将返回的内容显示在一个TextView上,就可以这么写:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
public class MainActivity extends AppCompatActivity {
private TextView tvFirst;
private TextView tvResult;
private ProgressDialog dialog;
private static final String TAG = "zhangym";
private Observable<String> mObservable;
private Subscriber<String> mSubscriber;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
tvFirst = (TextView) findViewById(R.id.tv_fist);
tvResult = (TextView) findViewById(R.id.tv_result);
tvFirst.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View view) {
showResponseResult();
}
});
}
private void showResponseResult() {
// 创建被观察者
mObservable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
HttpURLConnection connection = null;
try {
URL url = new URL("https://www.baidu.com/");
connection = (HttpURLConnection) url.openConnection();
connection.setRequestMethod("GET");
connection.setConnectTimeout(8000);
connection.setReadTimeout(8000);
InputStream is = connection.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
StringBuilder sb = new StringBuilder();
String line = null;
while ((line = reader.readLine()) != null) {
sb.append(line);
}
// 发送事件
subscriber.onNext(sb.toString());
subscriber.onCompleted();
} catch (Exception e) {
e.printStackTrace();
}
}
});
// 创建观察者
mSubscriber = new Subscriber<String>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted");
dialog.dismiss();
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onNext(String s) {
tvResult.setText(s);
}
};
// 指定联网操作在io线程
mObservable.subscribeOn(Schedulers.io())
// 事件发送前的准备工作
.doOnSubscribe(new Action0() {
@Override
public void call() {
dialog = new ProgressDialog(MainActivity.this, ProgressDialog.THEME_DEVICE_DEFAULT_LIGHT);
dialog.setProgressStyle(ProgressDialog.STYLE_SPINNER);
dialog.setIndeterminate(true);
dialog.show();
}
})
// 指定doOnSubscribe()的线程为UI线程
.subscribeOn(AndroidSchedulers.mainThread())
// 指定Subscriber的线程为UI线程,即在UI线程显示结果
.observeOn(AndroidSchedulers.mainThread())
// 订阅事件
.subscribe(mSubscriber);
}
@Override
protected void onDestroy() {
super.onDestroy();
// 取消订阅,防止内存泄露
if (!mSubscriber.isUnsubscribed()){
mSubscriber.unsubscribe();
}
}
}

运行效果:

由于网速较快,所以ProgressDialog一闪而过,gif图上面看不出来。

如果觉得本文对你有帮助,请支持我!