// Generated by CoffeeScript 2.5.1 // # Stream Transformer // Pass all elements of an array or a stream to transform, filter and add. Features include: // * Extends the Node.js "stream.Transform" API. // * Both synchrounous and asynchronous support based and user callback // arguments signature. // * Ability to skip records. // * Sequential and concurrent execution using the "parallel" options. // Please look at the [README], the [samples] and the [tests] for additional // information. var Transformer, clone, stream, util; stream = require('stream'); util = require('util'); ({clone} = require('mixme')); // ## Usage // Callback approach, for ease of use: // `transform(records, [options], handler, callback)` // Stream API, for maximum of power: // `transform([records], [options], handler, [callback])` module.exports = function() { var argument, callback, error, handler, i, j, len, options, records, result, transform, type; options = {}; for (i = j = 0, len = arguments.length; j < len; i = ++j) { argument = arguments[i]; type = typeof argument; if (argument === null) { type = 'null'; } else if (type === 'object' && Array.isArray(argument)) { type = 'array'; } if (type === 'array') { records = argument; } else if (type === 'object') { options = clone(argument); } else if (type === 'function') { if (handler && i === arguments.length - 1) { callback = argument; } else { handler = argument; } } else if (type !== 'null') { throw new Error(`Invalid Arguments: got ${JSON.stringify(argument)} at position ${i}`); } } transform = new Transformer(options, handler); error = false; if (records) { setImmediate(function() { var k, len1, record; for (k = 0, len1 = records.length; k < len1; k++) { record = records[k]; if (error) { break; } transform.write(record); } return transform.end(); }); } if (callback || options.consume) { result = []; transform.on('readable', function() { var record, results; results = []; while ((record = transform.read())) { if (callback) { results.push(result.push(record)); } else { results.push(void 0); } } return results; }); transform.on('error', function(err) { error = true; if (callback) { return callback(err); } }); transform.on('end', function() { if (callback && !error) { return callback(null, result); } }); } return transform; }; // ## Transformer // Options are documented [here](http://csv.js.org/transform/options/). Transformer = function(options1 = {}, handler1) { var base, base1; this.options = options1; this.handler = handler1; if ((base = this.options).consume == null) { base.consume = false; } this.options.objectMode = true; if ((base1 = this.options).parallel == null) { base1.parallel = 100; } stream.Transform.call(this, this.options); this.state = { running: 0, started: 0, finished: 0 }; return this; }; util.inherits(Transformer, stream.Transform); module.exports.Transformer = Transformer; Transformer.prototype._transform = function(chunk, encoding, cb) { var callback, err, l; this.state.started++; this.state.running++; if (this.state.running < this.options.parallel) { cb(); cb = null; } try { l = this.handler.length; if (this.options.params != null) { l--; } if (l === 1) { // sync this.__done(null, [this.handler.call(this, chunk, this.options.params)], cb); } else if (l === 2) { // async callback = (err, ...chunks) => { return this.__done(err, chunks, cb); }; this.handler.call(this, chunk, callback, this.options.params); } else { throw Error("Invalid handler arguments"); } return false; } catch (error1) { err = error1; return this.__done(err); } }; Transformer.prototype._flush = function(cb) { this._ending = function() { if (this.state.running === 0) { this._ending = undefined return cb(); } }; return this._ending(); }; Transformer.prototype.__done = function(err, chunks, cb) { var chunk, j, len; this.state.running--; if (err) { return this.emit('error', err); } this.state.finished++; for (j = 0, len = chunks.length; j < len; j++) { chunk = chunks[j]; if (typeof chunk === 'number') { chunk = `${chunk}`; } if ((chunk != null) && chunk !== '') { // We dont push empty string // See https://nodejs.org/api/stream.html#stream_readable_push this.push(chunk); } } if (cb) { cb(); } if (this._ending) { return this._ending(); } }; // [readme]: https://github.com/wdavidw/node-stream-transform // [samples]: https://github.com/wdavidw/node-stream-transform/tree/master/samples // [tests]: https://github.com/wdavidw/node-stream-transform/tree/master/test