三余无梦

冬者岁之余,夜者日之余,阴雨者时之余也

Node-thunkify Code Reading

thunkify

code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
function thunkify(fn) {
    assert('function' == typeof fn, 'function required');    

    return function () {
        var args = slice.call(arguments);    
        var ctx = this;
        return function (done) {   // done 是回调函数
            var called;
            args.push(function() {
                if (called) return;
                called = true;    
                done.apply(null, arguments); 
            }); // 将回调处理加入参数列表

            try {
                fn.apply(ctx, args);   // 函数处理 
            } catch (err) {
                done(err);   // 异常获取并用回调处理 
            }
        }
    }
};

例子

1
2
3
4
5
6
7
8
var thunkify = require('thunkify');
var fs = require('fs');


var read = thunkify(fs.readFile);

read('package.json', 'utf8')(function(err, str){
});

Koa.js Code Reading

TBC 继续挖坑待填…

Koa

源码简况

  • version: 0.8.2
  • 用到的库
    • escape-html: =1.0.1
    • statuses: =1.0.1
    • accepts: 1.0.0
    • type-is: 1.1.0
    • set-type: 1.0.0
    • mime-types: 1.0.0
    • finished: 1.2.0
    • co: 3.0.2
    • debug
    • fresh: 0.2.1
    • koa-compose: 2.2.0
    • koa-is-json: 1.0.0
    • cookies: 0.4.0
    • delegates: 0.0.3
    • delegates: 0.0.3 -已读 -13/5
    • dethroy: 1.0.0
    • vary: 0.1.0
    • error-inject: 1.0.0
    • parseurl: 1.2.0
    • only: 0.0.2 -已读 -9/5
  • 运行条件: node 版本大于0.11.9

从入口说起

1
2
3
4
5
6
7
8
var koa = require('koa');
var app = koa();

app.use(function *(){
    this.body = 'Hello World';    
});

app.listen(3000);

上面是一个最简单的服务,输出Hello World; 我们用到两个koa的api

  • app.use()
  • app.listen();

下面就先从app.listen() 开始

application.js

1
2
3
4
app.listen = function () {
    var server = http.createServer(this.callback()) ;  
    return server.listen.apply(server, arguments);
}

http.createServer([requestListener]) returns a new web server object

requestListener is a function which is automatically added to the 'request' 事件

由代码可以知道 this.callback() 就是一个 requestListener;

来到app.callback

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
app.callback = function () {
    var mw = [respond].concat(this.middleware);   // 实际上 respond 就是最后一个需要执行的中间件, 负责返回数据的 
    var gen = compose(mw); 
    var fn = co(gen);  // co 和 compose 就是将一个个middle,按序执行的操作, 此时middle还没有运行
    var self = this;
    
    if (!this.listeners('error').length) this.on('error', this.onerror);
    
    return function (req, res) {
        res.statusCode = 404;    
        var ctx = self.createContext(req, res);
        finished(ctx, ctx.onerror);
        fn.call(ctx, ctx.onerror); // 这步运行了说明服务器接收到了请求, 然后一步步的运行middle 和 请求业务处理
    }
}

上面的代码基本上就解释了整个请求到回复的流程

附张图, 一图胜千言

gif

下面分解开来

  • createContext(req, res)
  • finished(ctx, ctx.onerror)
  • fn.call(ctx, ctx.onerror)

app.createContext(req, res)

  • 参数 req,res
  • 返回内容:context, context具体内容如下
  • context: Object.create(this.context)
  • context.app: request.app=response.app=
  • context.req: request.req = response.app = req
  • context.res: request.res = response.res = res
  • context.ctx: response.ctx = context
  • context.onerror: context.onerror.bind(context) this.context
  • context.originalUrl: request.originalUrl = req.url
  • context.cookie: new Cookies(req, res, this.keys) –> cookie TODO
  • context.accept: request.accept = accepts(app) –> accepts

finished(ctx, ctx.onerror)

TBC

Lazy.js Code Reading

Lazy.js

在看源码前, 可以先看下Lazy.js的基本思想 Lazy.js 的设计模式)

Lazy

1
2
3
4
5
Lazy([1, 2, 4]) // in [1, 2, 4] -> out instanceof Lazy.ArrayLikeSequence 
Lazy({ a: 'b'})
Lazy("hello world")
Lazy()
Lazy(null)
  • 输入参数 Array | object | string | | null
  • 输出 { source: [1, 2, 4] } | { source: {‘a’: b} } | { source: “hello world”} | { source: undefined } | { source: null }
  • 可以看到输入的参数被打包成相应的对象, 而这些对象都继承自 sequence

Sequence

sequence 对象提供对 0或者更多连续元素的集合 的统一的 API 封装.为什么所有的操作需要一个 sequence. 看下面的例子

