You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							207 lines
						
					
					
						
							5.8 KiB
						
					
					
				
			
		
		
	
	
							207 lines
						
					
					
						
							5.8 KiB
						
					
					
				'use strict';
 | 
						|
 | 
						|
var _Object$setPrototypeO;
 | 
						|
 | 
						|
function _defineProperty(obj, key, value) { if (key in obj) { Object.defineProperty(obj, key, { value: value, enumerable: true, configurable: true, writable: true }); } else { obj[key] = value; } return obj; }
 | 
						|
 | 
						|
var finished = require('./end-of-stream');
 | 
						|
 | 
						|
var kLastResolve = Symbol('lastResolve');
 | 
						|
var kLastReject = Symbol('lastReject');
 | 
						|
var kError = Symbol('error');
 | 
						|
var kEnded = Symbol('ended');
 | 
						|
var kLastPromise = Symbol('lastPromise');
 | 
						|
var kHandlePromise = Symbol('handlePromise');
 | 
						|
var kStream = Symbol('stream');
 | 
						|
 | 
						|
function createIterResult(value, done) {
 | 
						|
  return {
 | 
						|
    value: value,
 | 
						|
    done: done
 | 
						|
  };
 | 
						|
}
 | 
						|
 | 
						|
function readAndResolve(iter) {
 | 
						|
  var resolve = iter[kLastResolve];
 | 
						|
 | 
						|
  if (resolve !== null) {
 | 
						|
    var data = iter[kStream].read(); // we defer if data is null
 | 
						|
    // we can be expecting either 'end' or
 | 
						|
    // 'error'
 | 
						|
 | 
						|
    if (data !== null) {
 | 
						|
      iter[kLastPromise] = null;
 | 
						|
      iter[kLastResolve] = null;
 | 
						|
      iter[kLastReject] = null;
 | 
						|
      resolve(createIterResult(data, false));
 | 
						|
    }
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
function onReadable(iter) {
 | 
						|
  // we wait for the next tick, because it might
 | 
						|
  // emit an error with process.nextTick
 | 
						|
  process.nextTick(readAndResolve, iter);
 | 
						|
}
 | 
						|
 | 
						|
function wrapForNext(lastPromise, iter) {
 | 
						|
  return function (resolve, reject) {
 | 
						|
    lastPromise.then(function () {
 | 
						|
      if (iter[kEnded]) {
 | 
						|
        resolve(createIterResult(undefined, true));
 | 
						|
        return;
 | 
						|
      }
 | 
						|
 | 
						|
      iter[kHandlePromise](resolve, reject);
 | 
						|
    }, reject);
 | 
						|
  };
 | 
						|
}
 | 
						|
 | 
						|
var AsyncIteratorPrototype = Object.getPrototypeOf(function () {});
 | 
						|
var ReadableStreamAsyncIteratorPrototype = Object.setPrototypeOf((_Object$setPrototypeO = {
 | 
						|
  get stream() {
 | 
						|
    return this[kStream];
 | 
						|
  },
 | 
						|
 | 
						|
  next: function next() {
 | 
						|
    var _this = this;
 | 
						|
 | 
						|
    // if we have detected an error in the meanwhile
 | 
						|
    // reject straight away
 | 
						|
    var error = this[kError];
 | 
						|
 | 
						|
    if (error !== null) {
 | 
						|
      return Promise.reject(error);
 | 
						|
    }
 | 
						|
 | 
						|
    if (this[kEnded]) {
 | 
						|
      return Promise.resolve(createIterResult(undefined, true));
 | 
						|
    }
 | 
						|
 | 
						|
    if (this[kStream].destroyed) {
 | 
						|
      // We need to defer via nextTick because if .destroy(err) is
 | 
						|
      // called, the error will be emitted via nextTick, and
 | 
						|
      // we cannot guarantee that there is no error lingering around
 | 
						|
      // waiting to be emitted.
 | 
						|
      return new Promise(function (resolve, reject) {
 | 
						|
        process.nextTick(function () {
 | 
						|
          if (_this[kError]) {
 | 
						|
            reject(_this[kError]);
 | 
						|
          } else {
 | 
						|
            resolve(createIterResult(undefined, true));
 | 
						|
          }
 | 
						|
        });
 | 
						|
      });
 | 
						|
    } // if we have multiple next() calls
 | 
						|
    // we will wait for the previous Promise to finish
 | 
						|
    // this logic is optimized to support for await loops,
 | 
						|
    // where next() is only called once at a time
 | 
						|
 | 
						|
 | 
						|
    var lastPromise = this[kLastPromise];
 | 
						|
    var promise;
 | 
						|
 | 
						|
    if (lastPromise) {
 | 
						|
      promise = new Promise(wrapForNext(lastPromise, this));
 | 
						|
    } else {
 | 
						|
      // fast path needed to support multiple this.push()
 | 
						|
      // without triggering the next() queue
 | 
						|
      var data = this[kStream].read();
 | 
						|
 | 
						|
      if (data !== null) {
 | 
						|
        return Promise.resolve(createIterResult(data, false));
 | 
						|
      }
 | 
						|
 | 
						|
      promise = new Promise(this[kHandlePromise]);
 | 
						|
    }
 | 
						|
 | 
						|
    this[kLastPromise] = promise;
 | 
						|
    return promise;
 | 
						|
  }
 | 
						|
}, _defineProperty(_Object$setPrototypeO, Symbol.asyncIterator, function () {
 | 
						|
  return this;
 | 
						|
}), _defineProperty(_Object$setPrototypeO, "return", function _return() {
 | 
						|
  var _this2 = this;
 | 
						|
 | 
						|
  // destroy(err, cb) is a private API
 | 
						|
  // we can guarantee we have that here, because we control the
 | 
						|
  // Readable class this is attached to
 | 
						|
  return new Promise(function (resolve, reject) {
 | 
						|
    _this2[kStream].destroy(null, function (err) {
 | 
						|
      if (err) {
 | 
						|
        reject(err);
 | 
						|
        return;
 | 
						|
      }
 | 
						|
 | 
						|
      resolve(createIterResult(undefined, true));
 | 
						|
    });
 | 
						|
  });
 | 
						|
}), _Object$setPrototypeO), AsyncIteratorPrototype);
 | 
						|
 | 
						|
var createReadableStreamAsyncIterator = function createReadableStreamAsyncIterator(stream) {
 | 
						|
  var _Object$create;
 | 
						|
 | 
						|
  var iterator = Object.create(ReadableStreamAsyncIteratorPrototype, (_Object$create = {}, _defineProperty(_Object$create, kStream, {
 | 
						|
    value: stream,
 | 
						|
    writable: true
 | 
						|
  }), _defineProperty(_Object$create, kLastResolve, {
 | 
						|
    value: null,
 | 
						|
    writable: true
 | 
						|
  }), _defineProperty(_Object$create, kLastReject, {
 | 
						|
    value: null,
 | 
						|
    writable: true
 | 
						|
  }), _defineProperty(_Object$create, kError, {
 | 
						|
    value: null,
 | 
						|
    writable: true
 | 
						|
  }), _defineProperty(_Object$create, kEnded, {
 | 
						|
    value: stream._readableState.endEmitted,
 | 
						|
    writable: true
 | 
						|
  }), _defineProperty(_Object$create, kHandlePromise, {
 | 
						|
    value: function value(resolve, reject) {
 | 
						|
      var data = iterator[kStream].read();
 | 
						|
 | 
						|
      if (data) {
 | 
						|
        iterator[kLastPromise] = null;
 | 
						|
        iterator[kLastResolve] = null;
 | 
						|
        iterator[kLastReject] = null;
 | 
						|
        resolve(createIterResult(data, false));
 | 
						|
      } else {
 | 
						|
        iterator[kLastResolve] = resolve;
 | 
						|
        iterator[kLastReject] = reject;
 | 
						|
      }
 | 
						|
    },
 | 
						|
    writable: true
 | 
						|
  }), _Object$create));
 | 
						|
  iterator[kLastPromise] = null;
 | 
						|
  finished(stream, function (err) {
 | 
						|
    if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') {
 | 
						|
      var reject = iterator[kLastReject]; // reject if we are waiting for data in the Promise
 | 
						|
      // returned by next() and store the error
 | 
						|
 | 
						|
      if (reject !== null) {
 | 
						|
        iterator[kLastPromise] = null;
 | 
						|
        iterator[kLastResolve] = null;
 | 
						|
        iterator[kLastReject] = null;
 | 
						|
        reject(err);
 | 
						|
      }
 | 
						|
 | 
						|
      iterator[kError] = err;
 | 
						|
      return;
 | 
						|
    }
 | 
						|
 | 
						|
    var resolve = iterator[kLastResolve];
 | 
						|
 | 
						|
    if (resolve !== null) {
 | 
						|
      iterator[kLastPromise] = null;
 | 
						|
      iterator[kLastResolve] = null;
 | 
						|
      iterator[kLastReject] = null;
 | 
						|
      resolve(createIterResult(undefined, true));
 | 
						|
    }
 | 
						|
 | 
						|
    iterator[kEnded] = true;
 | 
						|
  });
 | 
						|
  stream.on('readable', onReadable.bind(null, iterator));
 | 
						|
  return iterator;
 | 
						|
};
 | 
						|
 | 
						|
module.exports = createReadableStreamAsyncIterator; |