You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
199 lines
5.4 KiB
199 lines
5.4 KiB
'use strict'; |
|
// https://github.com/zenparsing/es-observable |
|
var $export = require('./_export'); |
|
var global = require('./_global'); |
|
var core = require('./_core'); |
|
var microtask = require('./_microtask')(); |
|
var OBSERVABLE = require('./_wks')('observable'); |
|
var aFunction = require('./_a-function'); |
|
var anObject = require('./_an-object'); |
|
var anInstance = require('./_an-instance'); |
|
var redefineAll = require('./_redefine-all'); |
|
var hide = require('./_hide'); |
|
var forOf = require('./_for-of'); |
|
var RETURN = forOf.RETURN; |
|
|
|
var getMethod = function (fn) { |
|
return fn == null ? undefined : aFunction(fn); |
|
}; |
|
|
|
var cleanupSubscription = function (subscription) { |
|
var cleanup = subscription._c; |
|
if (cleanup) { |
|
subscription._c = undefined; |
|
cleanup(); |
|
} |
|
}; |
|
|
|
var subscriptionClosed = function (subscription) { |
|
return subscription._o === undefined; |
|
}; |
|
|
|
var closeSubscription = function (subscription) { |
|
if (!subscriptionClosed(subscription)) { |
|
subscription._o = undefined; |
|
cleanupSubscription(subscription); |
|
} |
|
}; |
|
|
|
var Subscription = function (observer, subscriber) { |
|
anObject(observer); |
|
this._c = undefined; |
|
this._o = observer; |
|
observer = new SubscriptionObserver(this); |
|
try { |
|
var cleanup = subscriber(observer); |
|
var subscription = cleanup; |
|
if (cleanup != null) { |
|
if (typeof cleanup.unsubscribe === 'function') cleanup = function () { subscription.unsubscribe(); }; |
|
else aFunction(cleanup); |
|
this._c = cleanup; |
|
} |
|
} catch (e) { |
|
observer.error(e); |
|
return; |
|
} if (subscriptionClosed(this)) cleanupSubscription(this); |
|
}; |
|
|
|
Subscription.prototype = redefineAll({}, { |
|
unsubscribe: function unsubscribe() { closeSubscription(this); } |
|
}); |
|
|
|
var SubscriptionObserver = function (subscription) { |
|
this._s = subscription; |
|
}; |
|
|
|
SubscriptionObserver.prototype = redefineAll({}, { |
|
next: function next(value) { |
|
var subscription = this._s; |
|
if (!subscriptionClosed(subscription)) { |
|
var observer = subscription._o; |
|
try { |
|
var m = getMethod(observer.next); |
|
if (m) return m.call(observer, value); |
|
} catch (e) { |
|
try { |
|
closeSubscription(subscription); |
|
} finally { |
|
throw e; |
|
} |
|
} |
|
} |
|
}, |
|
error: function error(value) { |
|
var subscription = this._s; |
|
if (subscriptionClosed(subscription)) throw value; |
|
var observer = subscription._o; |
|
subscription._o = undefined; |
|
try { |
|
var m = getMethod(observer.error); |
|
if (!m) throw value; |
|
value = m.call(observer, value); |
|
} catch (e) { |
|
try { |
|
cleanupSubscription(subscription); |
|
} finally { |
|
throw e; |
|
} |
|
} cleanupSubscription(subscription); |
|
return value; |
|
}, |
|
complete: function complete(value) { |
|
var subscription = this._s; |
|
if (!subscriptionClosed(subscription)) { |
|
var observer = subscription._o; |
|
subscription._o = undefined; |
|
try { |
|
var m = getMethod(observer.complete); |
|
value = m ? m.call(observer, value) : undefined; |
|
} catch (e) { |
|
try { |
|
cleanupSubscription(subscription); |
|
} finally { |
|
throw e; |
|
} |
|
} cleanupSubscription(subscription); |
|
return value; |
|
} |
|
} |
|
}); |
|
|
|
var $Observable = function Observable(subscriber) { |
|
anInstance(this, $Observable, 'Observable', '_f')._f = aFunction(subscriber); |
|
}; |
|
|
|
redefineAll($Observable.prototype, { |
|
subscribe: function subscribe(observer) { |
|
return new Subscription(observer, this._f); |
|
}, |
|
forEach: function forEach(fn) { |
|
var that = this; |
|
return new (core.Promise || global.Promise)(function (resolve, reject) { |
|
aFunction(fn); |
|
var subscription = that.subscribe({ |
|
next: function (value) { |
|
try { |
|
return fn(value); |
|
} catch (e) { |
|
reject(e); |
|
subscription.unsubscribe(); |
|
} |
|
}, |
|
error: reject, |
|
complete: resolve |
|
}); |
|
}); |
|
} |
|
}); |
|
|
|
redefineAll($Observable, { |
|
from: function from(x) { |
|
var C = typeof this === 'function' ? this : $Observable; |
|
var method = getMethod(anObject(x)[OBSERVABLE]); |
|
if (method) { |
|
var observable = anObject(method.call(x)); |
|
return observable.constructor === C ? observable : new C(function (observer) { |
|
return observable.subscribe(observer); |
|
}); |
|
} |
|
return new C(function (observer) { |
|
var done = false; |
|
microtask(function () { |
|
if (!done) { |
|
try { |
|
if (forOf(x, false, function (it) { |
|
observer.next(it); |
|
if (done) return RETURN; |
|
}) === RETURN) return; |
|
} catch (e) { |
|
if (done) throw e; |
|
observer.error(e); |
|
return; |
|
} observer.complete(); |
|
} |
|
}); |
|
return function () { done = true; }; |
|
}); |
|
}, |
|
of: function of() { |
|
for (var i = 0, l = arguments.length, items = new Array(l); i < l;) items[i] = arguments[i++]; |
|
return new (typeof this === 'function' ? this : $Observable)(function (observer) { |
|
var done = false; |
|
microtask(function () { |
|
if (!done) { |
|
for (var j = 0; j < items.length; ++j) { |
|
observer.next(items[j]); |
|
if (done) return; |
|
} observer.complete(); |
|
} |
|
}); |
|
return function () { done = true; }; |
|
}); |
|
} |
|
}); |
|
|
|
hide($Observable.prototype, OBSERVABLE, function () { return this; }); |
|
|
|
$export($export.G, { Observable: $Observable }); |
|
|
|
require('./_set-species')('Observable');
|
|
|