Android响应式编程之RxJava

RxJava

本篇文章将概述 Android 响应式编程 RxJava,会从设计模式、使用到原理结合案例,由浅到深、由表到里、循序渐进的概述。

本篇文章的示例代码放在 Github 上。

观察者模式(Observer pattern)

在使用 RxJava 之前,我们需要先理解观察者设计模式,因为,RxJava 就使用了观察者设计模式。

定义

观察者模式(又被称为发布-订阅 Publish/Subscribe 模式),属于行为型模式的一种,它定义了一对多的依赖关系,让多个观察者对象同时监听某一个被观察者对象,被观察者对象在状态变化时,会通知所有的观察者对象,使他们能够自动更新。

结构

观察者模式结构,如图:

no-shadow

一般我们写一个观察者模式都需要定义如下角色:

  • Observable(被观察者):一般为抽象类,用于保存观察者对象和新增、删除、通知观察者的方法。
  • ConcreteObservable(具体的被观察者):继承 Observable 类,实现 notifyObservers() 方法,当被观察者发生变化时,通知所有的观察者。
  • Observer(观察者):一般为接口,包含 update() 方法,当收到具体的被观察者通知时被调用。
  • ConcreteObserver(具体的观察者):实现 Observer 接口,重写 update() 方法,以便更新自身状态。

案例

接下来,我们来实现一个简单的观察者模式的案例,案例情景,如下:

小明的水果店里的 🍊 非常甜,所以很快就卖光了,但是,接二连三的有顾客过来买 🍊,于是,小明告诉顾客:🍊已经卖完了,你们可以扫下这个公众号,订阅之后,等 🍊有货了会自动通知你们!

这种场景我们就可以使用观察者模式,我们先创建一个 Observer 接口(观察者),如下:

public interface Observer {
    void update();
}

再创建一个 Customer 客户类(具体的观察者),如下:

// 实现 Observer 接口
public class Customer implements Observer {

    private String name;

    public Customer(String name) {
        this.name = name;
    }

    // 重新 Observer 的 update() 方法
    @Override
    public void update() {
        System.out.println(name + “ 购买了 🍊”);
    }
}

之后创建一个 Observable 抽象类(被观察者),如下:

public abstract class Observable {
    // 观察者
    protected List<Observer> observers = new ArrayList<>();
    // 新增观察者
    public void add(Observer observer) {
        observers.add(observer);
    }
    // 移除观察者
    public void remove(Observer observer) {
        observers.remove(observer);
    }
    // 通知观察者
    public abstract void notifyObservers();
}

最后创建一个 FruitStore 水果店类(具体的被观察者),如下:

// 继承 Observable 类
public class FruitStore extends Observable {
    // 重写 notifyObservers() 通知所有的观察者
    @Override
    public void notifyObservers() {
        for (Observer observer: observers) {
            observer.update();
        }
    }

    public void run() {
        this.notifyObservers();
    }
}

我们再创建一个 Client 进行测试,如下:

public class Client {
    public static void main(String[] args) {
        // 创建一个水果店实例(被观察者)
        FruitStore fruitStore = new FruitStore();
        fruitStore.add(new Customer(“lsy”)); // 新增观察者
        fruitStore.add(new Customer(“per”)); // 新增观察者
        fruitStore.add(new Customer(“zimu”)); // 新增观察者
        fruitStore.run(); // 通知所有观察者
    }
}

运行结果如下:

lsy 购买了 🍊
per 购买了 🍊
zimu 购买了 🍊

Process finished with exit code 0

可见,使用观察者模式可以降低观察者和被观察者之间的耦合性,可以建立一套触发机制;当然,Java JDK 已经提供了 ObserverObservable,我们使用它们同样可以实现功能。

RxJava的使用

通过上文的对观察者模式的理解之后,再来看看 RxJava 是如何使用的,它同样有如下几个角色,如下:

Observable :被观察者,也就是事件的发生者
Observer :观察者,也就是事件的接受者
subscribe :两者产生订阅关系

具体的使用如下:

public class UseRxJava {

