前言
随着移动互联网的发展, 程序复杂度越来越高, 很多的PC端和Web端的开发技术被引入到移动端, ReactiveX并不是什么新技术, 但在移动开发上的使用却是近来兴起的, 就像热补丁技术一样.
本文旨在分析ReactiveX的长处, 并分析ReactiveX可以解决哪些移动开发中遇到的问题.
什么是ReactiveX
Reactive Extensions 原始描述: The Reactive Extensions (Rx) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators. Using Rx, developers represent asynchronous data streams with Observables, query asynchronous data streams using LINQ operators, and parameterize the concurrency in the asynchronous data streams using Schedulers. Simply put, Rx = Observables + LINQ + Schedulers.
ReactiveX基础知识
1. Observable和Observer
在Rx里, 两个最主要的角色就是Observable和Observer, 跟Java的观察者模式非常类似, 虽然会有细微的区别, 但作为初学的话, 将他们当成一个东西更方便理解.
Observable代表一个事件源(也可以叫被观察者), 可以被订阅.
Observer代表一个订阅者(也可以叫观察者), 订阅Observable, 获取数据.
Observable是一个类, 有个subscribe方法, 接收一个Observer类型的参数, 用于订阅事件.
Observer是一个接口(在iOS里是protocol, 一个意思), 有个onEvent接口, 当Observable发出事件时被调用, 使用者实现此接口来处理事件.
示例: �事件源和订阅者
Java
//从数组创建一个Observable
Integer[] arr = {1, 2, 3};
Observable<Integer> observable = Observable.from(arr);
//订阅前面创建在Observable
observable.subscribe(new Subscriber<Integer>() {
//Subscriber是抽象类, 继承了Observer接口, 但是未实现其中的方法
@Override
public void onNext(Integer item) {
//处理接收到的事件
System.out.println(item);
}
@Override public void onCompleted() { }
@Override public void onError(Throwable e) { }
});
Swift:
//从数组创建一个Observable
let observable = [1, 2, 3].toObservable()
//订阅前面创建在Observable
observable.subscribe(onNext: { item in
print(item)
}, onError: { error in
print(error)
}, onCompleted: {
//Observable is complete
})
在Rx里, 几乎所有的方法都返回一个Observable, 在RxJava和RxSwift里, Observable都是泛型. Observable发出有三种事件: 一种是Next事件, 可能有一个或多个; 事件的数据类型就是绑定的泛型, 一种是Error事件, 最多1个该事件, 表示发生了错误, 在RxJava里是Throwable, 在RxSwift里是ErrorType; 还有一种是Complete事件, 最多1个该事件, 表示该Observable完成了. Error事件和Complete事件是二选一, 二者有且只有一个.
如果只关心Next事件, 有重载方法可以单独订阅Next事件.
示例: 只订阅Next事件
Java
Integer[] arr = {1, 2, 3};
Observable<Integer> observable = Observable.from(arr);
observable.subscribe(new Action1<Integer>() {
@Override
public void call(Integer item) {
System.out.println(item);
}
});
Swift:
let observable = [1, 2, 3].toObservable()
observable.subscribe(onNext: { item in
print(item)
})
2. 几个常用的操作符
在Rx里, 事件流经常需要转换和综合, 比如转换类型(将整形转成字符串型), 又比如合并两个事件源成一个. Rx里有很多方法和函数用操作事件流, 这些方法叫做操作符.
所有的操作符都是将原Observable转换成另一个新的Observable, 每个操作符方法的返回值都是Observable类型, 但是新的Observable绑定的泛型可能不同于源Observable的泛型, 例如源Observable的具体类型可能是Observab<Integer>, 转换后的具体类型可能是Observable<String>.
为了方便描述, Observable发生Next事件时, 我们就说Observable发射(emit)了一个元素(item).
map
map操作符可以将Observable里元素转换成另一种元素. 例如, 我们可以将刚刚创建数字一一映射成字符串.
示例: 将数字一一映射成字符串
Java:
Integer[] arr = {1, 2, 3};
Observable.from(arr);
.map(new Func1<Integer, String>() {
@Override
public String call(Integer item) {
return "number is: " + item; //将每个元素映射成一句话
}
})
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer item) {
System.out.println(item); // will print: the number is x
}
});
Swift:
[1, 2, 3].toObservable()
map { item in
"number is \(item)" //将每个元素映射车一句话
}
.subscribe(onNext: { item in
print(item) // will print: the number is x
})
flatMap
flatMap操作符稍微复杂一点, 为了便于理解, 我们将其逻辑拆成两步:
第一步相当于map, 但是每个元素都被映射成了一个子Observable, 如果没有第二步, 那么每个Next事件都会是一个子Observable, 订阅者将会收到很多个子Observable.
第二步就是让订阅者收到的不是子Observable, 而是子Observable里的元素, 是由于flatMap创建了一个外层Observable代替其内部的子Observable发射其元素, 这个操作我们称为拉平, 这个外层Observable最终将会返回给调用者.
示例: 将每个数字item转换成能发射对应个数元素的Observable
Java:
Integer[] arr = {1, 2, 3};
Observable.from(arr);
.flatMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer item) {
//调用另外方法创建
return createRangeObservable(item);
}
});
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer item) {
System.out.println(item+",");
}
});
...
// 创建一个Observable, 会发射出number个元素, 从0开始
private Observable<Integer> createRangeObservable(Integer number) {
List<Integer> list = new ArrayList<>();
for (int i = 0; i < number; ++i) {
list.add(i);
}
return Observable.from(list);
}
Swift:
[1, 2, 3].toObservable()
.flatMap { item in
从Range创建Observable
(0..<item).toObservable()
}
.subscribe(onNext: { item in
print(item, terminator: ",")
})
运行结果:
0,0,1,0,1,2,
merge
merge很简单, 就是将多个Observable合成一个, 要求这些Observable的泛型相同
示例: 合并多个Observable
Java:
Integer[] arr1 = {1, 2, 3};
Observable<Integer> observable1 = Observable.from(arr1);
Integer[] arr2 = {4, 5, 6};
Observable<Integer> observable2 = Observable.from(arr2);
Integer[] arr3 = {7, 8, 9};
Observable<Integer> observable3 = Observable.from(arr3);
Observable.merge(observable1, observable2, observable3)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer item) {
System.out.println(item+",");
}
});
Swift:
let observable1 = [1, 2, 3].toObservable()
let observable2 = [4, 5, 6].toObservable()
let observable3 = [7, 8, 9].toObservable()
Observable.of(observable1, observable2, observable3).merge()
.subscribe(onNext: { item in
print(item)
})
运行结果:
1,2,3,4,5,6,7,8,9,
使用ReactiveX解决几个实际问题
1. 优雅的解决 "回调地狱"
在实际项目开发中, 我们应该经常遇到互相依赖的异步操作, 即A操作完成后要执行B操作, B操作完成后又要执行C操作, 但是ABC操作都是异步的, 这个时候就会出现内嵌很多层的回调, 就是著名的 "回调地狱" 问题, 最典型的就是我们经常遇到一个列表的数据要多条协议才能拉回来的问题, 使用Rx可以很优雅的解决这类问题.
示例: 简化多个数据合并, 避免回调地狱.
不用Rx是这样写的:
Java:
private void updateDataList() {
DataAccess.requestData1(myUserId, new CallBack1(){
@Override
public void onData(final Struct1[] list1) {
DataAccess.requestData2(list1, new CallBack2(){
@Override
public void onData(final Struct2[] list2) {
DataAccess.requestData3(list2, new CallBack3(){
@Override
public void onData(final Struct3[] list3) {
updateUI(list3)
}
})
}
})
}
})
}
Swift:
private func updateDataList() {
DataAccess.requestData1(myUserId) { list1 in
DataAccess.requestData2(list1) { list2 in
DataAccess.requestData3(list2) { [weak self] list3 in
self?.updateUI(list3)
}
}
}
}
用了Rx之后是这样写的:
Java:
private void updateDataList() {
RxDataAccess.requestData1(myUserId)
.flatMap(new Func1<List<Struct1>, Observable<List<Struct2>>>() {
@Override
public Observable<List<Struct2>> call(final List<Struct1> list) {
return RxDataAccess.requestData2(list)
}
})
.flatMap(new Func1<List<Struct2>, Observable<List<Struct3>>>() {
@Override
public Observable<List<Struct3]> call(final List<Struct2> list) {
return RxDataAccess.requestData3(list)
}
})
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer item) {
updateUI(list)
}
})
}
Swift:
private func updateDataList() {
RxDataAcess.requestData1(myUserId)
.flatMap { list in
RxDataAcess.requestData2(list)
}
.flatMap { list in
RxDataAcess.requestData3(list)
}
.subscribe(onNext: { [weak self] list in
self?.updateUI(list)
})
}
2. 快速实现简单需求
示例: 即时搜索天气
WeatherExampeImage
功能说明: 在文本框输入城市名字, 一边输入一边搜索, 输完立马显示该城市的天气, (为了简单这里先不做关键字联想的逻辑, 要加也是很简单的事). 这里使用了openweathermap作为数据来源.
Java:
searchText //searchText是输入框里变化的文本, 类型为Observable<String>
//debounce这里作用是用户停手0.3秒后才启动搜索
.debounce(300, TimeUnit.MILLISECONDS)
//distinctUntilChanged这里的作用是当文本内容发生变化时才启动搜索
.distinctUntilChanged()
.flatMap(new Func1<String, Observable<Weather>>() {
@Override
public Observable<Weather> call(String searchString) {
//weatherService是一个封装了拉取天气数据的类, 返回Observable<Weather>是搜索结果, Weather是一个数据类
return weatherService.search(searchString);
}
})
.subscribe(new Action1<Weather>() {
@Override
public void call(Weather weather) {
//从Weather数据类里提取字段
String cityName = weather.cityName //城市名称
float temp = weather.currentWeather.temp //温度
String description = weather.currentWeather.description //天气描述
String weatherImageUrl = "http://api.openweathermap.org/img/w/" + weather.currentWeather.imageID + ".png" //天气图标
String backgroundImageUrl = "http://api.openweathermap.org/data/2.5/forecast/" + weather.currentWeather.imageID + ".png" //背景图片
//更新界面
updateUI(cityName, temp, description, weatherImageUrl, backgroundImageUrl)
}
});
Swift:
searchText //searchText是输入框里变化的文本, 类型为Observable<String>
//debounce这里作用是用户停手0.3秒后才启动搜索
.debounce(0.3, scheduler: MainScheduler.instance)
//distinctUntilChanged这里的作用是当文本内容发生变化时才启动搜索
.distinctUntilChanged()
.flatMap { searchString -> Observable<Weather> in
//weatherService是一个封装了拉取天气数据的类, 返回Observable<Weather>是搜索结果, Weather是一个数据类
return weatherService.search(withCity: searchString)
}
//订阅搜索结果
.subscribe(onNext: { [weak self] weather in
//从Weather数据类里提取字段
let cityName = weather.cityName //城市名称
let temp = weather.currentWeather.temp //温度
let description = weather.currentWeather.description //天气描述
let weatherImageUrl = "http://api.openweathermap.org/img/w/" + weather.currentWeather.imageID + ".png" //天气图标
let backgroundImageUrl = "http://api.openweathermap.org/data/2.5/forecast/" + weather.currentWeather.imageID + ".png" //背景图片
//更新界面
self?.updateUI(cityName, temp, description, weatherImageUrl, backgroundImageUrl)
})
3. 轻松拆分综合异步事件流
Rx不仅可以合并多路异步事件流, 还可以拆分一路异步事件流为多路, 多路并行发展之后又可以重新综合起来, 这里举一个更接近实际项目的稍复杂一点的例子.
示例: 拆分过后再综合异步事件流
功能说明: 这个需求首先要从服务器拉去一个符号特定条件的商品的id列表, 比如所有打折的商品, 之后再用这个id列表去服务器拉这些商品的详情, 不过有一个限制, 这些商品详情服务器可能不支持一次性拉这么多, 有一个最大条数限制(比如6个), 可能是为了防止包过大, 但是由于产品需要, 客户端要求全部拉回这些商品详情才能展示.
思路: 我们的做法是将拉回来的列表先拆成多组, 最多6个一组, 然后按组去服务器拉详情, 所有的组拉回来之后, 我们在对这些结果综合重组.
Java:
GoodsManager.requestGoodsId
.buffer(6)
.flatMap(new Func1<List<Integer>, Observable<List<GoodsInfo>>>() {
@Override
public Observable<List<GoodsInfo>> call(List<Integer> splitIds) {
return GoodsManager.requestGoodsDetail();
}
})
.reduce(new ArrayList<GoodsInfo>(), new Func2<List<GoodsInfo>, List<GoodsInfo>, List<GoodsInfo>>() {
@Override
public List<GoodsInfo> call(List<GoodsInfo> result, List<GoodsInfo> item) {
result.addAll(item);
return result;
}
})
.subscribe(new Action1<List<GoodsInfo>>() {
@Override
public void call(List<GoodsInfo> list) {
updateUI(list)
}
});
Swift:
let pageSize = 6
GoodsManager.requestGoodsId(type: Discount)
.buffer(timeSpan: 0, count: pageSize, scheduler: MainScheduler.instance)
.flatMap { splitIds in
GoodsManager.requestGoodsDetail(ids: splitIds)
}
.reduce([]) { (result, item) -> [GoodsInfo] in
result + item
}
.subscribe(onNext: { [weak self] goodsInfo in
self?.updateUI(goodsInfo)
})
用Rx这么简短就解决了, 假如不用Rx你会怎么做? 想想就知道很麻烦.
总结:
以上只是随便列举了几个用Rx解决的实际, Rx能解决的实际问题远不止于此, 希望大家多多尝试, 如果有更好的案例, 欢迎分享给我, 谢谢大家 !
下面我们回顾总结一下Rx的特点.
Rx的长处是处理异步事件流, 虽然其接口简单, 但较之于普通编程, 在思维上却是一次极大的跳跃, 在Rx里, 原有的程序逻辑有了一套全新的编程方式.
- 所有变化都抽象成事件, 例如从回来的数据网络, 一次按钮点击, 输入框文字发生变化, 或一个系统广播.
- 数据由拉变为推, 普遍编程的数据一般是从函数返回的, 相当于调用者主动拉回来的, 而在ReactiveX里, 调用函数时不会直接得到数据, 得到的是一个信号源, 调用者订阅这个信号源, 被动的等待数据推过来, 在ReactiveX里, 所有的程序逻辑都是这样的.
- 没有回调, 统一使用Observable, 在Rx里, 所有的方法和函数都返回一个Observable, 调用者订阅这个Observable就可以得到操作结果.
- 统一的错误处理方法, 不在需要定义自己的错误处理回调, 有一个统一的onError方法, 所有错误通过onError传回, 包括抛出的异常.
后语:
学Rx有一段时间了, 此文一方面作为学习的总结整理, 另一方面作为知识分享, 希望对大家的日常开发有所帮助. 由于作者水平有限, 文章的可能存在瑕疵和纰漏, 欢迎大家批评指正.