Skip to content
This repository has been archived by the owner on Feb 26, 2024. It is now read-only.

fix(rxjs): fix #863, fix asap scheduler issue, add testcases #848

Merged
merged 18 commits into from
Aug 8, 2017
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
refactor
  • Loading branch information
JiaLiPassion committed Aug 7, 2017
commit 34f20db8afad9634c066fc64ecd7b4f5434456a4
166 changes: 79 additions & 87 deletions lib/rxjs/rxjs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {Observable} from 'rxjs/Observable';
import {asap} from 'rxjs/scheduler/asap';
import {Subscriber} from 'rxjs/Subscriber';
import {Subscription} from 'rxjs/Subscription';
import {rxSubscriber} from 'rxjs/symbol/rxSubscriber';

(Zone as any).__load_patch('rxjs', (global: any, Zone: ZoneType, api: any) => {
const symbol: (symbolString: string) => string = (Zone as any).__symbol__;
Expand All @@ -27,78 +28,43 @@ import {Subscription} from 'rxjs/Subscription';
const unsubscribeSource = 'rxjs.Subscriber.unsubscribe';
const teardownSource = 'rxjs.Subscriber.teardownLogic';

const _patchedSubscribe = function() {
const currentZone = Zone.current;
const _zone = this._zone;
const empty = {
closed: true,
next(value: any): void{},
error(err: any): void{throw err;},
complete(): void{}
};

const args = Array.prototype.slice.call(arguments);
// _subscribe should run in ConstructorZone
// but for performance concern, we should check
// whether ConsturctorZone === Zone.current here
const tearDownLogic = (_zone && _zone !== Zone.current) ?
_zone.run(this._originalSubscribe, this, args, subscribeSource) :
this._originalSubscribe.apply(this, args);
if (tearDownLogic && typeof tearDownLogic === 'function') {
const patchedTearDownLogic = function() {
// tearDownLogic should also run in ConstructorZone
// but for performance concern, we should check
// whether ConsturctorZone === Zone.current here
if (_zone && _zone !== Zone.current) {
return _zone.run(tearDownLogic, this, arguments, teardownSource);
} else {
return tearDownLogic.apply(this, arguments);
}
};
return patchedTearDownLogic;
function toSubscriber<T>(
nextOrObserver?: any, error?: (error: any) => void, complete?: () => void): Subscriber<T> {
if (nextOrObserver) {
if (nextOrObserver instanceof Subscriber) {
return (<Subscriber<T>>nextOrObserver);
}

if (nextOrObserver[rxSubscriber]) {
return nextOrObserver[rxSubscriber]();
}
}
return tearDownLogic;
};

if (!nextOrObserver && !error && !complete) {
return new Subscriber(empty);
}

return new Subscriber(nextOrObserver, error, complete);
}

const patchObservable = function() {
const ObservablePrototype: any = Observable.prototype;
const symbolSubscribe = symbol('subscribe');
const _symbolSubscribe = symbol('_subscribe');
const _subscribe = ObservablePrototype[_symbolSubscribe] = ObservablePrototype._subscribe;
const subscribe = ObservablePrototype[symbolSubscribe] = ObservablePrototype.subscribe;

if (!ObservablePrototype[symbolSubscribe]) {
const subscribe = ObservablePrototype[symbolSubscribe] = ObservablePrototype.subscribe;
// patch Observable.prototype.subscribe
// if SubscripitionZone is different with ConstructorZone
// we should run _subscribe in ConstructorZone and
// create sinke in SubscriptionZone,
// and tearDown should also run into ConstructorZone
ObservablePrototype.subscribe = function() {
const _zone = this._zone;
const currentZone = Zone.current;

// if operator is involved, we should also
// patch the call method to save the Subscription zone
if (this.operator && _zone && _zone !== currentZone) {
const call = this.operator.call;
this.operator.call = function() {
const args = Array.prototype.slice.call(arguments);
const subscriber = args.length > 0 ? args[0] : undefined;
if (!subscriber._zone) {
subscriber._zone = currentZone;
}
return _zone.run(call, this, args, subscribeSource);
};
}
if (this._subscribe && !this._originalSubscribe) {
const desc: any = Object.getOwnPropertyDescriptor(this, '_subscribe');
if (desc && desc.writable === false) {
return subscribe.apply(this, arguments);
}
this._originalSubscribe = this._subscribe;
const beforeZone = this._zone;
this._subscribe = _patchedSubscribe;
this._zone = beforeZone;
}
return subscribe.apply(this, arguments);
};
}

Object.defineProperties(ObservablePrototype, {
Object.defineProperties(Observable.prototype, {
_zone: {value: null, writable: true, configurable: true},
_zoneSource: {value: null, writable: true, configurable: true},
_zoneSubscribe: {value: null, writable: true, configurable: true},
source: {
configurable: true,
get: function(this: Observable<any>) {
Expand All @@ -117,30 +83,68 @@ import {Subscription} from 'rxjs/Subscription';
} else if (this.constructor === Observable) {
return _subscribe;
}
const proto = Object.getPrototypeOf(this);
return proto && proto._subscribe;
return (<any>this).__proto__._subscribe;
},
set: function(this: Observable<any>, subscribe: any) {
(this as any)._zoneSubscribe = subscribe;
(this as any)._zone = Zone.current;
(this as any)._zoneSubscribe = subscribe;
}
},
subscribe: {
writable: true,
configurable: true,
value: function(this: Observable<any>, observerOrNext: any, error: any, complete: any) {
// Only grab a zone if we Zone exists and it is different from the current zone.
const _zone = (this as any)._zone;
if (_zone && _zone !== Zone.current) {
// Current Zone is different from the intended zone.
// Restore the zone before invoking the subscribe callback.
return _zone.run(subscribe, this, [toSubscriber(observerOrNext, error, complete)]);
}
return subscribe.call(this, observerOrNext, error, complete);
}
}
});
};

const createSymbol = symbol('create');
const create = (Observable as any)[createSymbol] = Observable.create;
Observable.create = function() {
const observable = create.apply(this, arguments);
observable._zone = Zone.current;
return observable;
};
const patchSubscription = function() {
const unsubscribeSymbol = symbol('unsubscribe');
const unsubscribe = (Subscription.prototype as any)[unsubscribeSymbol] =
Subscription.prototype.unsubscribe;
Object.defineProperties(Subscription.prototype, {
_zone: {value: null, writable: true, configurable: true},
_zoneUnsubscribe: {value: null, writable: true, configurable: true},
_unsubscribe: {
get: function(this: Subscription) {
return (this as any)._zoneUnsubscribe || (<any>this).__proto__._unsubscribe;
},
set: function(this: Subscription, unsubscribe: any) {
(this as any)._zone = Zone.current;
(this as any)._zoneUnsubscribe = unsubscribe;
}
},
unsubscribe: {
writable: true,
configurable: true,
value: function(this: Subscription) {
// Only grab a zone if we Zone exists and it is different from the current zone.
const _zone: Zone = (this as any)._zone;
if (_zone && _zone !== Zone.current) {
// Current Zone is different from the intended zone.
// Restore the zone before invoking the subscribe callback.
_zone.run(unsubscribe, this);
} else {
unsubscribe.apply(this);
}
}
}
});
};

const patchSubscriber = function() {
const next = Subscriber.prototype.next;
const error = Subscriber.prototype.error;
const complete = Subscriber.prototype.complete;
const unsubscribe = Subscription.prototype.unsubscribe;

Object.defineProperty(Subscriber.prototype, 'destination', {
configurable: true,
Expand Down Expand Up @@ -193,19 +197,6 @@ import {Subscription} from 'rxjs/Subscription';
return complete.apply(this, arguments);
}
};

Subscription.prototype.unsubscribe = function() {
const currentZone = Zone.current;
const subscriptionZone = this._zone;

// for performance concern, check Zone.current
// equal with this._zone(SubscriptionZone) or not
if (subscriptionZone && subscriptionZone !== currentZone) {
return subscriptionZone.run(unsubscribe, this, arguments, unsubscribeSource);
} else {
return unsubscribe.apply(this, arguments);
}
};
};

const patchObservableInstance = function(observable: any) {
Expand Down Expand Up @@ -342,6 +333,7 @@ import {Subscription} from 'rxjs/Subscription';
};

patchObservable();
patchSubscription();
patchSubscriber();
patchObservableFactoryCreator(Observable, 'bindCallback');
patchObservableFactoryCreator(Observable, 'bindNodeCallback');
Expand Down
3 changes: 2 additions & 1 deletion test/global-rxjs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ exports.bindNodeCallback = globalRx.Observable.bindNodeCallback;
exports.defer = globalRx.Observable.defer;
exports.forkJoin = globalRx.Observable.forkJoin;
exports.fromEventPattern = globalRx.Observable.fromEventPattern;
exports.multicast = globalRx.Observable.multicast;
exports.multicast = globalRx.Observable.multicast;
exports.rxSubscriber = globalRx.Symbol.rxSubscriber;
3 changes: 2 additions & 1 deletion test/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ System.config({
'rxjs/scheduler/asap': 'base/build/test/global-rxjs.js',
'rxjs/Subject': 'base/build/test/global-rxjs.js',
'rxjs/Subscriber': 'base/build/test/global-rxjs.js',
'rxjs/Subscription': 'base/build/test/global-rxjs.js'
'rxjs/Subscription': 'base/build/test/global-rxjs.js',
'rxjs/symbol/rxSubscriber': 'base/build/test/global-rxjs.js',
},
});

Expand Down