    public static void main(String[] args) {
        // 创建 Observable 被观察者
        Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {
                emitter.onNext(“🍊 到货了!”);
                emitter.onNext(“大家可以来买 🍊 了!”);
                emitter.onError(new Throwable(“🍊 又卖完了!”));
                emitter.onNext(“WOW!🍊 卖光了”);
                emitter.onComplete();
                emitter.onComplete();
                emitter.onNext(“🍊 加急进货中…”);
            }
        });

        // 创建 Observer 观察者
        Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                System.out.println(“onSubscribe:” + d.isDisposed());
            }

            @Override
            public void onNext(@NonNull String s) {
                System.out.println(“onNext:” + s);
            }

            @Override
            public void onError(@NonNull Throwable e) {
                System.out.println(“onError:” + e.getMessage());
            }

            @Override
            public void onComplete() {
                System.out.println(“onComplete”);
            }
        };
        // 关联订阅关系
        observable.subscribe(observer);
    }

}

运行结果,如下:

onSubscribe:false
onNext:🍊 到货了!
onNext:大家可以来买 🍊 了!
onError:🍊 又卖完了!

BUILD SUCCESSFUL in 328ms

通过简单的使用案例和运行结果,可知:

onNext(): 可以多次发送事件
onComplete(): 可以多次调用不会报错,但只执行一次
onError(): 只能发送一次,多次调用会报错,不可和 onComplete() 共存
onComplete(): 和 onError() 都存在时,只执行 onError()
onComplete(): 和 onError() 之后,观察者无法接收到发送事件
onSubscribe(): 是在订阅之后,发送事件之前执行

RxJava编程思想

RxJava 是在 Java 上的响应式扩展,通过使用可观察序列,用于组成异步和基于事件编程的类库,也就是以响应式编程思维来进行编程的Java类库。

响应式编程是面向数据流的编程思想,在响应式编程思想下,一切皆数据流。响应式编程所侧重的是数据流的流动。

响应式编程案例

我们在理解响应式编程之前,先来使用我们传统的思想实现一个加载网络图片的案例:

public class MainActivity extends AppCompatActivity {

    private static final String TAG = “MainActivity”;
    private ImageView imageView;
    private final static String URL = https://cdn.lishaoy.net/serializable/serializable.png";
    private final Handler handler = new Handler(new Handler.Callback() {
        @Override
        public boolean handleMessage(@NonNull Message msg) {
            Log.i(TAG, “handleMessage: aaa”);
            Bitmap bitmap = (Bitmap) msg.obj;
            imageView.setImageBitmap(bitmap);
            if (loading != null) loading.dismiss();
            return false;
        }
    });
    private ProgressDialog loading;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);
        imageView = findViewById(R.id.image_view);
    }

    public void loadImage(View view) {
        loading = ProgressDialog.show(this“”“loading”);
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    URL url = new URL(URL);
                    HttpURLConnection urlConnection = (HttpURLConnection) url.openConnection();
                    urlConnection.setConnectTimeout(6000);
                    int responseCode = urlConnection.getResponseCode();
                    if(responseCode == HttpURLConnection.HTTP_OK) {
                        InputStream inputStream = urlConnection.getInputStream();
                        Bitmap bitmap = BitmapFactory.decodeStream(inputStream);
                        Message message = handler.obtainMessage();
                        message.obj = bitmap;
                        handler.sendMessage(message);
                    }
                } catch (MalformedURLException e) {
                    e.printStackTrace();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}

如上,我们可能会用 new Thread()Handler 来实现或者其他方法,每个人的实现方法可能都不同。如果,使用 RxJava 思想实现呢,必然实现方式要按照响应式编程数据流的编程思想,实现方式也就一致了。下面,我们再用 RxJava 来实现它,如下:

public class MainActivity extends AppCompatActivity {

