You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					581 lines
				
				16 KiB
			
		
		
			
		
	
	
					581 lines
				
				16 KiB
			| 
								 
											3 years ago
										 
									 | 
							
								var util = require("util");
							 | 
						||
| 
								 | 
							
								var Transform = require("stream").Transform;
							 | 
						||
| 
								 | 
							
								var os = require("os");
							 | 
						||
| 
								 | 
							
								var stripBom = require('strip-bom');
							 | 
						||
| 
								 | 
							
								var eol = os.EOL;
							 | 
						||
| 
								 | 
							
								// var Processor = require("./Processor.js");
							 | 
						||
| 
								 | 
							
								var defParam = require("./defParam");
							 | 
						||
| 
								 | 
							
								var fileline = require("./fileline");
							 | 
						||
| 
								 | 
							
								var fileLineToCSVLine = require("./fileLineToCSVLine");
							 | 
						||
| 
								 | 
							
								var linesToJson = require("./linesToJson");
							 | 
						||
| 
								 | 
							
								var CSVError = require("./CSVError");
							 | 
						||
| 
								 | 
							
								var workerMgr = null;
							 | 
						||
| 
								 | 
							
								var _ = require('lodash');
							 | 
						||
| 
								 | 
							
								var rowSplit = require("./rowSplit");
							 | 
						||
