You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							581 lines
						
					
					
						
							16 KiB
						
					
					
				
			
		
		
	
	
							581 lines
						
					
					
						
							16 KiB
						
					
					
				| var util = require("util");
 | |
| var Transform = require("stream").Transform;
 | |
| var os = require("os");
 | |
| var stripBom = require('strip-bom');
 | |
| var eol = os.EOL;
 | |
| // var Processor = require("./Processor.js");
 | |
| var defParam = require("./defParam");
 | |
| var fileline = require("./fileline");
 | |
| var fileLineToCSVLine = require("./fileLineToCSVLine");
 | |
| var linesToJson = require("./linesToJson");
 | |
| var CSVError = require("./CSVError");
 | |
| var workerMgr = null;
 | |
| var _ = require('lodash');
 | |
| var rowSplit = require("./rowSplit");
 | |
| function Converter(params, options) {
 | |
|   Transform.call(this, options);
 | |
|   this._options = options || {};
 | |
|   this.param = defParam(params);
 | |
|   this.param._options = this._options;
 | |
|   // this.resultObject = new Result(this);
 | |
|   // this.pipe(this.resultObject); // it is important to have downstream for a transform otherwise it will stuck
 | |
|   this.started = false;//indicate if parsing has started.
 | |
|   this.recordNum = 0;
 | |
|   this.lineNumber = 0; //file line number
 | |
|   this._csvLineBuffer = "";
 | |
|   this.lastIndex = 0; // index in result json array
 | |
|   //this._pipe(this.lineParser).pipe(this.processor);
 | |
|   // this.initNoFork();
 | |
|   if (this.param.forked) {
 | |
|     this.param.forked = false;
 | |
|     this.workerNum = 2;
 | |
|   }
 | |
|   this.flushCb = null;
 | |
|   this.processEnd = false;
 | |
|   this.sequenceBuffer = [];
 | |
|   this._needJson = null;
 | |
|   this._needEmitResult = null;
 | |
|   this._needEmitFinalResult = null;
 | |
|   this._needEmitHeader = null;
 | |
|   this._needEmitJson = null;
 | |
|   this._needPush = null;
 | |
|   this._needEmitCsv = null;
 | |
|   this._csvTransf = null;
 | |
|   this.finalResult = [];
 | |
|   // this.on("data", function() {});
 | |
|   this.on("error", emitDone(this));
 | |
|   this.on("end", emitDone(this));
 | |
|   this.initWorker();
 | |
|   process.nextTick(function () {
 | |
|     if (this._needEmitFinalResult === null) {
 | |
|       this._needEmitFinalResult = this.listeners("end_parsed").length > 0;
 | |
|     }
 | |
|     if (this._needEmitResult === null) {
 | |
|       this._needEmitResult = this.listeners("record_parsed").length > 0;
 | |
|     }
 | |
|     if (this._needEmitJson === null) {
 | |
|       this._needEmitJson = this.listeners("json").length > 0;
 | |
|     }
 | |
|     if (this._needEmitHeader === null) {
 | |
|       this._needEmitHeader = this.listeners("header").length > 0;
 | |
|     }
 | |
|     if (this._needEmitCsv === null) {
 | |
|       this._needEmitCsv = this.listeners("csv").length > 0;
 | |
|     }
 | |
|     if (this._needJson === null) {
 | |
|       this._needJson = this._needEmitJson || this._needEmitFinalResult || this._needEmitResult || this.transform || this._options.objectMode;
 | |
|     }
 | |
|     if (this._needPush === null) {
 | |
|       this._needPush = this.listeners("data").length > 0 || this.listeners("readable").length > 0;
 | |
|       // this._needPush=false;
 | |
|     }
 | |
|     this.param._needParseJson = this._needJson || this._needPush;
 | |
|   }.bind(this));
 | |
| 
 | |
|   return this;
 | |
| }
 | |
| 
 | |
| util.inherits(Converter, Transform);
 | |