    private static final String TAG = “MainActivity”;
    private ImageView imageView;
    private final static String URL = https://cdn.lishaoy.net/image/112131.jpg";
    private ProgressDialog loading;
    private Disposable disposable;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);
        imageView = findViewById(R.id.image_view);
    }

    public void rxJavaLoadImage(View view) {
        // Observable.just(URL) 创建被观察者
        Observable.just(URL)
                .map(new Function<String, Bitmap>() {
                    @Override
                    public Bitmap apply(String s) throws IOException {
                        URL url = new URL(URL);
                        HttpURLConnection urlConnection = (HttpURLConnection) url.openConnection();
                        urlConnection.setConnectTimeout(6000);
                        int responseCode = urlConnection.getResponseCode();
                        if (responseCode == HttpURLConnection.HTTP_OK) {
                            InputStream inputStream = urlConnection.getInputStream();
                            Bitmap bitmap = BitmapFactory.decodeStream(inputStream);
                            return bitmap;
                        }
                        return null;
                    }
                })
                .subscribeOn(Schedulers.io()) // 上面的代码分配工作线程
                .observeOn(AndroidSchedulers.mainThread()) // 下面的代码分别UI线程
                // 链式调用 subscribe 绑定观察者
                .subscribe(new Observer<Bitmap>() {
                    // onSubscribe() 方法在发送事件之前执行
                    @Override
                    public void onSubscribe(Disposable d) {
                        loading = ProgressDialog.show(MainActivity.this“”“loading”);
                        disposable = d;
                    }

                    // onNext() 在发送事件之后执行
                    @Override
                    public void onNext(Bitmap bitmap) {
                        imageView.setImageBitmap(bitmap);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.e(TAG, “onError: ” + e.getMessage(), e);
                    }

                    @Override
                    public void onComplete() {
                        if (loading != null) loading.dismiss();
                    }
                });
    }

    @Override
    protected void onDestroy() {
        super.onDestroy();
        disposable.dispose();
    }
}

运行结果如图:

no-shadow

RxJava 以链式调用,Observable.just(URL) 分发事件,map() 来加工数据流(可以使用多个map处理不同的业务)、subscribeOn(Schedulers.io()) 切换工作线程、observeOn(AndroidSchedulers.mainThread()) 切换主线程,数据流流向 subscribe 进行处理(下游的处理根据上游的数据流变化而变化)。

RxJava结合Retrofit使用(案例)

通过 RxJava 使用,现在我们已经了解它的流式的响应式编程的思想,下面我们通过和 Retrofit 结合使用,看看是什么体验?

我们使用 玩Android 开放API 来完成本次案例。首先我们使用 Retrofit 定义一个 Api 的接口,如下:

public interface Api {
    // 获取项目分类数据
    @GET(“project/tree/json”)
    Observable<ProjectBean> getProject();
    // 获取项目列表数据(项目列表数据依赖于项目分类数据的id)
    @GET(“project/list/{pageIndex}/json”)
    Observable<ProjectItem> getProjectItem(@Path(“pageIndex”) int pageIndex, @Query(“cid”) int cid);

}

ProjectBeanProjectItem JavaBean 我们根据接口返回的数据利用工具 GsonFormat 生成。

然后,新建一个 HttpClient 生成 Retrofit,如下:

public class HttpClient {
    // api 的 base url
    public static String BASE_URL = https://www.wanandroid.com/";

    public static void setBaseUrl(String baseUrl) {
        BASE_URL = baseUrl;
    }

    // 创建 Retrofit
    public static Retrofit getRetrofit() {
        // 创建 OkHttp 客户端
        OkHttpClient.Builder builder = new OkHttpClient.Builder();
        // 配置参数
        OkHttpClient httpClient = builder.addNetworkInterceptor(new StethoInterceptor())
                .readTimeout(6666, TimeUnit.SECONDS)
                .connectTimeout(6666, TimeUnit.SECONDS)
                .writeTimeout(6666, TimeUnit.SECONDS)
                .build();

        return new Retrofit.Builder().baseUrl(BASE_URL)
                .client(httpClient) // 使用 OkHttp 访问网络
                .addConverterFactory(GsonConverterFactory.create(new Gson())) // 设置 json 解析工具
                .addCallAdapterFactory(RxJava2CallAdapterFactory.create()) // 设置 rxjava
                .build();
    }

}

再新建一个 RetrofitActivity 来演示,如下:

public class RetrofitActivity extends AppCompatActivity {

    private static final String TAG = “RetrofitActivity”;
    private Api api;
    private TextView textView;
    private String itemData;
    private Disposable disposable;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_retrofit);
        textView = findViewById(R.id.text_view);

        api = HttpClient.getRetrofit().create(Api.class); // 生成 api
    }

    // 查询项目分类数据
    @SuppressLint(“CheckResult”)
    public void getProject(View view) {
        disposable = api.getProject() // 查询项目分类数据(返回的是 Observable 被观察者)
                .subscribeOn(Schedulers.io()) // 给上面的代码分配工作线程
                .observeOn(AndroidSchedulers.mainThread()) // 给下面的代码分配主线程
                .subscribe(new Consumer<ProjectBean>() { // 订阅并创建观察者
                    @Override
                    public void accept(ProjectBean projectBean) throws Exception {
                        textView.setText(projectBean.toString()); // 进行 UI 操作
                    }
                });
    }

    @Override
    protected void onDestroy() {
        super.onDestroy();
        disposable.dispose();
    }

}