| 
								 | 
							
								function Converter(params, options) {
							 | 
						||
| 
								 | 
							
								  Transform.call(this, options);
							 | 
						||
| 
								 | 
							
								  this._options = options || {};
							 | 
						||
| 
								 | 
							
								  this.param = defParam(params);
							 | 
						||
| 
								 | 
							
								  this.param._options = this._options;
							 | 
						||
| 
								 | 
							
								  // this.resultObject = new Result(this);
							 | 
						||
| 
								 | 
							
								  // this.pipe(this.resultObject); // it is important to have downstream for a transform otherwise it will stuck
							 | 
						||
| 
								 | 
							
								  this.started = false;//indicate if parsing has started.
							 | 
						||
| 
								 | 
							
								  this.recordNum = 0;
							 | 
						||
| 
								 | 
							
								  this.lineNumber = 0; //file line number
							 | 
						||
| 
								 | 
							
								  this._csvLineBuffer = "";
							 | 
						||
| 
								 | 
							
								  this.lastIndex = 0; // index in result json array
							 | 
						||
| 
								 | 
							
								  //this._pipe(this.lineParser).pipe(this.processor);
							 | 
						||
| 
								 | 
							
								  // this.initNoFork();
							 | 
						||
| 
								 | 
							
								  if (this.param.forked) {
							 | 
						||
| 
								 | 
							
								    this.param.forked = false;
							 | 
						||
| 
								 | 
							
								    this.workerNum = 2;
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								  this.flushCb = null;
							 | 
						||
| 
								 | 
							
								  this.processEnd = false;
							 | 
						||
| 
								 | 
							
								  this.sequenceBuffer = [];
							 | 
						||
| 
								 | 
							
								  this._needJson = null;
							 | 
						||
| 
								 | 
							
								  this._needEmitResult = null;
							 | 
						||
| 
								 | 
							
								  this._needEmitFinalResult = null;
							 | 
						||
| 
								 | 
							
								  this._needEmitHeader = null;
							 | 
						||
| 
								 | 
							
								  this._needEmitJson = null;
							 | 
						||
| 
								 | 
							
								  this._needPush = null;
							 | 
						||
| 
								 | 
							
								  this._needEmitCsv = null;
							 | 
						||
| 
								 | 
							
								  this._csvTransf = null;
							 | 
						||
| 
								 | 
							
								  this.finalResult = [];
							 | 
						||
| 
								 | 
							
								  // this.on("data", function() {});
							 | 
						||
| 
								 | 
							
								  this.on("error", emitDone(this));
							 | 
						||
| 
								 | 
							
								  this.on("end", emitDone(this));
							 | 
						||
| 
								 | 
							
								  this.initWorker();
							 | 
						||
| 
								 | 
							
								  process.nextTick(function () {
							 | 
						||
| 
								 | 
							
								    if (this._needEmitFinalResult === null) {
							 | 
						||
| 
								 | 
							
								      this._needEmitFinalResult = this.listeners("end_parsed").length > 0;
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								    if (this._needEmitResult === null) {
							 | 
						||
| 
								 | 
							
								      this._needEmitResult = this.listeners("record_parsed").length > 0;
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								    if (this._needEmitJson === null) {
							 | 
						||
| 
								 | 
							
								      this._needEmitJson = this.listeners("json").length > 0;
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								    if (this._needEmitHeader === null) {
							 | 
						||
| 
								 | 
							
								      this._needEmitHeader = this.listeners("header").length > 0;
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								    if (this._needEmitCsv === null) {
							 | 
						||
| 
								 | 
							
								      this._needEmitCsv = this.listeners("csv").length > 0;
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								    if (this._needJson === null) {
							 | 
						||
| 
								 | 
							
								      this._needJson = this._needEmitJson || this._needEmitFinalResult || this._needEmitResult || this.transform || this._options.objectMode;
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								    if (this._needPush === null) {
							 | 
						||
| 
								 | 
							
								      this._needPush = this.listeners("data").length > 0 || this.listeners("readable").length > 0;
							 | 
						||
| 
								 | 
							
								      // this._needPush=false;
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								    this.param._needParseJson = this._needJson || this._needPush;
							 | 
						||
| 
								 | 
							
								  }.bind(this));
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  return this;
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								util.inherits(Converter, Transform);
							 | 
						||
| 
								 | 
							
								function emitDone(conv) {
							 | 
						||
| 
								 | 
							
								  return function (err) {
							 | 
						||
| 
								 | 
							
								    if (!conv._hasDone) {
							 | 
						||
| 
								 | 
							
								      conv._hasDone = true;
							 | 
						||
| 
								 | 
							
								      process.nextTick(function () {
							 | 
						||
| 
								 | 
							
								        conv.emit('done', err);
							 | 
						||
| 
								 | 
							
								      });
							 | 
						||
| 
								 | 
							
								    };
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								function bufFromString(str) {
							 | 
						||
| 
								 | 
							
								  var length=Buffer.byteLength(str);
							 | 
						||
| 
								 | 
							
								  var buffer = Buffer.allocUnsafe
							 | 
						||
| 
								 | 
							
								    ? Buffer.allocUnsafe(length)
							 | 
						||
| 
								 | 
							
								    : new Buffer(length);
							 | 
						||
| 
								 | 
							
								  buffer.write(str);
							 | 
						||
| 
								 | 
							
								  return buffer;
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								Converter.prototype._transform = function (data, encoding, cb) {
							 | 
						||
| 
								 | 
							
								  data=this.prepareData(data);
							 | 
						||
| 
								 | 
							
								  var idx =data.length-1;
							 | 
						||
| 
								 | 
							
								  var left=null;
							 | 
						||
| 
								 | 
							
								  /**
							 | 
						||
| 
								 | 
							
								   * From Keyang:
							 | 
						||
| 
								 | 
							
								   * The code below is to check if a single utf8 char (which could be multiple bytes) being split.
							 | 
						||
| 
								 | 
							
								   * If the char being split, the buffer from two chunk needs to be concat
							 | 
						||
| 
								 | 
							
								   * check how utf8 being encoded to understand the code below. 
							 | 
						||
| 
								 | 
							
								   * If anyone has any better way to do this, please let me know.
							 | 
						||
| 
								 | 
							
								   */
							 | 
						||
| 
								 | 
							
								  if ((data[idx] & 1<<7) !=0){
							 | 
						||
| 
								 | 
							
								    while ((data[idx] & 3<<6) === 128){
							 | 
						||
| 
								 | 
							
								      idx--;
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								    idx--;
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								  if (idx !=data.length-1){
							 | 
						||
| 
								 | 
							
								    left=data.slice(idx+1);
							 | 
						||
| 
								 | 
							
								    data=data.slice(0,idx+1)
							 | 
						||
| 
								 | 
							
								    var _cb=cb;
							 | 
						||
| 
								 | 
							
								    var self=this;
							 | 
						||
| 
								 | 
							
								    cb=function(){
							 | 
						||
| 
								 | 
							
								      if (self._csvLineBuffer){
							 | 
						||
| 
								 | 
							
								        self._csvLineBuffer=Buffer.concat([bufFromString(self._csvLineBuffer,"utf8"),left]);
							 | 
						||
| 
								 | 
							
								      }else{
							 | 
						||
| 
								 | 
							
								        self._csvLineBuffer=left;
							 | 
						||
| 
								 | 
							
								      }
							 | 
						||
| 
								 | 
							
								      _cb();
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								  data = data.toString("utf8");
							 | 
						||
| 
								 | 
							
								  if (this.started === false) {
							 | 
						||
| 
								 | 
							
								    this.started = true;
							 | 
						||
| 
								 | 
							
								    data = stripBom(data);
							 | 
						||
| 
								 | 
							
								    if (this.param.toArrayString) {
							 | 
						||
| 
								 | 
							
								      if (this._needPush) {
							 | 
						||
| 
								 | 
							
								        this.push("[" + eol, "utf8");
							 | 
						||
| 
								 | 
							
								      }
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								  var self = this;
							 | 
						||
| 
								 | 
							
								  this.preProcessRaw(data, function (d) {
							 | 
						||
| 
								 | 
							
								    if (d && d.length > 0) {
							 | 
						||
| 
								 | 
							
								      self.processData(d, cb);
							 | 
						||
| 
								 | 
							
								    } else {
							 | 
						||
| 
								 | 
							
								      cb();
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								  });
							 | 
						||
| 
								 | 
							
								};
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								Converter.prototype.prepareData = function (data) {
							 | 
						||
| 
								 | 
							
								  if (this._csvLineBuffer && this._csvLineBuffer.length>0){
							 | 
						||
| 
								 | 
							
								    if (typeof this._csvLineBuffer === "string"){
							 | 
						||
| 
								 | 
							
								      this._csvLineBuffer=bufFromString(this._csvLineBuffer);
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								    return Buffer.concat([this._csvLineBuffer,data]);
							 | 
						||
| 
								 | 
							
								  }else{
							 | 
						||
| 
								 | 
							
								    return data;
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								  // return this._csvLineBuffer + data;
							 | 
						||
| 
								 | 
							
								};
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								Converter.prototype.setPartialData = function (d) {
							 | 
						||
| 
								 | 
							
								  this._csvLineBuffer = d;
							 | 
						||
| 
								 | 
							
								};
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								Converter.prototype.processData = function (data, cb) {
							 | 
						||
| 
								 | 
							
								  var params = this.param;
							 | 
						||
| 
								 | 
							
								  if (params.ignoreEmpty && !params._headers) {
							 | 
						||
| 
								 | 
							
								    data = data.replace(/^\s+/, "");
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								  var eol = this.param.eol;
							 | 
						||
| 
								 | 
							
								  var fileLines = fileline(data, this.param);
							 | 
						||
| 
								 | 
							
								  if (this.param.eol !== eol) {
							 | 
						||
| 
								 | 
							
								    this.emit("eol", this.param.eol);
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								  if (fileLines.lines.length > 0) {
							 | 
						||
| 
								 | 
							
								    if (this.preProcessLine && typeof this.preProcessLine === "function") {
							 | 
						||
| 
								 | 
							
								      fileLines.lines = this._preProcessLines(fileLines.lines, this.lastIndex);
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								    if (!params._headers) { //header is not inited. init header
							 | 
						||
| 
								 | 
							
								      this.processHead(fileLines, cb);
							 | 
						||
| 
								 | 
							
								    } else {
							 | 
						||
| 
								 | 
							
								      if (params.workerNum <= 1) {
							 | 
						||
| 
								 | 
							
								        var lines = fileLineToCSVLine(fileLines, params);
							 | 
						||
| 
								 | 
							
								        this.setPartialData(lines.partial);
							 | 
						||
| 
								 | 
							
								        var jsonArr = linesToJson(lines.lines, params, this.recordNum);
							 | 
						||
| 
								 | 
							
								        this.processResult(jsonArr);
							 | 
						||
| 
								 | 
							
								        this.lastIndex += jsonArr.length;
							 | 
						||
| 
								 | 
							
								        this.recordNum += jsonArr.length;
							 | 
						||
| 
								 | 
							
								        cb();
							 | 
						||
| 
								 | 
							
								      } else {
							 | 
						||
| 
								 | 
							
								        this.workerProcess(fileLines, cb);
							 | 
						||
| 
								 | 
							
								      }
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								  } else {
							 | 
						||
| 
								 | 
							
								    this.setPartialData(fileLines.partial);
							 | 
						||
| 
								 | 
							
								    cb();
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								};
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								Converter.prototype._preProcessLines = function (lines, startIdx) {
							 | 
						||
| 
								 | 
							
								  var rtn = [];
							 | 
						||
| 
								 | 
							
								  for (var i = 0, len = lines.length; i < len; i++) {
							 | 
						||
| 
								 | 
							
								    var result = this.preProcessLine(lines[i], startIdx + i + 1);
							 | 
						||
| 
								 | 
							
								    if (typeof result === "string") {
							 | 
						||
| 
								 | 
							
								      rtn.push(result);
							 | 
						||
| 
								 | 
							
								    } else {
							 | 
						||
| 
								 | 
							
								      rtn.push(lines[i]);
							 | 
						||
| 
								 | 
							
								      this.emit("error", new Error("preProcessLine should return a string but got: " + JSON.stringify(result)));
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								  return rtn;
							 | 
						||
| 
								 | 
							
								};
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								Converter.prototype.initWorker = function () {
							 | 
						||
| 
								 | 
							
								  var workerNum = this.param.workerNum - 1;
							 | 
						||
| 
								 | 
							
								  if (workerNum > 0) {
							 | 
						||
| 
								 | 
							
								    workerMgr = require("./workerMgr");
							 | 
						||
| 
								 | 
							
								    this.workerMgr = workerMgr();
							 | 
						||
| 
								 | 
							
								    this.workerMgr.initWorker(workerNum, this.param);
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								};
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								Converter.prototype.preRawData = function (func) {
							 | 
						||
| 
								 | 
							
								  this.preProcessRaw = func;
							 | 
						||
| 
								 | 
							
								  return this;
							 | 
						||
| 
								 | 
							
								};
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								Converter.prototype.preFileLine = function (func) {
							 | 
						||
| 
								 | 
							
								  this.preProcessLine = func;
							 | 
						||
| 
								 | 
							
								  return this;
							 | 
						||
| 
								 | 
							
								};
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								/**
							 | 
						||
| 
								 | 
							
								 * workerpRocess does not support embeded multiple lines.
							 | 
						||
| 
								 | 
							
								 */
							 | 
						||
| 
								 | 
							
								Converter.prototype.workerProcess = function (fileLine, cb) {
							 | 
						||
| 
								 | 
							
								  var self = this;
							 | 
						||
| 
								 | 
							
								  var line = fileLine;
							 | 
						||
| 
								 | 
							
								  var eol = this.getEol();
							 | 
						||
| 
								 | 
							
								  this.setPartialData(line.partial);
							 | 
						||
| 
								 | 
							
								  this.workerMgr.sendWorker(line.lines.join(eol) + eol, this.lastIndex, cb, function (results, lastIndex) {
							 | 
						||
| 
								 | 
							
								    var buf;
							 | 
						||
| 
								 | 
							
								    var cur = self.sequenceBuffer[0];
							 | 
						||
| 
								 | 
							
								    if (cur.idx === lastIndex) {
							 | 
						||
| 
								 | 
							
								      cur.result = results;
							 | 
						||
| 
								 | 
							
								      var records = [];
							 | 
						||
| 
								 | 
							
								      while (self.sequenceBuffer[0] && self.sequenceBuffer[0].result) {
							 | 
						||
| 
								 | 
							
								        buf = self.sequenceBuffer.shift();
							 | 
						||
| 
								 | 
							
								        records = records.concat(buf.result);
							 | 
						||
| 
								 | 
							
								      }
							 | 
						||
| 
								 | 
							
								      self.processResult(records);
							 | 
						||
| 
								 | 
							
								      self.recordNum += records.length;
							 | 
						||
| 
								 | 
							
								    } else {
							 | 
						||
| 
								 | 
							
								      for (var i = 0, len = self.sequenceBuffer.length; i < len; i++) {
							 | 
						||
| 
								 | 
							
								        buf = self.sequenceBuffer[i];
							 | 
						||
| 
								 | 
							
								        if (buf.idx === lastIndex) {
							 | 
						||
| 
								 | 
							
								          buf.result = results;
							 | 
						||
| 
								 | 
							
								          break;
							 | 
						||
| 
								 | 
							
								        }
							 | 
						||
| 
								 | 
							
								      }
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								  });
							 | 
						||
| 
								 | 
							
								  this.sequenceBuffer.push({
							 | 
						||
| 
								 | 
							
								    idx: this.lastIndex,
							 | 
						||
| 
								 | 
							
								    result: null
							 | 
						||
| 
								 | 
							
								  });
							 | 
						||
| 
								 | 
							
								  this.lastIndex += line.lines.length;
							 | 
						||
| 
								 | 
							
								};
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								Converter.prototype.processHead = function (fileLine, cb) {
							 | 
						||
| 
								 | 
							
								  var params = this.param;
							 | 
						||
| 
								 | 
							
								  if (params._headers) {
							 | 
						||
| 
								 | 
							
								    return cb();
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								  //dirty hack
							 | 
						||
| 
								 | 
							
								  params._needFilterRow = false;
							 | 
						||
| 
								 | 
							
								  // if header is not inited. init header
							 | 
						||
| 
								 | 
							
								  var lines = fileLine.lines;
							 | 
						||
| 
								 | 
							
								  var left = "";
							 | 
						||
| 
								 | 
							
								  var headerRow = [];
							 | 
						||
| 
								 | 
							
								  if (!params.noheader) {
							 | 
						||
| 
								 | 
							
								    while (lines.length) {
							 | 
						||
| 
								 | 
							
								      var line = left + lines.shift();
							 | 
						||
| 
								 | 
							
								      var delimiter = params.delimiter;
							 | 
						||
| 
								 | 
							
								      var row = rowSplit(line, params);
							 | 
						||
| 
								 | 
							
								      if (params.delimiter !== delimiter) {
							 | 
						||
| 
								 | 
							
								        this.emit("delimiter", params.delimiter);
							 | 
						||
| 
								 | 
							
								      }
							 | 
						||
| 
								 | 
							
								      if (row.closed) {
							 | 
						||
| 
								 | 
							
								        headerRow = row.cols;
							 | 
						||
| 
								 | 
							
								        left = "";
							 | 
						||
| 
								 | 
							
								        break;
							 | 
						||
| 
								 | 
							
								      } else {
							 | 
						||
| 
								 | 
							
								        left = line + this.getEol();
							 | 
						||
| 
								 | 
							
								      }
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								  params._needFilterRow = true;
							 | 
						||
| 
								 | 
							
								  if (!params.noheader && headerRow.length === 0) { //if one chunk of data does not complete header row.
							 | 
						||
| 
								 | 
							
								    this.setPartialData(left);
							 | 
						||
| 
								 | 
							
								    return cb();
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								  if (params.noheader) {
							 | 
						||
| 
								 | 
							
								    if (params.headers) {
							 | 
						||
| 
								 | 
							
								      params._headers = params.headers;
							 | 
						||
| 
								 | 
							
								    } else {
							 | 
						||
| 
								 | 
							
								      params._headers = [];
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								  } else {
							 | 
						||
| 
								 | 
							
								    if (params.headers) {
							 | 
						||
| 
								 | 
							
								      params._headers = params.headers;
							 | 
						||
| 
								 | 
							
								    } else {
							 | 
						||
| 
								 | 
							
								      params._headers = headerRow;
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								  configIgnoreIncludeColumns(params);
							 | 
						||
| 
								 | 
							
								  params._headers = require("./filterRow")(params._headers, params);
							 | 
						||
| 
								 | 
							
								  if (this._needEmitHeader && this.param._headers) {
							 | 
						||
| 
								 | 
							
								    this.emit("header", this.param._headers);
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								  var delimiter = params.delimiter;
							 | 
						||
| 
								 | 
							
								  var lines = fileLineToCSVLine(fileLine, params);
							 | 
						||
| 
								 | 
							
								  if (params.delimiter !== delimiter) {
							 | 
						||
| 
								 | 
							
								    this.emit("delimiter", params.delimiter);
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								  this.setPartialData(lines.partial);
							 | 
						||
| 
								 | 
							
								  if (this.param.workerNum > 1) {
							 | 
						||
| 
								 | 
							
								    this.workerMgr.setParams(params);
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								  var res = linesToJson(lines.lines, params, 0);
							 | 
						||
| 
								 | 
							
								  // Put the header with the first row
							 | 
						||
| 
								 | 
							
								  // if(res.length > 0) res[0].header = params._headers;
							 | 
						||
| 
								 | 
							
								  this.processResult(res);
							 | 
						||
| 
								 | 
							
								  this.lastIndex += res.length;
							 | 
						||
| 
								 | 
							
								  this.recordNum += res.length;
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  cb();
							 | 
						||
| 
								 | 
							
								};
							 | 
						||
| 
								 | 
							
								function configIgnoreIncludeColumns(params) {
							 | 
						||
| 
								 | 
							
								  if (params._postIgnoreColumns) {
							 | 
						||
| 
								 | 
							
								    for (var i = 0; i < params.ignoreColumns.length; i++) {
							 | 
						||
| 
								 | 
							
								      var ignoreCol = params.ignoreColumns[i];
							 | 
						||
| 
								 | 
							
								      if (typeof ignoreCol === "string") {
							 | 
						||
| 
								 | 
							
								        var idx = params._headers.indexOf(ignoreCol);
							 | 
						||
| 
								 | 
							
								        if (idx > -1) {
							 | 
						||
| 
								 | 
							
								          params.ignoreColumns[i] = idx;
							 | 
						||
| 
								 | 
							
								        } else {
							 | 
						||
| 
								 | 
							
								          params.ignoreColumns[i] = -1;
							 | 
						||
| 
								 | 
							
								        }
							 | 
						||
| 
								 | 
							
								      }
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								    params.ignoreColumns.sort(function (a, b) { return b - a; });
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								  if (params._postIncludeColumns) {
							 | 
						||
| 
								 | 
							
								    for (var i = 0; i < params.includeColumns.length; i++) {
							 | 
						||
| 
								 | 
							
								      var includeCol = params.includeColumns[i];
							 | 
						||
| 
								 | 
							
								      if (typeof includeCol === "string") {
							 | 
						||
| 
								 | 
							
								        var idx = params._headers.indexOf(includeCol);
							 | 
						||
| 
								 | 
							
								        if (idx > -1) {
							 | 
						||
| 
								 | 
							
								          params.includeColumns[i] = idx;
							 | 
						||
| 
								 | 
							
								        } else {
							 | 
						||
| 
								 | 
							
								          params.includeColumns[i] = -1;
							 | 
						||
| 
								 | 
							
								        }
							 | 
						||
| 
								 | 
							
								      }
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								  params.ignoreColumns = _.uniq(params.ignoreColumns);
							 | 
						||
| 
								 | 
							
								  params.includeColumns = _.uniq(params.includeColumns);
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								Converter.prototype.processResult = function (result) {
							 | 
						||
| 
								 | 
							
								  for (var i = 0, len = result.length; i < len; i++) {
							 | 
						||
| 
								 | 
							
								    var r = result[i];
							 | 
						||
| 
								 | 
							
								    if (r.err) {
							 | 
						||
| 
								 | 
							
								      this.emit("error", r.err);
							 | 
						||
| 
								 | 
							
								    } else {
							 | 
						||
| 
								 | 
							
								      this.emitResult(r);
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								};
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								Converter.prototype.emitResult = function (r) {
							 | 
						||
| 
								 | 
							
								  var index = r.index;
							 | 
						||
| 
								 | 
							
								  var header = this.param;
							 | 
						||
| 
								 | 
							
								  var row = r.row;
							 | 
						||
| 
								 | 
							
								  var result = r.json;
							 | 
						||
| 
								 | 
							
								  var resultJson = null;
							 | 
						||
| 
								 | 
							
								  var resultStr = null;
							 | 
						||
| 
								 | 
							
								  if (typeof result === "string") {
							 | 
						||
| 
								 | 
							
								    resultStr = result;
							 | 
						||
| 
								 | 
							
								  } else {
							 | 
						||
| 
								 | 
							
								    resultJson = result;
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								  if (resultJson === null && this._needJson) {
							 | 
						||
| 
								 | 
							
								    resultJson = JSON.parse(resultStr);
							 | 
						||
| 
								 | 
							
								    if (typeof row === "string") {
							 | 
						||
| 
								 | 
							
								      row = JSON.parse(row);
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								  if (this.transform && typeof this.transform === "function") {
							 | 
						||
| 
								 | 
							
								    this.transform(resultJson, row, index);
							 | 
						||
| 
								 | 
							
								    resultStr = null;
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								  if (this._needEmitJson) {
							 | 
						||
| 
								 | 
							
								    this.emit("json", resultJson, index);
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								  if (this._needEmitCsv) {
							 | 
						||
| 
								 | 
							
								    if (typeof row === "string") {
							 | 
						||
| 
								 | 
							
								      row = JSON.parse(row);
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								    this.emit("csv", row, index);
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								  if (this.param.constructResult && this._needEmitFinalResult) {
							 | 
						||
| 
								 | 
							
								    this.finalResult.push(resultJson);
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								  if (this._needEmitResult) {
							 | 
						||
| 
								 | 
							
								    this.emit("record_parsed", resultJson, row, index);
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								  if (this.param.toArrayString && index > 0 && this._needPush) {
							 | 
						||
| 
								 | 
							
								    this.push("," + eol);
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								  if (this._options && this._options.objectMode) {
							 | 
						||
| 
								 | 
							
								    this.push(resultJson);
							 | 
						||
| 
								 | 
							
								  } else {
							 | 
						||
| 
								 | 
							
								    if (this._needPush) {
							 | 
						||
| 
								 | 
							
								      if (resultStr === null) {
							 | 
						||
| 
								 | 
							
								        resultStr = JSON.stringify(resultJson);
							 | 
						||
| 
								 | 
							
								      }
							 | 
						||
| 
								 | 
							
								      this.push(!this.param.toArrayString ? resultStr + eol : resultStr, "utf8");
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								};
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								Converter.prototype.preProcessRaw = function (data, cb) {
							 | 
						||
| 
								 | 
							
								  cb(data);
							 | 
						||
| 
								 | 
							
								};
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// FIXME: lineNumber is not used.
							 | 
						||
| 
								 | 
							
								Converter.prototype.preProcessLine = function (line, lineNumber) {
							 | 
						||
| 
								 | 
							
								  return line;
							 | 
						||
| 
								 | 
							
								};
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								Converter.prototype._flush = function (cb) {
							 | 
						||
| 
								 | 
							
								  var self = this;
							 | 
						||
| 
								 | 
							
								  this.flushCb = function () {
							 | 
						||
| 
								 | 
							
								    self.emit("end_parsed", self.finalResult);
							 | 
						||
| 
								 | 
							
								    if (self.workerMgr) {
							 | 
						||
| 
								 | 
							
								      self.workerMgr.destroyWorker();
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								    cb();
							 | 
						||
| 
								 | 
							
								    if (!self._needPush) {
							 | 
						||
| 
								 | 
							
								      self.emit("end");
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								  };
							 | 
						||
| 
								 | 
							
								  if (this._csvLineBuffer.length > 0) {
							 | 
						||
| 
								 | 
							
								    var eol = this.getEol();
							 | 
						||
| 
								 | 
							
								    if (this._csvLineBuffer[this._csvLineBuffer.length - 1] !== eol) {
							 | 
						||
| 
								 | 
							
								      this._csvLineBuffer += eol;
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								    this.processData(this._csvLineBuffer, function () {
							 | 
						||
| 
								 | 
							
								      this.checkAndFlush();
							 | 
						||
| 
								 | 
							
								    }.bind(this));
							 | 
						||
| 
								 | 
							
								  } else {
							 | 
						||
| 
								 | 
							
								    this.checkAndFlush();
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								  return;
							 | 
						||
| 
								 | 
							
								};
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								Converter.prototype.checkAndFlush = function () {
							 | 
						||
| 
								 | 
							
								  if (this._csvLineBuffer.length !== 0) {
							 | 
						||
| 
								 | 
							
								    this.emit("error", CSVError.unclosed_quote(this.recordNum, this._csvLineBuffer), this._csvLineBuffer);
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								  if (this.param.toArrayString && this._needPush) {
							 | 
						||
| 
								 | 
							
								    this.push(eol + "]", "utf8");
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								  if (this.workerMgr && this.workerMgr.isRunning()) {
							 | 
						||
| 
								 | 
							
								    this.workerMgr.drain = function () {
							 | 
						||
| 
								 | 
							
								      this.flushCb();
							 | 
						||
| 
								 | 
							
								    }.bind(this);
							 | 
						||
| 
								 | 
							
								  } else {
							 | 
						||
| 
								 | 
							
								    this.flushCb();
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								};
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								Converter.prototype.getEol = function (data) {
							 | 
						||
| 
								 | 
							
								  if (!this.param.eol && data) {
							 | 
						||
| 
								 | 
							
								    for (var i = 0, len = data.length; i < len; i++) {
							 | 
						||
| 
								 | 
							
								      if (data[i] === "\r") {
							 | 
						||
| 
								 | 
							
								        if (data[i + 1] === "\n") {
							 | 
						||
| 
								 | 
							
								          this.param.eol = "\r\n";
							 | 
						||
| 
								 | 
							
								        } else {
							 | 
						||
| 
								 | 
							
								          this.param.eol = "\r";
							 | 
						||
| 
								 | 
							
								        }
							 | 
						||
| 
								 | 
							
								        return this.param.eol;
							 | 
						||
| 
								 | 
							
								      } else if (data[i] === "\n") {
							 | 
						||
| 
								 | 
							
								        this.param.eol = "\n";
							 | 
						||
| 
								 | 
							
								        return this.param.eol;
							 | 
						||
| 
								 | 
							
								      }
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								    this.param.eol = eol;
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  return this.param.eol || eol;
							 | 
						||
| 
								 | 
							
								};
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								Converter.prototype.fromFile = function (filePath, cb, options) {
							 | 
						||
| 
								 | 
							
								  var fs = require('fs');
							 | 
						||
| 
								 | 
							
								  var rs = null;
							 | 
						||
| 
								 | 
							
								  if (typeof cb ==="object" && typeof options === "undefined"){
							 | 
						||
| 
								 | 
							
								    options=cb;
							 | 
						||
| 
								 | 
							
								    cb=null;
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								  this.wrapCallback(cb, function () {
							 | 
						||
| 
								 | 
							
								    if (rs && rs.destroy) {
							 | 
						||
| 
								 | 
							
								      rs.destroy();
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								  });
							 | 
						||
| 
								 | 
							
								  fs.exists(filePath, function (exist) {
							 | 
						||
| 
								 | 
							
								    if (exist) {
							 | 
						||
| 
								 | 
							
								      rs = fs.createReadStream(filePath,options);
							 | 
						||
| 
								 | 
							
								      rs.pipe(this);
							 | 
						||
| 
								 | 
							
								    } else {
							 | 
						||
| 
								 | 
							
								      this.emit('error', new Error("File does not exist. Check to make sure the file path to your csv is correct."));
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								  }.bind(this));
							 | 
						||
| 
								 | 
							
								  return this;
							 | 
						||
| 
								 | 
							
								};
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								Converter.prototype.fromStream = function (readStream, cb) {
							 | 
						||
| 
								 | 
							
								  if (cb && typeof cb === "function") {
							 | 
						||
| 
								 | 
							
								    this.wrapCallback(cb);
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								  readStream.pipe(this);
							 | 
						||
| 
								 | 
							
								  return this;
							 | 
						||
| 
								 | 
							
								};
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								Converter.prototype.transf = function (func) {
							 | 
						||
| 
								 | 
							
								  this.transform = func;
							 | 
						||
| 
								 | 
							
								  return this;
							 | 
						||
| 
								 | 
							
								};
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								Converter.prototype.fromString = function (csvString, cb) {
							 | 
						||
| 
								 | 
							
								  if (typeof csvString !== "string") {
							 | 
						||
| 
								 | 
							
								    if (cb && typeof cb ==="function"){
							 | 
						||
| 
								 | 
							
								      return cb(new Error("Passed CSV Data is not a string."));
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								  if (cb && typeof cb === "function") {
							 | 
						||
| 
								 | 
							
								    this.wrapCallback(cb, function () {
							 | 
						||
| 
								 | 
							
								    });
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								  process.nextTick(function () {
							 | 
						||
| 
								 | 
							
								    this.end(csvString);
							 | 
						||
| 
								 | 
							
								  }.bind(this));
							 | 
						||
| 
								 | 
							
								  return this;
							 | 
						||
| 
								 | 
							
								};
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								Converter.prototype.wrapCallback = function (cb, clean) {
							 | 
						||
| 
								 | 
							
								  if (clean === undefined) {
							 | 
						||
| 
								 | 
							
								    clean = function () { };
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								  if (cb && typeof cb === "function") {
							 | 
						||
| 
								 | 
							
								    this.once("end_parsed", function (res) {
							 | 
						||
| 
								 | 
							
								      if (!this.hasError) {
							 | 
						||
| 
								 | 
							
								        cb(null, res);
							 | 
						||
| 
								 | 
							
								      }
							 | 
						||
| 
								 | 
							
								    }.bind(this));
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								  this.once("error", function (err) {
							 | 
						||
| 
								 | 
							
								    this.hasError = true;
							 | 
						||
| 
								 | 
							
								    if (cb && typeof cb === "function") {
							 | 
						||
| 
								 | 
							
								      cb(err);
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								    clean();
							 | 
						||
| 
								 | 
							
								  }.bind(this));
							 | 
						||
| 
								 | 
							
								};
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								module.exports = Converter;
							 |