| function emitDone(conv) {
 | |
|   return function (err) {
 | |
|     if (!conv._hasDone) {
 | |
|       conv._hasDone = true;
 | |
|       process.nextTick(function () {
 | |
|         conv.emit('done', err);
 | |
|       });
 | |
|     };
 | |
|   }
 | |
| }
 | |
| 
 | |
| 
 | |
| function bufFromString(str) {
 | |
|   var length=Buffer.byteLength(str);
 | |
|   var buffer = Buffer.allocUnsafe
 | |
|     ? Buffer.allocUnsafe(length)
 | |
|     : new Buffer(length);
 | |
|   buffer.write(str);
 | |
|   return buffer;
 | |
| }
 | |
| 
 | |
| Converter.prototype._transform = function (data, encoding, cb) {
 | |
|   data=this.prepareData(data);
 | |
|   var idx =data.length-1;
 | |
|   var left=null;
 | |
|   /**
 | |
|    * From Keyang:
 | |
|    * The code below is to check if a single utf8 char (which could be multiple bytes) being split.
 | |
|    * If the char being split, the buffer from two chunk needs to be concat
 | |
|    * check how utf8 being encoded to understand the code below. 
 | |
|    * If anyone has any better way to do this, please let me know.
 | |
|    */
 | |
|   if ((data[idx] & 1<<7) !=0){
 | |
|     while ((data[idx] & 3<<6) === 128){
 | |
|       idx--;
 | |
|     }
 | |
|     idx--;
 | |
|   }
 | |
|   if (idx !=data.length-1){
 | |
|     left=data.slice(idx+1);
 | |
|     data=data.slice(0,idx+1)
 | |
|     var _cb=cb;
 | |
|     var self=this;
 | |
|     cb=function(){
 | |
|       if (self._csvLineBuffer){
 | |
|         self._csvLineBuffer=Buffer.concat([bufFromString(self._csvLineBuffer,"utf8"),left]);
 | |
|       }else{
 | |
|         self._csvLineBuffer=left;
 | |
|       }
 | |
|       _cb();
 | |
|     }
 | |
|   }
 | |
|   data = data.toString("utf8");
 | |
|   if (this.started === false) {
 | |
|     this.started = true;
 | |
|     data = stripBom(data);
 | |
|     if (this.param.toArrayString) {
 | |
|       if (this._needPush) {
 | |
|         this.push("[" + eol, "utf8");
 | |
|       }
 | |
|     }
 | |
|   }
 | |
|   var self = this;
 | |
|   this.preProcessRaw(data, function (d) {
 | |
|     if (d && d.length > 0) {
 | |
|       self.processData(d, cb);
 | |
|     } else {
 | |
|       cb();
 | |
|     }
 | |
|   });
 | |
| };
 | |
| 
 | |
| Converter.prototype.prepareData = function (data) {
 | |
|   if (this._csvLineBuffer && this._csvLineBuffer.length>0){
 | |
|     if (typeof this._csvLineBuffer === "string"){
 | |
|       this._csvLineBuffer=bufFromString(this._csvLineBuffer);
 | |
|     }
 | |
|     return Buffer.concat([this._csvLineBuffer,data]);
 | |
|   }else{
 | |
|     return data;
 | |
|   }
 | |
|   // return this._csvLineBuffer + data;
 | |
| };
 | |
| 
 | |
| Converter.prototype.setPartialData = function (d) {
 | |
|   this._csvLineBuffer = d;
 | |
| };
 | |
| 
 | |