运行结果如图:

no-shadow

可见 RetrofitRxJava 结合使用,代码量减少的同时,整个流程思路更为清晰,且可以为所欲为的切换线程。

防抖

我们再使用防抖(防止用户操作带来的频繁发起请求问题)的方式来查询项目列表数据,如下:

public class RetrofitActivity extends AppCompatActivity {

    private static final String TAG = “RetrofitActivity”;
    private Api api;
    private TextView textView;
    private String itemData;
    private Disposable disposable;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_retrofit);
        textView = findViewById(R.id.text_view);

        api = HttpClient.getRetrofit().create(Api.class); // 生成 api
        getProjectItemData();
    }

    …

    // 查询项目列表数据,项目列表数据需要根据项目分类数据的 id 进行查询
    // 且使用 rxbinding 增加防抖功能
    public void getProjectItemData() {
        Button button = findViewById(R.id.get_item_button_fd);
        disposable = RxView.clicks(button) // 设置防抖的 view
                .throttleFirst(2600, TimeUnit.MILLISECONDS) // 设置在 2.6 秒内只响应一次点击事件
                .subscribe(new Consumer<Object>() {
                    @Override
                    public void accept(Object o) throws Exception {
                        disposable = api.getProject() // 查询项目分类数据(返回的是 Observable 被观察者)
                                .subscribeOn(Schedulers.io()) // 给上面的代码分配工作线程
                                .observeOn(AndroidSchedulers.mainThread()) // 给下面的代码分配主线程
                                .subscribe(new Consumer<ProjectBean>() { // 订阅并创建观察者
                                    @Override
                                    public void accept(final ProjectBean projectBean) throws Exception {
                                        for (ProjectBean.DataBean bean: projectBean.getData()) {
                                            disposable = api.getProjectItem(1, bean.getId()) // 根据项目分类数据的 id 查询项目列表数据(返回的是 Observable 被观察者)
                                                    .subscribeOn(Schedulers.io()) // 给上面的代码分配工作线程
                                                    .observeOn(AndroidSchedulers.mainThread()) // 给下面的代码分配主线程
                                                    .subscribe(new Consumer<ProjectItem>() { // 订阅并创建观察者
                                                        @Override
                                                        public void accept(ProjectItem projectItem) throws Exception {
                                                            Log.d(TAG, “accept: ” + projectItem);
                                                            textView.setText(projectItem.toString()); // 进行 UI 操作
                                                        }
                                                    });
                                        }
                                    }
                                });
                    }
                });
    }

    @Override
    protected void onDestroy() {
        super.onDestroy();
        disposable.dispose();
    }

}

运行结果如图:

no-shadow

可见以上代码虽然实现了防抖和嵌套查询的功能,但是,代码嵌套过多,难以维护。

解决网络嵌套问题

上面的代码虽没什么问题,但是嵌套太多,所以我们使用 flatMap 操作符来解决此问题,如下:

public class RetrofitActivity extends AppCompatActivity {

