You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
212 lines
7.4 KiB
212 lines
7.4 KiB
"use strict";
|
|
Object.defineProperty(exports, "__esModule", { value: true });
|
|
exports.CsvParserStream = void 0;
|
|
const string_decoder_1 = require("string_decoder");
|
|
const stream_1 = require("stream");
|
|
const transforms_1 = require("./transforms");
|
|
const parser_1 = require("./parser");
|
|
class CsvParserStream extends stream_1.Transform {
|
|
constructor(parserOptions) {
|
|
super({ objectMode: parserOptions.objectMode });
|
|
this.lines = '';
|
|
this.rowCount = 0;
|
|
this.parsedRowCount = 0;
|
|
this.parsedLineCount = 0;
|
|
this.endEmitted = false;
|
|
this.headersEmitted = false;
|
|
this.parserOptions = parserOptions;
|
|
this.parser = new parser_1.Parser(parserOptions);
|
|
this.headerTransformer = new transforms_1.HeaderTransformer(parserOptions);
|
|
this.decoder = new string_decoder_1.StringDecoder(parserOptions.encoding);
|
|
this.rowTransformerValidator = new transforms_1.RowTransformerValidator();
|
|
}
|
|
get hasHitRowLimit() {
|
|
return this.parserOptions.limitRows && this.rowCount >= this.parserOptions.maxRows;
|
|
}
|
|
get shouldEmitRows() {
|
|
return this.parsedRowCount > this.parserOptions.skipRows;
|
|
}
|
|
get shouldSkipLine() {
|
|
return this.parsedLineCount <= this.parserOptions.skipLines;
|
|
}
|
|
transform(transformFunction) {
|
|
this.rowTransformerValidator.rowTransform = transformFunction;
|
|
return this;
|
|
}
|
|
validate(validateFunction) {
|
|
this.rowTransformerValidator.rowValidator = validateFunction;
|
|
return this;
|
|
}
|
|
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
|
emit(event, ...rest) {
|
|
if (event === 'end') {
|
|
if (!this.endEmitted) {
|
|
this.endEmitted = true;
|
|
super.emit('end', this.rowCount);
|
|
}
|
|
return false;
|
|
}
|
|
return super.emit(event, ...rest);
|
|
}
|
|
_transform(data, encoding, done) {
|
|
// if we have hit our maxRows parsing limit then skip parsing
|
|
if (this.hasHitRowLimit) {
|
|
return done();
|
|
}
|
|
const wrappedCallback = CsvParserStream.wrapDoneCallback(done);
|
|
try {
|
|
const { lines } = this;
|
|
const newLine = lines + this.decoder.write(data);
|
|
const rows = this.parse(newLine, true);
|
|
return this.processRows(rows, wrappedCallback);
|
|
}
|
|
catch (e) {
|
|
return wrappedCallback(e);
|
|
}
|
|
}
|
|
_flush(done) {
|
|
const wrappedCallback = CsvParserStream.wrapDoneCallback(done);
|
|
// if we have hit our maxRows parsing limit then skip parsing
|
|
if (this.hasHitRowLimit) {
|
|
return wrappedCallback();
|
|
}
|
|
try {
|
|
const newLine = this.lines + this.decoder.end();
|
|
const rows = this.parse(newLine, false);
|
|
return this.processRows(rows, wrappedCallback);
|
|
}
|
|
catch (e) {
|
|
return wrappedCallback(e);
|
|
}
|
|
}
|
|
parse(data, hasMoreData) {
|
|
if (!data) {
|
|
return [];
|
|
}
|
|
const { line, rows } = this.parser.parse(data, hasMoreData);
|
|
this.lines = line;
|
|
return rows;
|
|
}
|
|
processRows(rows, cb) {
|
|
const rowsLength = rows.length;
|
|
const iterate = (i) => {
|
|
const callNext = (err) => {
|
|
if (err) {
|
|
return cb(err);
|
|
}
|
|
if (i % 100 === 0) {
|
|
// incase the transform are sync insert a next tick to prevent stack overflow
|
|
setImmediate(() => iterate(i + 1));
|
|
return undefined;
|
|
}
|
|
return iterate(i + 1);
|
|
};
|
|
this.checkAndEmitHeaders();
|
|
// if we have emitted all rows or we have hit the maxRows limit option
|
|
// then end
|
|
if (i >= rowsLength || this.hasHitRowLimit) {
|
|
return cb();
|
|
}
|
|
this.parsedLineCount += 1;
|
|
if (this.shouldSkipLine) {
|
|
return callNext();
|
|
}
|
|
const row = rows[i];
|
|
this.rowCount += 1;
|
|
this.parsedRowCount += 1;
|
|
const nextRowCount = this.rowCount;
|
|
return this.transformRow(row, (err, transformResult) => {
|
|
if (err) {
|
|
this.rowCount -= 1;
|
|
return callNext(err);
|
|
}
|
|
if (!transformResult) {
|
|
return callNext(new Error('expected transform result'));
|
|
}
|
|
if (!transformResult.isValid) {
|
|
this.emit('data-invalid', transformResult.row, nextRowCount, transformResult.reason);
|
|
}
|
|
else if (transformResult.row) {
|
|
return this.pushRow(transformResult.row, callNext);
|
|
}
|
|
return callNext();
|
|
});
|
|
};
|
|
iterate(0);
|
|
}
|
|
transformRow(parsedRow, cb) {
|
|
try {
|
|
this.headerTransformer.transform(parsedRow, (err, withHeaders) => {
|
|
if (err) {
|
|
return cb(err);
|
|
}
|
|
if (!withHeaders) {
|
|
return cb(new Error('Expected result from header transform'));
|
|
}
|
|
if (!withHeaders.isValid) {
|
|
if (this.shouldEmitRows) {
|
|
return cb(null, { isValid: false, row: parsedRow });
|
|
}
|
|
// skipped because of skipRows option remove from total row count
|
|
return this.skipRow(cb);
|
|
}
|
|
if (withHeaders.row) {
|
|
if (this.shouldEmitRows) {
|
|
return this.rowTransformerValidator.transformAndValidate(withHeaders.row, cb);
|
|
}
|
|
// skipped because of skipRows option remove from total row count
|
|
return this.skipRow(cb);
|
|
}
|
|
// this is a header row dont include in the rowCount or parsedRowCount
|
|
this.rowCount -= 1;
|
|
this.parsedRowCount -= 1;
|
|
return cb(null, { row: null, isValid: true });
|
|
});
|
|
}
|
|
catch (e) {
|
|
cb(e);
|
|
}
|
|
}
|
|
checkAndEmitHeaders() {
|
|
if (!this.headersEmitted && this.headerTransformer.headers) {
|
|
this.headersEmitted = true;
|
|
this.emit('headers', this.headerTransformer.headers);
|
|
}
|
|
}
|
|
skipRow(cb) {
|
|
// skipped because of skipRows option remove from total row count
|
|
this.rowCount -= 1;
|
|
return cb(null, { row: null, isValid: true });
|
|
}
|
|
pushRow(row, cb) {
|
|
try {
|
|
if (!this.parserOptions.objectMode) {
|
|
this.push(JSON.stringify(row));
|
|
}
|
|
else {
|
|
this.push(row);
|
|
}
|
|
cb();
|
|
}
|
|
catch (e) {
|
|
cb(e);
|
|
}
|
|
}
|
|
static wrapDoneCallback(done) {
|
|
let errorCalled = false;
|
|
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
|
return (err, ...args) => {
|
|
if (err) {
|
|
if (errorCalled) {
|
|
throw err;
|
|
}
|
|
errorCalled = true;
|
|
done(err);
|
|
return;
|
|
}
|
|
done(...args);
|
|
};
|
|
}
|
|
}
|
|
exports.CsvParserStream = CsvParserStream;
|
|
//# sourceMappingURL=CsvParserStream.js.map
|