1
2
3
4
5
var seq = Lazy(source) // 1st sequence
        .map(func) // 2nd MappedSequence
       .filter(pred) // 3rd FilteredSequence
        .reverse() // 4th ReversedSequence
seq.each(function(x) { console.log(x); })

上面这个例子中 前四步除了创建对应的sequece没有做任何的遍历source或者别的操作。 只有在第5步调用 each 时,将一次性按照鍊條(chain)的順序处理source 得到最后的结果。所以lazy做的就是延迟遍历处理数据.

in fact, when i think about the performance of underscore and lazy.js; i cann’t understand why lazy is faster. lazy.js: 1 2s 3 underscore: 1 2 3 1 2 3 1 2 3 . so what’s the difference. lazy.js just hold off some process; i cann’t get it…. so continue to read code. >_<

1
function Sequence() {} # 创建Sequence构造函数

TBC

Generic-pool Reading

node-pool

why pool

  • 与数据建立连接关闭连接, 每次建立一个连接对象会有消耗!(在并发很高的情况消耗会更明显)

what node-pool can do ?

  • 动态调整连接数的连接池, 数据库频繁读写时, 就建多个连接, 当然连接数有最大值Max, 如果读写实际需要的连接数 > Max 那么加入队列里;如果数据库读写空闲,就释放多余的连接;

how node-pool do that ? –> Code Reading

  • 栗子

运行上面的例子

  1. 新建一个pool对象, 初始化一些变量
    • idleTimeoutMillis: 一个连接空闲时间最大值, 如果设置为30000, 那么一个连接空闲30000ms 后会自动关闭, 默认 30000ms
    • reapIntervalMillis: 每reapIntervalMillis检查空闲并移除, 默认1000ms
    • max: 连接池存在的最大连接数, 例子里设置为10
    • min: 连接池存在的最小连接数, 例子里默认为0(备注: 这里有个疑问,当我将min设为1, 从mongodb log 中可以看到 在空闲时间 连接是每隔idleTimeoutMillis关闭并新建连接, 问题是为什么不保持一个连接而要不停的关闭新建, 如果这样不如)
    • log: 可以自定义node-pool, 例子直接设为true 默认用node-pool提供的log
    • create: 应该创建一个item(db) 被 acquired, 然后调用其创建的 item 作为参数
    • destory: 在items毁之前,关闭所有资源使用的item(db)
  2. ensureMinimum, 确保有最小连接数
  3. 例子中如果给min赋一个大于0的值, 2, 那么就 createResource()两次
  4. 因为例子默认为0 跳过
  5. 至此已经建立pool, 下一步等待acquired, 资源准备就续, 就等消费了
  6. curl http://127.0.0.1:8080
  7. acquire(callback, priority)
    • waitingClients.enqueue(callback, priority): 将回调根据priority推入队列
    • dispense(): 由单词意思可知分配资源, 施行; 试着拿一个客户端工作, 清除空闲的item. 如果有等待的client, shift(), call its callback. 如果没有等待的client, 创建一个; 如果创建一个client将超过max, 把客户端加入等待list!代码见 dispense
  8. dispense() 结束,此时 waitingClients.size() = 0, availableObjects.length = 0
  9. release(): 如果client不再需要,将之返回pool, 代码见release
  10. removeIdle(): check and removes the available clients that have timed out. 见removeIdle

  11. me.destroy(): client to be destroyed.见destroy

dispense

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
function dispense() {
    var obj = null,
    objWithTimeout = null,
    err = null,
    clientCb = null,
    waitingCount = waitingClients.size();
    log("...");
    if (waitingCount > 0) { //此时waitingCount = 1
        while(availableObjects.length > 0) {
            log("...");
            objWithTimeout = avaiableObjects[0];
            if (!factory.validate(objWithTimeout.obj)) {
                me.destroy(objWithTimeout.obj);
                continue;
            } // 这一步验证对象
            avaiableObjects.shift(); // LIFO
            clientCb = waitingClients.dequeue(); // dequeue
            return clientCb(err, objWithTimeout.obj); // callback
        }
        if (count < factory.max) {
                createResource();
        }
    }
}

release

1
2
3
4
5
6
7
8
9
10
11
12
13
14
me.release = function(obj) {
    // 确保对象已经被释放
    if (availableObjects.some(function(objWithTimeout) {
        return (objWithTimeout.obj === obj);
        })) {
        log("release called twice for the same resource")
        return;
    }
    var objWithTimeout = { obj: obj, timeout: (new Date().getTime() + idleTimeoutMillis) };
    availableObjects.push(objWithTimeout); // 此时 availableObjects.size()=1
    log("timeout:..");
    dispense();
    scheduleRemoveIdle(); // 计划移走idle items(空闲项目) 延迟1000ms 执行 removeIdle
}

