You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					62 lines
				
				1.1 KiB
			
		
		
			
		
	
	
					62 lines
				
				1.1 KiB
			| 
								 
											3 years ago
										 
									 | 
							
								'use strict';
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								/*!
							 | 
						||
| 
								 | 
							
								 * Module dependencies.
							 | 
						||
| 
								 | 
							
								 */
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								const EventEmitter = require('events').EventEmitter;
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								/*!
							 | 
						||
| 
								 | 
							
								 * ignore
							 | 
						||
| 
								 | 
							
								 */
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								class ChangeStream extends EventEmitter {
							 | 
						||
| 
								 | 
							
								  constructor(changeStreamThunk, pipeline, options) {
							 | 
						||
| 
								 | 
							
								    super();
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    this.driverChangeStream = null;
							 | 
						||
| 
								 | 
							
								    this.closed = false;
							 | 
						||
| 
								 | 
							
								    this.pipeline = pipeline;
							 | 
						||
| 
								 | 
							
								    this.options = options;
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    // This wrapper is necessary because of buffering.
							 | 
						||
| 
								 | 
							
								    changeStreamThunk((err, driverChangeStream) => {
							 | 
						||
| 
								 | 
							
								      if (err != null) {
							 | 
						||
| 
								 | 
							
								        this.emit('error', err);
							 | 
						||
| 
								 | 
							
								        return;
							 | 
						||
| 
								 | 
							
								      }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								      this.driverChangeStream = driverChangeStream;
							 | 
						||
| 
								 | 
							
								      this._bindEvents();
							 | 
						||
| 
								 | 
							
								      this.emit('ready');
							 | 
						||
| 
								 | 
							
								    });
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  _bindEvents() {
							 | 
						||
| 
								 | 
							
								    this.driverChangeStream.on('close', () => {
							 | 
						||
| 
								 | 
							
								      this.closed = true;
							 | 
						||
| 
								 | 
							
								    });
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    ['close', 'change', 'end', 'error'].forEach(ev => {
							 | 
						||
| 
								 | 
							
								      this.driverChangeStream.on(ev, data => this.emit(ev, data));
							 | 
						||
| 
								 | 
							
								    });
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  _queue(cb) {
							 | 
						||
| 
								 | 
							
								    this.once('ready', () => cb());
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  close() {
							 | 
						||
| 
								 | 
							
								    this.closed = true;
							 | 
						||
| 
								 | 
							
								    if (this.driverChangeStream) {
							 | 
						||
| 
								 | 
							
								      this.driverChangeStream.close();
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								/*!
							 | 
						||
| 
								 | 
							
								 * ignore
							 | 
						||
| 
								 | 
							
								 */
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								module.exports = ChangeStream;
							 |