参考资料:

1. 什么是stream

stream(流)是Node中一个非常重要的概念,几乎所有的I/O操作都与stream有关。官方文档的解释如下:

A stream is an abstract interface implemented by various objects in Node. For example a request to an HTTP server is a stream, as is stdout. Streams are readable, writable, or both. All streams are instances of EventEmitter.

也就是说,stream是一个抽象的接口,其它的类对它进行了实现。而从最初的本质来说,stream就是一个EventEmitter

流的理念源于Unix中的Pipeline,简单地说,就是上一个程序的输出可以作为下一个程序的输入。数据在不同的程序之间流动,就像水流在水管中一样,这是一个十分形象的比喻。通过管道符|可以十分方便地将各个程序连接起来,为数据的流动提供管道,例如:

ls -l | grep key | less

在Node中,stream的之间的连接通过pipe方法,例如:

readStream.pipe(writeStream);

在Node中,stream分为多种,包括ReadableWritableDuplexTransform

通常,可以通过如下方式来创建stream:

var Readable = require('stream').Readable;
var rs = Readable();

通过stream.js的源码,也可以对该模块的结构有一个整体上的了解:

// https://github.com/joyent/node/blob/master/lib/stream.js

module.exports = Stream;

var EE = require('events').EventEmitter;
var util = require('util');

util.inherits(Stream, EE);
Stream.Readable = require('_stream_readable');
Stream.Writable = require('_stream_writable');
Stream.Duplex = require('_stream_duplex');
Stream.Transform = require('_stream_transform');
Stream.PassThrough = require('_stream_passthrough');

function Stream() {
	EE.call(this);
}

可以看到,Stream继承自EventEmitter,而ReadableWritable等具体的stream都挂载在Stream下面。

2. Readable stream

Readable是可以流出数据的流,简单来说,它可以作为我们的数据源。常见的Readable stream有如下几种:

Readable stream有两种模式:

  • flowing:在该模式下,会尽快获取数据向外输出。因此如果没有事件监听,也没有pipe()来引导数据流向,数据可能会丢失。
  • paused:默认模式。在该模式下,需要手动调用stream.read()来获取数据。

可以通过以下几种方法切换到flowing模式:

  • data事件添加监听器
  • 调用resume()
  • 调用pipe()

可以通过以下几种方法切换到paused模式:

  • 如果没有pipe,则调用pause()即可
  • 如果有pipe,那么需要移除data事件的所有监听器,并通过unpipe()移除所有的pipe

下面,来看一个简单的例子:

var Readable = require('stream').Readable;
var rs = new Readable();

rs.on('readable', function() {
	var chunk = rs.read();
	console.log('get data:', chunk ? chunk.toString() : null);
});

rs.on('end', function() {
	console.log('stream end');
});

rs.push('hello stream');
rs.push('hello alex');
rs.push(null);

输出为:

get data: hello streamhello alex
stream end

在这里,我们创建了一个Readable stream,并监听其readableend事件,然后通过push手动地向其中写入数据。在这里,stream为paused模式,当调用push(null)后,才会触发readable事件,之前的push(chunk)操作会将数据先放在内部的缓存中。当监听到readable事件后,需要手动调用rs.read()读取数据,同时该操作会触发end事件;如果在监听到readable事件后不进行read()操作,那么数据就会丢失。

如果我们监听data事件,那么就会自动切换为flowing模式。代码如下:

var Readable = require('stream').Readable;
var rs = new Readable();

rs.on('data', function(chunk) {
	console.log('get data:', chunk.toString());
});

rs.on('end', function() {
	console.log('stream end');
});

rs.push('hello stream');
rs.push('hello alex');
rs.push(null);

此时输出为:

get data: hello stream
get data: hello alex
stream end

因为是flowing模式,因此每次push(chunk)操作都会触发data事件。当调用push(null)时,同样会触发readable事件,只不过此时通过read()操作读取不到任何数据而已。而在每次触发data事件的时候,会在内部调用read()方法,因此最终end事件依然会触发。