removeIdle

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
 function removeIdle(){
    var toRemove = [],
        now = new Date().getTime(),
        i,
        al, tr,
        timeout;
    removeIdleScheduled = false;
    // go through the available(idle) items, check if they have timed out;
    for (i=0, al=availableObjects.length; i< al &&
        (refreshIdle) || (count - factory.min > toRemove.length));i+=1
        ){
        timeout = availableObjects[i].timeout;
        if (now >= timeout) {
           //client timed out, so destroy it 
           toRemove.push(availableObjects[i].obj);
            }
        }
        for (i=0, tr=toRemove.length; i<tr; i+=1) {
            me.destroy(toRemove[i]);
        }
        al = availableObjects.length;
        if (al > 0) {
           log("..");
           scheduleRemoveIdle();
        } else {
            log("removeIdle() all objects removed", 'verbose');
        }
     }

destroy

1
2
3
4
5
6
7
8
 me.destroy = function(obj) {
    count -= 1;
    availableOjbects = availableObjects.filter(function(objWithTimeout) {
       return (objWithTimeout.obj != obj)
        });//过滤掉return true 的对象
    factory.destroy(obj);
    ensureMinimum();
     }

Tornado Code Reading - tornado.concurrent

Future背景

Future 代表一个函数的调用结果. 函数返回一个值或者抛出一个异常, 所以future 包含一个值或者一个异常(可以通过future.result()获得这个结果,异常和返回值). Fuures存在与对应的函数结束前. 在 一个 多线程场景下,简单的调用 future.result()等待另外一个线程或者进程完成。在 异步场景下,你可以附带一个callback给future为了当调用结束可以得到通知。(with future.add_done_callback or io_loop.add_future)

了解 Future

Futures 已经在Python3.2应用了 concurrent.futures , 如果在python3.2之前版本用 可以 (pip install futures). 那在 tornado 中如果可以将用就用python包,否则将会使用一个兼容的类 tornado.concurrent.Future

class tornado.concurrent.Future

这个类封装了异步操作的结果。在同步程序中, Futures被用来等待一个线程或者进程池的结果。在Tornado一般用在 IOLoop.add_future 或者 在一个 gen.coroutine

1
2
3
4
5
6
7
"""
如果有 concurrent.futures 可用,那就用它; 否则用 _DummyFuture 
"""
if futures is None:
    Future = _DummyFuture
else:
    Future = futures.Future

_DummyFuture

  • result
1
2
3
4
5
6
7
8
"""
先检查有没有完成 ,没有完成 ,直接抛出异常!如果有异常就raise 异常,否则返回 result
"""
def result(self, timeout=None):
  self._check_done()
  if self._exception:
      raise self._exception
  return self._result
  • exception

这个函数先与result唯一的不同是 无异常的时候 return None ;

  • add_done_callback(fn)
1
2
3
4
5
6
7
8
"""
给Future加入一个回调。当它完成将会 以Future为参数调用回调函数. 
"""
def add_done_callback(self, fn):
  if self.done:
      fn(self)
  else:
      self._callbacks.append(fn)

TracebackFuture

存储 异常的追踪

DummyExecutor

1
2
3
4
5
6
7
8
9
10
class DummyExecutor(object):
  def submit(self, fn, *args, **kwargs):
      future = TracebackFuture()
      try:
          future.set_result(fn(*args, **kwargs))
      except Exception:
          future.set_exc_info(sys.exc_info())
      return future
  def shutdown(self, wait=True):
      pass

run_on_executor

1
2
3
4
5
6
7
8
9
10
11
12
13
def run_on_executor(fn):
    """
    异步的跑同步方法
    """
    @functools.wraps(fn):
    def wrapper(self, *args, **kwargs):
        callback = kwargs.pop("callback", None)
        future = self.executor.submit(fn, self, *args, **kwargs) #self.executor  理解为线程池
        if callback:
            self.io_loop.add_future(future, \
                lambda future: callback(future.result()))
        return future
    return wrapper

这里有关于run_on_executor的应用例子 https://gist.github.com/zs1621/7921770

return_future

  • what use: 让函数通过回调返回一个Future
  • how use: @return_future