| Converter.prototype.processData = function (data, cb) {
 | |
|   var params = this.param;
 | |
|   if (params.ignoreEmpty && !params._headers) {
 | |
|     data = data.replace(/^\s+/, "");
 | |
|   }
 | |
|   var eol = this.param.eol;
 | |
|   var fileLines = fileline(data, this.param);
 | |
|   if (this.param.eol !== eol) {
 | |
|     this.emit("eol", this.param.eol);
 | |
|   }
 | |
|   if (fileLines.lines.length > 0) {
 | |
|     if (this.preProcessLine && typeof this.preProcessLine === "function") {
 | |
|       fileLines.lines = this._preProcessLines(fileLines.lines, this.lastIndex);
 | |
|     }
 | |
|     if (!params._headers) { //header is not inited. init header
 | |
|       this.processHead(fileLines, cb);
 | |
|     } else {
 | |
|       if (params.workerNum <= 1) {
 | |
|         var lines = fileLineToCSVLine(fileLines, params);
 | |
|         this.setPartialData(lines.partial);
 | |
|         var jsonArr = linesToJson(lines.lines, params, this.recordNum);
 | |
|         this.processResult(jsonArr);
 | |
|         this.lastIndex += jsonArr.length;
 | |
|         this.recordNum += jsonArr.length;
 | |
|         cb();
 | |
|       } else {
 | |
|         this.workerProcess(fileLines, cb);
 | |
|       }
 | |
|     }
 | |
|   } else {
 | |
|     this.setPartialData(fileLines.partial);
 | |
|     cb();
 | |
|   }
 | |
| };
 | |
| 
 | |
| Converter.prototype._preProcessLines = function (lines, startIdx) {
 | |
|   var rtn = [];
 | |
|   for (var i = 0, len = lines.length; i < len; i++) {
 | |
|     var result = this.preProcessLine(lines[i], startIdx + i + 1);
 | |
|     if (typeof result === "string") {
 | |
|       rtn.push(result);
 | |
|     } else {
 | |
|       rtn.push(lines[i]);
 | |
|       this.emit("error", new Error("preProcessLine should return a string but got: " + JSON.stringify(result)));
 | |
|     }
 | |
|   }
 | |
|   return rtn;
 | |
| };
 | |
| 
 | |
| Converter.prototype.initWorker = function () {
 | |
|   var workerNum = this.param.workerNum - 1;
 | |
|   if (workerNum > 0) {
 | |
|     workerMgr = require("./workerMgr");
 | |
|     this.workerMgr = workerMgr();
 | |
|     this.workerMgr.initWorker(workerNum, this.param);
 | |
|   }
 | |
| };
 | |
| 
 | |
| Converter.prototype.preRawData = function (func) {
 | |
|   this.preProcessRaw = func;
 | |
|   return this;
 | |
| };
 | |
| 
 | |
| Converter.prototype.preFileLine = function (func) {
 | |
|   this.preProcessLine = func;
 | |
|   return this;
 | |
| };
 | |
| 
 | |
| /**
 | |
|  * workerpRocess does not support embeded multiple lines.
 | |
|  */
 | |
| Converter.prototype.workerProcess = function (fileLine, cb) {
 | |
|   var self = this;
 | |
|   var line = fileLine;
 | |
|   var eol = this.getEol();
 | |
|   this.setPartialData(line.partial);
 | |
|   this.workerMgr.sendWorker(line.lines.join(eol) + eol, this.lastIndex, cb, function (results, lastIndex) {
 | |
|     var buf;
 | |
|     var cur = self.sequenceBuffer[0];
 | |
|     if (cur.idx === lastIndex) {
 | |
|       cur.result = results;
 | |
|       var records = [];
 | |
|       while (self.sequenceBuffer[0] && self.sequenceBuffer[0].result) {
 | |
|         buf = self.sequenceBuffer.shift();
 | |
|         records = records.concat(buf.result);
 | |
|       }
 | |
|       self.processResult(records);
 | |
|       self.recordNum += records.length;
 | |
|     } else {
 | |
|       for (var i = 0, len = self.sequenceBuffer.length; i < len; i++) {
 | |
|         buf = self.sequenceBuffer[i];
 | |
|         if (buf.idx === lastIndex) {
 | |
|           buf.result = results;
 | |
|           break;
 | |
|         }
 | |
|       }
 | |
|     }
 | |
|   });
 | |
|   this.sequenceBuffer.push({
 | |
|     idx: this.lastIndex,
 | |
|     result: null
 | |
|   });
 | |
|   this.lastIndex += line.lines.length;
 | |
| };
 | |
| 
 | |
