传统异步方法
ES6 诞生以前,异步编程的方法,大概有下面四种。
- 回调函数
- 事件监听
- 发布/订阅
- Promise 对象
Generator 函数将 JavaScript 异步编程带入了一个全新的阶段。
基本概念
异步
所谓"异步",简单说就是一个任务不是连续完成的,可以理解成该任务被人为分成两段,先执行第一段,然后转而执行其他任务,等做好了准备,再回过头执行第二段。
比如,有一个任务是读取文件进行处理,任务的第一段是向操作系统发出请求,要求读取文件。然后,程序执行其他任务,等到操作系统返回文件,再接着执行任务的第二段(处理文件)。这种不连续的执行,就叫做异步。
回调函数
JavaScript 语言对异步编程的实现,就是回调函数。所谓回调函数,就是把任务的第二段单独写在一个函数里面,等到重新执行这个任务的时候,就直接调用这个函数。回调函数的英语名字 callback,直译过来就是"重新调用"。
一个有趣的问题是,为什么 Node 约定,回调函数的第一个参数,必须是错误对象 err(如果没有错误,该参数就是 null)?
原因是执行分成两段,第一段执行完以后,任务所在的上下文环境就已经结束了。在这以后抛出的错误,原来的上下文环境已经无法捕捉,只能当作参数,传入第二段。
协程的 Generator 函数实现
Generator 函数是协程在 ES6 的实现,最大特点就是可以交出函数的执行权(即暂停执行)。
整个 Generator 函数就是一个封装的异步任务,或者说是异步任务的容器。异步操作需要暂停的地方,都用 yield 语句注明。Generator 函数的执行方法如下。
function* gen(x) {
var y = yield x + 2;
return y;
}
var g = gen(1);
g.next(); // { value: 3, done: false }
g.next(); // { value: undefined, done: true }
2
3
4
5
6
7
8
Generator 函数的数据交换和错误处理
Generator 函数可以暂停执行和恢复执行,这是它能封装异步任务的根本原因。除此之外,它还有两个特性,使它可以作为异步编程的完整解决方案:函数体内外的数据交换和错误处理机制。
next 返回值的 value 属性,是 Generator 函数向外输出数据;next 方法还可以接受参数,向 Generator 函数体内输入数据。
function* gen(x){
var y = yield x + 2;
return y;
}
var g = gen(1);
g.next() // { value: 3, done: false }
g.next(2) // { value: 2, done: true }
```js
Generator 函数内部还可以部署错误处理代码,捕获函数体外抛出的错误。
```js
function* gen(x){
try {
var y = yield x + 2;
} catch (e){
console.log(e);
}
return y;
}
var g = gen(1);
g.next();
g.throw('出错了');
// 出错了
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Thunk 函数
Thunk 函数是自动执行 Generator 函数的一种方法。
参数的求值策略
var x = 1;
function f(m) {
return m * 2;
}
f(x + 5);
2
3
4
5
6
7
上面代码先定义函数 f,然后向它传入表达式 x + 5。请问,这个表达式应该何时求值.一种意见是"传值调用"(call by value),即在进入函数体之前,就计算 x + 5 的值(等于 6),再将这个值传入函数 f。C 语言就采用这种策略。
另一种意见是“传名调用”(call by name),即直接将表达式 x+5 传入函数体,只在用到它的时候求值。Haskell 语言采用这种策略。
传值调用和传名调用,哪一种比较好?回答是各有利弊。传值调用比较简单,但是对参数求值的时候,实际上还没用到这个参数,有可能造成性能损失。
Thunk 函数的含义
编译器的“传名调用”实现,往往是将参数放到一个临时函数之中,再将这个临时函数传入函数体。这个临时函数就叫做 Thunk 函数。
function f(m) {
return m * 2;
}
f(x + 5);
// 等同于
var thunk = function () {
return x + 5;
};
function f(thunk) {
return thunk() * 2;
}
```js
这就是 Thunk函数的定义,它是“传名调用”的一种实现策略,用来替换某个表达式。
### JavaScript 语言的Thunk函数
JavaScript 语言是传值调用,它的 Thunk 函数含义有所不同。在 JavaScript 语言中,Thunk 函数替换的不是表达式,而是多参数函数,将其替换成一个只接受回调函数作为参数的单参数函数。
```js
// 正常版本的readFile(多参数版本)
fs.readFile(fileName, callback);
// Thunk版本的readFile(单参数版本)
var Thunk = function (fileName) {
return function (callback) {
return fs.readFile(fileName, callback);
};
};
var readFileThunk = Thunk(fileName);
readFileThunk(callback);
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
上面代码中,fs 模块的 readFile 方法是一个多参数函数,两个参数分别为文件名和回调函数。经过转换器处理,它变成了一个单参数函数,只接受回调函数作为参数。这个单参数版本,就叫做 Thunk 函数。
任何函数,只要参数有回调函数,就能写成 Thunk 函数的形式。下面是一个简单的 Thunk 函数转换器。
// ES5版本
var Thunk = function(fn) {
return function() {
var args = Array.prototype.slice.call(arguments);
return function(callback) {
args.push(callback);
return fn.apply(this, args);
};
};
};
// ES6版本
const Thunk = function(fn) {
return function(...args) {
return function(callback) {
return fn.call(this, ...args, callback);
};
};
};
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
使用上面的转换器,生成 fs.readFile 的 Thunk 函数。
var readFileThunk = Thunk(fs.readFile);
readFileThunk(fileA)(callback);
2
Thunkify 模块
生产环境的转换器,建议使用 Thunkify 模块。
npm install thunkify
使用方式如下。
var thunkify = require('thunkify');
var fs = require('fs');
var read = thunkify(fs.readFile);
read('package.json')(function(err, str) {
// ...
});
2
3
4
5
6
7
Thunkify 的源码与上一节那个简单的转换器非常像。
function thunkify(fn) {
return function() {
var args = new Array(arguments.length);
var ctx = this;
for (var i = 0; i < args.length; ++i) {
args[i] = arguments[i];
}
return function(done) {
var called;
args.push(function() {
if (called) return;
called = true;
done.apply(null, arguments);
});
try {
fn.apply(ctx, args);
} catch (err) {
done(err);
}
};
};
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
它的源码主要多了一个检查机制,变量 called 确保回调函数只运行一次。
function f(a, b, callback) {
var sum = a + b;
callback(sum);
callback(sum);
}
var ft = thunkify(f);
var print = console.log.bind(console);
ft(1, 2)(print);
// 3
2
3
4
5
6
7
8
9
用于 Generator 函数的流程管理
Thunk 函数现在可以用于 Generator 函数的自动流程管理。Generator 函数可以自动执行。
function* gen() {
// ...
}
var g = gen();
var res = g.next();
while (!res.done) {
console.log(res.value);
res = g.next();
}
2
3
4
5
6
7
8
9
上面代码中,Generator 函数 gen 会自动执行完所有步骤。但是,这不适合异步操作。如果必须保证前一步执行完,才能执行后一步,上面的自动执行就不可行。这时,Thunk 函数就能派上用处。Thunk 函数真正的威力,在于可以自动执行 Generator 函数。下面就是一个基于 Thunk 函数的 Generator 执行器。
function run(fn) {
var gen = fn();
function next(err, data) {
var result = gen.next(data);
if (result.done) return;
result.value(next);
}
next();
}
function* g() {
// ...
}
run(g);
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
上面代码的 run 函数,就是一个 Generator 函数的自动执行器。内部的 next 函数就是 Thunk 的回调函数。next 函数先将指针移到 Generator 函数的下一步(gen.next 方法),然后判断 Generator 函数是否结束(result.done 属性),如果没结束,就将 next 函数再传入 Thunk 函数(result.value 属性),否则就直接退出。
有了这个执行器,执行 Generator 函数方便多了。不管内部有多少个异步操作,直接把 Generator 函数传入 run 函数即可。当然,前提是每一个异步操作,都要是 Thunk 函数,也就是说,跟在 yield 命令后面的必须是 Thunk 函数.
var g = function* (){
var f1 = yield readFileThunk('fileA');
var f2 = yield readFileThunk('fileB');
// ...
var fn = yield readFileThunk('fileN');
};
run(g);
```js
## co模块
### 基本用法
下面是一个 Generator函数,用于依次读取两个文件。
```js
var gen = function* () {
var f1 = yield readFile('/etc/fstab');
var f2 = yield readFile('/etc/shells');
console.log(f1.toString());
console.log(f2.toString());
};
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
co 模块可以让你不用编写 Generator 函数的执行器。
var co = require('co');
co(gen);
2
co 函数返回一个 Promise 对象,因此可以用 then 方法添加回调函数。
co(gen).then(function() {
console.log('Generator 函数执行完成');
});
2
3
co 模块的原理
前面说过,Generator 就是一个异步操作的容器。它的自动执行需要一种机制,当异步操作有了结果,能够自动交回执行权。 两种方法可以做到这一点。
- 回调函数。将异步操作包装成 Thunk 函数,在回调函数里面交回执行权。
- Promise 对象。将异步操作包装成 Promise 对象,用 then 方法交回执行权。
co 模块其实就是将两种自动执行器(Thunk 函数和 Promise 对象),包装成一个模块。使用 co 的前提条件是,Generator 函数的 yield 命令后面,只能是 Thunk 函数或 Promise 对象。如果数组或对象的成员,全部都是 Promise 对象,也可以使用 co,详见后文的例子。
基于 Promise 对象的自动执行
var fs = require('fs');
var readFile = function(fileName) {
return new Promise(function(resolve, reject) {
fs.readFile(fileName, function(error, data) {
if (error) return reject(error);
resolve(data);
});
});
};
var gen = function*() {
var f1 = yield readFile('/etc/fstab');
var f2 = yield readFile('/etc/shells');
console.log(f1.toString());
console.log(f2.toString());
};
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
然后,手动执行上面的 Generator 函数。
var g = gen();
g.next().value.then(function(data) {
g.next(data).value.then(function(data) {
g.next(data);
});
});
2
3
4
5
6
7
手动执行其实就是用 then 方法,层层添加回调函数。理解了这一点,就可以写出一个自动执行器。
function run(gen) {
var g = gen();
function next(data) {
var result = g.next(data);
if (result.done) return result.value;
result.value.then(function(data) {
next(data);
});
}
next();
}
run(gen);
2
3
4
5
6
7
8
9
10
11
12
13
14
15
co 模块的源码
首先,co 函数接受 Generator 函数作为参数,返回一个 Promise 对象。
function co(gen) {
var ctx = this;
return new Promise(function(resolve, reject) {});
}
2
3
4
5
在返回的 Promise 对象里面,co 先检查参数 gen 是否为 Generator 函数。如果是,就执行该函数,得到一个内部指针对象;如果不是就返回,并将 Promise 对象的状态改为 resolved。
function co(gen) {
var ctx = this;
return new Promise(function(resolve, reject) {
if (typeof gen === 'function') gen = gen.call(ctx);
if (!gen || typeof gen.next !== 'function') return resolve(gen);
});
}
2
3
4
5
6
7
8
接着,co 将 Generator 函数的内部指针对象的 next 方法,包装成 onFulfilled 函数。这主要是为了能够捕捉抛出的错误。
function co(gen) {
var ctx = this;
return new Promise(function(resolve, reject) {
if (typeof gen === 'function') gen = gen.call(ctx);
if (!gen || typeof gen.next !== 'function') return resolve(gen);
onFulfilled();
function onFulfilled(res) {
var ret;
try {
ret = gen.next(res);
} catch (e) {
return reject(e);
}
next(ret);
}
});
}
```js
最后,就是关键的next函数,它会反复调用自身。
```js
function next(ret) {
if (ret.done) return resolve(ret.value);
var value = toPromise.call(ctx, ret.value);
if (value && isPromise(value)) return value.then(onFulfilled, onRejected);
return onRejected(
new TypeError(
'You may only yield a function, promise, generator, array, or object, '
+ 'but the following object was passed: "'
+ String(ret.value)
+ '"'
)
);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
第一行,检查当前是否为 Generator 函数的最后一步,如果是就返回。
第二行,确保每一步的返回值,是 Promise 对象。
第三行,使用 then 方法,为返回值加上回调函数,然后通过 onFulfilled 函数再次调用 next 函数。
第四行,在参数不符合要求的情况下(参数非 Thunk 函数和 Promise 对象),将 Promise 对象的状态改为 rejected,从而终止执行。
处理并发的异步操作
co 支持并发的异步操作,即允许某些操作同时进行,等到它们全部完成,才进行下一步。 这时,要把并发的操作都放在数组或对象里面,跟在 yield 语句后面。
// 数组的写法
co(function*() {
var res = yield [Promise.resolve(1), Promise.resolve(2)];
console.log(res);
}).catch(onerror);
// 对象的写法
co(function*() {
var res = yield {
1: Promise.resolve(1),
2: Promise.resolve(2)
};
console.log(res);
}).catch(onerror);
2
3
4
5
6
7
8
9
10
11
12
13
14
实例:处理 Stream
Node 提供 Stream 模式读写数据,特点是一次只处理数据的一部分,数据分成一块块依次处理,就好像“数据流”一样。这对于处理大规模数据非常有利。Stream 模式使用 EventEmitter API,会释放三个事件。
- data 事件:下一块数据块已经准备好了。
- end 事件:整个“数据流”处理“完了。
- error 事件:发生错误
使用 Promise.race()函数,可以判断这三个事件之中哪一个最先发生,只有当 data 事件最先发生时,才进入下一个数据块的处理。从而,我们可以通过一个 while 循环,完成所有数据的读取。
const co = require('co');
const fs = require('fs');
const stream = fs.createReadStream('./les_miserables.txt');
let valjeanCount = 0;
co(function*() {
while (true) {
const res = yield Promise.race([
new Promise(resolve => stream.once('data', resolve)),
new Promise(resolve => stream.once('end', resolve)),
new Promise((resolve, reject) => stream.once('error', reject))
]);
if (!res) {
break;
}
stream.removeAllListeners('data');
stream.removeAllListeners('end');
stream.removeAllListeners('error');
valjeanCount += (res.toString().match(/valjean/gi) || []).length;
}
console.log('count:', valjeanCount); // count: 1120
});
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
上面代码采用 Stream 模式读取《悲惨世界》的文本文件,对于每个数据块都使用 stream.once 方法,在 data、end、error 三个事件上添加一次性回调函数。变量 res 只有在 data 事件发生时才有值,然后累加每个数据块之中 valjean 这个词出现的次数。