看 tornado 源码的 test文件 concurrent_test.py;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class ReturnFutureTest(AsyncTestCase):
    @return_future
    def sync_future(self, callback): # 同步future
        print (callback, '+++++++++++++++') # d -> 对应log 的d
        callback(42)

    @return_future
    def async_future(self, callback): #异步future
        print (callback, '+++++++++++++++') # d -> 对应 log 的 d
        self.io_loop.add_callback(callback, 42)

    def test_sync_future(self): #测试同步 
        future = self.sync_future()
        self.assertEqual(future.result(), 42)

    def test_async_future(self): #测试异步
        future = self.async_future()
        self.assertFalse(future.done())
        self.io_loop.add_future(future, self.stop) #add_future
        future2 = self.wait()
        self.assertIs(future, future2)
        self.assertEqual(future.result(), 42)

联合 concurrent.py –> return_future

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def return_future(f):
    replacer = ArgReplacer(f, 'callback') # 1 

    @fuctools.wraps(f)
    def wrapper(*args, **kwrags):
        print (args, kwrags, "argsssssssss") #a 对应下面的 log -> a
        future = TracebackFuture() # 2
        callback, args, kwargs = replacer.replace(
            lambda value=_NO_RESULT: future.set_result(value),
            args, kwargs) # 1  替代 f 函数的 `callback`,  
        print (callback, args, kwargs, 'callback') #b 对应下面的 log -> b

        def handle_error(typ, value, tb):
            future.set_exc_info((typ, value, tb))
            return True

        exc_info = None
        with ExceptionStackContext(handle_error): # 4
            try:
                result = f(*args, **kwargs)
                print (result, 'result-------------------') #c 对应下面的 log -> c
                if result is not None:
                        raise ReturnValueIgoredError(
                            "@return_future should not be used with functions"
                            "that return values")
            except:
                exc_info = sys.exc_info()
                raise
        if exc_info is not None:
            raise_exc_info(exc_info)

        if callback is not None:
            def run_callback(future):
                result = future.result()
                print (future, "+__+_+_+_+_") #e -> 对应下面 LOG -> e
                if result is _NO_RESULT:
                    callback()
                else:
                    callback(future.result())
            future.add_done_callback(wrap(run_callback)) # 6
        return future
    return wrapper

分几种情况 1. 同步无回调 2.同步有回调 3.异步无回调 4.异步有回调

  • 同步无回调LOG f(args, kwargs): future = sync_process()
    • a — (<test_return_future.ReturnFutureTest testMethod=test_no_callback>,) {} argsssssssss
    • b — None (<test_return_future.ReturnFutureTest testMethod=test_no_callback>,) {‘callback’: at 0xb6ba8f0c>} callback
    • c — None result—————————
    • d — (function at 0xb6ba8f0c>) +++++++++++++

从 a-b 可以理解 replacer.replace 的作用: 提取callback 的值, 并将callback 放入kwargs; 由c 可以知道 f() 函数是不会return 的; f()的结果只能由 future.result() 得到, 只要知道reuturn_future 是返回Future 本身是不会return 的, 如果return 就会报错; 由d 可知匿名函数fuction <lambda> at 0xb6ba8f0c赋值给了callback,而这个匿名函数的作用就是set_result

  • 同步有回调LOG f(args, kwargs, callback): future = sync_process() callback(future)
    • a — (<test_return_future.ReturnFutureTest testMethod=test_callback_kw>,) {‘callback’:>} argsssssssss
    • b> (<test_return_future.ReturnFutureTest testMethod=test_callback_kw>,) {‘callback’: at 0xb6bd8f44>} callback
    • c — None result—————————
    • d — 额额 function at 0xb6bd8f44 +++++++++++++
    • e — 额额 Future at 0xb6b9cc8cL state=finished returned int +++++

与无回调相比; 明显多出 e log; run_callback 就是将 future.result()作为callback的参数运行;

  • 异步无回调LOG f(args, kwargs): future = async_process()
  • a — (<test_return_future.ReturnFutureTest testMethod=test_async_future>,) {} argsssssssss
  • b — None (<test_return_future.ReturnFutureTest testMethod=test_async_future>,) {‘callback’: at 0x8518f44>} callback
  • c — None result—————————
  • d — (function at 0x8518f44) ++++++++++++++

与同步无回调相比; 看test_async_future(self), 将 f() 获得的 future –> self.io_loop.add_future(future, self.stop) –> self.stop(future) –> 最后通过 self.wait()获得的结果 就是 例子中42 . 现实中一般将 return_future 与 gen.engine 联合使用 , 通过在 gen.engine –> yield f() 获得结果

Tornado Code Reading - IOLoop

IOLoop

对着IOLoop 的源码瞅了一天,楞是没有明白, 为什么 父类实例可以调用子类方法?

最后看到configurable tornado 解读

看一下可配置接口的实现

configurable两函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@classmethod
def configurable_base(cls):
  return IOLoop