当然,我们也可以不监听data事件,而是通过pipe()来进入flowing模式。代码如下:

var Readable = require('stream').Readable;
var rs = new Readable();

rs.on('end', function() {
	console.log('stream end');
});

rs.push('hello stream\n');
rs.push('hello alex\n');
rs.push(null);

rs.pipe(process.stdout);

输出为:

hello stream
hello alex
stream end

其本质上,是在pipe()函数中进行了data事件的监听,以及read()等一系列操作。

上面的几个例子中,我们都是通过push()方法手动地向Readable stream中写入数据。然而在实际情况下,如果要实现一个自定义的Readable stream类,往往是通过定义其_read方法来进行数据的处理。看如下例子:

var Readable = require('stream').Readable;

function MyReadable(data, options) {
	if (!(this instanceof MyReadable)) {
		return new MyReadable(data, options);
	}
	Readable.call(this, options);
	this.data = data || [];
	this.index = 0;
}

MyReadable.prototype.__proto__ = Readable.prototype;

MyReadable.prototype._read = function() {
	if (this.index >= this.data.length) {
		this.push(null);
	} else {
		setTimeout(function() {
			this.push(this.data[this.index++]);
		}.bind(this), 1000);
	}
};

var data = ['California Dreaming', 'Hotel California', 'Californication'];
var rs = MyReadable(data);

rs.on('data', function(chunk) {
	console.log('get data:', chunk.toString());
});

在这里我们实现了一个MyReadable的类,并让其继承自ReadableMyReadable的参数data是一个字符串数组。然后定义了其_read()方法,每次从数组data中取出一项进行push()操作,如果遍历完了数组,则执行push(null)

但是可以发现这样一个问题,也就是push(chunk)中的参数必须为字符串或者Buffer对象,否则就会报错。在上面的例子中,可以看到,MyReadable实际上是接受两个参数的,第二个参数为options,这个参数实际上被Readable所使用。通过在options中设置objectMode: true,就可以使push()操作支持对象、数字等其它类型了。例如:

var Readable = require('stream').Readable;

function MyReadable(data, options) {
	if (!(this instanceof MyReadable)) {
		return new MyReadable(data, options);
	}
	Readable.call(this, options);
	this.data = data || [];
	this.index = 0;
}

MyReadable.prototype.__proto__ = Readable.prototype;

MyReadable.prototype._read = function() {
	if (this.index >= this.data.length) {
		this.push(null);
	} else {
		setTimeout(function() {
			this.push(this.data[this.index++]);
		}.bind(this), 1000);
	}
};

var data = [{
	music: 'California Dreaming',
	artist: 'The Mamas & The Papas'
}, {
	music: 'Hotel California',
	artist: 'Eagles'
}, {
	music: 'Californication',
	artist: 'Red Hot Chili Peppers'
}];
var rs = MyReadable(data, {
	objectMode: true
});

rs.on('data', function(chunk) {
	console.log('%s - %s', chunk.artist, chunk.music);
});

在上面的例子中,我们是通过在_read()函数中使用setTimeout来保证数据的产生速率。其实,也可以通过pause()resume()来控制数据流,例如:

var Readable = require('stream').Readable;

function MyReadable(data, options) {
	if (!(this instanceof MyReadable)) {
		return new MyReadable(data, options);
	}
	Readable.call(this, options);
	this.data = data || [];
	this.index = 0;
}

MyReadable.prototype.__proto__ = Readable.prototype;

MyReadable.prototype._read = function() {
	if (this.index >= this.data.length) {
		this.push(null);
	} else {
		this.push(this.data[this.index++]);
	}
};

var data = ['California Dreaming', 'Hotel California', 'Californication'];
var rs = MyReadable(data);

rs.on('data', function(chunk) {
	console.log('get data:', chunk.toString());
	rs.pause();
	setTimeout(function() {
		rs.resume();
	}, 1000);
});

在这里,每次监听到data事件后,执行pause(),然后过1秒后再执行resume()。在该例子中,在此过程中_read依然在产生数据,只不过此时的push()操作并不会触发data事件,数据暂时存放在内部的缓存中。当执行了resume()后,才会继续在内部调用read()读取数据。

3. Writable stream

Writable是可以流入数据的流,简单来说,我们可以将数据输入给它。常见的Writable stream有如下几种:

当创建一个Writable stream的时候,我们需要实现其_write()方法。看下面例子:

var Writable = require('stream').Writable;

var ws = Writable();

ws._write = function(chunk, encoding, cb) {
	console.log(chunk.toString());
	cb();
}

ws.on('finish', function() {
	console.log('on finish');
});

ws.write('hello world');
ws.write('hello alex');
ws.end();

当然,我们也可以通过继承Writable来创建一个新的stream类,例如:

var Writable = require('stream').Writable;

function MyWritable(options) {
	if (!(this instanceof MyWritable)) {
		return new MyWritable(options);
	}
	Writable.call(this, options);
}

MyWritable.prototype.__proto__ = Writable.prototype;

MyWritable.prototype._write = function(chunk, encoding, cb) {
	console.log(chunk.toString());
	cb();
};

var ws = MyWritable();

ws.write('hello world');
ws.end('the end');

在上面的例子中,write()操作依然是只能够写字符串或者Buffer对象。同样地,可以通过设置objectMode: true来实现其它数据类型的写入。例如;

var Writable = require('stream').Writable;

function MyWritable(options) {
	if (!(this instanceof MyWritable)) {
		return new MyWritable(options);
	}
	Writable.call(this, options);
}

MyWritable.prototype.__proto__ = Writable.prototype;

MyWritable.prototype._write = function(chunk, encoding, cb) {
	console.log(chunk);
	cb();
};

var ws = MyWritable({
	objectMode: true
});

ws.write({
	music: 'California Dreaming',
	artist: 'The Mamas & The Papas'
});
ws.end(100);

4. pipe

下面来了解一下pipe()操作的基本原理。首先我们定义MyReadableMyWritable两个类:

var util = require('util'),
	stream = require('stream'),
	Readable = stream.Readable,
	Writable = stream.Writable;

function MyReadable(options) {
	if (!(this instanceof MyReadable)) {
		return new MyReadable(options);
	}
	Readable.call(this, options);
	this._cur = 1;
	this._max = 200;
}

util.inherits(MyReadable, Readable);

MyReadable.prototype._read = function() {
	if (this._cur > this._max) {
		this.push(null);
	} else {
		this.push('' + this._cur++);
	}
}

function MyWritable(options) {
	if (!(this instanceof MyWritable)) {
		return new MyWritable(options);
	}
	Writable.call(this, options);
}

util.inherits(MyWritable, Writable);

MyWritable.prototype._write = function(chunk, encoding, cb) {
	setTimeout(function() {
		console.log('write data:', chunk.toString());
		cb();
	}.bind(this), 100);
};

其中MyReadable_read()操作是依次生成1到200的数字;MyWritable_write()操作是将收到的数据打印在控制台上,这里通过setTimeout来限制MyWritable的写入速度。

如果我们希望MyReadable的数据流入MyWritable,代码可能是这样子:

var rs = MyReadable(),
	ws = MyWritable();

rs.on('data', function(chunk) {
	ws.write('chunk');
});

此时输出为:

read data: 1
read data: 2
... ...
read data: 199
read data: 200
write data: 1
write data: 2
... ...
write data: 199
write data: 200

此时监听了data事件,rs处于flowing模式,因此数据会源源不断地产生。又由于ws的写入速度远低于rs的读取速度,因此产生的数据会缓存在内存中,然后才会进行写操作。如果例子中_max的值不是200,而是一个非常大的数字的话,就会严重耗费内存。因此,我们需要对数据流进行一定的管理。如下:

var rs = MyReadable({
		highWaterMark: 10
	}),
	ws = MyWritable({
		highWaterMark: 10
	});

rs.on('data', function(chunk) {
	if (!ws.write(chunk)) {
		rs.pause();
	}
});

ws.on('drain', function() {
	rs.resume();
});

由于例子中的数据量比较小,我们通过设置highWaterMark为一个比较小的值来模拟实现此效果。

因为write()操作会返回一个boolean值,用来说明是否可以继续写下去。如果内存中已经缓存了较多的数据,就会返回false。当ws.write(chunk)返回值是false的时候,执行rs.pause()暂停读取流;然后当接收到drain事件的时候,执行rs.resume()重新开启读取流。

此时输出可能为:

read data: 1
... ... 
read data: 15
write data: 1
... ...
write data: 10
read data: 16
... ...
read data: 20
write data: 11
... ...
write data: 15
... ...
... ...
read data: 197
... ...
read data: 200
write data: 193
... ...
write data: 200

我们也可以这样子写:

function pipe(rs, ws) {
	rs.on('data', function(chunk) {
		if (!ws.write(chunk)) {
			rs.pause();
		}
	});

	ws.on('drain', function() {
		rs.resume();
	});
}

pipe(rs, ws);

这其实就是pipe()的基本逻辑。更简单地,我们可以直接使用pipe()来自动管理数据流:

rs.pipe(ws);

5. Duplex

Duplex既是Readable stream同时又是Writable Stream,常见的Duplex stream有:

如果要实现一个Duplex stream,需要实现它的_read()_write()方法。

如下例子,实现了一个比较简单的双向流:

var util = require('util'),
	stream = require('stream'),
	Readable = stream.Readable,
	Duplex = stream.Duplex;

function MyReadable(options) {
	if (!(this instanceof MyReadable)) {
		return new MyReadable(options);
	}
	Readable.call(this, options);
	this._cur = 1;
	this._max = 20;
}

util.inherits(MyReadable, Readable);

MyReadable.prototype._read = function() {
	if (this._cur > this._max) {
		this.push(null);
	} else {
		this.push('' + this._cur++);
	}
}

function MyDuplex(options) {
	if (!(this instanceof MyDuplex)) {
		return new MyDuplex(options);
	}
	Duplex.call(this, options);
	this._data = [];
}

util.inherits(MyDuplex, Duplex);

MyDuplex.prototype._read = function() {
	if (this._data.length) {
		this.push(this._data.shift());
		this.push('\n');
	} else {
		this.push(null);
	}
}

MyDuplex.prototype._write = function(chunk, encoding, cb) {
	console.log('write data:', chunk.toString());
	this._data.push(chunk);
	cb();
};

var rs = MyReadable(),
	ds = MyDuplex();

rs.pipe(ds).pipe(process.stdout);

6. Transform

Transform是一种特殊的Duplex stream,它可以对数据进行转换,也就是说,它的输出是将输入根据某种规则计算而成的。常见的Transform stream有:

如果要实现一个Transform stream,需要实现它的_transform()方法。

如下例子,实现了一个比较简单的转换流:

var util = require('util'),
	stream = require('stream'),
	Readable = stream.Readable,
	Transform = stream.Transform;

function MyReadable(options) {
	if (!(this instanceof MyReadable)) {
		return new MyReadable(options);
	}
	Readable.call(this, options);
	this._cur = 1;
	this._max = 20;
}

util.inherits(MyReadable, Readable);

MyReadable.prototype._read = function() {
	if (this._cur > this._max) {
		this.push(null);
	} else {
		this.push('' + this._cur++);
	}
}

function MyTransform(options) {
	if (!(this instanceof MyTransform)) {
		return new MyTransform(options);
	}
	Transform.call(this, options);
}

util.inherits(MyTransform, Transform);

MyTransform.prototype._transform = function(chunk, encoding, cb) {
	var val = Number(chunk.toString());
	this.push('' + val * 2 + '\n');
	cb();
};

var rs = MyReadable(),
	ts = MyTransform();

rs.pipe(ts).pipe(process.stdout);