// A thing that emits "entry" events with Reader objects // Pausing it causes it to stop emitting entry events, and also // pauses the current entry if there is one. module.exports = DirReader var fs = require('graceful-fs') var inherits = require('inherits') var path = require('path') var Reader = require('./reader.js') var assert = require('assert').ok inherits(DirReader, Reader) function DirReader (props) { var self = this if (!(self instanceof DirReader)) { throw new Error('DirReader must be called as constructor.') } // should already be established as a Directory type if (props.type !== 'Directory' || !props.Directory) { throw new Error('Non-directory type ' + props.type) } self.entries = null self._index = -1 self._paused = false self._length = -1 if (props.sort) { this.sort = props.sort } Reader.call(this, props) } DirReader.prototype._getEntries = function () { var self = this // race condition. might pause() before calling _getEntries, // and then resume, and try to get them a second time. if (self._gotEntries) return self._gotEntries = true fs.readdir(self._path, function (er, entries) { if (er) return self.error(er) self.entries = entries self.emit('entries', entries) if (self._paused) self.once('resume', processEntries) else processEntries() function processEntries () { self._length = self.entries.length if (typeof self.sort === 'function') { self.entries = self.entries.sort(self.sort.bind(self)) } self._read() } }) } // start walking the dir, and emit an "entry" event for each one. DirReader.prototype._read = function () { var self = this if (!self.entries) return self._getEntries() if (self._paused || self._currentEntry || self._aborted) { // console.error('DR paused=%j, current=%j, aborted=%j', self._paused, !!self._currentEntry, self._aborted) return } self._index++ if (self._index >= self.entries.length) { if (!self._ended) { self._ended = true self.emit('end') self.emit('close') } return } // ok, handle this one, then. // save creating a proxy, by stat'ing the thing now. var p = path.resolve(self._path, self.entries[self._index]) assert(p !== self._path) assert(self.entries[self._index]) // set this to prevent trying to _read() again in the stat time. self._currentEntry = p fs[ self.props.follow ? 'stat' : 'lstat' ](p, function (er, stat) { if (er) return self.error(er) var who = self._proxy || self stat.path = p stat.basename = path.basename(p) stat.dirname = path.dirname(p) var childProps = self.getChildProps.call(who, stat) childProps.path = p childProps.basename = path.basename(p) childProps.dirname = path.dirname(p) var entry = Reader(childProps, stat) // console.error("DR Entry", p, stat.size) self._currentEntry = entry // "entry" events are for direct entries in a specific dir. // "child" events are for any and all children at all levels. // This nomenclature is not completely final. entry.on('pause', function (who) { if (!self._paused && !entry._disowned) { self.pause(who) } }) entry.on('resume', function (who) { if (self._paused && !entry._disowned) { self.resume(who) } }) entry.on('stat', function (props) { self.emit('_entryStat', entry, props) if (entry._aborted) return if (entry._paused) { entry.once('resume', function () { self.emit('entryStat', entry, props) }) } else self.emit('entryStat', entry, props) }) entry.on('ready', function EMITCHILD () { // console.error("DR emit child", entry._path) if (self._paused) { // console.error(" DR emit child - try again later") // pause the child, and emit the "entry" event once we drain. // console.error("DR pausing child entry") entry.pause(self) return self.once('resume', EMITCHILD) } // skip over sockets. they can't be piped around properly, // so there's really no sense even acknowledging them. // if someone really wants to see them, they can listen to // the "socket" events. if (entry.type === 'Socket') { self.emit('socket', entry) } else { self.emitEntry(entry) } }) var ended = false entry.on('close', onend) entry.on('disown', onend) function onend () { if (ended) return ended = true self.emit('childEnd', entry) self.emit('entryEnd', entry) self._currentEntry = null if (!self._paused) { self._read() } } // XXX Remove this. Works in node as of 0.6.2 or so. // Long filenames should not break stuff. entry.on('error', function (er) { if (entry._swallowErrors) { self.warn(er) entry.emit('end') entry.emit('close') } else { self.emit('error', er) } }) // proxy up some events. ;[ 'child', 'childEnd', 'warn' ].forEach(function (ev) { entry.on(ev, self.emit.bind(self, ev)) }) }) } DirReader.prototype.disown = function (entry) { entry.emit('beforeDisown') entry._disowned = true entry.parent = entry.root = null if (entry === this._currentEntry) { this._currentEntry = null } entry.emit('disown') } DirReader.prototype.getChildProps = function () { return { depth: this.depth + 1, root: this.root || this, parent: this, follow: this.follow, filter: this.filter, sort: this.props.sort, hardlinks: this.props.hardlinks } } DirReader.prototype.pause = function (who) { var self = this if (self._paused) return who = who || self self._paused = true if (self._currentEntry && self._currentEntry.pause) { self._currentEntry.pause(who) } self.emit('pause', who) } DirReader.prototype.resume = function (who) { var self = this if (!self._paused) return who = who || self self._paused = false // console.error('DR Emit Resume', self._path) self.emit('resume', who) if (self._paused) { // console.error('DR Re-paused', self._path) return } if (self._currentEntry) { if (self._currentEntry.resume) self._currentEntry.resume(who) } else self._read() } DirReader.prototype.emitEntry = function (entry) { this.emit('entry', entry) this.emit('child', entry) }