@classmethod
def configurable_default(cls):
  if hasattr(select, "epoll"):
      from tornado.platform.epoll import EPollIOLoop
      return EPollIOLoop
  if hasattr(select, "kqueue"):
      #python 2.6+ on BSD or Mac
      from tornado.platform.kqueue import KQueueIOLoop
      return KQueueIOLoop
  from tornado.platform.select import SelectIOLoop
  return SelectIOLoop

Configurable类是可配置接口的父类, 可配置接口对外提供一致的接口标志, 但它的子类实现可以在运行时进行configure。一般跨平台时由于子类实现有多种选择, 这时候就可以使用配置接口, 例如 select 和 epoll。首先注意 Configurable 的两个函数: configurable_base 和 configurable_default, 两函数都需要被子类(即可配置接口类)覆盖重写。其中, base函数一般返回接口类自身, default 返回接口的默认子类实现, 除非接口指定了 __impl_class。IOLoop及其子类实现都没有实现初始化函数也没有构造函数, 七构造函数继承于 Configurable, 如下::

1
2
3
4
5
6
7
8
9
10
11
12
13
def __new__(cls, **kwargs):
  base = cls.configurable_base()
  args = {}
  if cls is base:
      impl = cls.configured_class()
      if base.__impl_kwargs:
          args.update(base.__impl_kwargs)
  else:
      impl = cls
  args.update(kwargs)
  instance = super(Configurable, cls).__new__(impl)
  instance.initialize(**args)
  return instance

当子类对象被构造时, 子类new被调用, 因此参数里的cls 指的是Configurable的子类(可配置接口类, 如IOLoop)。先得到base, IOLoop代码 可知, configurable_base返回的是自身类。由于 base 和 cls 是一样的, 所以调用 configured_class() 得到接口的子类实现(见configured_class) 其实就是调用 base的 configurable_default(?????TBD), 就是返回一个子类实现(epoll/kqueue/select之一),顺便把impl_kwargs合并到args 里 。然后调用Configurable类的父类(Object)的 new__方法, 生成一个impl的对象, 紧接着把args当参数调用该队想的initialize(继承PollIOLoop) , 返回该对象。 所以, 当构造IOLoop对象时, 实际得到的是EPollIOLoop或其它相关子类。可以看出, Configurable 类主要提供构造方法, 相当于对象工厂根据配置来生产对象, 同时开放configure接口以供配置。而子类按照约定调整配置即可得到不同对象, 代码得到了复用 或其它相关子类。可以看出, Configurable 类主要提供构造方法, 相当于对象工厂根据配置来生产对象, 同时开放configure接口以供配置。而子类按照约定调整配置即可得到不同对象, 代码得到了复用

上面的过程如果不好太理解 可以去看 example 这样大致能理解 ioloop 实例的初始化过程

configured_class

1
2
3
4
base = cls.configurable_base()
if cls.__impl_class is None:
  base._impl_class = cls.configurable_default()
return base.__impl_class

===

上面主要解释了 IOLoop 为什么能调用子类方法 以及 可配置接口的实现 下面来看 IOLoop 的对象 instance

IOLoop 实现了单例的概念, 具体见 IOLoop单例

理解了上面的概念 接着 TCPServer 最后的 add_handle !其实 此时的 object 已经是确定的 EOLoop 或者 Kqueue 的对象! 这里的 add_handle 是 它们的父类 POLoop 的 方法, 这明显就是继承了!

add_handler 代码如下, 首先把 处理方法的上下文 存入 handlers ,等调用时再恢复, 这个机制是 statck_context 见 statck_content 做到的。 第二步 先来看下 self.impl 从哪里来 –> self._impl = impl 此时需要知道是谁调用 initialize –> 这里初始化是在构造函数 new 里调用的, instance.initialize(**args)此时的instance 为 EPollIOLoop 实例 –> super(EPollIOLoop, self).initialize(impl=select.epoll, **kwargs) –> EPollIOLoop 的父类的 initialize() 很明显 impl 为 select.epollepoll

1
2
3
 def add_handler(self, fd, handler, events):
  self._handlers[fd] = statck_context.wrap(handler)
  self._impl.register(fd, events | self.ERROR)

这步呢 就是把 监听 fd 和 accept_handler方法进行关联, 至此事件分发到此就结束了

======

下面来看下 IOLoop 的主循环 start()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def start(self):
  if not logging.getLogger().handlers:
      logging.basicConfig
  if self._stopped:
      self._stopped = False
      return
  old_current = getattr(IOLoop._current, "instance", None)
  IOLoop._current.instance = self
  self._thread_ident = thread.get_ident() #Return the ‘thread identifier’ of the current thread. This is a nonzero integer
  self._running = True

  old_wakeup_fd = None
  if hasattr(singal, 'set_wakeup_fd') and os.name == 'posix':
      try:
          old_wakeup_fd = signal.set_wakeup_fd(self._waker.write_fileno())
          if old_wakeup_fd != -1:
              signal.set_walkeup_fd(old_wakeup_fd)
              old_walkup_fd = None
      except ValueError:
          pass