| Converter.prototype.processHead = function (fileLine, cb) {
 | |
|   var params = this.param;
 | |
|   if (params._headers) {
 | |
|     return cb();
 | |
|   }
 | |
|   //dirty hack
 | |
|   params._needFilterRow = false;
 | |
|   // if header is not inited. init header
 | |
|   var lines = fileLine.lines;
 | |
|   var left = "";
 | |
|   var headerRow = [];
 | |
|   if (!params.noheader) {
 | |
|     while (lines.length) {
 | |
|       var line = left + lines.shift();
 | |
|       var delimiter = params.delimiter;
 | |
|       var row = rowSplit(line, params);
 | |
|       if (params.delimiter !== delimiter) {
 | |
|         this.emit("delimiter", params.delimiter);
 | |
|       }
 | |
|       if (row.closed) {
 | |
|         headerRow = row.cols;
 | |
|         left = "";
 | |
|         break;
 | |
|       } else {
 | |
|         left = line + this.getEol();
 | |
|       }
 | |
|     }
 | |
|   }
 | |
|   params._needFilterRow = true;
 | |
|   if (!params.noheader && headerRow.length === 0) { //if one chunk of data does not complete header row.
 | |
|     this.setPartialData(left);
 | |
|     return cb();
 | |
|   }
 | |
|   if (params.noheader) {
 | |
|     if (params.headers) {
 | |
|       params._headers = params.headers;
 | |
|     } else {
 | |
|       params._headers = [];
 | |
|     }
 | |
|   } else {
 | |
|     if (params.headers) {
 | |
|       params._headers = params.headers;
 | |
|     } else {
 | |
|       params._headers = headerRow;
 | |
|     }
 | |
|   }
 | |
|   configIgnoreIncludeColumns(params);
 | |
|   params._headers = require("./filterRow")(params._headers, params);
 | |
|   if (this._needEmitHeader && this.param._headers) {
 | |
|     this.emit("header", this.param._headers);
 | |
|   }
 | |
|   var delimiter = params.delimiter;
 | |
|   var lines = fileLineToCSVLine(fileLine, params);
 | |
|   if (params.delimiter !== delimiter) {
 | |
|     this.emit("delimiter", params.delimiter);
 | |
|   }
 | |
|   this.setPartialData(lines.partial);
 | |
|   if (this.param.workerNum > 1) {
 | |
|     this.workerMgr.setParams(params);
 | |
|   }
 | |
|   var res = linesToJson(lines.lines, params, 0);
 | |
|   // Put the header with the first row
 | |
|   // if(res.length > 0) res[0].header = params._headers;
 | |
|   this.processResult(res);
 | |
|   this.lastIndex += res.length;
 | |
|   this.recordNum += res.length;
 | |
| 
 | |
|   cb();
 | |
| };
 | |
| function configIgnoreIncludeColumns(params) {
 | |
|   if (params._postIgnoreColumns) {
 | |
|     for (var i = 0; i < params.ignoreColumns.length; i++) {
 | |
|       var ignoreCol = params.ignoreColumns[i];
 | |
|       if (typeof ignoreCol === "string") {
 | |
|         var idx = params._headers.indexOf(ignoreCol);
 | |
|         if (idx > -1) {
 | |
|           params.ignoreColumns[i] = idx;
 | |
|         } else {
 | |
|           params.ignoreColumns[i] = -1;
 | |
|         }
 | |
|       }
 | |
|     }
 | |
|     params.ignoreColumns.sort(function (a, b) { return b - a; });
 | |
|   }
 | |
|   if (params._postIncludeColumns) {
 | |
|     for (var i = 0; i < params.includeColumns.length; i++) {
 | |
|       var includeCol = params.includeColumns[i];
 | |
|       if (typeof includeCol === "string") {
 | |
|         var idx = params._headers.indexOf(includeCol);
 | |
|         if (idx > -1) {
 | |
|           params.includeColumns[i] = idx;
 | |
|         } else {
 | |
|           params.includeColumns[i] = -1;
 | |
|         }
 | |
|       }
 | |
|     }
 | |
|   }
 | |
|   params.ignoreColumns = _.uniq(params.ignoreColumns);
 | |
|   params.includeColumns = _.uniq(params.includeColumns);
 | |
| }
 | |
| 
 | |
