添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
3
4

More than 5 years have passed since last update.

RxJS の Operators (8) - Observable Utility Operators (2)

Last updated at Posted at 2015-12-10

この記事は bouzuya's RxJS Advent Calendar 2015 の 10 日目かつ RxJS Advent Calendar 2015 の 10 日目です。

今日は昨日に続いて ReactiveX の Observable Utility Operators について RxJS の API ドキュメント やサンプルコードを書いていきます。

また RxJS 4.0.7 を対象にしています。

Observable.prototype.timeInterval
  • ReactiveX - TimeInterval operator
  • Observable.prototype.timeInterval API Document
  • Observable.prototype.timeInterval Source Code
  • 値 ( value ) とその前の値との間隔 ( interval ) とに変換します。後述の timestamp と似ています。

    import { Observable } from 'rx';
    Observable
      .timer(0, 1000)
      .timeInterval()
      .map(({ value, interval }) => `${value}:${interval}`)
      .take(4)
      .subscribe(
        value => console.log(`onNext: ${value}`),
        error => console.log(`onError: ${error}`),
        () => console.log('onCompleted')
    // onNext: 0:2
    // onNext: 1:1002
    // onNext: 2:999
    // onNext: 3:997
    // onCompleted
    Observable.prototype.timestamp
    
  • ReactiveX - Timestamp operator
  • Observable.prototype.timestamp API Document
  • Observable.prototype.timestamp Source Code
  • 値 (value) とそのタイムスタンプ (timestamp) とに変換します。前述の timestamp と似ています。

    import { Observable } from 'rx';
    Observable
      .timer(0, 1000)
      .timestamp()
      .map(({ value, timestamp }) => `${value}:${timestamp}`)
      .take(4)
      .subscribe(
        value => console.log(`onNext: ${value}`),
        error => console.log(`onError: ${error}`),
        () => console.log('onCompleted')
    // onNext: 0:1449755362020
    // onNext: 1:1449755363025
    // onNext: 2:1449755364023
    // onNext: 3:1449755365022
    // onCompleted
    Observable.prototype.timeout
    
  • ReactiveX - Timeout operator
  • Observable.prototype.timeout API Document
  • Observable.prototype.timeout Source Code
  • 指定した時間よりも interval があいた場合にタイムアウトとしてエラーにします。

    import { Observable } from 'rx';
    Observable
      .concat(
        Observable.timer(100),
        Observable.timer(200),
        Observable.timer(300),
        Observable.timer(400),
        Observable.timer(500)
      .timeInterval()
      .map(({ value, interval }) => `${value}:${interval}`)
      .timeout(350)
      .subscribe(
        value => console.log(`onNext: ${value}`),
        error => console.log(`onError: ${error}`),
        () => console.log('onCompleted')
    // onNext: 0:106
    // onNext: 0:224
    // onNext: 0:306
    // onError: TimeoutError: Timeout has occurred
    

    個人的には Rx のエラーハンドリングはそんなに便利だと思えないので、積極的に使いたいとは思えないのですが……。

    Observable.using
  • ReactiveX - Using operator
  • Observable.prototype.using API Document
  • Observable.prototype.using Source Code
  • まだ Disposable についてまったく触れていないので、あまり書きたくないのですけど……。

    .NET ではおなじみの using 構文に近いもの。Java だと try (...) {} が近いです。

    動作としては Disposable を必ず dispose してくれるはずです。 subscribe が返す Disposable (Rx 的には Subscription と呼ばれているはず……) を明示的に dispose すると良いはずです。

    import { Observable } from 'rx';
    class MyDisposable {
      dispose() {
        console.log('disposed');
    Observable
      .using(
        () => new MyDisposable(),
        (resource) => {
          // ... use disposable resource ...
          return Observable.empty();
      .subscribe(
        value => console.log(`onNext: ${value}`),
        error => console.log(`onError: ${error}`),
        () => console.log('onCompleted')
    // onCompleted
    // disposed
    

    この例では明示的に dispose はしていませんが、dispose が呼ばれているので、おそらく onCompleteddispose されていると思います (未確認) 。

    また Disposable 関連のコードを読むときに触れるつもりです。

    Observable.prototype.materialize / Observable.prototype.dematerialize
  • ReactiveX - Materialize/Dematerialize operator
  • Observable.prototype.materialize API Document
  • Observable.prototype.materialize Source Code
  • Observable.prototype.dematerialize API Document
  • Observable.prototype.dematerialize Source Code
  • onNextonCompletedonError 自体を操作できるようです。説明が難しいので例を挙げます。

    import { Observable } from 'rx';
    Observable
      .from([1, 2, 3])
      .materialize()
      .subscribe(
        value => console.log(`onNext: ${value}`),
        error => console.log(`onError: ${error}`),
        () => console.log('onCompleted')
    // onNext: OnNext(1)
    // onNext: OnNext(2)
    // onNext: OnNext(3)
    // onNext: OnCompleted()
    // onCompleted
    

    ここで流れてきているのは Rx.Notification クラスのインスタンスです。OnNext / OnCompleted / OnError があります。OnNextvalueOnErrorerror を持ちます。

    materializeObservalbe を流れるデータを Notification に変換します。 dematerialize はその逆を行います。

    今度は、明示的に NotificationObservabledematerialize してみましょう。

    import { Observable, Notification } from 'rx';
    Observable
      .from([
        Notification.createOnNext(4),
        Notification.createOnNext(5),
        Notification.createOnCompleted(),
        Notification.createOnNext(6)
      .dematerialize()
      .subscribe(
        value => console.log(`onNext: ${value}`),
        error => console.log(`onError: ${error}`),
        () => console.log('onCompleted')
    // onNext: 4
    // onNext: 5
    // onCompleted
    

    onCompleted のあとにはデータは流れないようです。当然そうですよね。

    メタな操作をしたくなったときに使えるのかもしれません。

    今日は昨日に続き Observable のユーティリティを見ました。知っておくと便利な場面があるかもしれませんね。

    3
    4
    0

    Register as a new user and use Qiita more conveniently

    1. You get articles that match your needs
    2. You can efficiently read back useful information
    3. You can use dark theme
    What you can do with signing up
    3
    4