    private static final String TAG = “RetrofitActivity”;
    private Api api;
    private TextView textView;
    private String itemData;
    private Disposable disposable;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_retrofit);
        textView = findViewById(R.id.text_view);

        api = HttpClient.getRetrofit().create(Api.class); // 生成 api
        getProjectItemData();
        getItemData();
    }

    …

    // 查询项目列表数据,使用 flatMap 操作符,解决网络嵌套问题
    public void getItemData(){
        Button button = findViewById(R.id.get_item_button);
       disposable = RxView.clicks(button) // 设置防抖的 view
                .throttleFirst(2600, TimeUnit.MILLISECONDS) // 设置在 2.6 秒内只响应一次点击事件
                .observeOn(Schedulers.io()) // 给下面的代码分配工作线程
                .flatMap(new Function<Object, ObservableSource<ProjectBean>>() {
                    @Override
                    public ObservableSource<ProjectBean> apply(Object o) throws Exception {
                        return api.getProject(); // 查询项目分类数据,并且把数据流向下游
                    }
                })
                .flatMap(new Function<ProjectBean, ObservableSource<ProjectBean.DataBean>>() {
                    @Override
                    public ObservableSource<ProjectBean.DataBean> apply(ProjectBean projectBean) throws Exception {
                        return Observable.fromIterable(projectBean.getData()); // 根据上游流过来的数据,迭代出每个 ProjectItem 项目列表数据,并且流向下游
                    }
                })
                .flatMap(new Function<ProjectBean.DataBean, ObservableSource<ProjectItem>>() {
                    @Override
                    public ObservableSource<ProjectItem> apply(ProjectBean.DataBean dataBean) throws Exception {
                        return api.getProjectItem(1, dataBean.getId()); // 根据上游流过来的数据,查询每个列表数据,并且流向下游
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<ProjectItem>() {
                    @Override
                    public void accept(ProjectItem projectItem) throws Exception {
                        itemData += projectItem.toString() + “\n ================================================ \n”;
                        textView.setText(itemData); // 根据上游流过来的数据,进行 UI 操作
                    }
                });
    }

    @Override
    protected void onDestroy() {
        super.onDestroy();
        disposable.dispose();
    }
}

运行结果如图:

no-shadow

可见,使用 flatMap 操作符后,思路更为清晰,代码平铺下来更易理解,以一种流式的方式不断的向下游流去数据,下游根据上游的数据可以决定是否继续向下游流或者做UI更新操作等,flatMap 操作符可以重复使用,且线程的切换可以随意切换,这个就是 RxJava 数据流式的响应式编程思想。

Hook钩子函数

我们已经了解了 RxJava 的思想和使用,现在我们来看看它的源码,我们从创建一个被观察者(Observable)开始 Observable.create,点击 create,如下:

public static <T> Observable<T> create(@NonNull ObservableOnSubscribe<T> source) {
    Objects.requireNonNull(source, “source is null”);
    return RxJavaPlugins.onAssembly(new ObservableCreate<>(source));
}

很简单,就这么一句 RxJavaPlugins.onAssembly() 代码,我们点进 onAssembly 如下:

/
   Calls the associated hook function.
  
 @param <T> the value type
   @param source the hook’s input value
  
 @return the value returned by the hook
  */

@SuppressWarnings({ “rawtypes”“unchecked” })
@NonNull
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
    Function<? super Observable, ? extends Observable> f = onObservableAssembly;
    if (f != null) {
        return apply(f, source);
    }
    return source;
}

可以看到注释:Calls the associated hook function(调用关联的钩子函数),把 onObservableAssembly 赋值给了 f 函数,我们通过查找 onObservableAssembly 发现他并没有赋值,也就是说它始终是 null,所以,在没有给 onObservableAssembly 赋值的情况下,这个函数什么也不会做;所以,我们需要给 onObservableAssembly 函数赋值就可以先 if 语句执行 onObservableAssembly 函数,那么怎么赋值呢?我们对 onObservableAssembly 进行搜索发现,如下:

@SuppressWarnings(“rawtypes”)
@Nullable
static volatile Function<? super Observable, ? extends Observable> onObservableAssembly;

RxJavaPlugins.onObservableAssembly = onObservableAssembly;

onObservableAssemblyRxJavaPlugins 类的一个静态变量,于是我们就知道如何赋值了,我们用之前例子来测试,如下:

public class UseRxJava {