| Converter.prototype.processResult = function (result) {
 | |
|   for (var i = 0, len = result.length; i < len; i++) {
 | |
|     var r = result[i];
 | |
|     if (r.err) {
 | |
|       this.emit("error", r.err);
 | |
|     } else {
 | |
|       this.emitResult(r);
 | |
|     }
 | |
|   }
 | |
| };
 | |
| 
 | |
| Converter.prototype.emitResult = function (r) {
 | |
|   var index = r.index;
 | |
|   var header = this.param;
 | |
|   var row = r.row;
 | |
|   var result = r.json;
 | |
|   var resultJson = null;
 | |
|   var resultStr = null;
 | |
|   if (typeof result === "string") {
 | |
|     resultStr = result;
 | |
|   } else {
 | |
|     resultJson = result;
 | |
|   }
 | |
|   if (resultJson === null && this._needJson) {
 | |
|     resultJson = JSON.parse(resultStr);
 | |
|     if (typeof row === "string") {
 | |
|       row = JSON.parse(row);
 | |
|     }
 | |
|   }
 | |
|   if (this.transform && typeof this.transform === "function") {
 | |
|     this.transform(resultJson, row, index);
 | |
|     resultStr = null;
 | |
|   }
 | |
|   if (this._needEmitJson) {
 | |
|     this.emit("json", resultJson, index);
 | |
|   }
 | |
|   if (this._needEmitCsv) {
 | |
|     if (typeof row === "string") {
 | |
|       row = JSON.parse(row);
 | |
|     }
 | |
|     this.emit("csv", row, index);
 | |
|   }
 | |
|   if (this.param.constructResult && this._needEmitFinalResult) {
 | |
|     this.finalResult.push(resultJson);
 | |
|   }
 | |
|   if (this._needEmitResult) {
 | |
|     this.emit("record_parsed", resultJson, row, index);
 | |
|   }
 | |
|   if (this.param.toArrayString && index > 0 && this._needPush) {
 | |
|     this.push("," + eol);
 | |
|   }
 | |
|   if (this._options && this._options.objectMode) {
 | |
|     this.push(resultJson);
 | |
|   } else {
 | |
|     if (this._needPush) {
 | |
|       if (resultStr === null) {
 | |
|         resultStr = JSON.stringify(resultJson);
 | |
|       }
 | |
|       this.push(!this.param.toArrayString ? resultStr + eol : resultStr, "utf8");
 | |
|     }
 | |
|   }
 | |
| };
 | |
| 
 | |
| Converter.prototype.preProcessRaw = function (data, cb) {
 | |
|   cb(data);
 | |
| };
 | |
| 
 | |
| // FIXME: lineNumber is not used.
 | |
| Converter.prototype.preProcessLine = function (line, lineNumber) {
 | |
|   return line;
 | |
| };
 | |
| 
 | |
| Converter.prototype._flush = function (cb) {
 | |
|   var self = this;
 | |
|   this.flushCb = function () {
 | |
|     self.emit("end_parsed", self.finalResult);
 | |
|     if (self.workerMgr) {
 | |
|       self.workerMgr.destroyWorker();
 | |
|     }
 | |
|     cb();
 | |
|     if (!self._needPush) {
 | |
|       self.emit("end");
 | |
|     }
 | |
|   };
 | |
|   if (this._csvLineBuffer.length > 0) {
 | |
|     var eol = this.getEol();
 | |
|     if (this._csvLineBuffer[this._csvLineBuffer.length - 1] !== eol) {
 | |
|       this._csvLineBuffer += eol;
 | |
|     }
 | |
|     this.processData(this._csvLineBuffer, function () {
 | |
|       this.checkAndFlush();
 | |
|     }.bind(this));
 | |
|   } else {
 | |
|     this.checkAndFlush();
 | |
|   }
 | |
|   return;
 | |
| };
 | |
| 
 | |