上面这段代码 TBD

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
while True:
  poll_timeout = _POLL_TIMEOUT
  #Prevent IO event starvation by delaying new callbacks 
  # to the next iteration of the event loop.
  with self._callback_lock:
      callbacks = self._callbacks
      self._callbacks = []
  for callback in callbacks:
      self._run_callback(callback)
  # Closures may be holding on to a lot of memory, so allow
  # them to be freed before we go into our poll wait.
  callbacks = callback = None

  if self._timeouts:
      now = self.time()
      while self_timeouts:
          if self._timesouts[0].callback is None:
              # the timeout was cancelled
              heapq.heappop(self._timeouts)
              self._cancellations -= 1
          elif self._timeouts[0].deadline <= now:
              timeout = heapq.heappop(self._timeouts)
              self._run_callback(timeout.callback)
              del timeout
          else:
              seconds = self._timeouts[0].deadline - now
              poll_timeout = min(seconds, poll_timeout)
              break

Tornado Code Reading-TCPServer

TCPserver

一个非阻塞, 单进程的 TCP 服务器

为了使用 TCPServer, 定义一个子类改写其中的 handle_stream 方法

如果让服务器通过 SSL 传输, 如下例

1
2
3
4
TCPServer(ssl_options={
  "certfile": os.path.join(data_dir, "mydomain.crt"),
  "keyfile": os.path.join(data_dir, "mydomain.key"),
})

TCPServer simple single-process::

  • listen: 监听单个进程
1
2
3
server = TCPServer()
server.listen(8888)
IOLoop.instance().start()
  • bind / start: simple multi-process::
1
2
3
4
server = TCPServer()
server.bind(8888)
server.start(0) #Forks multiple sub-process
IOLoop.instance().start()

如果使用start接口, 一个 .IOLoop 不可以传入 TCPServer 结构里。 start将总是开始服务在默认的单例 .IOLoop

  • add_sockets: 高级多进程::
1
2
3
4
5
sockets = bind_sockets(8888)
tornaado.process.fork_processes(0)
server = TCPServer()
server.add_sockets(sockets)
IOLoop.instance().start()

add_sockets 接口更为复杂, 但是和tornado.process.fork_processes使用将会更明了, add_sockets也可以用在单进程服务中 , 如果你想要创建你的监听套节字而不是~tornado.netutil.bind_sockets


以上是怎么应用 下面看源码

初始化

1
2
3
4
5
6
7
8
def __init__(self, io_loop=None, ssl_options=None, max_buffer_size=None):
  self.io_loop = io_loop
  self.ssl_options = ssl_options
  self.sockets = {}
  self._pending_sockets = []
  self._started = False
  self.max_buffer_size = max_buffer_size
  #下面是检验 SSL 文件,此处略

listen

在给定的端口接受连接 这个方法可能被调用多次为了监听多个端口。listen 立即生效; 之后不必调用TCPServer.start , 但是, ,IOLoop 是必要的

1
2
sockets = bind_sockets(port, address=address)
self.add_sockets(sockets)

源码里的 bind_socketsbind_sockets, add_sockets 见下

add_sockets

让服务接受多个连接

sockets 参数是一个socket数组, add_sockets一般和 tornado.process.fork_processes联合使用 为了控制 多进程服务的启动

1
2
3
4
5
6
if self.io_loop is None:
  self.io_loop = IOLoop.current()

for sock in sockets:
  self._sockets[sock.fileno()] = sock
  add_accept_handler(sock, self._handle_connection, io_loop=self.io.loop) 

上面的 add_accept_handler 见 add_accept_handler add_accept_handler 第二个参数 self._handle_connection是个回调函数, 分析如下

_handle_connection在接受客户端的连接处理结束后会被调用,调用时传入连接和ioloop对象初始化 IOStream,用于对客户端的异步读写;然后调用 handle_stream(注意这里的handle_stream 文档说了如果你只是用tcpserver那么,handle_stream得自己重写, 如果用tornado的httpserver 那handle_stream 在 httpserver), 传入创建的IOStream对象初始化一个HTTPConnection, HTTPConnection 封装了IOStream 的一些操作, 用于处俩 HTTPRequest 并返回。 至此 HTTPServer的创建、启动、注册回调函数过程结束

1
2
3
4
5
6
7
8
9
#如果self.ssl_options不为空,处理ssl代码略
try:
  if self.ssl_options in not None:
      stream = SSLTOSTREAM(connection, io_loop=self.io_loop, max_buffer_size=self.max_buffer_size)
  else:
      stream = IOSTream(connection, io_loop=self.io_loop, max_buffer_size=self.max_buffer_size)
  self.handle_stream(stream, address)