    public static void main(String[] args) {
        // 给 hook 钩子函数赋值
        RxJavaPlugins.setOnObservableAssembly(new Function<Observable, Observable>() {
            @Override
            public Observable apply(Observable observable) throws Throwable {
                System.out.println(observable + “ 你想买🍊 ?”);
                return observable;
            }
        });
        // 新增一个测试被观察者
        Observable.just(“🍊”)
                .map(new Function<String, Object>() {
                    @Override
                    public Object apply(String s) throws Throwable {
                        return “lsy 买了 ” + s;
                    }
                })
                .subscribe(new Consumer<Object>() {
                    @Override
                    public void accept(Object o) throws Throwable {
                        System.out.println(o);
                    }
                });
        // 创建 Observable 被观察者
        Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {
                emitter.onNext(“🍊 到货了!”);
                emitter.onNext(“大家可以来买 🍊 了!”);
                emitter.onError(new Throwable(“🍊 又卖完了!”));
                emitter.onNext(“WOW!🍊 卖光了”);
                emitter.onComplete();
                emitter.onComplete();
                emitter.onNext(“🍊 加急进货中…”);
            }
        });

        // 创建 Observer 观察者
        Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                System.out.println(“onSubscribe:” + d.isDisposed());
            }

            @Override
            public void onNext(@NonNull String s) {
                System.out.println(“onNext:” + s);
            }

            @Override
            public void onError(@NonNull Throwable e) {
                System.out.println(“onError:” + e.getMessage());
            }

            @Override
            public void onComplete() {
                System.out.println(“onComplete”);
            }
        };
        // 关联订阅关系
        observable.subscribe(observer);
    }

}

运行结果,如下:

io.reactivex.rxjava3.internal.operators.observable.ObservableJust@694f9431 你想买🍊 ?
io.reactivex.rxjava3.internal.operators.observable.ObservableMap@f2a0b8e 你想买🍊 ?
lsy 买了 🍊
io.reactivex.rxjava3.internal.operators.observable.ObservableCreate@515f550a 你想买🍊 ?
onSubscribe:false
onNext:🍊 到货了!
onNext:大家可以来买 🍊 了!
onError:🍊 又卖完了!

BUILD SUCCESSFUL in 868ms

我们 hook 钩子函数执行了3次,分别是:ObservableJustObservableMapObservableCreate,可知,hook 钩子函数是一个全局监听函数,所以我们可以利用它做很多事情。

RxJava观察者模式和标准观察者模式

我们来继续解读源码,通过标准观察者模式和 RxJava 观察者模式的比较已区别来更加深刻的理解 RxJava

上面我们通过源码了解到 hook 钩子函数,Observable.create 创建一个被观察者时如果我们给钩子函数赋值,就会先执行钩子函数;那么,我们的准观察者模式和 RxJava 观察者模式有什么不同呢?

标准观察者模式有4个角色:Observable(被观察者)、ConcreteObservable(具体的被观察者)、Observer(观察者)、ConcreteObserver(具体的观察者),RxJava 观察者模式的这4个角色分别是什么呢?对应关系如下:

标准观察者模式 RxJava观察者模式
Observable (被观察者) Observable 接口
ConcreteObservable(具体的被观察者) Observable.create()创建出来的,最终是一个 ObservableCreate 对象
Observer(观察者) Observer 接口
ConcreteObserver(具体的观察者) new Observer() { } 创建出来的观察者对象

在标准的观察者模式中 Observable (被观察者) 是持有 Observer(观察者) 列表的,那么 RxJava 观察者模式呢?

我们来继续看源码,上文我们已经看过 Observable.create,点击 create,如下:

public static <T> Observable<T> create(@NonNull ObservableOnSubscribe<T> source) {
    Objects.requireNonNull(source, “source is null”);
    return RxJavaPlugins.onAssembly(new ObservableCreate<>(source));
}

RxJavaPlugins.onAssembly() 我们已经看过,我们来看看 new ObservableCreate<>(source), 这个 source 就是我们传进来的 new ObservableOnSubscribe(){ … },我们点进 ObservableCreate 如下:

public final class ObservableCreate<Textends Observable<T{
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    …

}

可知,return RxJavaPlugins.onAssembly(new ObservableCreate<>(source)); 返回的是 ObservableCreate(ObservableOnSubscribe<T> source) { this.source = source; },而 source 是我们自己传进去的;

我们再来看看订阅 observable.subscribe(observer) 代码,点进 subscribe,如下:

@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(@NonNull Observer<? super T> observer) {
    Objects.requireNonNull(observer, “observer is null”);
    try {
        observer = RxJavaPlugins.onSubscribe(this, observer);

        Objects.requireNonNull(observer, “The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");

        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        // can’t call onError because no way to know if a Disposable has been set or not
        // can’t call onSubscribe because the call might have set a Subscription already
        RxJavaPlugins.onError(e);

        NullPointerException npe = new NullPointerException(“Actually not, but can’t throw other exceptions due to RS”);
        npe.initCause(e);
        throw npe;
    }
}

我们看到重点代码 subscribeActual(observer); 点进 subscribeActual 如下:

protected abstract void subscribeActual(@NonNull Observer<? super T> observer);

是一个抽象函数,也就说它的实现是在 ObservableCreate 类里面的,因为,是 ObservableCreate 调用了 subscribe 方法,那么,我们就回到 subscribeActual 类,如下:

public final class ObservableCreate<Textends Observable<T{
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    // subscribeActual 抽象方法的实现
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        // 创建了一个 CreateEmitter 发射器,且传入了目标观察者
        CreateEmitter<T> parent = new CreateEmitter<>(observer);
        // 执行了目标观察者的 onSubscribe 方法
        observer.onSubscribe(parent);

        try {
            // 执行目标被观察者(ObservableCreate) 的 subscribe 方法且传入了 发射器 CreateEmitter
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

}

可知,subscribeActual 抽象方法的实现:创建了一个 CreateEmitter 发射器,且传入了目标观察者,且执行了目标观察者的 onSubscribe() 方法;这就是为什么 onSubscribe() 方法会在订阅之后,发送事件之前执行的原因。

之后,又执行目标被观察者(ObservableCreate) 的 subscribe 方法且传入了发射器 CreateEmitter,在看下我们的 CreateEmitter 发射器源码,如下:

static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable 
{

        private static final long serialVersionUID = -3434801548987643227L;

        final Observer<? super T> observer;
        // 持有目标观察者 observer
        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }

        // 执行 onNext() 会调用 observer.onNext(t) 执行目标观察者的 onNext()
        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(ExceptionHelper.createNullPointerException(“onNext called with a null value.”));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

        @Override
        public void onError(Throwable t) {
            if (!tryOnError(t)) {
                RxJavaPlugins.onError(t);
            }
        }

       …

CreateEmitter 发射器持有目标观察者 observer 目标观察者,执行 onNext() 会调用 observer.onNext(t) 执行目标观察者的 onNext()

RxJava 的观察者模式整个流程,如图:

no-shadow

可见,RxJava 观察者模式和标准的观察者模式完全不同,RxJava 观察者模式的被观察者并没有持有观察者的列表,而是通过一个中间层 CreateEmitter 发射器来完成事件的传递,它更像是一个发布订阅者模式,如图:

no-shadow

Map操作符原理

上文加载图片的案例里我们已经使用过 map 操作符,用来把 String 数据加工成 Bitmap 数据,从而流向下游,我们来回顾一下上文的案例代码,如下:

public void rxJavaLoadImage(View view) {
        // Observable.just(URL) 创建被观察者
        Observable.just(URL)
                // 使用 map 操作符加工数据,从 String 转换为 Bitmap
                .map(new Function<String, Bitmap>() {
                    @Override
                    public Bitmap apply(String s) throws IOException {
                        URL url = new URL(URL);
                        HttpURLConnection urlConnection = (HttpURLConnection) url.openConnection();
                        urlConnection.setConnectTimeout(6000);
                        int responseCode = urlConnection.getResponseCode();
                        if (responseCode == HttpURLConnection.HTTP_OK) {
                            InputStream inputStream = urlConnection.getInputStream();
                            Bitmap bitmap = BitmapFactory.decodeStream(inputStream);
                            return bitmap; // 将 Bitmap 数据流向下游
                        }
                        return null;
                    }
                })
               .subscribeOn(Schedulers.io()) // 上面的代码分配工作线程
               .observeOn(AndroidSchedulers.mainThread()) // 下面的代码分别UI线程
                // 链式调用 subscribe 绑定观察者
                .subscribe(new Observer<Bitmap>() {
                    // onSubscribe() 方法在发送事件之前执行
                    @Override
                    public void onSubscribe(Disposable d) {
                        loading = ProgressDialog.show(MainActivity.this“”“loading”);
                        disposable = d;
                    }

                    // onNext() 在发送事件之后执行
                    @Override
                    public void onNext(Bitmap bitmap) {
                        imageView.setImageBitmap(bitmap);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.e(TAG, “onError: ” + e.getMessage(), e);
                    }

                    @Override
                    public void onComplete() {
                        if (loading != null) loading.dismiss();
                    }
                });
    }

我们点进 map 查看源码,如下

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
    ObjectHelper.requireNonNull(mapper, “mapper is null”);
    return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}

可知,最终返回的是一个 ObservableMap,点进 ObservableMap 如下:

public final class ObservableMap<TUextends AbstractObservableWithUpstream<TU{
    final Function<? super T, ? extends U> function;

    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        this.function = function;
    }

    // new MapObserver<T, U>(t, function) 把我们传进来的函数包上了一次 MapObserver
    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }

    …

}

