You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							192 lines
						
					
					
						
							5.0 KiB
						
					
					
				
			
		
		
	
	
							192 lines
						
					
					
						
							5.0 KiB
						
					
					
				| 'use strict';
 | |
| 
 | |
| import stream from 'stream';
 | |
| import utils from '../utils.js';
 | |
| import throttle from './throttle.js';
 | |
| import speedometer from './speedometer.js';
 | |
| 
 | |
| const kInternals = Symbol('internals');
 | |
| 
 | |
| class AxiosTransformStream extends stream.Transform{
 | |
|   constructor(options) {
 | |
|     options = utils.toFlatObject(options, {
 | |
|       maxRate: 0,
 | |
|       chunkSize: 64 * 1024,
 | |
|       minChunkSize: 100,
 | |
|       timeWindow: 500,
 | |
|       ticksRate: 2,
 | |
|       samplesCount: 15
 | |
|     }, null, (prop, source) => {
 | |
|       return !utils.isUndefined(source[prop]);
 | |
|     });
 | |
| 
 | |
|     super({
 | |
|       readableHighWaterMark: options.chunkSize
 | |
|     });
 | |
| 
 | |
|     const self = this;
 | |
| 
 | |
|     const internals = this[kInternals] = {
 | |
|       length: options.length,
 | |
|       timeWindow: options.timeWindow,
 | |
|       ticksRate: options.ticksRate,
 | |
|       chunkSize: options.chunkSize,
 | |
|       maxRate: options.maxRate,
 | |
|       minChunkSize: options.minChunkSize,
 | |
|       bytesSeen: 0,
 | |
|       isCaptured: false,
 | |
|       notifiedBytesLoaded: 0,
 | |
|       ts: Date.now(),
 | |
|       bytes: 0,
 | |
|       onReadCallback: null
 | |
|     };
 | |
| 
 | |
|     const _speedometer = speedometer(internals.ticksRate * options.samplesCount, internals.timeWindow);
 | |
| 
 | |
|     this.on('newListener', event => {
 | |
|       if (event === 'progress') {
 | |
|         if (!internals.isCaptured) {
 | |
|           internals.isCaptured = true;
 | |
|         }
 | |
|       }
 | |
|     });
 | |
| 
 | |
|     let bytesNotified = 0;
 | |
| 
 | |
|     internals.updateProgress = throttle(function throttledHandler() {
 | |
|       const totalBytes = internals.length;
 | |
|       const bytesTransferred = internals.bytesSeen;
 | |
|       const progressBytes = bytesTransferred - bytesNotified;
 | |
|       if (!progressBytes || self.destroyed) return;
 | |
| 
 | |
|       const rate = _speedometer(progressBytes);
 | |
| 
 | |
|       bytesNotified = bytesTransferred;
 | |
| 
 | |
|       process.nextTick(() => {
 | |
|         self.emit('progress', {
 | |
|           'loaded': bytesTransferred,
 | |
|           'total': totalBytes,
 | |
|           'progress': totalBytes ? (bytesTransferred / totalBytes) : undefined,
 | |
|           'bytes': progressBytes,
 | |
|           'rate': rate ? rate : undefined,
 | |
|           'estimated': rate && totalBytes && bytesTransferred <= totalBytes ?
 | |
|             (totalBytes - bytesTransferred) / rate : undefined
 | |
|         });
 | |
|       });
 | |
|     }, internals.ticksRate);
 | |
| 
 | |
|     const onFinish = () => {
 | |
|       internals.updateProgress(true);
 | |
|     };
 | |
| 
 | |
|     this.once('end', onFinish);
 | |
|     this.once('error', onFinish);
 | |
|   }
 | |
| 
 | |
|   _read(size) {
 | |
|     const internals = this[kInternals];
 | |
| 
 | |
|     if (internals.onReadCallback) {
 | |
|       internals.onReadCallback();
 | |
|     }
 | |
| 
 | |
|     return super._read(size);
 | |
|   }
 | |
| 
 | |
|   _transform(chunk, encoding, callback) {
 | |
|     const self = this;
 | |
|     const internals = this[kInternals];
 | |
|     const maxRate = internals.maxRate;
 | |
| 
 | |
|     const readableHighWaterMark = this.readableHighWaterMark;
 | |
| 
 | |
|     const timeWindow = internals.timeWindow;
 | |
| 
 | |
|     const divider = 1000 / timeWindow;
 | |
|     const bytesThreshold = (maxRate / divider);
 | |
|     const minChunkSize = internals.minChunkSize !== false ? Math.max(internals.minChunkSize, bytesThreshold * 0.01) : 0;
 | |
| 
 | |
|     function pushChunk(_chunk, _callback) {
 | |
|       const bytes = Buffer.byteLength(_chunk);
 | |
|       internals.bytesSeen += bytes;
 | |
|       internals.bytes += bytes;
 | |
| 
 | |
|       if (internals.isCaptured) {
 | |
|         internals.updateProgress();
 | |
|       }
 | |
| 
 | |
|       if (self.push(_chunk)) {
 | |
|         process.nextTick(_callback);
 | |
|       } else {
 | |
|         internals.onReadCallback = () => {
 | |
|           internals.onReadCallback = null;
 | |
|           process.nextTick(_callback);
 | |
|         };
 | |
|       }
 | |
|     }
 | |
| 
 | |
|     const transformChunk = (_chunk, _callback) => {
 | |
|       const chunkSize = Buffer.byteLength(_chunk);
 | |
|       let chunkRemainder = null;
 | |
|       let maxChunkSize = readableHighWaterMark;
 | |
|       let bytesLeft;
 | |
|       let passed = 0;
 | |
| 
 | |
|       if (maxRate) {
 | |
|         const now = Date.now();
 | |
| 
 | |
|         if (!internals.ts || (passed = (now - internals.ts)) >= timeWindow) {
 | |
|           internals.ts = now;
 | |
|           bytesLeft = bytesThreshold - internals.bytes;
 | |
|           internals.bytes = bytesLeft < 0 ? -bytesLeft : 0;
 | |
|           passed = 0;
 | |
|         }
 | |
| 
 | |
|         bytesLeft = bytesThreshold - internals.bytes;
 | |
|       }
 | |
| 
 | |
|       if (maxRate) {
 | |
|         if (bytesLeft <= 0) {
 | |
|           // next time window
 | |
|           return setTimeout(() => {
 | |
|             _callback(null, _chunk);
 | |
|           }, timeWindow - passed);
 | |
|         }
 | |
| 
 | |
|         if (bytesLeft < maxChunkSize) {
 | |
|           maxChunkSize = bytesLeft;
 | |
|         }
 | |
|       }
 | |
| 
 | |
|       if (maxChunkSize && chunkSize > maxChunkSize && (chunkSize - maxChunkSize) > minChunkSize) {
 | |
|         chunkRemainder = _chunk.subarray(maxChunkSize);
 | |
|         _chunk = _chunk.subarray(0, maxChunkSize);
 | |
|       }
 | |
| 
 | |
|       pushChunk(_chunk, chunkRemainder ? () => {
 | |
|         process.nextTick(_callback, null, chunkRemainder);
 | |
|       } : _callback);
 | |
|     };
 | |
| 
 | |
|     transformChunk(chunk, function transformNextChunk(err, _chunk) {
 | |
|       if (err) {
 | |
|         return callback(err);
 | |
|       }
 | |
| 
 | |
|       if (_chunk) {
 | |
|         transformChunk(_chunk, transformNextChunk);
 | |
|       } else {
 | |
|         callback(null);
 | |
|       }
 | |
|     });
 | |
|   }
 | |
| 
 | |
|   setLength(length) {
 | |
|     this[kInternals].length = +length;
 | |
|     return this;
 | |
|   }
 | |
| }
 | |
| 
 | |
| export default AxiosTransformStream;
 |