except Excetpion:
  app_log.error('Error in connection callback', exc_info=True)

从上面的分析和源码 可知 服务器的工作流程 socket->bind->listen创建 listen socket 监听客户端, 并将每个listen socket 的 fd 注册到IOLoop的单例实例中; 当 listen socket 可读时回调 _handle_events 处理客户端请求;在与客户端通信的过程中使用 IOStream 封装读写缓冲区, 实现与客户端的异步读写。 下面我们将具体了解listen socket 的 fd 被注册到IOLoop的单例实例中 见 IOLoop


netutil

bind_sockets – 解释:创建监听套节字绑定到给定的端口和地址

  • 参数

    • address: IP地址或主机名。如果是主机名,服务将会监听所有跟此域名有关的IP
    • Family: socket.AF_INETsocket.AF_INET6
    • backlog: 这个参数跟socket.listen()<socket.socket.listen>一样
    • flags:
  • 代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
sockets = []
if address == "":
  address = None
if not socket.has_ipv6 and family == socket.AF_UNSPEC:
  family = socket.AF_INET
if flags is None:
  flags = socket.AI_PASSIVE
  #见 [getaddrinfo](https://github.com/zs1621/pythostudy/wiki/socket)
  #`getaddrinfo`会返回服务器的所有网卡信息, 每个网卡都要监听客户端的请求并返回创建的sockets
for res in set(socket.getaddrinfo(address, port, family, socket.SOCK.STREAM, 0, flags)):
  af, socktype, proto, canoname, sockaddr = res
  try:
      #创建套节字
      sock = socket.socket(af, socktype, proto)
  except socket.error as e:
      if e.args[0] == errno.EAFNOSUPPORT:
          continue
      raise
  set_close_exec(sock.fileno())
  if os.name != 'nt':
      #TCP连接中,recv等函数默认为阻塞模式(block),
      #即直到有数据到来之前函数不会返回,
      #而我们有时则需要一种超时机制使其在一定时间后返回而不管是否有数据到来,
      #这里我们就会用到setsockopt()函数 见 [setsockopt](http://blog.chinaunix.net/uid-25749806-id-348637.html)
      sock.setsockopt(socket.SOL_SOCKET, socket.SOREUSERADDR, 1)
  #在linux ipv6 sockets 也接受 ipv4,这样的话不能同时绑定 ipv4 和 ipv6.为了方便,在ipv6 sockets中总是禁用ipv4 
  if af == socket.AF_INET6:
      if hasattr(socket, "IPPROTO_IPV6"):
          sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY)
  sock.setblocking(0)
  sock.bind(sockaddr)
  #默认设定等待被处理的连接最大个数
  sock.listen(backlog)
  sockets.append(sock)
return sockets

add_accept_handler: 添加一个IOLoop事件去接受新的连接在 sock

当一个连接被接受了, callback(connection, address)(connection是socket对象, address是连接的另外结尾处的地址)将会运行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
if io_loop is None:
  io_loop = IOLoop.current()
def accept_handler(fd, events):
  while True:
      try:
          connection, address = sock.accept()
      except socket.error as e:
          #EWOULDBLOCK 和 EAGAIN 表明我们
          #已经接受了每个可以接受的连接
          #具体见[EWOULDBLOCK](http://stackoverflow.com/questions/3647539/socket-error-errno-ewouldblock)
          if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
              return
          #ECONNABORTED 表明有个连接还在接受队列时被关闭了
          if e.args[0] == errno.ECONNABORTED:
              continue
          raise
      callback(connection, address)
io_loop.add_handler(sock.fileno(), accept_handler, IOLoop.READ)

Visionmedia/send Reading

send入口

1
exports = module.exports = send;

send 函数返回 SendStream 构造函数

1
2
3
function send(req, path, options) {
  return new SendStream(req, path, options);
}

来看SendStream

  • 参数
    • req 即 request
    • path : string 路径
    • options:
1
2
3
function SendStream(req, path, options) {
  ....
}
  • 继承 Stream.prototype
