pbootcms网站模板|日韩1区2区|织梦模板||网站源码|日韩1区2区|jquery建站特效-html5模板网

RxJava2 線程調度的方法

這篇文章主要介紹了RxJava2 線程調度的方法,小編覺得挺不錯的,現在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧

subscribeOn和observeOn負責線程切換,同時某些操作符也默認指定了線程.

我們這里不分析在線程中怎么執行的.只看如何切換到某個指定線程.

subscribeOn

Observable.subscribeOn()在方法內部生成了一個ObservableSubscribeOn對象.

主要看一下ObservableSubscribeOn的subscribeActual方法.


 @Override
  public void subscribeActual(final Observer<? super T> observer) {
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
    //調用下游的Observer的onSubscribe方法
    observer.onSubscribe(parent);
    //通過SubscribeTask執行了上游Observable的subscribeActual方法
    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
  }

scheduler.scheduleDirect(Runnable)用于執行SubscribeTask這個任務.SubscribeTask本身是Runnable的實現類.看一下其run方法.


    @Override
    public void run() {
      //上游的Observable.subscribe方法被切換到了新的線程
      source.subscribe(parent);
    }

首先可以得出結論:subscribeOn將上游的Observable的subscribe方法切換到了新的線程.

如果多次調用subscribeOn切換線程,會有什么效果?

由下往上,每次調用subscribeOn,都會導致上游的Observable的subscribeActual切換到指定的線程.那么最后一次調用的切換最上游的創建型操作符的subscribeActual的執行線程.如果操作符有默認執行線程怎么辦?

操作符默認線程

如果是創建型操作符,處于最上游,那么subscribeOn的線程切換對它不起作用.天高皇帝遠,縣官不如現管.就是這個道理.
如果是其它操作符,會是怎樣的?

以操作符timeout為例:它對應ObservableTimeoutTimed和TimeoutObserver


 @Override
    public void onNext(T t) {
      downstream.onNext(t);
      //超時計時
      startTimeout(idx + 1);
    }

    void startTimeout(long nextIndex) {
      //交給操作符默認的線程執行
      task.replace(worker.schedule(new TimeoutTask(nextIndex, this), timeout, unit));
    }

    @Override
    public void onError(Throwable t) {
        downstream.onError(t); 
    }

    @Override
    public void onComplete() {
        downstream.onComplete();
      }
    }

    @Override
    public void onTimeout(long idx) {
        downstream.onError(new TimeoutException(timeoutMessage(timeout, unit)));
    }


//TimeoutTask.java
static final class TimeoutTask implements Runnable {

    @Override
    public void run() {
      parent.onTimeout(idx);
    }
  }

可以看到操作符默認的執行線程只用來做超時計時任務,如果超時了,會在操作符的默認線程執行onError方法..操作符默認線程對下游的observer造成什么影響要做具體對待.

observeOn

observeOn對應ObservableObserveOnObserveOnObserver.


 //ObservableObserveOn.java
 @Override
  protected void subscribeActual(Observer<? super T> observer) {
    if (scheduler instanceof TrampolineScheduler) {
      source.subscribe(observer);
    } else {
      Scheduler.Worker w = scheduler.createWorker();
      source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
    }
  }

 //ObserveOnObserver.java 
  @Override
    public void onSubscribe(Disposable d) {
      if (DisposableHelper.validate(this.upstream, d)) {
        if (d instanceof QueueDisposable) {
          if (m == QueueDisposable.SYNC) {
          //執行下游Observer的onSubscribe方法
            downstream.onSubscribe(this);
            schedule();
            return;
          }
          if (m == QueueDisposable.ASYNC) {
           //執行下游Observer的onSubscribe方法
            downstream.onSubscribe(this);
            return;
          }
        }
         //執行下游Observer的onSubscribe方法
        downstream.onSubscribe(this);
      }
    }
    @Override
    public void onNext(T t) {
     //省略
      schedule();
    }
    @Override
    public void onError(Throwable t) {
     //省略
      schedule();
    }
     void schedule() {
      if (getAndIncrement() == 0) {
      /*
      ObserveOnObserver是Runnable的實現類.交給線程池執行
      */
        worker.schedule(this);
      }
    }
    
    
    void drainNormal() {
      final Observer<? super T> a = downstream;
      for (;;) {
        for (;;) {
          T v;
          try {
            v = q.poll();
          } catch (Throwable ex) {
            a.onError(ex);
            return;
          }
          //執行下游Observer的onNext方法
          a.onNext(v);
        }
      }
    }

    void drainFused() {
      for (;;) {
        if (!delayError && d && ex != null) {
          //執行下游Observer的onError方法
          downstream.onError(error);
          return;
        }
        downstream.onNext(null);
        if (d) {
          ex = error;
          if (ex != null) {
            //執行下游Observer的onError方法
            downstream.onError(ex);
          } else {
            //執行下游Observer的onComplete方法
            downstream.onComplete();
          }
          return;
        }
      }
    }
    //執行線程任務
    @Override
    public void run() {
      if (outputFused) {
        drainFused();
      } else {
        drainNormal();
      }
    }

