You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					192 lines
				
				5.0 KiB
			
		
		
			
		
	
	
					192 lines
				
				5.0 KiB
			| 
											3 years ago
										 | 'use strict'; | ||
|  | 
 | ||
|  | import stream from 'stream'; | ||
|  | import utils from '../utils.js'; | ||
|  | import throttle from './throttle.js'; | ||
|  | import speedometer from './speedometer.js'; | ||
|  | 
 | ||
|  | const kInternals = Symbol('internals'); | ||
|  | 
 | ||
|  | class AxiosTransformStream extends stream.Transform{ | ||
|  |   constructor(options) { | ||
|  |     options = utils.toFlatObject(options, { | ||
|  |       maxRate: 0, | ||
|  |       chunkSize: 64 * 1024, | ||
|  |       minChunkSize: 100, | ||
|  |       timeWindow: 500, | ||
|  |       ticksRate: 2, | ||
|  |       samplesCount: 15 | ||
|  |     }, null, (prop, source) => { | ||
|  |       return !utils.isUndefined(source[prop]); | ||
|  |     }); | ||
|  | 
 | ||
|  |     super({ | ||
|  |       readableHighWaterMark: options.chunkSize | ||
|  |     }); | ||
|  | 
 | ||
|  |     const self = this; | ||
|  | 
 | ||
|  |     const internals = this[kInternals] = { | ||
|  |       length: options.length, | ||
|  |       timeWindow: options.timeWindow, | ||
|  |       ticksRate: options.ticksRate, | ||
|  |       chunkSize: options.chunkSize, | ||
|  |       maxRate: options.maxRate, | ||
|  |       minChunkSize: options.minChunkSize, | ||
|  |       bytesSeen: 0, | ||
|  |       isCaptured: false, | ||
|  |       notifiedBytesLoaded: 0, | ||
|  |       ts: Date.now(), | ||
|  |       bytes: 0, | ||
|  |       onReadCallback: null | ||
|  |     }; | ||
|  | 
 | ||
|  |     const _speedometer = speedometer(internals.ticksRate * options.samplesCount, internals.timeWindow); | ||
|  | 
 | ||
|  |     this.on('newListener', event => { | ||
|  |       if (event === 'progress') { | ||
|  |         if (!internals.isCaptured) { | ||
|  |           internals.isCaptured = true; | ||
|  |         } | ||
|  |       } | ||
|  |     }); | ||
|  | 
 | ||
|  |     let bytesNotified = 0; | ||
|  | 
 | ||
|  |     internals.updateProgress = throttle(function throttledHandler() { | ||
|  |       const totalBytes = internals.length; | ||
|  |       const bytesTransferred = internals.bytesSeen; | ||
|  |       const progressBytes = bytesTransferred - bytesNotified; | ||
|  |       if (!progressBytes || self.destroyed) return; | ||
|  | 
 | ||
|  |       const rate = _speedometer(progressBytes); | ||
|  | 
 | ||
|  |       bytesNotified = bytesTransferred; | ||
|  | 
 | ||
|  |       process.nextTick(() => { | ||
|  |         self.emit('progress', { | ||
|  |           'loaded': bytesTransferred, | ||
|  |           'total': totalBytes, | ||
|  |           'progress': totalBytes ? (bytesTransferred / totalBytes) : undefined, | ||
|  |           'bytes': progressBytes, | ||
|  |           'rate': rate ? rate : undefined, | ||
|  |           'estimated': rate && totalBytes && bytesTransferred <= totalBytes ? | ||
|  |             (totalBytes - bytesTransferred) / rate : undefined | ||
|  |         }); | ||
|  |       }); | ||
|  |     }, internals.ticksRate); | ||
|  | 
 | ||
|  |     const onFinish = () => { | ||
|  |       internals.updateProgress(true); | ||
|  |     }; | ||
|  | 
 | ||
|  |     this.once('end', onFinish); | ||
|  |     this.once('error', onFinish); | ||
|  |   } | ||
|  | 
 | ||
|  |   _read(size) { | ||
|  |     const internals = this[kInternals]; | ||
|  | 
 | ||
|  |     if (internals.onReadCallback) { | ||
|  |       internals.onReadCallback(); | ||
|  |     } | ||
|  | 
 | ||
|  |     return super._read(size); | ||
|  |   } | ||
|  | 
 | ||
|  |   _transform(chunk, encoding, callback) { | ||
|  |     const self = this; | ||
|  |     const internals = this[kInternals]; | ||
|  |     const maxRate = internals.maxRate; | ||
|  | 
 | ||
|  |     const readableHighWaterMark = this.readableHighWaterMark; | ||
|  | 
 | ||
|  |     const timeWindow = internals.timeWindow; | ||
|  | 
 | ||
|  |     const divider = 1000 / timeWindow; | ||
|  |     const bytesThreshold = (maxRate / divider); | ||
|  |     const minChunkSize = internals.minChunkSize !== false ? Math.max(internals.minChunkSize, bytesThreshold * 0.01) : 0; | ||
|  | 
 | ||
|  |     function pushChunk(_chunk, _callback) { | ||
|  |       const bytes = Buffer.byteLength(_chunk); | ||
|  |       internals.bytesSeen += bytes; | ||
|  |       internals.bytes += bytes; | ||
|  | 
 | ||
|  |       if (internals.isCaptured) { | ||
|  |         internals.updateProgress(); | ||
|  |       } | ||
|  | 
 | ||
|  |       if (self.push(_chunk)) { | ||
|  |         process.nextTick(_callback); | ||
|  |       } else { | ||
|  |         internals.onReadCallback = () => { | ||
|  |           internals.onReadCallback = null; | ||
|  |           process.nextTick(_callback); | ||
|  |         }; | ||
|  |       } | ||
|  |     } | ||
|  | 
 | ||
|  |     const transformChunk = (_chunk, _callback) => { | ||
|  |       const chunkSize = Buffer.byteLength(_chunk); | ||
|  |       let chunkRemainder = null; | ||
|  |       let maxChunkSize = readableHighWaterMark; | ||
|  |       let bytesLeft; | ||
|  |       let passed = 0; | ||
|  | 
 | ||
|  |       if (maxRate) { | ||
|  |         const now = Date.now(); | ||
|  | 
 | ||
|  |         if (!internals.ts || (passed = (now - internals.ts)) >= timeWindow) { | ||
|  |           internals.ts = now; | ||
|  |           bytesLeft = bytesThreshold - internals.bytes; | ||
|  |           internals.bytes = bytesLeft < 0 ? -bytesLeft : 0; | ||
|  |           passed = 0; | ||
|  |         } | ||
|  | 
 | ||
|  |         bytesLeft = bytesThreshold - internals.bytes; | ||
|  |       } | ||
|  | 
 | ||
|  |       if (maxRate) { | ||
|  |         if (bytesLeft <= 0) { | ||
|  |           // next time window
 | ||
|  |           return setTimeout(() => { | ||
|  |             _callback(null, _chunk); | ||
|  |           }, timeWindow - passed); | ||
|  |         } | ||
|  | 
 | ||
|  |         if (bytesLeft < maxChunkSize) { | ||
|  |           maxChunkSize = bytesLeft; | ||
|  |         } | ||
|  |       } | ||
|  | 
 | ||
|  |       if (maxChunkSize && chunkSize > maxChunkSize && (chunkSize - maxChunkSize) > minChunkSize) { | ||
|  |         chunkRemainder = _chunk.subarray(maxChunkSize); | ||
|  |         _chunk = _chunk.subarray(0, maxChunkSize); | ||
|  |       } | ||
|  | 
 | ||
|  |       pushChunk(_chunk, chunkRemainder ? () => { | ||
|  |         process.nextTick(_callback, null, chunkRemainder); | ||
|  |       } : _callback); | ||
|  |     }; | ||
|  | 
 | ||
|  |     transformChunk(chunk, function transformNextChunk(err, _chunk) { | ||
|  |       if (err) { | ||
|  |         return callback(err); | ||
|  |       } | ||
|  | 
 | ||
|  |       if (_chunk) { | ||
|  |         transformChunk(_chunk, transformNextChunk); | ||
|  |       } else { | ||
|  |         callback(null); | ||
|  |       } | ||
|  |     }); | ||
|  |   } | ||
|  | 
 | ||
|  |   setLength(length) { | ||
|  |     this[kInternals].length = +length; | ||
|  |     return this; | ||
|  |   } | ||
|  | } | ||
|  | 
 | ||
|  | export default AxiosTransformStream; |