1
SendStream.prototype.__proto__ = Stream.prototype;
  • 下面都是 SendStream 原型链的方法

    • hidden: 赋值 this._hidden, return this
    • index: 赋值默认的 index path, this._index, return this
    • root: 赋值根路径, this._root
    • maxage: 赋值最大缓存时间 , this._maxage
    • error: 根据status–>触发error
    • redirect: 如果有监听事件directory的监听者 那么触发directory
    • isMalicious: 检测pathname是否有潜在的问题,判断方法如果 没有_rootpath包含.. 那么 就是有异常的路径;
    • hasTrailingSlash: 判断路径最后一位是不是/
    • hasLeadingDot: 就是文件是否有后缀 形如aa.html
    • isCachable: 是否缓存, 判断方法 (res.statusCode >= 200 && res.statusCode < 300) || 304 == res.statusCode
    • isFresh: 判断缓存是否是新的 判断方法 引用模块 fresh
    • removeContentHeaderFields 去除包含 content的头 key
    • onStatError:
  • 主体方法 pipe , 参数 res

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
var self = this
, args = arguments
, path = this.path
, root = this._root

this.res = res;
//判断uri的有效性
path = utils.decode(path);
if (-1 == path) return this.error(400);

//如果`path`里包含`\0` ,表明`path`为空
if (~path.indexOf('\0')) return this.error(400);

//连接 this._root 和 path
if (root) path = normalize(join(this._root, path));

//如果异常路径禁止访问
if (this.isMalicious()) return this.error(403);

//此时`path`已经是 `this._root`和 `path`的结合, 所以此种情况不会出现,如果有root
if (root && 0 != path.indexOf(root)) return this.error(403)

//如果_hidden为`false`,那么不支持隐藏文件,此时文件如果以`.`开头那么就不支持 
if (!this._hidden && this.hasLeadingDot()) return this.error(404);

//index 文件支持
if (this._index && this.hasTrailingSlash()) path += this._index

//最后看path 的信息
fs.stat(path, function(err, stat){
  if (err) return self.onStatError(err); //上面已介绍
  if (stat.isDirectory) return self.redirect(self.path); //见上
  self.emit('file', path, stat); //见下
  self.send(path, stat);//见下
});
  • 方法 send
    • path: 路径
    • stat: 文件状态
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
var options = this.options;
var len = stat.size;
var res = this.res;
var req = this.req;
var ranges = req.headers.range; //截取部分文件之用
var offset = options.start || 0;

//赋值header
this.setHeader(stat);

//赋值 content-type
this.type(path);

//条件 GET 支持, isFresh() 见 [fresh](https://github.com/visionmedia/node-fresh/blob/master/index.js)
if (this.isConditionGET()
&& this.isCachable()
&& this.isFresh()) {
  return this.notModified();
}

//
len = Math.max(0, len - offset);
if (options.end != undefined) {
  var bytes = options.end - offset + 1;
  if (len > bytes) len = bytes;
}

// Range 支持
if (ranges) {
  ... 
}

// content-length
res.setHeader('Content-Length', len);

// HEAD support
if ('HEAD' == req.method) return res.end();

this.stream(path, options); //见下
  • 方法 stream
    • 参数
      • path : 路径
      • options
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
var self = this;
var res = this.res;
var req = this.req;

//pipe 把流数据加入 `res`管道
var stream = fs.createReadStream(path, options);
this.emit('stream', stream);
stream.pipe(res);

//socket 关闭, 

req.on('close', stream.destroy.bind(stream));

//error处理
steam.on('error', function(err){
  //不回复
  if (res._header) {
      console.error(err.stack);
      req.destroy();
      return;
  }
  erro.status = 500;
  self.emit('error', err);
});

//end
stream.on('end', function(){
  self.emit('end');   
});

Tornado Code Reading

服务器建立

HTTPServer

应用

1
2
3
4
5
6
application = web.Application([
(r"/", MainPageHandler),  
])
http_server = httpserver.HTTPServer(application)
http_server.listen(8080)
ioloop.IOLoop.instance().start()

对照应用例子理解源码

1
2
3
4
5
6
7
8
def __init__(self, request_callback, no_keep_alive=False, io_loop=None,
                 xheaders=False, ssl_options=None, protocol=None, **kwargs):
        self.request_callback = request_callback
        self.no_keep_alive = no_keep_alive
        self.xheaders = xheaders
        self.protocol = protocol
        TCPServer.__init__(self, io_loop=io_loop, ssl_options=ssl_options,
                           **kwargs) #HTTPServer 继承自 TCPServer, 初始化TCPServer

参数

  • request_callback: 必须产生一个http回复, 例子中 application 即是回复
  • xheaders: True(支持通过x-real-ipx-forwarded-for获取ip) False(当torando之前有反向代理或者负载均衡self.request.remote_ip只能获得127.0.0.1)
  • ssl_options: ssl传输数据

ssl_options 使用例子

1
2
3
4
HTTPServer(applicaton, ssl_options={
   "certfile": os.path.join(data_dir, "mydomain.crt"),
   "keyfile": os.path.join(data_dir, "mydomain.key"),
  }) 

下面应该说下 TCPServer 主体内容在 TCPServer,

TCPServer