| Converter.prototype.checkAndFlush = function () {
 | |
|   if (this._csvLineBuffer.length !== 0) {
 | |
|     this.emit("error", CSVError.unclosed_quote(this.recordNum, this._csvLineBuffer), this._csvLineBuffer);
 | |
|   }
 | |
|   if (this.param.toArrayString && this._needPush) {
 | |
|     this.push(eol + "]", "utf8");
 | |
|   }
 | |
|   if (this.workerMgr && this.workerMgr.isRunning()) {
 | |
|     this.workerMgr.drain = function () {
 | |
|       this.flushCb();
 | |
|     }.bind(this);
 | |
|   } else {
 | |
|     this.flushCb();
 | |
|   }
 | |
| };
 | |
| 
 | |
| Converter.prototype.getEol = function (data) {
 | |
|   if (!this.param.eol && data) {
 | |
|     for (var i = 0, len = data.length; i < len; i++) {
 | |
|       if (data[i] === "\r") {
 | |
|         if (data[i + 1] === "\n") {
 | |
|           this.param.eol = "\r\n";
 | |
|         } else {
 | |
|           this.param.eol = "\r";
 | |
|         }
 | |
|         return this.param.eol;
 | |
|       } else if (data[i] === "\n") {
 | |
|         this.param.eol = "\n";
 | |
|         return this.param.eol;
 | |
|       }
 | |
|     }
 | |
|     this.param.eol = eol;
 | |
|   }
 | |
| 
 | |
|   return this.param.eol || eol;
 | |
| };
 | |
| 
 | |
| Converter.prototype.fromFile = function (filePath, cb, options) {
 | |
|   var fs = require('fs');
 | |
|   var rs = null;
 | |
|   if (typeof cb ==="object" && typeof options === "undefined"){
 | |
|     options=cb;
 | |
|     cb=null;
 | |
|   }
 | |
|   this.wrapCallback(cb, function () {
 | |
|     if (rs && rs.destroy) {
 | |
|       rs.destroy();
 | |
|     }
 | |
|   });
 | |
|   fs.exists(filePath, function (exist) {
 | |
|     if (exist) {
 | |
|       rs = fs.createReadStream(filePath,options);
 | |
|       rs.pipe(this);
 | |
|     } else {
 | |
|       this.emit('error', new Error("File does not exist. Check to make sure the file path to your csv is correct."));
 | |
|     }
 | |
|   }.bind(this));
 | |
|   return this;
 | |
| };
 | |
| 
 | |
| Converter.prototype.fromStream = function (readStream, cb) {
 | |
|   if (cb && typeof cb === "function") {
 | |
|     this.wrapCallback(cb);
 | |
|   }
 | |
|   readStream.pipe(this);
 | |
|   return this;
 | |
| };
 | |
| 
 | |
| Converter.prototype.transf = function (func) {
 | |
|   this.transform = func;
 | |
|   return this;
 | |
| };
 | |
| 
 | |
| Converter.prototype.fromString = function (csvString, cb) {
 | |
|   if (typeof csvString !== "string") {
 | |
|     if (cb && typeof cb ==="function"){
 | |
|       return cb(new Error("Passed CSV Data is not a string."));
 | |
|     }
 | |
|   }
 | |
|   if (cb && typeof cb === "function") {
 | |
|     this.wrapCallback(cb, function () {
 | |
|     });
 | |
|   }
 | |
|   process.nextTick(function () {
 | |
|     this.end(csvString);
 | |
|   }.bind(this));
 | |
|   return this;
 | |
| };
 | |
| 
 | |
| Converter.prototype.wrapCallback = function (cb, clean) {
 | |
|   if (clean === undefined) {
 | |
|     clean = function () { };
 | |
|   }
 | |
|   if (cb && typeof cb === "function") {
 | |
|     this.once("end_parsed", function (res) {
 | |
|       if (!this.hasError) {
 | |
|         cb(null, res);
 | |
|       }
 | |
|     }.bind(this));
 | |
|   }
 | |
|   this.once("error", function (err) {
 | |
|     this.hasError = true;
 | |
|     if (cb && typeof cb === "function") {
 | |
|       cb(err);
 | |
|     }
 | |
|     clean();
 | |
|   }.bind(this));
 | |
| };
 | |
| 
 | |
| module.exports = Converter;
 |