Jacky Liu's Blog
Python asyncore / asynchat 基本传输实验
---- 自从上回实验了 Python socket 的基本传输之后又受了些启发,于是想试试基于 non-blocking socket 的通信机制。Python 标准库里的 asynchat 自然是第一步。昨天写的实验程序实现了基于 asynchat 的基本通信,把前因后果总结一下:
实验前的考虑
---- 用 non-blocking socket 最基本的考虑自然是传输效率,尤其是遇到一大坨数据过来的时候,希望它能尽快传送完,这时最好其它线程通通都停掉,在传完之前不要再无谓地换来换去(反正其它线程此时应该也没甚实际工作要干),当然这只是愿望而已。既然做不到,就只能追求尽量快速的传输方式了。
---- 查资料得出的印象是 non-blocking socket 当然比较快,但是好的东西总是有代价,这是源于我个人的印象。因为非阻塞意味着没有延时,而没有延时环节的无限循环跑起来可能会很耗 CPU。但是另一方面来讲,既然非阻塞 socket 已经用得这么多,意味着底层实现很可能已经解决了这个问题,我的担心很可能是不成立的。这个还是要试试才知道。
结果总结
1. 关于 CPU 使用
---- 实验的结果,当服务端非正常退出以后,使用 asynchat 的客户端好像就会陷入死循环,没有任何异常的迹象,所以此时 polling loop 应该还是在运行的,而 CPU 使用并没有明显高于平时的水平,所以 non-blocking socket + polling loop 应该对 CPU 并没有特别大的压力,当然如果开十个 polling loop 可能会是另外一回事。
2. 对坏连接的处理
---- 关于死循环,这实际上就是一个网络通信模块有没有能力对付坏连接的问题。如果用 blocking socket,上一篇已经讲了,会抛出异常或返回 b'',编程时就可以做相应处理,而死循环显然不是个很讨喜的行为方式。我想了一下如果直接用底层的 select() 函数时会发生什么。因为我曾经在进程间通信的特性中体验过 select() / pooling loop 的用法,这里用作网络传输时应该也差不多。
---- 如果一个进程(或者网络连接)还没吱一声就挂了,那么与它相关的 fd(或 socket)应该仍然会被 select() 放在有效列表里返回,但是后续的 read()(或 recv())操作就会读到一个空的对象,这时你就知道这个进程或连接已经挂了。再看 asynchat 提供的那些寥寥的接口: 好像 collect_incoming_data() 跟 recv() 也差不多,能不能在里面判断,当 data 参数是空的就说明连接已经挂了呢?试了一下,不行。collect_incoming_data() 的 data 参数总是有效的,也就是说读不到内容时这个函数根本不会被调用。
---- 如果再要深究下去大概要去看 asynchat.py 了,但问题是 asynchat 完全是用 Python 写的而且只有几百行而已,它应该木有那么多考虑在里面,我想有这功夫还不如直接去试验 select()。
3. 在运行中更新连接
---- 另一个很重要的问题,我觉得网络通信模块应该提供在 pooling loop 运行的时候动态添加或删除连接的功能,但是很显然 asynchat 并没有这个特性的接口。试着动点脑筋 —— 因为连接对象是通过 asyncore.loop() 的一个参数: map 来传递的,能不能动态修改它呢?文档里说了 map 参数是普通的 dict 类型,dict 当然没有线程安全性,也就是说从 asyncore.loop() 所在的线程之外的地方动它肯定会让 loop() 崩溃。如果在同一个线程内呢?行不行呢?这大概又要去看 asyncore.py 了,卟啦卟啦 。。。如果有这功夫,卟啦卟啦 。。。
所以,总结的总结:
---- 我觉得上面的 2 跟 3 对于一个网络传输组件来说应该算是很基本的功能。如果缺了这两个功能,那这个网络传输组件应该没太大实际用处,大概只能用在一次性程序里。但是实话讲我对异步网络传输的了解还浅得很,俩月前我还不知道 socket 是什么,也不知道局域网该怎么连,所以以上这些都不是很确定。眼下先跑起来,以后有机会再修正吧。
---- 实验程序:
# -*- encoding: utf-8 -*- ''' 实验平台: Ubuntu 12.04 LTS / Linux 3.2.0-25-generic-pae 实验目标: 1. asynchat 模块的基本传输机制 2. 让服务端在未关闭 socket 的情况下终止运行,模拟连接异常中断的情况,检视后续行为。 实验过程: 1. 另起一个进程作为服务端,服务端仍然用 blocking socket + 多线程的方式,针对每个连接开启一个 发送线程和一个接收线程。服务端的任务是向客户端发送数据(一次 3 Mb)并接收客户端的回执。 2. 主进程内建立客户端对象并向服务端发起指定数目的连接。客户端使用 asynchat 模块,背后的机制 是 non-blocking socket + select()。连接建立以后向服务端发送一条 'start' 消息,通知服务端开始 发送,之后从服务端接收数据,收完一条之后发送一个回执。 3. 服务端完成指定次数的发送以后,在未关闭 socket 的情况下退出运行,检视后续的行为。 实验结果: 1. 客户端使用 asynchat 模块实现了基本传输功能。服务端非正常退出以后 CPU 使用一直保持低水平, 此时 polling loop 应该是在运行的,所以 non-blocking socket + polling loop 的方式应该对 CPU 并没有太大压力。 2.1. 如果服务端发送线程选择未关闭 socket 就退出,那么 asyncore.loop() 就会死循环,不抛出异常 2.2. 如果服务端发送线程选择先关闭 socket 再退出,那么 asyncore.loop() 就会抛出异常 ''' import asyncore import asynchat import socket import multiprocessing import time import threading __host_server__= '127.0.0.1' __port_server__= 9000 __host_client__= '127.0.0.1' __消息内容__= b'head' + b'.' * (3 * 2**20 - 8) + b'tail' # 一次 3 Mb __结束标志__= b'\r<--EOM-->\r' __发送次数__= 3 __连接个数__= 3 __接收缓存__= 2**15 __消息编码__= 'utf-8' #================================================== ↓ 服务端 ↓ ================================================== # 全局 socket,最后在主进程内关闭。 sock_server= socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock_server.bind((__host_server__, __port_server__)) sock_server.listen(1) # 开始监听 def 服务端进程(): 服务端对象= 服务端() for i in range(__连接个数__): try: sock, addr= sock_server.accept() except: print('服务端进程() -- server socket 已关闭,监听进程退出 ...') break print('服务端进程() -- 接收到来自 ' + str(addr) + ' 的连接。') 服务端对象.添加连接(addr=addr, sock=sock) time.sleep(0.3) # 等所有发送线程开始运行 服务端对象.join() # 服务端退出 ... print('服务端进程() -- 所有连接已终止,服务端进程退出 ...') class 服务端: def __init__(self): self._连接集= {} def 添加连接(self, addr, sock): 新连接= 服务端连接(addr=addr, sock=sock) self._连接集[addr]= 新连接 def join(self): for conn in self._连接集.values(): conn.join() class 服务端连接: def __init__(self, addr, sock): self._addr= addr self._sock= sock self._buff= b'' self._发送= False self._发送线程= threading.Thread(target=self._服务端发送线程, name='发送线程_'+str(self._addr), kwargs={}) self._接收线程= threading.Thread(target=self._服务端接收线程, name='接收线程_'+str(self._addr), kwargs={}) self._发送线程.start() self._接收线程.start() def _服务端发送线程(self): ''' ''' sock= self._sock addr= self._addr while not self._发送: time.sleep(0.3) print('_服务端发送线程() -- 与 ' + str(addr) + ' 的连接开始发送 ...') for i in range(1, __发送次数__+1): try: sock.sendall(__消息内容__ + __结束标志__) except: print('_服务端发送线程() -- 与 ' + str(addr) + ' 的连接已损坏,发送线程终止。') break print('_服务端发送线程() -- 与 ' + str(addr) + ' 的连接已发 ' + str(i) + ' 条。') print('_服务端发送线程() -- 与 ' + str(addr) + ' 的连接发送完毕,发送线程退出 ... ') time.sleep(3) # XXX: 非正常退出,未关闭 socket 的情况下就退出 # try: sock.shutdown(socket.SHUT_RDWR) # except: pass # sock.close() def _服务端接收线程(self): ''' ''' sock= self._sock addr= self._addr print('_服务端接收线程() -- 与 ' + str(addr) + ' 的连接开始接收 ...') while True: data= sock.recv(8192) if not data: print('_服务端接收线程() -- 与 ' + str(addr) + ' 的连接已损坏,接收线程终止。') break sidx= max(len(self._buff) - len(__结束标志__), 0) self._buff += data if __结束标志__ in self._buff[sidx:]: mlist= self._buff.split(__结束标志__) self._buff= mlist.pop(-1) for msg in mlist: msg= msg.decode(__消息编码__) print('_服务端接收线程() -- 接收到 ' + str(addr) + ' 发来的信息: "' + msg + '"') self._处理所收消息(msg=msg) try: sock.shutdown(socket.SHUT_RDWR) except: pass sock.close() def _处理所收消息(self, msg): if msg == 'start': self._发送= True def join(self): self._发送线程.join() self._接收线程.join() #================================================== ↓ 客户端 ↓ ================================================== class 客户端: def __init__(self): ''' ''' self._连接集= {} def 建立连接(self): ''' ''' for i in range(1, __连接个数__+1): addr= (__host_client__, 9000+i) self._连接集[addr]= 客户端连接(addr=addr) def start(self): ''' ''' for conn in self._连接集.values(): conn.start() map= {conn._sock.fileno(): conn for conn in self._连接集.values()} polling_loop= threading.Thread(target=asyncore.loop, name='polling_loop', kwargs={'map':map}) polling_loop.start() print('客户端.start() -- 客户端已开始运行 ...') class 客户端连接(asynchat.async_chat): def __init__(self, addr): self._addr= addr # self._sock= self.create_socket(socket.AF_INET, socket.SOCK_STREAM) self._sock= socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._sock.bind(addr) asynchat.async_chat.__init__(self, self._sock) self._buff= b'' self._msgs= [] self.ac_in_buffer_size= __接收缓存__ self.set_terminator(__结束标志__) self.connect( (__host_server__, __port_server__) ) def collect_incoming_data(self, data): if not data: # 说明连接已损坏 print('客户端连接.collect_incoming_data() -- 连接 ' + str(self._addr) + ' 已损坏。') self._buff += data def found_terminator(self): msg= self._buff.decode(__消息编码__) self._msgs.append(msg) self._buff= b'' print('客户端连接.found_terminator() -- 连接 ' + str(self._addr) + ' 共收到 ' + str(len(self._msgs)) + ' 条。') self.push( ('已收 ' + str(len(self._msgs)) + ' 条。').encode(__消息编码__) + __结束标志__ ) def start(self): self.push(b'start' + __结束标志__) #================================================== ↓ 主进程 ↓ ================================================== # 启动监听进程 服务端进程对象= multiprocessing.Process(name='服务端进程', target=服务端进程) 服务端进程对象.start() 客户端对象= 客户端() 客户端对象.建立连接() time.sleep(0.3) 客户端对象.start()
---- 以下是运行结果:
服务端进程() -- 接收到来自 ('127.0.0.1', 9001) 的连接。
服务端进程() -- 接收到来自 ('127.0.0.1', 9002) 的连接。
_服务端接收线程() -- 与 ('127.0.0.1', 9001) 的连接开始接收 ...
_服务端接收线程() -- 与 ('127.0.0.1', 9002) 的连接开始接收 ...
客户端.start() -- 客户端已开始运行 ...
_服务端接收线程() -- 接收到 ('127.0.0.1', 9001) 发来的信息: "start"
服务端进程() -- 接收到来自 ('127.0.0.1', 9003) 的连接。
_服务端接收线程() -- 接收到 ('127.0.0.1', 9002) 发来的信息: "start"
_服务端接收线程() -- 与 ('127.0.0.1', 9003) 的连接开始接收 ...
_服务端接收线程() -- 接收到 ('127.0.0.1', 9003) 发来的信息: "start"
_服务端发送线程() -- 与 ('127.0.0.1', 9001) 的连接开始发送 ...
_服务端发送线程() -- 与 ('127.0.0.1', 9002) 的连接开始发送 ...
_服务端发送线程() -- 与 ('127.0.0.1', 9002) 的连接已发 1 条。
_服务端发送线程() -- 与 ('127.0.0.1', 9001) 的连接已发 1 条。
_服务端发送线程() -- 与 ('127.0.0.1', 9003) 的连接开始发送 ...
客户端连接.found_terminator() -- 连接 ('127.0.0.1', 9002) 共收到 1 条。
_服务端接收线程() -- 接收到 ('127.0.0.1', 9002) 发来的信息: "已收 1 条。"
客户端连接.found_terminator() -- 连接 ('127.0.0.1', 9001) 共收到 1 条。
_服务端接收线程() -- 接收到 ('127.0.0.1', 9001) 发来的信息: "已收 1 条。"
_服务端发送线程() -- 与 ('127.0.0.1', 9002) 的连接已发 2 条。
_服务端发送线程() -- 与 ('127.0.0.1', 9003) 的连接已发 1 条。
_服务端发送线程() -- 与 ('127.0.0.1', 9001) 的连接已发 2 条。
客户端连接.found_terminator() -- 连接 ('127.0.0.1', 9002) 共收到 2 条。
_服务端接收线程() -- 接收到 ('127.0.0.1', 9002) 发来的信息: "已收 2 条。"
客户端连接.found_terminator() -- 连接 ('127.0.0.1', 9003) 共收到 1 条。
_服务端接收线程() -- 接收到 ('127.0.0.1', 9003) 发来的信息: "已收 1 条。"
客户端连接.found_terminator() -- 连接 ('127.0.0.1', 9001) 共收到 2 条。
_服务端接收线程() -- 接收到 ('127.0.0.1', 9001) 发来的信息: "已收 2 条。"
_服务端发送线程() -- 与 ('127.0.0.1', 9002) 的连接已发 3 条。
_服务端发送线程() -- 与 ('127.0.0.1', 9002) 的连接发送完毕,发送线程退出 ...
_服务端发送线程() -- 与 ('127.0.0.1', 9003) 的连接已发 2 条。
_服务端发送线程() -- 与 ('127.0.0.1', 9001) 的连接已发 3 条。
_服务端发送线程() -- 与 ('127.0.0.1', 9001) 的连接发送完毕,发送线程退出 ...
客户端连接.found_terminator() -- 连接 ('127.0.0.1', 9002) 共收到 3 条。
_服务端接收线程() -- 接收到 ('127.0.0.1', 9002) 发来的信息: "已收 3 条。"
客户端连接.found_terminator() -- 连接 ('127.0.0.1', 9003) 共收到 2 条。
_服务端接收线程() -- 接收到 ('127.0.0.1', 9003) 发来的信息: "已收 2 条。"
客户端连接.found_terminator() -- 连接 ('127.0.0.1', 9001) 共收到 3 条。
_服务端接收线程() -- 接收到 ('127.0.0.1', 9001) 发来的信息: "已收 3 条。"
_服务端发送线程() -- 与 ('127.0.0.1', 9003) 的连接已发 3 条。
_服务端发送线程() -- 与 ('127.0.0.1', 9003) 的连接发送完毕,发送线程退出 ...
客户端连接.found_terminator() -- 连接 ('127.0.0.1', 9003) 共收到 3 条。
_服务端接收线程() -- 接收到 ('127.0.0.1', 9003) 发来的信息: "已收 3 条。"
^CException KeyboardInterrupt: KeyboardInterrupt() in <module 'threading' from '/usr/lib/python3.2/threading.py'> ignored
Process 服务端进程:
Traceback (most recent call last):
File "/usr/lib/python3.2/multiprocessing/process.py", line 267, in _bootstrap
self.run()
File "/usr/lib/python3.2/multiprocessing/process.py", line 116, in run
self._target(*self._args, **self._kwargs)
File "asynchat基本传输.py", line 83, in 服务端进程
服务端对象.join()
File "asynchat基本传输.py", line 101, in join
conn.join()
File "asynchat基本传输.py", line 196, in join
self._接收线程.join()
File "/usr/lib/python3.2/threading.py", line 854, in join
self._block.wait()
File "/usr/lib/python3.2/threading.py", line 235, in wait
waiter.acquire()
KeyboardInterrupt