從上面可以看出ObservableObserveOn在其subscribeActual方法中并沒有切換上游Observable的subscribe方法的執行線程.但是ObserveOnObserver在其onNext,onError和onComplete中通過schedule()方法將下游Observer的各個方法切換到了新的線程.

得出結論: observeOn負責切換的是下游Observer的各個方法的執行線程

如果下游多次通過observeOn切換線程,會有什么效果?

每次切換都會對其下游造成影響,直到遇到下一個observeOn為止.

Observer(onSubscribe,onNext,onError,onComplete)

onNext,onError,onComplete與上游最近的observeOn所切換的線程保持一致.onSubscribe則不同.
遇到線程切換的時候,會首先在對應的Observable的subscribeActual方法內,先調用observer.onSubscribe方法.而observer.onSubscribe會逐級向上傳遞直到最上游,而最上游的observer.onSubscribe是在subscribeActual方法內調用,這是在主線程執行的.所以onSubscribe方法無論如何都是在主線程執行.

doOnSubscribe


.doOnSubscribe(new Consumer<Disposable>() {
          @Override
          public void accept(Disposable disposable) throws Exception {
           
          }
        })

我們要看的是方法accept的執行線程.

通過源碼找到對應的DisposableLambdaObserver.


 @Override
  public void onSubscribe(Disposable d) {
  //在這里調用了accept方法.
      onSubscribe.accept(d);
  }

這就要看上游在哪個線程執行了Observer.onSubscribe(disposable)方法.

在創建型操作符的subscribeActual方法和subscribeOn對應的Observable的subscribeActual方法內調用了Observer.onSubscribe(disposable)方法.那么這兩處的執行線程就決定了onSubscribe.accept(d);的執行線程.

doFinally

對應ObservableDoFinally和DoFinallyObserver


 //DoFinallyObserver.java
 @Override
    public void onError(Throwable t) {
      runFinally();
    }

    @Override
    public void onComplete() {
      runFinally();
    }

    @Override
    public void dispose() {
      runFinally();
    }
    
     void runFinally() {
       onFinally.run();
    }

可以看到與它所對應的DoFinallyObserver的onError,onComplete,dispose方法的執行線程有關,這三個方法的執行線程又受到上游的observeOn的影響.如果沒有observeOn,則會受到最上游的observable.subscribeActual方法影響.

doOnError

對應ObservableDoOnEach和DoOnEachObserver


//DoOnEachObserver.java
 @Override
    public void onError(Throwable t) {
        onError.accept(t);
    }

和自身對應的observer.onError所在線程保持一致.

doOnNext

對應ObservableDoOnEach和DoOnEachObserver


//DoOnEachObserver.java
 @Override
    public void onNext(T t) {
        onNext.accept(t);
    }

和自身對應的observer.onNext所在線程保持一致.

操作符對應方法參數的執行線程

包io.reactivex.functions下的接口類一般用于處理上游數據然后往下傳遞.這些接口類的方法一般在對應的observer.onNext中調用.所以他們的線程保持一致.

總結:

subscribeOn由下往上逐級切換Observable.subscribe的執行線程,不受observeOn影響,也不受具有默認指定線程的非創建型操作符影響,但是會被更上游的subscribeOn奪取線程切換的權利,直到最上游.如果最上游的創建型操作符也有默認執行線程,那么任何一個subscribeOn的線程切換不起作用.subscribeOn由下向上到達最上游后,然后由上往下影響下游的observer的執行線程.遇到observeOn會被奪取線程切換的權利.observeOn影響的是下游的observer的執行線程,由上往下,遇到另一個observeOn會移交線程控制權力,遇到指定默認線程非創建型的操作符,要視具體情況對待.

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持html5模板網。

【網站聲明】本站部分內容來源于互聯網,旨在幫助大家更快的解決問題,如果有圖片或者內容侵犯了您的權益,請聯系我們刪除處理,感謝您的支持!

相關文檔推薦

主站蜘蛛池模板: 北京中航时代-耐电压击穿试验仪厂家-电压击穿试验机 | 钢板仓,大型钢板仓,钢板库,大型钢板库,粉煤灰钢板仓,螺旋钢板仓,螺旋卷板仓,骨料钢板仓 | 水厂自动化|污水处理中控系统|水利信息化|智慧水务|智慧农业-山东德艾自动化科技有限公司 | 世纪豪门官网 世纪豪门集成吊顶加盟电话 世纪豪门售后电话 | 氟塑料磁力泵-不锈钢离心泵-耐腐蚀化工泵厂家「皖金泵阀」 | 汽车水泵_汽车水泵厂家-瑞安市骏迪汽车配件有限公司 | 盐水蒸发器,水洗盐设备,冷凝结晶切片机,转鼓切片机,絮凝剂加药系统-无锡瑞司恩机械有限公司 | Trimos测长机_测高仪_TESA_mahr,WYLER水平仪,PWB对刀仪-德瑞华测量技术(苏州)有限公司 | 珠海白蚁防治_珠海灭鼠_珠海杀虫灭鼠_珠海灭蟑螂_珠海酒店消杀_珠海工厂杀虫灭鼠_立净虫控防治服务有限公司 | 海峰资讯 - 专注装饰公司营销型网站建设和网络营销培训 | 路面机械厂家 | 飞行者联盟-飞机模拟机_无人机_低空经济_航空技术交流平台 | 上海小程序开发-上海小程序制作公司-上海网站建设-公众号开发运营-软件外包公司-咏熠科技 | 双相钢_双相不锈钢_双相钢圆钢棒_双相不锈钢报价「海新双相钢」 双能x射线骨密度检测仪_dxa骨密度仪_双能x线骨密度仪_品牌厂家【品源医疗】 | 云南成考网_云南成人高考报名网 粤丰硕水性环氧地坪漆-防静电自流平厂家-环保地坪涂料代理 | 橡胶接头|可曲挠橡胶接头|橡胶软接头安装使用教程-上海松夏官方网站 | 天津力值检测-天津管道检测-天津天诚工程检测技术有限公司 | 生态板-实木生态板-生态板厂家-源木原作生态板品牌-深圳市方舟木业有限公司 | 河北中仪伟创试验仪器有限公司是专业生产沥青,土工,水泥,混凝土等试验仪器的厂家,咨询电话:13373070969 | 电动垃圾车,垃圾清运车-江苏速利达机车有限公司 | 云南标线|昆明划线|道路标线|交通标线-就选云南云路施工公司-云南云路科技有限公司 | 涡街流量计_LUGB智能管道式高温防爆蒸汽温压补偿计量表-江苏凯铭仪表有限公司 | 河南卓美创业科技有限公司-河南卓美防雷公司-防雷接地-防雷工程-重庆避雷针-避雷器-防雷检测-避雷带-避雷针-避雷塔、机房防雷、古建筑防雷等-山西防雷公司 | 浙江寺庙设计-杭州寺院设计-宁波寺庙规划_汉匠 | 酸度计_PH计_特斯拉计-西安云仪 纯水电导率测定仪-万用气体检测仪-低钠测定仪-米沃奇科技(北京)有限公司www.milwaukeeinst.cn | 联系我们-腾龙公司上分客服微信19116098882 | 电梯乘运质量测试仪_电梯安全评估测试仪-武汉懿之刻 | 烘干设备-热泵烘干机_广东雄贵能源设备有限公司 | 除尘布袋_液体过滤袋_针刺毡滤料-杭州辉龙过滤技术有限公司 | IIS7站长之家-站长工具-爱网站请使用IIS7站长综合查询工具,中国站长【WWW.IIS7.COM】 | NM-02立式吸污机_ZHCS-02软轴刷_二合一吸刷软轴刷-厦门地坤科技有限公司 | 鹤壁创新仪器公司-全自动量热仪,定硫仪,煤炭测硫仪,灰熔点测定仪,快速自动测氢仪,工业分析仪,煤质化验仪器 | 气弹簧定制-气动杆-可控气弹簧-不锈钢阻尼器-工业气弹簧-可调节气弹簧厂家-常州巨腾气弹簧供应商 | 济南ISO9000认证咨询代理公司,ISO9001认证,CMA实验室认证,ISO/TS16949认证,服务体系认证,资产管理体系认证,SC食品生产许可证- 济南创远企业管理咨询有限公司 郑州电线电缆厂家-防火|低压|低烟无卤电缆-河南明星电缆 | 焊接烟尘净化器__焊烟除尘设备_打磨工作台_喷漆废气治理设备 -催化燃烧设备 _天津路博蓝天环保科技有限公司 | 烟气换热器_GGH烟气换热器_空气预热器_高温气气换热器-青岛康景辉 | 河南mpp电力管_mpp电力管生产厂家_mpp电力电缆保护管价格 - 河南晨翀实业 | 电磁流量计_智能防腐防爆管道式计量表-金湖凯铭仪表有限公司 | 东莞螺杆空压机_永磁变频空压机_节能空压机_空压机工厂批发_深圳螺杆空压机_广州螺杆空压机_东莞空压机_空压机批发_东莞空压机工厂批发_东莞市文颖设备科技有限公司 | 水上浮桥-游艇码头-浮动码头-游船码头-码瑞纳游艇码头工程 | 高防护蠕动泵-多通道灌装系统-高防护蠕动泵-www.bjhuiyufluid.com慧宇伟业(北京)流体设备有限公司 |