千家信息网

使用 Rx 中预定义的 Subject

发表于:2025-01-22 作者:千家信息网编辑
千家信息网最后更新 2025年01月22日,看到一幅有趣的关于 Rx 学习的图,想知道学习 Rx 的学习曲线?不,是峭壁!我们可以直接通过 Rx 的 Observer 来创建 Observable 对象。但是,使用这种方式往往比较复杂,在特定的
千家信息网最后更新 2025年01月22日使用 Rx 中预定义的 Subject

看到一幅有趣的关于 Rx 学习的图,想知道学习 Rx 的学习曲线?不,是峭壁!

我们可以直接通过 Rx 的 Observer 来创建 Observable 对象。

但是,使用这种方式往往比较复杂,在特定的场景下,我们可以直接使用 Rx 提供的特定 Subject 来实现 Observable。这些特定的 Subject 是主题和订阅者的混合体,我们可以直接使用这样的一个对象来实现信息的发布和数据流的订阅。

1. Subject

通用的 Subject,既可以被订阅,从名字也可以看到它本身就是一个主题,所以可以直接用来发布信息。如果需要实现一个普通的主题,它就是最理想的选择。

使用方式:

发布信息的方法:

onNext( value )

发布一个新的值到数据流中。

onCompleted()

数据流终止。

onError( error )

发布异常。

使用示例:

var subject = new Rx.Subject();var subscription = subject.subscribe(    function (x) {        console.log('Next: ' + x.toString());    },    function (err) {        console.log('Error: ' + err);    },    function () {        console.log('Completed');    });subject.next(42);// => Next: 42subject.next(56);// => Next: 56subject.completed();// => Completed

See Also:

https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/subjects/subject.md

2. AsyncSubject

缓存直到 completed() 的最后一个值。所有的订阅者都会收到同样的最后一个值。

注意只能有一个值,在 completed() 之后,将不能再发布新的值。而所有的订阅者也只能得到最后一个值。

使用方式:

必须使用 completed() 完成流,订阅者将会在 completed() 之后才能得到最后一个值。

使用示例:

var subject = new Rx.AsyncSubject();var i = 0;var handle = setInterval(function () {    subject.onNext(i);    if (++i > 3) {        subject.onCompleted();        clearInterval(handle);    }}, 500);var subscription = subject.subscribe(    function (x) {        console.log('Next: ' + x.toString());    },    function (err) {        console.log('Error: ' + err);    },    function () {        console.log('Completed');    });// => Next: 3// => Completed

图例

See also: https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/subjects/asyncsubject.md

3. BehaviorSubject

如果是希望订阅者获得当前的最后一个值,但是后面可能还会提供新的值,可以考虑这个。

缓存已经发布的最后数据,新的订阅者可以接收到最后一个已经发布的值,和以后发布的新的值。

它可以直接设置一个初始值。如果不需要初始值,可以考虑使用 ReplaySubject.

使用方式:

BehaviorSubject(initialValue)

在构造函数中提供初始的值。

getValue()

获取当前的值,或者抛出异常,在调用了 completed() 之后,最后的值被保留。在 error() 之后,总是抛出一个特定的异常。

使用示例

/* Initialize with initial value of 42 */var subject = new Rx.BehaviorSubject(42);var subscription = subject.subscribe(    function (x) {        console.log('Next: ' + x.toString());    },    function (err) {        console.log('Error: ' + err);    },    function () {        console.log('Completed');    });// => Next: 42subject.next(56);// => Next: 56subject.completed();// => Completed

图例

See also:

https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/subjects/behaviorsubject.md

4. ReplaySubject

可以用来缓存流中最后 n 个数据,在新的观察者登记之后,这些缓存的数据直接发布给新的观察者。

使用说明:

在构造 ReplaySubject 对象的时候,配置缓存的数据元素数量以及时间窗口。

ReplaySubject([bufferSize], [windowSize], [scheduler])

使用特定的缓存大小,时间窗口和调度器来创建 ReplaySubject 对象实例.

Arguments
  1. [bufferSize = Number.MAX_VALUE] (Number): Maximum element count of the replay buffer.

  2. [windowSize = NUMBER.MAX_VALUE] (Number): Maximum time length of the replay buffer.

  3. [scheduler = Rx.Scheduler.currentThread] (Scheduler): Scheduler the observers are invoked on.

使用示例

var subject = new Rx.ReplaySubject(2 /* buffer size */);subject.next('a');subject.next('b');subject.next('c');var subscription = subject.subscribe(    function (x) {        console.log('Next: ' + x.toString());    },    function (err) {        console.log('Error: ' + err);    },    function () {        console.log('Completed');    });// => Next: b// => Next: csubject.next('d');// => Next: d


0