// write data to it, and it'll emit data in 512 byte blocks. // if you .end() or .flush(), it'll emit whatever it's got, // padded with nulls to 512 bytes. module.exports = BlockStream var Stream = require("stream").Stream , inherits = require("inherits") , assert = require("assert").ok , debug = process.env.DEBUG ? console.error : function () {} function BlockStream (size, opt) { this.writable = this.readable = true this._opt = opt || {} this._chunkSize = size || 512 this._offset = 0 this._buffer = [] this._bufferLength = 0 if (this._opt.nopad) this._zeroes = false else { this._zeroes = new Buffer(this._chunkSize) for (var i = 0; i < this._chunkSize; i ++) { this._zeroes[i] = 0 } } } inherits(BlockStream, Stream) BlockStream.prototype.write = function (c) { // debug(" BS write", c) if (this._ended) throw new Error("BlockStream: write after end") if (c && !Buffer.isBuffer(c)) c = new Buffer(c + "") if (c.length) { this._buffer.push(c) this._bufferLength += c.length } // debug("pushed onto buffer", this._bufferLength) if (this._bufferLength >= this._chunkSize) { if (this._paused) { // debug(" BS paused, return false, need drain") this._needDrain = true return false } this._emitChunk() } return true } BlockStream.prototype.pause = function () { // debug(" BS pausing") this._paused = true } BlockStream.prototype.resume = function () { // debug(" BS resume") this._paused = false return this._emitChunk() } BlockStream.prototype.end = function (chunk) { // debug("end", chunk) if (typeof chunk === "function") cb = chunk, chunk = null if (chunk) this.write(chunk) this._ended = true this.flush() } BlockStream.prototype.flush = function () { this._emitChunk(true) } BlockStream.prototype._emitChunk = function (flush) { // debug("emitChunk flush=%j emitting=%j paused=%j", flush, this._emitting, this._paused) // emit a chunk if (flush && this._zeroes) { // debug(" BS push zeroes", this._bufferLength) // push a chunk of zeroes var padBytes = (this._bufferLength % this._chunkSize) if (padBytes !== 0) padBytes = this._chunkSize - padBytes if (padBytes > 0) { // debug("padBytes", padBytes, this._zeroes.slice(0, padBytes)) this._buffer.push(this._zeroes.slice(0, padBytes)) this._bufferLength += padBytes // debug(this._buffer[this._buffer.length - 1].length, this._bufferLength) } } if (this._emitting || this._paused) return this._emitting = true // debug(" BS entering loops") var bufferIndex = 0 while (this._bufferLength >= this._chunkSize && (flush || !this._paused)) { // debug(" BS data emission loop", this._bufferLength) var out , outOffset = 0 , outHas = this._chunkSize while (outHas > 0 && (flush || !this._paused) ) { // debug(" BS data inner emit loop", this._bufferLength) var cur = this._buffer[bufferIndex] , curHas = cur.length - this._offset // debug("cur=", cur) // debug("curHas=%j", curHas) // If it's not big enough to fill the whole thing, then we'll need // to copy multiple buffers into one. However, if it is big enough, // then just slice out the part we want, to save unnecessary copying. // Also, need to copy if we've already done some copying, since buffers // can't be joined like cons strings. if (out || curHas < outHas) { out = out || new Buffer(this._chunkSize) cur.copy(out, outOffset, this._offset, this._offset + Math.min(curHas, outHas)) } else if (cur.length === outHas && this._offset === 0) { // shortcut -- cur is exactly long enough, and no offset. out = cur } else { // slice out the piece of cur that we need. out = cur.slice(this._offset, this._offset + outHas) } if (curHas > outHas) { // means that the current buffer couldn't be completely output // update this._offset to reflect how much WAS written this._offset += outHas outHas = 0 } else { // output the entire current chunk. // toss it away outHas -= curHas outOffset += curHas bufferIndex ++ this._offset = 0 } } this._bufferLength -= this._chunkSize assert(out.length === this._chunkSize) // debug("emitting data", out) // debug(" BS emitting, paused=%j", this._paused, this._bufferLength) this.emit("data", out) out = null } // debug(" BS out of loops", this._bufferLength) // whatever is left, it's not enough to fill up a block, or we're paused this._buffer = this._buffer.slice(bufferIndex) if (this._paused) { // debug(" BS paused, leaving", this._bufferLength) this._needsDrain = true this._emitting = false return } // if flushing, and not using null-padding, then need to emit the last // chunk(s) sitting in the queue. We know that it's not enough to // fill up a whole block, because otherwise it would have been emitted // above, but there may be some offset. var l = this._buffer.length if (flush && !this._zeroes && l) { if (l === 1) { if (this._offset) { this.emit("data", this._buffer[0].slice(this._offset)) } else { this.emit("data", this._buffer[0]) } } else { var outHas = this._bufferLength , out = new Buffer(outHas) , outOffset = 0 for (var i = 0; i < l; i ++) { var cur = this._buffer[i] , curHas = cur.length - this._offset cur.copy(out, outOffset, this._offset) this._offset = 0 outOffset += curHas this._bufferLength -= curHas } this.emit("data", out) } // truncate this._buffer.length = 0 this._bufferLength = 0 this._offset = 0 } // now either drained or ended // debug("either draining, or ended", this._bufferLength, this._ended) // means that we've flushed out all that we can so far. if (this._needDrain) { // debug("emitting drain", this._bufferLength) this._needDrain = false this.emit("drain") } if ((this._bufferLength === 0) && this._ended && !this._endEmitted) { // debug("emitting end", this._bufferLength) this._endEmitted = true this.emit("end") } this._emitting = false // debug(" BS no longer emitting", flush, this._paused, this._emitting, this._bufferLength, this._chunkSize) }