本篇文章将概述 「Android」 响应式编程 「RxJava」,会从设计模式、使用到原理结合案例,由浅到深、由表到里、循序渐进的概述。
本篇文章的示例代码放在 Github 上。
观察者模式(Observer pattern)
在使用 「RxJava」 之前,我们需要先理解观察者设计模式,因为,「RxJava」 就使用了观察者设计模式。
定义
观察者模式(又被称为发布-订阅 Publish/Subscribe 模式),属于行为型模式的一种,它定义了一对多的依赖关系,让多个观察者对象同时监听某一个被观察者对象,被观察者对象在状态变化时,会通知所有的观察者对象,使他们能够自动更新。
结构
观察者模式结构,如图:
一般我们写一个观察者模式都需要定义如下角色:
Observable(被观察者):一般为抽象类,用于保存观察者对象和新增、删除、通知观察者的方法。 ConcreteObservable(具体的被观察者):继承 Observable 类,实现 notifyObservers()
方法,当被观察者发生变化时,通知所有的观察者。Observer(观察者):一般为接口,包含 update()
方法,当收到具体的被观察者通知时被调用。ConcreteObserver(具体的观察者):实现 Observer 接口,重写 update()
方法,以便更新自身状态。
案例
接下来,我们来实现一个简单的观察者模式的案例,案例情景,如下:
❝小明的水果店里的 🍊 非常甜,所以很快就卖光了,但是,接二连三的有顾客过来买 🍊,于是,小明告诉顾客:🍊已经卖完了,你们可以扫下这个公众号,订阅之后,等 🍊有货了会自动通知你们!
❞
这种场景我们就可以使用观察者模式,我们先创建一个 Observer
接口(观察者),如下:
public interface Observer {
void update();
}
再创建一个 Customer
客户类(具体的观察者),如下:
// 实现 Observer 接口
public class Customer implements Observer {
private String name;
public Customer(String name) {
this.name = name;
}
// 重新 Observer 的 update() 方法
@Override
public void update() {
System.out.println(name + “ 购买了 🍊”);
}
}
之后创建一个 Observable
抽象类(被观察者),如下:
public abstract class Observable {
// 观察者
protected List<Observer> observers = new ArrayList<>();
// 新增观察者
public void add(Observer observer) {
observers.add(observer);
}
// 移除观察者
public void remove(Observer observer) {
observers.remove(observer);
}
// 通知观察者
public abstract void notifyObservers();
}
最后创建一个 FruitStore
水果店类(具体的被观察者),如下:
// 继承 Observable 类
public class FruitStore extends Observable {
// 重写 notifyObservers() 通知所有的观察者
@Override
public void notifyObservers() {
for (Observer observer: observers) {
observer.update();
}
}
public void run() {
this.notifyObservers();
}
}
我们再创建一个 Client
进行测试,如下:
public class Client {
public static void main(String[] args) {
// 创建一个水果店实例(被观察者)
FruitStore fruitStore = new FruitStore();
fruitStore.add(new Customer(“lsy”)); // 新增观察者
fruitStore.add(new Customer(“per”)); // 新增观察者
fruitStore.add(new Customer(“zimu”)); // 新增观察者
fruitStore.run(); // 通知所有观察者
}
}
运行结果如下:
lsy 购买了 🍊
per 购买了 🍊
zimu 购买了 🍊
Process finished with exit code 0
可见,使用观察者模式可以降低观察者和被观察者之间的耦合性,可以建立一套触发机制;当然,Java JDK 已经提供了 Observer
、Observable
,我们使用它们同样可以实现功能。
RxJava的使用
通过上文的对观察者模式的理解之后,再来看看 「RxJava」 是如何使用的,它同样有如下几个角色,如下:
「Observable」 :被观察者,也就是事件的发生者
「Observer」 :观察者,也就是事件的接受者
「subscribe」 :两者产生订阅关系
具体的使用如下:
public class UseRxJava {
public static void main(String[] args) {
// 创建 Observable 被观察者
Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {
emitter.onNext(“🍊 到货了!”);
emitter.onNext(“大家可以来买 🍊 了!”);
emitter.onError(new Throwable(“🍊 又卖完了!”));
emitter.onNext(“WOW!🍊 卖光了”);
emitter.onComplete();
emitter.onComplete();
emitter.onNext(“🍊 加急进货中…”);
}
});
// 创建 Observer 观察者
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
System.out.println(“onSubscribe:” + d.isDisposed());
}
@Override
public void onNext(@NonNull String s) {
System.out.println(“onNext:” + s);
}
@Override
public void onError(@NonNull Throwable e) {
System.out.println(“onError:” + e.getMessage());
}
@Override
public void onComplete() {
System.out.println(“onComplete”);
}
};
// 关联订阅关系
observable.subscribe(observer);
}
}
运行结果,如下:
onSubscribe:false
onNext:🍊 到货了!
onNext:大家可以来买 🍊 了!
onError:🍊 又卖完了!
BUILD SUCCESSFUL in 328ms
通过简单的使用案例和运行结果,可知:
「onNext()」: 可以多次发送事件
「onComplete()」: 可以多次调用不会报错,但只执行一次
「onError()」: 只能发送一次,多次调用会报错,不可和 onComplete()
共存
「onComplete()」: 和 「onError()」 都存在时,只执行 onError()
「onComplete()」: 和 「onError()」 之后,观察者无法接收到发送事件
「onSubscribe()」: 是在订阅之后,发送事件之前执行
RxJava编程思想
「RxJava」 是在 Java 上的响应式扩展,通过使用可观察序列,用于组成异步和基于事件编程的类库,也就是以响应式编程思维来进行编程的Java类库。
响应式编程是面向数据流的编程思想,在响应式编程思想下,一切皆数据流。响应式编程所侧重的是数据流的流动。
响应式编程案例
我们在理解响应式编程之前,先来使用我们传统的思想实现一个加载网络图片的案例:
public class MainActivity extends AppCompatActivity {
private static final String TAG = “MainActivity”;
private ImageView imageView;
private final static String URL = “https://cdn.lishaoy.net/serializable/serializable.png";
private final Handler handler = new Handler(new Handler.Callback() {
@Override
public boolean handleMessage(@NonNull Message msg) {
Log.i(TAG, “handleMessage: aaa”);
Bitmap bitmap = (Bitmap) msg.obj;
imageView.setImageBitmap(bitmap);
if (loading != null) loading.dismiss();
return false;
}
});
private ProgressDialog loading;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
imageView = findViewById(R.id.image_view);
}
public void loadImage(View view) {
loading = ProgressDialog.show(this, “”, “loading”);
new Thread(new Runnable() {
@Override
public void run() {
try {
URL url = new URL(URL);
HttpURLConnection urlConnection = (HttpURLConnection) url.openConnection();
urlConnection.setConnectTimeout(6000);
int responseCode = urlConnection.getResponseCode();
if(responseCode == HttpURLConnection.HTTP_OK) {
InputStream inputStream = urlConnection.getInputStream();
Bitmap bitmap = BitmapFactory.decodeStream(inputStream);
Message message = handler.obtainMessage();
message.obj = bitmap;
handler.sendMessage(message);
}
} catch (MalformedURLException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}).start();
}
}
如上,我们可能会用 new Thread()
和 Handler
来实现或者其他方法,每个人的实现方法可能都不同。如果,使用 「RxJava」 思想实现呢,必然实现方式要按照响应式编程数据流的编程思想,实现方式也就一致了。下面,我们再用 「RxJava」 来实现它,如下:
public class MainActivity extends AppCompatActivity {
private static final String TAG = “MainActivity”;
private ImageView imageView;
private final static String URL = “https://cdn.lishaoy.net/image/112131.jpg";
private ProgressDialog loading;
private Disposable disposable;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
imageView = findViewById(R.id.image_view);
}
public void rxJavaLoadImage(View view) {
// Observable.just(URL) 创建被观察者
Observable.just(URL)
.map(new Function<String, Bitmap>() {
@Override
public Bitmap apply(String s) throws IOException {
URL url = new URL(URL);
HttpURLConnection urlConnection = (HttpURLConnection) url.openConnection();
urlConnection.setConnectTimeout(6000);
int responseCode = urlConnection.getResponseCode();
if (responseCode == HttpURLConnection.HTTP_OK) {
InputStream inputStream = urlConnection.getInputStream();
Bitmap bitmap = BitmapFactory.decodeStream(inputStream);
return bitmap;
}
return null;
}
})
.subscribeOn(Schedulers.io()) // 上面的代码分配工作线程
.observeOn(AndroidSchedulers.mainThread()) // 下面的代码分别UI线程
// 链式调用 subscribe 绑定观察者
.subscribe(new Observer<Bitmap>() {
// onSubscribe() 方法在发送事件之前执行
@Override
public void onSubscribe(Disposable d) {
loading = ProgressDialog.show(MainActivity.this, “”, “loading”);
disposable = d;
}
// onNext() 在发送事件之后执行
@Override
public void onNext(Bitmap bitmap) {
imageView.setImageBitmap(bitmap);
}
@Override
public void onError(Throwable e) {
Log.e(TAG, “onError: ” + e.getMessage(), e);
}
@Override
public void onComplete() {
if (loading != null) loading.dismiss();
}
});
}
@Override
protected void onDestroy() {
super.onDestroy();
disposable.dispose();
}
}
运行结果如图:
「RxJava」 以链式调用,Observable.just(URL)
分发事件,map()
来加工数据流(可以使用多个map处理不同的业务)、subscribeOn(Schedulers.io())
切换工作线程、observeOn(AndroidSchedulers.mainThread())
切换主线程,数据流流向 subscribe
进行处理(下游的处理根据上游的数据流变化而变化)。
RxJava结合Retrofit使用(案例)
通过 「RxJava」 使用,现在我们已经了解它的流式的响应式编程的思想,下面我们通过和 Retrofit
结合使用,看看是什么体验?
我们使用 玩Android 开放API 来完成本次案例。首先我们使用 Retrofit
定义一个 Api
的接口,如下:
public interface Api {
// 获取项目分类数据
@GET(“project/tree/json”)
Observable<ProjectBean> getProject();
// 获取项目列表数据(项目列表数据依赖于项目分类数据的id)
@GET(“project/list/{pageIndex}/json”)
Observable<ProjectItem> getProjectItem(@Path(“pageIndex”) int pageIndex, @Query(“cid”) int cid);
}
ProjectBean
、ProjectItem
JavaBean 我们根据接口返回的数据利用工具 GsonFormat
生成。
然后,新建一个 HttpClient
生成 Retrofit
,如下:
public class HttpClient {
// api 的 base url
public static String BASE_URL = “https://www.wanandroid.com/";
public static void setBaseUrl(String baseUrl) {
BASE_URL = baseUrl;
}
// 创建 Retrofit
public static Retrofit getRetrofit() {
// 创建 OkHttp 客户端
OkHttpClient.Builder builder = new OkHttpClient.Builder();
// 配置参数
OkHttpClient httpClient = builder.addNetworkInterceptor(new StethoInterceptor())
.readTimeout(6666, TimeUnit.SECONDS)
.connectTimeout(6666, TimeUnit.SECONDS)
.writeTimeout(6666, TimeUnit.SECONDS)
.build();
return new Retrofit.Builder().baseUrl(BASE_URL)
.client(httpClient) // 使用 OkHttp 访问网络
.addConverterFactory(GsonConverterFactory.create(new Gson())) // 设置 json 解析工具
.addCallAdapterFactory(RxJava2CallAdapterFactory.create()) // 设置 rxjava
.build();
}
}
再新建一个 RetrofitActivity
来演示,如下:
public class RetrofitActivity extends AppCompatActivity {
private static final String TAG = “RetrofitActivity”;
private Api api;
private TextView textView;
private String itemData;
private Disposable disposable;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_retrofit);
textView = findViewById(R.id.text_view);
api = HttpClient.getRetrofit().create(Api.class); // 生成 api
}
// 查询项目分类数据
@SuppressLint(“CheckResult”)
public void getProject(View view) {
disposable = api.getProject() // 查询项目分类数据(返回的是 Observable 被观察者)
.subscribeOn(Schedulers.io()) // 给上面的代码分配工作线程
.observeOn(AndroidSchedulers.mainThread()) // 给下面的代码分配主线程
.subscribe(new Consumer<ProjectBean>() { // 订阅并创建观察者
@Override
public void accept(ProjectBean projectBean) throws Exception {
textView.setText(projectBean.toString()); // 进行 UI 操作
}
});
}
@Override
protected void onDestroy() {
super.onDestroy();
disposable.dispose();
}
}
运行结果如图:
可见 Retrofit
和 RxJava
结合使用,代码量减少的同时,整个流程思路更为清晰,且可以为所欲为的切换线程。
防抖
我们再使用防抖(防止用户操作带来的频繁发起请求问题)的方式来查询项目列表数据,如下:
public class RetrofitActivity extends AppCompatActivity {
private static final String TAG = “RetrofitActivity”;
private Api api;
private TextView textView;
private String itemData;
private Disposable disposable;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_retrofit);
textView = findViewById(R.id.text_view);
api = HttpClient.getRetrofit().create(Api.class); // 生成 api
getProjectItemData();
}
…
// 查询项目列表数据,项目列表数据需要根据项目分类数据的 id 进行查询
// 且使用 rxbinding 增加防抖功能
public void getProjectItemData() {
Button button = findViewById(R.id.get_item_button_fd);
disposable = RxView.clicks(button) // 设置防抖的 view
.throttleFirst(2600, TimeUnit.MILLISECONDS) // 设置在 2.6 秒内只响应一次点击事件
.subscribe(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
disposable = api.getProject() // 查询项目分类数据(返回的是 Observable 被观察者)
.subscribeOn(Schedulers.io()) // 给上面的代码分配工作线程
.observeOn(AndroidSchedulers.mainThread()) // 给下面的代码分配主线程
.subscribe(new Consumer<ProjectBean>() { // 订阅并创建观察者
@Override
public void accept(final ProjectBean projectBean) throws Exception {
for (ProjectBean.DataBean bean: projectBean.getData()) {
disposable = api.getProjectItem(1, bean.getId()) // 根据项目分类数据的 id 查询项目列表数据(返回的是 Observable 被观察者)
.subscribeOn(Schedulers.io()) // 给上面的代码分配工作线程
.observeOn(AndroidSchedulers.mainThread()) // 给下面的代码分配主线程
.subscribe(new Consumer<ProjectItem>() { // 订阅并创建观察者
@Override
public void accept(ProjectItem projectItem) throws Exception {
Log.d(TAG, “accept: ” + projectItem);
textView.setText(projectItem.toString()); // 进行 UI 操作
}
});
}
}
});
}
});
}
@Override
protected void onDestroy() {
super.onDestroy();
disposable.dispose();
}
}
运行结果如图:
可见以上代码虽然实现了防抖和嵌套查询的功能,但是,代码嵌套过多,难以维护。
解决网络嵌套问题
上面的代码虽没什么问题,但是嵌套太多,所以我们使用 flatMap
操作符来解决此问题,如下:
public class RetrofitActivity extends AppCompatActivity {
private static final String TAG = “RetrofitActivity”;
private Api api;
private TextView textView;
private String itemData;
private Disposable disposable;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_retrofit);
textView = findViewById(R.id.text_view);
api = HttpClient.getRetrofit().create(Api.class); // 生成 api
getProjectItemData();
getItemData();
}
…
// 查询项目列表数据,使用 flatMap 操作符,解决网络嵌套问题
public void getItemData(){
Button button = findViewById(R.id.get_item_button);
disposable = RxView.clicks(button) // 设置防抖的 view
.throttleFirst(2600, TimeUnit.MILLISECONDS) // 设置在 2.6 秒内只响应一次点击事件
.observeOn(Schedulers.io()) // 给下面的代码分配工作线程
.flatMap(new Function<Object, ObservableSource<ProjectBean>>() {
@Override
public ObservableSource<ProjectBean> apply(Object o) throws Exception {
return api.getProject(); // 查询项目分类数据,并且把数据流向下游
}
})
.flatMap(new Function<ProjectBean, ObservableSource<ProjectBean.DataBean>>() {
@Override
public ObservableSource<ProjectBean.DataBean> apply(ProjectBean projectBean) throws Exception {
return Observable.fromIterable(projectBean.getData()); // 根据上游流过来的数据,迭代出每个 ProjectItem 项目列表数据,并且流向下游
}
})
.flatMap(new Function<ProjectBean.DataBean, ObservableSource<ProjectItem>>() {
@Override
public ObservableSource<ProjectItem> apply(ProjectBean.DataBean dataBean) throws Exception {
return api.getProjectItem(1, dataBean.getId()); // 根据上游流过来的数据,查询每个列表数据,并且流向下游
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<ProjectItem>() {
@Override
public void accept(ProjectItem projectItem) throws Exception {
itemData += projectItem.toString() + “\n ================================================ \n”;
textView.setText(itemData); // 根据上游流过来的数据,进行 UI 操作
}
});
}
@Override
protected void onDestroy() {
super.onDestroy();
disposable.dispose();
}
}
运行结果如图:
可见,使用 flatMap
操作符后,思路更为清晰,代码平铺下来更易理解,以一种流式的方式不断的向下游流去数据,下游根据上游的数据可以决定是否继续向下游流或者做UI更新操作等,flatMap
操作符可以重复使用,且线程的切换可以随意切换,这个就是 RxJava
数据流式的响应式编程思想。
Hook钩子函数
我们已经了解了 RxJava
的思想和使用,现在我们来看看它的源码,我们从创建一个被观察者(Observable)开始 Observable.create
,点击 create
,如下:
public static <T> Observable<T> create(@NonNull ObservableOnSubscribe<T> source) {
Objects.requireNonNull(source, “source is null”);
return RxJavaPlugins.onAssembly(new ObservableCreate<>(source));
}
很简单,就这么一句 RxJavaPlugins.onAssembly()
代码,我们点进 onAssembly
如下:
/
Calls the associated hook function.
@param <T> the value type
@param source the hook’s input value
@return the value returned by the hook
*/
@SuppressWarnings({ “rawtypes”, “unchecked” })
@NonNull
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
可以看到注释:Calls the associated hook function(调用关联的钩子函数),把 onObservableAssembly
赋值给了 f
函数,我们通过查找 onObservableAssembly
发现他并没有赋值,也就是说它始终是 null
,所以,在没有给 onObservableAssembly
赋值的情况下,这个函数什么也不会做;所以,我们需要给 onObservableAssembly
函数赋值就可以先 if
语句执行 onObservableAssembly
函数,那么怎么赋值呢?我们对 onObservableAssembly
进行搜索发现,如下:
@SuppressWarnings(“rawtypes”)
@Nullable
static volatile Function<? super Observable, ? extends Observable> onObservableAssembly;
RxJavaPlugins.onObservableAssembly = onObservableAssembly;
onObservableAssembly
是 RxJavaPlugins
类的一个静态变量,于是我们就知道如何赋值了,我们用之前例子来测试,如下:
public class UseRxJava {
public static void main(String[] args) {
// 给 hook 钩子函数赋值
RxJavaPlugins.setOnObservableAssembly(new Function<Observable, Observable>() {
@Override
public Observable apply(Observable observable) throws Throwable {
System.out.println(observable + “ 你想买🍊 ?”);
return observable;
}
});
// 新增一个测试被观察者
Observable.just(“🍊”)
.map(new Function<String, Object>() {
@Override
public Object apply(String s) throws Throwable {
return “lsy 买了 ” + s;
}
})
.subscribe(new Consumer<Object>() {
@Override
public void accept(Object o) throws Throwable {
System.out.println(o);
}
});
// 创建 Observable 被观察者
Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {
emitter.onNext(“🍊 到货了!”);
emitter.onNext(“大家可以来买 🍊 了!”);
emitter.onError(new Throwable(“🍊 又卖完了!”));
emitter.onNext(“WOW!🍊 卖光了”);
emitter.onComplete();
emitter.onComplete();
emitter.onNext(“🍊 加急进货中…”);
}
});
// 创建 Observer 观察者
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
System.out.println(“onSubscribe:” + d.isDisposed());
}
@Override
public void onNext(@NonNull String s) {
System.out.println(“onNext:” + s);
}
@Override
public void onError(@NonNull Throwable e) {
System.out.println(“onError:” + e.getMessage());
}
@Override
public void onComplete() {
System.out.println(“onComplete”);
}
};
// 关联订阅关系
observable.subscribe(observer);
}
}
运行结果,如下:
io.reactivex.rxjava3.internal.operators.observable.ObservableJust@694f9431 你想买🍊 ?
io.reactivex.rxjava3.internal.operators.observable.ObservableMap@f2a0b8e 你想买🍊 ?
lsy 买了 🍊
io.reactivex.rxjava3.internal.operators.observable.ObservableCreate@515f550a 你想买🍊 ?
onSubscribe:false
onNext:🍊 到货了!
onNext:大家可以来买 🍊 了!
onError:🍊 又卖完了!
BUILD SUCCESSFUL in 868ms
我们 hook
钩子函数执行了3次,分别是:ObservableJust
、ObservableMap
、ObservableCreate
,可知,hook
钩子函数是一个全局监听函数,所以我们可以利用它做很多事情。
RxJava观察者模式和标准观察者模式
我们来继续解读源码,通过标准观察者模式和 RxJava
观察者模式的比较已区别来更加深刻的理解 RxJava
。
上面我们通过源码了解到 hook
钩子函数,Observable.create
创建一个被观察者时如果我们给钩子函数赋值,就会先执行钩子函数;那么,我们的准观察者模式和 「RxJava」 观察者模式有什么不同呢?
标准观察者模式有4个角色:Observable(被观察者)、ConcreteObservable(具体的被观察者)、Observer(观察者)、ConcreteObserver(具体的观察者),「RxJava」 观察者模式的这4个角色分别是什么呢?对应关系如下:
标准观察者模式 | RxJava观察者模式 |
---|---|
Observable (被观察者) | Observable 接口 |
ConcreteObservable(具体的被观察者) | Observable.create()创建出来的,最终是一个 ObservableCreate 对象 |
Observer(观察者) | Observer 接口 |
ConcreteObserver(具体的观察者) | new Observer |
在标准的观察者模式中 Observable (被观察者) 是持有 Observer(观察者) 列表的,那么 「RxJava」 观察者模式呢?
我们来继续看源码,上文我们已经看过 Observable.create
,点击 create
,如下:
public static <T> Observable<T> create(@NonNull ObservableOnSubscribe<T> source) {
Objects.requireNonNull(source, “source is null”);
return RxJavaPlugins.onAssembly(new ObservableCreate<>(source));
}
RxJavaPlugins.onAssembly()
我们已经看过,我们来看看 new ObservableCreate<>(source)
, 这个 source
就是我们传进来的 new ObservableOnSubscribe(){ … }
,我们点进 ObservableCreate
如下:
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
…
}
可知,return RxJavaPlugins.onAssembly(new ObservableCreate<>(source));
返回的是 ObservableCreate(ObservableOnSubscribe<T> source) { this.source = source; }
,而 source
是我们自己传进去的;
我们再来看看订阅 observable.subscribe(observer)
代码,点进 subscribe
,如下:
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(@NonNull Observer<? super T> observer) {
Objects.requireNonNull(observer, “observer is null”);
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
Objects.requireNonNull(observer, “The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can’t call onError because no way to know if a Disposable has been set or not
// can’t call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException(“Actually not, but can’t throw other exceptions due to RS”);
npe.initCause(e);
throw npe;
}
}
我们看到重点代码 subscribeActual(observer);
点进 subscribeActual
如下:
protected abstract void subscribeActual(@NonNull Observer<? super T> observer);
是一个抽象函数,也就说它的实现是在 ObservableCreate
类里面的,因为,是 ObservableCreate
调用了 subscribe
方法,那么,我们就回到 subscribeActual
类,如下:
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
// subscribeActual 抽象方法的实现
@Override
protected void subscribeActual(Observer<? super T> observer) {
// 创建了一个 CreateEmitter 发射器,且传入了目标观察者
CreateEmitter<T> parent = new CreateEmitter<>(observer);
// 执行了目标观察者的 onSubscribe 方法
observer.onSubscribe(parent);
try {
// 执行目标被观察者(ObservableCreate) 的 subscribe 方法且传入了 发射器 CreateEmitter
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
}
可知,subscribeActual
抽象方法的实现:创建了一个 CreateEmitter
发射器,且传入了目标观察者,且执行了目标观察者的 onSubscribe()
方法;这就是为什么 onSubscribe()
方法会在订阅之后,发送事件之前执行的原因。
之后,又执行目标被观察者(ObservableCreate) 的 subscribe
方法且传入了发射器 CreateEmitter
,在看下我们的 CreateEmitter
发射器源码,如下:
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
// 持有目标观察者 observer
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
// 执行 onNext() 会调用 observer.onNext(t) 执行目标观察者的 onNext()
@Override
public void onNext(T t) {
if (t == null) {
onError(ExceptionHelper.createNullPointerException(“onNext called with a null value.”));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
…
CreateEmitter
发射器持有目标观察者 observer
目标观察者,执行 onNext()
会调用 observer.onNext(t)
执行目标观察者的 onNext()
。
「RxJava」 的观察者模式整个流程,如图:
可见,「RxJava」 观察者模式和标准的观察者模式完全不同,「RxJava」 观察者模式的被观察者并没有持有观察者的列表,而是通过一个中间层 CreateEmitter
发射器来完成事件的传递,它更像是一个发布订阅者模式,如图:
Map操作符原理
上文加载图片的案例里我们已经使用过 map
操作符,用来把 String
数据加工成 Bitmap
数据,从而流向下游,我们来回顾一下上文的案例代码,如下:
public void rxJavaLoadImage(View view) {
// Observable.just(URL) 创建被观察者
Observable.just(URL)
// 使用 map 操作符加工数据,从 String 转换为 Bitmap
.map(new Function<String, Bitmap>() {
@Override
public Bitmap apply(String s) throws IOException {
URL url = new URL(URL);
HttpURLConnection urlConnection = (HttpURLConnection) url.openConnection();
urlConnection.setConnectTimeout(6000);
int responseCode = urlConnection.getResponseCode();
if (responseCode == HttpURLConnection.HTTP_OK) {
InputStream inputStream = urlConnection.getInputStream();
Bitmap bitmap = BitmapFactory.decodeStream(inputStream);
return bitmap; // 将 Bitmap 数据流向下游
}
return null;
}
})
.subscribeOn(Schedulers.io()) // 上面的代码分配工作线程
.observeOn(AndroidSchedulers.mainThread()) // 下面的代码分别UI线程
// 链式调用 subscribe 绑定观察者
.subscribe(new Observer<Bitmap>() {
// onSubscribe() 方法在发送事件之前执行
@Override
public void onSubscribe(Disposable d) {
loading = ProgressDialog.show(MainActivity.this, “”, “loading”);
disposable = d;
}
// onNext() 在发送事件之后执行
@Override
public void onNext(Bitmap bitmap) {
imageView.setImageBitmap(bitmap);
}
@Override
public void onError(Throwable e) {
Log.e(TAG, “onError: ” + e.getMessage(), e);
}
@Override
public void onComplete() {
if (loading != null) loading.dismiss();
}
});
}
我们点进 map
查看源码,如下
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, “mapper is null”);
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
可知,最终返回的是一个 ObservableMap
,点进 ObservableMap
如下:
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
// new MapObserver<T, U>(t, function) 把我们传进来的函数包上了一次 MapObserver
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
…
}
关键代码 source.subscribe(new MapObserver<T, U>(t, function))
把我们传进来的函数包上了一次 MapObserver
,source.subscribe()
这个 source
就是 Observable
,就相当于 Observable.subscribe()
,而 MapObserver
继承与 BasicFuseableObserver
,BasicFuseableObserver
实现了 Observer
,最终 source.subscribe()
会执行到:
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(@NonNull Observer<? super T> observer) {
Objects.requireNonNull(observer, “observer is null”);
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
Objects.requireNonNull(observer, “The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
// 是一个静态抽象方法,最终由实现类完成
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can’t call onError because no way to know if a Disposable has been set or not
// can’t call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException(“Actually not, but can’t throw other exceptions due to RS”);
npe.initCause(e);
throw npe;
}
}
subscribeActual(observer)
是一个静态抽象方法,最终由实现类完成,也就是 ObservableJust
的 subscribeActual
方法,如下
public final class ObservableJust<T> extends Observable<T> implements ScalarSupplier<T> {
private final T value;
public ObservableJust(final T value) {
this.value = value;
}
// source.subscribe(new MapObserver<T, U>(t, function)); 又对 observer 包裹了一层
@Override
protected void subscribeActual(Observer<? super T> observer) {
ScalarDisposable<T> sd = new ScalarDisposable<>(observer, value);
observer.onSubscribe(sd);
sd.run();
}
@Override
public T get() {
return value;
}
}
observer
是我们从 source.subscribe
里 传进来的 MapObserver
,而此段代码又对 observer
包裹了一层。
observer.onSubscribe(sd)
就会执行我们自己 new
出来的目标观察者的 onSubscribe
里的逻辑。
sd.run()
进去看看源码,如下:
public static final class ScalarDisposable<T>
extends AtomicInteger
implements QueueDisposable<T>, Runnable {
…
@Override
public void run() {
if (get() == START && compareAndSet(START, ON_NEXT)) {
// observer 就是我们传进来的 MapObserver
observer.onNext(value);
if (get() == ON_NEXT) {
lazySet(ON_COMPLETE);
observer.onComplete();
}
}
}
}
observer.onNext(value)
的 observer
就是我们传进来的 MapObserver
,也就是执行 MapObserver.onNext()
,如下:
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
downstream.onNext(null);
return;
}
U v;
try {
v = Objects.requireNonNull(mapper.apply(t), “The mapper function returned a null value.”);
} catch (Throwable ex) {
fail(ex);
return;
}
downstream.onNext(v);
}
…
}
onNext()
通过 mapper.apply(t)
对我们的数据进行转换,如下
public interface Function<@NonNull T, @NonNull R> {
/
Apply some calculation to the input value and return some other value.
@param t the input value
@return the output value
@throws Throwable if the implementation wishes to throw any type of exception
*/
R apply(T t) throws Throwable;
}
传入 T
返回 R
,而这个 apply
会执行我们实现的重写 apply
方法的里面逻辑。
「RxJava」 的 map
操作符使用了装饰器模式,在不影响主数据流的情况下,对需要加工的数据进行包装,在自己的包装类里完成数据的加工。
「RxJava」 里的操作符非常多,只要你理解其中的几个的原理,其它的操作符原理都差不多,下面列出了基本所有的操作符,如图: