Callbacks 和(han) Promise 和 Rx*

想想还有几个月就去台湾了,心里还有点小激动呢,标题卖个萌。


回头看了一下,发现自己最近几个月对callback和async这些玩意儿出奇的关心。原因无它,各种callback和async操作实在是太反 人 类了(此处人类特指地球上某统治级灵长类生物中跟其祖先最接近的一群——程序猿,BTW,反正都扯蛋了就多扯两句:人人都知道user-friendly重要,但却没有人比微软更注重programmer-friendly,就这一点,简直要为M$点赞,以后蛋疼了一定要写一篇关于大微软爱护程序员的软文),而如何抽象这些反人类的东西,正是计算机和编程领域里最有魅力的部分。

你得承认,类似:内存 = 竖着摆的矩形分栏状图形;stream = 横着摆带节点的水管状图形,都是一些类似“平面直角坐标系”这样的——你根本就不晓得笛卡尔是怎么想出来的——伟大抽象。没了这些抽象,很多更一步的概念和实现就会变得十分的困难。

嗯,图文并茂一下:

memory: memory

stream: stream

简单来说,抽象就是让你理解不了的东西变得容易(可以?)理解了,如果达不到这个目的,那就不是成功的抽象。

类似threading和asynchronous这样的东西就是难以理解的(光拼写就很难了),因为人的惯性思维都是串行的,非异步和单线程的编程也是串行的,当我写下

operation_one
operation_two

的时候,我一定想的是operation_one执行完毕operation_two开始执行这样的顺序。如果我没接触过threading或者async,当你告诉我“可能operation_two会比operation_one先执行完,可能他们两个操作还会瞬间就return了,不过到底会发生什么我也不能确定呢”,我一定会觉得你在耍流氓。

可async就是要耍流氓(后面都只写async不写threading了,因为multi-threading只是async底层的一种实现方式,async的抽象层次已经比threading高了,没必要在一起讲),至于async到底是怎么耍流氓的,先看下面的代码(nodejs):

var dirName = "foo";
fs.readdir(dirName, function(err, fileNames) {
	if (err) {
		throw err;
	}

	fileNames.forEach(function(fileName) {
		var fullPath = path.resolve(dirName, fileName);
		fs.readFile(fullPath, function(err, data) {
			if (err) {
				throw err;
			}

			console.log(data.length);
		});
	});
});

看起来挺直白的是吧?不就是读取某文件夹下所有文件,然后print出来他们的文件长度嘛。嗯,加个功能,在读取完所有文件后,打印个DONE出来。简单,这样写就行了:

var dirName = "foo";
fs.readdir(dirName, function(err, fileNames) {
	if (err) {
		throw err;
	}

	fileNames.forEach(function(fileName) {
		var fullPath = path.resolve(dirName, fileName);
		fs.readFile(fullPath, function(err, data) {
			if (err) {
				throw err;
			}

			console.log(data.length);
		});
	});

	console.log("DONE");
});

才没有行了DONE第一时间就print出来了,因为我们可爱的async,想要在正确的时候显示DONE,你得这样写:

var readCount = 0;
fileNames.forEach(function(fileName) {
	var fullPath = path.resolve(dirName, fileName);
	fs.readFile(fullPath, function(err, data) {
		if (err) {
			throw err;
		}

		console.log(data.length);

		if (++readCount == fileNames.length) {
			console.log("DONE");
		}
	});
});

先不论这种写法严格的正确与否(在有error的情况下,肯定要另说了),无缘无故多出一个变量(readCount)和一段逻辑(++if),为的只是实现一个在同步环境下十分顺理成章的操作,这本身就不可接受嘛。究其原因——nodejs以callback的方式设计async的API,就是渣决定,这话可不是我第一个说的,至于是在那个大牛的blog上最先看到的忘记了。结合开篇写到的抽象的部分,可以进一步升华出——以回调的方式抽象异步操作非常反人类——这一结论。

我不知道nodejs团队有没有承认自己的渣,只知道后来的api都多了类似readFileSync这样的对应的同步版本。多强调一次,我并不是说nodejs的异步+事件驱动的设计理念是渣,而是觉得以回调的方式提供API/抽象异步操作真心很渣

不过,勤劳勇敢的程序员有自己的特有的手段,来寻找珍贵的食材解决问题,自己写个library,拯救其他同类。让我们看看引入了npm上依赖数前五的async库,情况有何不同:

var dirName = "foo";
fs.readdir(dirName, function(err, fileNames) {
	if (err) {
		throw err;
	}

	var fullPaths = fileNames.map(function(fileName) {
		return path.resolve(dirName, fileName);
	});

	async.map(fullPaths, fs.readFile, function(err, datas) {
		if (err) {
			throw err
		}

		datas.forEach(function(data) {
			console.log(data.length);
		});

		console.log("DONE");
	})
})

async具体的玩法可以去看文档,从直观上来讲,代码好像清晰了一点,也多了点FP的高大上的味道。但是说到底,情况从“不知道何时该出现回调”到了“现在我能在all done的时候调用回调了”,有好转,但不多。

再来,用promise:

var readdirPromise = function(dirName) {
	var deferred = Q.defer();

	fs.readdir(dirName, function(err, fileNames) {
		if (err) {
			deferred.reject(err);
		} else {
			deferred.resolve(fileNames);
		}
	});

	return deferred.promise;
};

var readFilePromise = function(filePath) {
	var deferred = Q.defer();

	fs.readFile(filePath, function(err, data) {
		if (err) {
			deferred.reject(err);
		} else {
			deferred.resolve(data);
		}
	});

	return deferred.promise;
};
// below two lines are AWESOME!
// var readdirPromise = Q.denodeify(fs.readdir);
// var readFilePromise = Q.denodeify(fs.readFile);

var dirName = "foo";

readdirPromise(dirName).then(function(fileNames) {

	var readFilePromises = fileNames.map(function(fileName) {
		var fullPath = path.resolve(dirName, fileName);
		return readFilePromise(fullPath);
	});

	return Q.all(readFilePromises);
}).then(function(datas) {

	datas.forEach(function(data) {
		console.log(data.length);
	});

	console.log("DONE");
});

不看前面包装的部分,或者用Q的denodeify来封装,代码似乎更加顺眼了点,乍一看几乎是串行化地在写代码了。这也是我看来拯救callback式API最好的workaround了,注意是workaround而不是solution,这很重要!

糟,标题里面还有个Rx完全没提啊,因为我是今天才真正了解了一点Rx*,刚刚有点make sense的感觉,关于Rx*的以后再写吧。关于Rx这个文章写的很不错。

FYI:上面所有代码见这个gist

洋洋洒洒写了那么多最后还只有个workaround,那真正的solution是啥?下面有几行(我就是数体教,你打我啊)代码,你们自己感受一下:

using System;
using System.IO;
using System.Threading.Tasks;
using System.Text;

public class AsyncDemo
{
	public static void Main()
	{
		Task.Run(async ()=>{
			string readResult = await ReadAsync("foo.txt");
			int writeLength = await WriteAsync("foo1.txt", readResult);

			Console.WriteLine(writeLength);
		});

		Console.ReadLine();
	}

	public static async Task<string> ReadAsync(string filePath)
	{
		using(var fs = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.Read, 4096, true))
		{
			StringBuilder sb = new StringBuilder();

	        byte[] buffer = new byte[0x1000];
	        int numRead;
	        while ((numRead = await fs.ReadAsync(buffer, 0, buffer.Length)) != 0)
	        {
	            string text = Encoding.UTF8.GetString(buffer, 0, numRead);
	            sb.Append(text);
	        }

	        return sb.ToString();
		}
	}


	public static async Task<int> WriteAsync(string filePath,string text)
	{
		byte[] encodedText = Encoding.UTF8.GetBytes(text);

	    using (FileStream sourceStream = new FileStream(filePath,
	        FileMode.Append, FileAccess.Write, FileShare.None,
	        bufferSize: 4096, useAsync: true))
	    {
	        await sourceStream.WriteAsync(encodedText, 0, encodedText.Length);
	    }

	    return encodedText.Length;
	}
}

去掉其他奇奇怪怪的部分,只看这两行:

string readResult = await ReadAsync("foo.txt");
int writeLength = await WriteAsync("foo1.txt", readResult);

天都亮了!

FYI1:上面的代码是在mono 3.3.0的环境下测试通过了,最近也稍微看了一下mono,感觉非常不错。WaitToWriteStack.push(mono);

FYI2:别以为上面那个就是真·solution了,如果你不知道async里的坑,该错的代码一样会写错。只是说C#5.0的async/wait,只从代码层面看,已经很好了。不过,看起来好,就是最重要的。Looks good is All!