ConnectableObservable.js
1.93 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import { Observable } from '../Observable';
import { Subscription } from '../Subscription';
import { refCount as higherOrderRefCount } from '../operators/refCount';
import { createOperatorSubscriber } from '../operators/OperatorSubscriber';
import { hasLift } from '../util/lift';
export class ConnectableObservable extends Observable {
constructor(source, subjectFactory) {
super();
this.source = source;
this.subjectFactory = subjectFactory;
this._subject = null;
this._refCount = 0;
this._connection = null;
if (hasLift(source)) {
this.lift = source.lift;
}
}
_subscribe(subscriber) {
return this.getSubject().subscribe(subscriber);
}
getSubject() {
const subject = this._subject;
if (!subject || subject.isStopped) {
this._subject = this.subjectFactory();
}
return this._subject;
}
_teardown() {
this._refCount = 0;
const { _connection } = this;
this._subject = this._connection = null;
_connection === null || _connection === void 0 ? void 0 : _connection.unsubscribe();
}
connect() {
let connection = this._connection;
if (!connection) {
connection = this._connection = new Subscription();
const subject = this.getSubject();
connection.add(this.source.subscribe(createOperatorSubscriber(subject, undefined, () => {
this._teardown();
subject.complete();
}, (err) => {
this._teardown();
subject.error(err);
}, () => this._teardown())));
if (connection.closed) {
this._connection = null;
connection = Subscription.EMPTY;
}
}
return connection;
}
refCount() {
return higherOrderRefCount()(this);
}
}
//# sourceMappingURL=ConnectableObservable.js.map