'use strict' const EE = require('events') const Yallist = require('yallist') const EOF = Symbol('EOF') const MAYBE_EMIT_END = Symbol('maybeEmitEnd') const EMITTED_END = Symbol('emittedEnd') const CLOSED = Symbol('closed') const READ = Symbol('read') const FLUSH = Symbol('flush') const FLUSHCHUNK = Symbol('flushChunk') const SD = require('string_decoder').StringDecoder const ENCODING = Symbol('encoding') const DECODER = Symbol('decoder') const FLOWING = Symbol('flowing') const RESUME = Symbol('resume') const BUFFERLENGTH = Symbol('bufferLength') const BUFFERPUSH = Symbol('bufferPush') const BUFFERSHIFT = Symbol('bufferShift') const OBJECTMODE = Symbol('objectMode') class MiniPass extends EE { constructor (options) { super() this[FLOWING] = false this.pipes = new Yallist() this.buffer = new Yallist() this[OBJECTMODE] = options && options.objectMode || false if (this[OBJECTMODE]) this[ENCODING] = null else this[ENCODING] = options && options.encoding || null if (this[ENCODING] === 'buffer') this[ENCODING] = null this[DECODER] = this[ENCODING] ? new SD(this[ENCODING]) : null this[EOF] = false this[EMITTED_END] = false this[CLOSED] = false this.writable = true this.readable = true this[BUFFERLENGTH] = 0 } get bufferLength () { return this[BUFFERLENGTH] } get encoding () { return this[ENCODING] } set encoding (enc) { if (this[OBJECTMODE]) throw new Error('cannot set encoding in objectMode') if (this[ENCODING] && enc !== this[ENCODING] && (this[DECODER] && this[DECODER].lastNeed || this[BUFFERLENGTH])) throw new Error('cannot change encoding') if (this[ENCODING] !== enc) { this[DECODER] = enc ? new SD(enc) : null if (this.buffer.length) this.buffer = this.buffer.map(chunk => this[DECODER].write(chunk)) } this[ENCODING] = enc } setEncoding (enc) { this.encoding = enc } write (chunk, encoding, cb) { if (this[EOF]) throw new Error('write after end') if (typeof encoding === 'function') cb = encoding, encoding = 'utf8' if (!encoding) encoding = 'utf8' // fast-path writing strings of same encoding to a stream with // an empty buffer, skipping the buffer/decoder dance if (typeof chunk === 'string' && !this[OBJECTMODE] && // unless it is a string already ready for us to use !(encoding === this[ENCODING] && !this[DECODER].lastNeed)) { chunk = new Buffer(chunk, encoding) } if (Buffer.isBuffer(chunk) && this[ENCODING]) chunk = this[DECODER].write(chunk) try { return this.flowing ? (this.emit('data', chunk), this.flowing) : (this[BUFFERPUSH](chunk), false) } finally { this.emit('readable') if (cb) cb() } } read (n) { try { if (this[BUFFERLENGTH] === 0 || n === 0 || n > this[BUFFERLENGTH]) return null if (this[OBJECTMODE]) n = null if (this.buffer.length > 1 && !this[OBJECTMODE]) { if (this.encoding) this.buffer = new Yallist([ Array.from(this.buffer).join('') ]) else this.buffer = new Yallist([ Buffer.concat(Array.from(this.buffer), this[BUFFERLENGTH]) ]) } return this[READ](n || null, this.buffer.head.value) } finally { this[MAYBE_EMIT_END]() } } [READ] (n, chunk) { if (n === chunk.length || n === null) this[BUFFERSHIFT]() else { this.buffer.head.value = chunk.slice(n) chunk = chunk.slice(0, n) this[BUFFERLENGTH] -= n } this.emit('data', chunk) if (!this.buffer.length && !this[EOF]) this.emit('drain') return chunk } end (chunk, encoding, cb) { if (typeof chunk === 'function') cb = chunk, chunk = null if (typeof encoding === 'function') cb = encoding, encoding = 'utf8' if (chunk) this.write(chunk, encoding) if (cb) this.once('end', cb) this[EOF] = true this.writable = false if (this.flowing) this[MAYBE_EMIT_END]() } // don't let the internal resume be overwritten [RESUME] () { this[FLOWING] = true this.emit('resume') if (this.buffer.length) this[FLUSH]() else if (this[EOF]) this[MAYBE_EMIT_END]() else this.emit('drain') } resume () { return this[RESUME]() } pause () { this[FLOWING] = false } get flowing () { return this[FLOWING] } [BUFFERPUSH] (chunk) { if (this[OBJECTMODE]) this[BUFFERLENGTH] += 1 else this[BUFFERLENGTH] += chunk.length return this.buffer.push(chunk) } [BUFFERSHIFT] () { if (this.buffer.length) { if (this[OBJECTMODE]) this[BUFFERLENGTH] -= 1 else this[BUFFERLENGTH] -= this.buffer.head.value.length } return this.buffer.shift() } [FLUSH] () { do {} while (this[FLUSHCHUNK](this[BUFFERSHIFT]())) if (!this.buffer.length && !this[EOF]) this.emit('drain') } [FLUSHCHUNK] (chunk) { return chunk ? (this.emit('data', chunk), this.flowing) : false } pipe (dest, opts) { if (dest === process.stdout || dest === process.stderr) (opts = opts || {}).end = false const p = { dest: dest, opts: opts, ondrain: _ => this[RESUME]() } this.pipes.push(p) dest.on('drain', p.ondrain) this[RESUME]() return dest } addEventHandler (ev, fn) { return this.on(ev, fn) } on (ev, fn) { try { return super.on(ev, fn) } finally { if (ev === 'data' && !this.pipes.length && !this.flowing) { this[RESUME]() } } } get emittedEnd () { return this[EMITTED_END] } [MAYBE_EMIT_END] () { if (!this[EMITTED_END] && this.buffer.length === 0 && this[EOF]) { this.emit('end') this.emit('prefinish') this.emit('finish') if (this[CLOSED]) this.emit('close') } } emit (ev, data) { if (ev === 'data') { if (!data) return if (this.pipes.length) this.pipes.forEach(p => p.dest.write(data) || this.pause()) } else if (ev === 'end') { if (this[DECODER]) { data = this[DECODER].end() if (data) { this.pipes.forEach(p => p.dest.write(data)) super.emit('data', data) } } this.pipes.forEach(p => { p.dest.removeListener('drain', p.ondrain) if (!p.opts || p.opts.end !== false) p.dest.end() }) this[EMITTED_END] = true this.readable = false } else if (ev === 'close') { this[CLOSED] = true // don't emit close before 'end' and 'finish' if (!this[EMITTED_END]) return } const args = new Array(arguments.length) args[0] = ev args[1] = data if (arguments.length > 2) { for (let i = 2; i < arguments.length; i++) { args[i] = arguments[i] } } try { return super.emit.apply(this, args) } finally { if (ev !== 'end') this[MAYBE_EMIT_END]() } } } module.exports = MiniPass