关键代码 source.subscribe(new MapObserver<T, U>(t, function)) 把我们传进来的函数包上了一次 MapObserversource.subscribe() 这个 source 就是 Observable,就相当于 Observable.subscribe(),而 MapObserver 继承与 BasicFuseableObserverBasicFuseableObserver 实现了 Observer,最终 source.subscribe() 会执行到:

 @SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(@NonNull Observer<? super T> observer) {
        Objects.requireNonNull(observer, “observer is null”);
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            Objects.requireNonNull(observer, “The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
            // 是一个静态抽象方法,最终由实现类完成
            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can’t call onError because no way to know if a Disposable has been set or not
            // can’t call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException(“Actually not, but can’t throw other exceptions due to RS”);
            npe.initCause(e);
            throw npe;
        }
    }

subscribeActual(observer) 是一个静态抽象方法,最终由实现类完成,也就是 ObservableJustsubscribeActual 方法,如下

public final class ObservableJust<Textends Observable<Timplements ScalarSupplier<T{

    private final T value;
    public ObservableJust(final T value) {
        this.value = value;
    }
    // source.subscribe(new MapObserver<T, U>(t, function)); 又对 observer 包裹了一层
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        ScalarDisposable<T> sd = new ScalarDisposable<>(observer, value);
        observer.onSubscribe(sd);
        sd.run();
    }

    @Override
    public T get() {
        return value;
    }
}

observer 是我们从 source.subscribe 里 传进来的 MapObserver,而此段代码又对 observer 包裹了一层。

observer.onSubscribe(sd) 就会执行我们自己 new 出来的目标观察者的 onSubscribe 里的逻辑。

sd.run() 进去看看源码,如下:

public static final class ScalarDisposable<T>
    extends AtomicInteger
    implements QueueDisposable<T>, Runnable 
{

        …

        @Override
        public void run() {
            if (get() == START && compareAndSet(START, ON_NEXT)) {
              // observer 就是我们传进来的 MapObserver
                observer.onNext(value);
                if (get() == ON_NEXT) {
                    lazySet(ON_COMPLETE);
                    observer.onComplete();
                }
            }
        }
    }

observer.onNext(value)observer 就是我们传进来的 MapObserver,也就是执行 MapObserver.onNext(),如下:

static final class MapObserver<TUextends BasicFuseableObserver<TU{
        final Function<? super T, ? extends U> mapper;

        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != NONE) {
                downstream.onNext(null);
                return;
            }

            U v;

            try {
                v = Objects.requireNonNull(mapper.apply(t), “The mapper function returned a null value.”);
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            downstream.onNext(v);
        }

        …
    }

onNext() 通过 mapper.apply(t) 对我们的数据进行转换,如下

public interface Function<@NonNull T, @NonNull R{
    /
      Apply some calculation to the input value and return some other value.
     
 @param t the input value
      @return the output value
     
 @throws Throwable if the implementation wishes to throw any type of exception
     */

    apply(T t) throws Throwable;
}

传入 T 返回 R,而这个 apply 会执行我们实现的重写 apply 方法的里面逻辑。

RxJavamap 操作符使用了装饰器模式,在不影响主数据流的情况下,对需要加工的数据进行包装,在自己的包装类里完成数据的加工。

RxJava 里的操作符非常多,只要你理解其中的几个的原理,其它的操作符原理都差不多,下面列出了基本所有的操作符,如图:

no-shadow
本文结束感谢您的阅读

本文标题:Android响应式编程之RxJava

文章作者:李少颖(persilee)

发布时间:2020年08月07日 - 15:08

最后更新:2020年08月07日 - 18:08

原始链接:https://h.lishaoy.net/rxjava.html

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。

坚持原创技术分享,您的支持将鼓励我继续创作!