Python asyncore / asynchat 基本传输实验 - Jacky Liu's Blog
Python asyncore / asynchat 基本传输实验
---- 自从上回实验了 Python socket 的基本传输之后又受了些启发,于是想试试基于 non-blocking socket 的通信机制。Python 标准库里的 asynchat 自然是第一步。昨天写的实验程序实现了基于 asynchat 的基本通信,把前因后果总结一下:
实验前的考虑
---- 用 non-blocking socket 最基本的考虑自然是传输效率,尤其是遇到一大坨数据过来的时候,希望它能尽快传送完,这时最好其它线程通通都停掉,在传完之前不要再无谓地换来换去(反正其它线程此时应该也没甚实际工作要干),当然这只是愿望而已。既然做不到,就只能追求尽量快速的传输方式了。
---- 查资料得出的印象是 non-blocking socket 当然比较快,但是好的东西总是有代价,这是源于我个人的印象。因为非阻塞意味着没有延时,而没有延时环节的无限循环跑起来可能会很耗 CPU。但是另一方面来讲,既然非阻塞 socket 已经用得这么多,意味着底层实现很可能已经解决了这个问题,我的担心很可能是不成立的。这个还是要试试才知道。
结果总结
1. 关于 CPU 使用
---- 实验的结果,当服务端非正常退出以后,使用 asynchat 的客户端好像就会陷入死循环,没有任何异常的迹象,所以此时 polling loop 应该还是在运行的,而 CPU 使用并没有明显高于平时的水平,所以 non-blocking socket + polling loop 应该对 CPU 并没有特别大的压力,当然如果开十个 polling loop 可能会是另外一回事。
2. 对坏连接的处理
---- 关于死循环,这实际上就是一个网络通信模块有没有能力对付坏连接的问题。如果用 blocking socket,上一篇已经讲了,会抛出异常或返回 b'',编程时就可以做相应处理,而死循环显然不是个很讨喜的行为方式。我想了一下如果直接用底层的 select() 函数时会发生什么。因为我曾经在进程间通信的特性中体验过 select() / pooling loop 的用法,这里用作网络传输时应该也差不多。
---- 如果一个进程(或者网络连接)还没吱一声就挂了,那么与它相关的 fd(或 socket)应该仍然会被 select() 放在有效列表里返回,但是后续的 read()(或 recv())操作就会读到一个空的对象,这时你就知道这个进程或连接已经挂了。再看 asynchat 提供的那些寥寥的接口: 好像 collect_incoming_data() 跟 recv() 也差不多,能不能在里面判断,当 data 参数是空的就说明连接已经挂了呢?试了一下,不行。collect_incoming_data() 的 data 参数总是有效的,也就是说读不到内容时这个函数根本不会被调用。
---- 如果再要深究下去大概要去看 asynchat.py 了,但问题是 asynchat 完全是用 Python 写的而且只有几百行而已,它应该木有那么多考虑在里面,我想有这功夫还不如直接去试验 select()。
3. 在运行中更新连接
---- 另一个很重要的问题,我觉得网络通信模块应该提供在 pooling loop 运行的时候动态添加或删除连接的功能,但是很显然 asynchat 并没有这个特性的接口。试着动点脑筋 —— 因为连接对象是通过 asyncore.loop() 的一个参数: map 来传递的,能不能动态修改它呢?文档里说了 map 参数是普通的 dict 类型,dict 当然没有线程安全性,也就是说从 asyncore.loop() 所在的线程之外的地方动它肯定会让 loop() 崩溃。如果在同一个线程内呢?行不行呢?这大概又要去看 asyncore.py 了,卟啦卟啦 。。。如果有这功夫,卟啦卟啦 。。。
所以,总结的总结:
---- 我觉得上面的 2 跟 3 对于一个网络传输组件来说应该算是很基本的功能。如果缺了这两个功能,那这个网络传输组件应该没太大实际用处,大概只能用在一次性程序里。但是实话讲我对异步网络传输的了解还浅得很,俩月前我还不知道 socket 是什么,也不知道局域网该怎么连,所以以上这些都不是很确定。眼下先跑起来,以后有机会再修正吧。
---- 实验程序:
# -*- encoding: utf-8 -*- ''' 实验平台: Ubuntu 12.04 LTS / Linux 3.2.0-25-generic-pae 实验目标: 1. asynchat 模块的基本传输机制 2. 让服务端在未关闭 socket 的情况下终止运行,模拟连接异常中断的情况,检视后续行为。 实验过程: 1. 另起一个进程作为服务端,服务端仍然用 blocking socket + 多线程的方式,针对每个连接开启一个 发送线程和一个接收线程。服务端的任务是向客户端发送数据(一次 3 Mb)并接收客户端的回执。 2. 主进程内建立客户端对象并向服务端发起指定数目的连接。客户端使用 asynchat 模块,背后的机制 是 non-blocking socket + select()。连接建立以后向服务端发送一条 'start' 消息,通知服务端开始 发送,之后从服务端接收数据,收完一条之后发送一个回执。 3. 服务端完成指定次数的发送以后,在未关闭 socket 的情况下退出运行,检视后续的行为。 实验结果: 1. 客户端使用 asynchat 模块实现了基本传输功能。服务端非正常退出以后 CPU 使用一直保持低水平, 此时 polling loop 应该是在运行的,所以 non-blocking socket + polling loop 的方式应该对 CPU 并没有太大压力。 2.1. 如果服务端发送线程选择未关闭 socket 就退出,那么 asyncore.loop() 就会死循环,不抛出异常 2.2. 如果服务端发送线程选择先关闭 socket 再退出,那么 asyncore.loop() 就会抛出异常 ''' import asyncore import asynchat import socket import multiprocessing import time import threading __host_server__= '127.0.0.1' __port_server__= 9000 __host_client__= '127.0.0.1' __消息内容__= b'head' + b'.' * (3 * 2**20 - 8) + b'tail' # 一次 3 Mb __结束标志__= b'\r<--EOM-->\r' __发送次数__= 3 __连接个数__= 3 __接收缓存__= 2**15 __消息编码__= 'utf-8' #================================================== ↓ 服务端 ↓ ================================================== # 全局 socket,最后在主进程内关闭。 sock_server= socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock_server.bind((__host_server__, __port_server__)) sock_server.listen(1) # 开始监听 def 服务端进程(): 服务端对象= 服务端() for i in range(__连接个数__): try: sock, addr= sock_server.accept() except: print('服务端进程() -- server socket 已关闭,监听进程退出 ...') break print('服务端进程() -- 接收到来自 ' + str(addr) + ' 的连接。') 服务端对象.添加连接(addr=addr, sock=sock) time.sleep(0.3) # 等所有发送线程开始运行 服务端对象.join() # 服务端退出 ... print('服务端进程() -- 所有连接已终止,服务端进程退出 ...') class 服务端: def __init__(self): self._连接集= {} def 添加连接(self, addr, sock): 新连接= 服务端连接(addr=addr, sock=sock) self._连接集[addr]= 新连接 def join(self): for conn in self._连接集.values(): conn.join() class 服务端连接: def __init__(self, addr, sock): self._addr= addr self._sock= sock self._buff= b'' self._发送= False self._发送线程= threading.Thread(target=self._服务端发送线程, name='发送线程_'+str(self._addr), kwargs={}) self._接收线程= threading.Thread(target=self._服务端接收线程, name='接收线程_'+str(self._addr), kwargs={}) self._发送线程.start() self._接收线程.start() def _服务端发送线程(self): ''' ''' sock= self._sock addr= self._addr while not self._发送: time.sleep(0.3) print('_服务端发送线程() -- 与 ' + str(addr) + ' 的连接开始发送 ...') for i in range(1, __发送次数__+1): try: sock.sendall(__消息内容__ + __结束标志__) except: print('_服务端发送线程() -- 与 ' + str(addr) + ' 的连接已损坏,发送线程终止。') break print('_服务端发送线程() -- 与 ' + str(addr) + ' 的连接已发 ' + str(i) + ' 条。') print('_服务端发送线程() -- 与 ' + str(addr) + ' 的连接发送完毕,发送线程退出 ... ') time.sleep(3) # XXX: 非正常退出,未关闭 socket 的情况下就退出 # try: sock.shutdown(socket.SHUT_RDWR) # except: pass # sock.close() def _服务端接收线程(self): ''' ''' sock= self._sock addr= self._addr print('_服务端接收线程() -- 与 ' + str(addr) + ' 的连接开始接收 ...') while True: data= sock.recv(8192) if not data: print('_服务端接收线程() -- 与 ' + str(addr) + ' 的连接已损坏,接收线程终止。') break sidx= max(len(self._buff) - len(__结束标志__), 0) self._buff += data if __结束标志__ in self._buff[sidx:]: mlist= self._buff.split(__结束标志__) self._buff= mlist.pop(-1) for msg in mlist: msg= msg.decode(__消息编码__) print('_服务端接收线程() -- 接收到 ' + str(addr) + ' 发来的信息: "' + msg + '"') self._处理所收消息(msg=msg) try: sock.shutdown(socket.SHUT_RDWR) except: pass sock.close() def _处理所收消息(self, msg): if msg == 'start': self._发送= True def join(self): self._发送线程.join() self._接收线程.join() #================================================== ↓ 客户端 ↓ ================================================== class 客户端: def __init__(self): ''' ''' self._连接集= {} def 建立连接(self): ''' ''' for i in range(1, __连接个数__+1): addr= (__host_client__, 9000+i) self._连接集[addr]= 客户端连接(addr=addr) def start(self): ''' ''' for conn in self._连接集.values(): conn.start() map= {conn._sock.fileno(): conn for conn in self._连接集.values()} polling_loop= threading.Thread(target=asyncore.loop, name='polling_loop', kwargs={'map':map}) polling_loop.start() print('客户端.start() -- 客户端已开始运行 ...') class 客户端连接(asynchat.async_chat): def __init__(self, addr): self._addr= addr # self._sock= self.create_socket(socket.AF_INET, socket.SOCK_STREAM) self._sock= socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._sock.bind(addr) asynchat.async_chat.__init__(self, self._sock) self._buff= b'' self._msgs= [] self.ac_in_buffer_size= __接收缓存__ self.set_terminator(__结束标志__) self.connect( (__host_server__, __port_server__) ) def collect_incoming_data(self, data): if not data: # 说明连接已损坏 print('客户端连接.collect_incoming_data() -- 连接 ' + str(self._addr) + ' 已损坏。') self._buff += data def found_terminator(self): msg= self._buff.decode(__消息编码__) self._msgs.append(msg) self._buff= b'' print('客户端连接.found_terminator() -- 连接 ' + str(self._addr) + ' 共收到 ' + str(len(self._msgs)) + ' 条。') self.push( ('已收 ' + str(len(self._msgs)) + ' 条。').encode(__消息编码__) + __结束标志__ ) def start(self): self.push(b'start' + __结束标志__) #================================================== ↓ 主进程 ↓ ================================================== # 启动监听进程 服务端进程对象= multiprocessing.Process(name='服务端进程', target=服务端进程) 服务端进程对象.start() 客户端对象= 客户端() 客户端对象.建立连接() time.sleep(0.3) 客户端对象.start()
---- 以下是运行结果:
服务端进程() -- 接收到来自 ('127.0.0.1', 9001) 的连接。
服务端进程() -- 接收到来自 ('127.0.0.1', 9002) 的连接。
_服务端接收线程() -- 与 ('127.0.0.1', 9001) 的连接开始接收 ...
_服务端接收线程() -- 与 ('127.0.0.1', 9002) 的连接开始接收 ...
客户端.start() -- 客户端已开始运行 ...
_服务端接收线程() -- 接收到 ('127.0.0.1', 9001) 发来的信息: "start"
服务端进程() -- 接收到来自 ('127.0.0.1', 9003) 的连接。
_服务端接收线程() -- 接收到 ('127.0.0.1', 9002) 发来的信息: "start"
_服务端接收线程() -- 与 ('127.0.0.1', 9003) 的连接开始接收 ...
_服务端接收线程() -- 接收到 ('127.0.0.1', 9003) 发来的信息: "start"
_服务端发送线程() -- 与 ('127.0.0.1', 9001) 的连接开始发送 ...
_服务端发送线程() -- 与 ('127.0.0.1', 9002) 的连接开始发送 ...
_服务端发送线程() -- 与 ('127.0.0.1', 9002) 的连接已发 1 条。
_服务端发送线程() -- 与 ('127.0.0.1', 9001) 的连接已发 1 条。
_服务端发送线程() -- 与 ('127.0.0.1', 9003) 的连接开始发送 ...
客户端连接.found_terminator() -- 连接 ('127.0.0.1', 9002) 共收到 1 条。
_服务端接收线程() -- 接收到 ('127.0.0.1', 9002) 发来的信息: "已收 1 条。"
客户端连接.found_terminator() -- 连接 ('127.0.0.1', 9001) 共收到 1 条。
_服务端接收线程() -- 接收到 ('127.0.0.1', 9001) 发来的信息: "已收 1 条。"
_服务端发送线程() -- 与 ('127.0.0.1', 9002) 的连接已发 2 条。
_服务端发送线程() -- 与 ('127.0.0.1', 9003) 的连接已发 1 条。
_服务端发送线程() -- 与 ('127.0.0.1', 9001) 的连接已发 2 条。
客户端连接.found_terminator() -- 连接 ('127.0.0.1', 9002) 共收到 2 条。
_服务端接收线程() -- 接收到 ('127.0.0.1', 9002) 发来的信息: "已收 2 条。"
客户端连接.found_terminator() -- 连接 ('127.0.0.1', 9003) 共收到 1 条。
_服务端接收线程() -- 接收到 ('127.0.0.1', 9003) 发来的信息: "已收 1 条。"
客户端连接.found_terminator() -- 连接 ('127.0.0.1', 9001) 共收到 2 条。
_服务端接收线程() -- 接收到 ('127.0.0.1', 9001) 发来的信息: "已收 2 条。"
_服务端发送线程() -- 与 ('127.0.0.1', 9002) 的连接已发 3 条。
_服务端发送线程() -- 与 ('127.0.0.1', 9002) 的连接发送完毕,发送线程退出 ...
_服务端发送线程() -- 与 ('127.0.0.1', 9003) 的连接已发 2 条。
_服务端发送线程() -- 与 ('127.0.0.1', 9001) 的连接已发 3 条。
_服务端发送线程() -- 与 ('127.0.0.1', 9001) 的连接发送完毕,发送线程退出 ...
客户端连接.found_terminator() -- 连接 ('127.0.0.1', 9002) 共收到 3 条。
_服务端接收线程() -- 接收到 ('127.0.0.1', 9002) 发来的信息: "已收 3 条。"
客户端连接.found_terminator() -- 连接 ('127.0.0.1', 9003) 共收到 2 条。
_服务端接收线程() -- 接收到 ('127.0.0.1', 9003) 发来的信息: "已收 2 条。"
客户端连接.found_terminator() -- 连接 ('127.0.0.1', 9001) 共收到 3 条。
_服务端接收线程() -- 接收到 ('127.0.0.1', 9001) 发来的信息: "已收 3 条。"
_服务端发送线程() -- 与 ('127.0.0.1', 9003) 的连接已发 3 条。
_服务端发送线程() -- 与 ('127.0.0.1', 9003) 的连接发送完毕,发送线程退出 ...
客户端连接.found_terminator() -- 连接 ('127.0.0.1', 9003) 共收到 3 条。
_服务端接收线程() -- 接收到 ('127.0.0.1', 9003) 发来的信息: "已收 3 条。"
^CException KeyboardInterrupt: KeyboardInterrupt() in <module 'threading' from '/usr/lib/python3.2/threading.py'> ignored
Process 服务端进程:
Traceback (most recent call last):
File "/usr/lib/python3.2/multiprocessing/process.py", line 267, in _bootstrap
self.run()
File "/usr/lib/python3.2/multiprocessing/process.py", line 116, in run
self._target(*self._args, **self._kwargs)
File "asynchat基本传输.py", line 83, in 服务端进程
服务端对象.join()
File "asynchat基本传输.py", line 101, in join
conn.join()
File "asynchat基本传输.py", line 196, in join
self._接收线程.join()
File "/usr/lib/python3.2/threading.py", line 854, in join
self._block.wait()
File "/usr/lib/python3.2/threading.py", line 235, in wait
waiter.acquire()
KeyboardInterrupt
2012年7月03日 15:34
那个「死循环」,叫作「无限等待」更好。这时系统在等待永远也不可能到来的事件,和死锁差不多了。Node.js 似乎不会出现这种问题。
2012年7月03日 17:47
@依云: 我不懂 js,曾经试着看过些概念也不得要领。我只是感觉作为脚本语言,Python 遵循的是很原生态的哲学,标准库里的那些特性真的非常非常的谦虚内敛。这个好也不好,对我这种菜鸟来说可能还是好处多些。如果其它脚本语言提供更加高级的功能,应该是一点都不奇怪的,尤其 js 这种高层而又有针对性的东西。
另外想问一下,你所知道的 tornado 是怎么对付那两个问题的,如果有连接挂了怎么办?还有能不能动态地添加或删除连接呢?
2012年7月03日 17:54
@依云: 或者不一定 tornado,任何网络传输库都行,因为它们在底层用的应该是一样的 非阻塞 socket + polling loop 的特性。
2012年7月03日 18:03
@Jacky Liu: Tornado 写应用的话,一般都是持续跑的,所以不用担心没事做干等待了。至于连接断掉,写 Web 的话框架已经处理好了,其它的自己 try 嘛。
Tornado 是可以任何时候添加/修改/移除文件描述符的(几乎所有I/O主循环的库都是这样)。
Tornado 在底层使用的是:Linux->epoll,大多数 BSD->kqueue,其余 fallback 到 select。
2012年7月03日 18:14
@依云: 果然!这再一次验证了 Python 标准库的原生态,还有菜鸟的苦逼!
2012年7月03日 19:49
@依云: 刚才看见 Tornado 的源码,还好还好。大部分也是 python 写的,也不算太大。现在对 python 不那么怵。看不懂,但关键应该在 ioloop 和 iostream。有空研究研究 。。。
2023年5月18日 12:01
Apart from the publicly available information you choose to provide, ekalyan-epass aims to offer better service in a number of ways, but we do not sell or otherwise share your personal information. We take every precaution to protect every email because we are highly conscious of email spam. On rare occasions,ekalyan-epass.in someone may see your letter. Apart from the publicly available information you choose to provide, Ekalyan-epass makes every effort to improve customer service, but we do not sell or otherwise share your personal information. We take every precaution to protect every email because we are highly conscious of email spam. On rare occasions, the general public might see your letter.
2023年6月11日 12:02
If you are interested in your favorite actors' net worth and detailed information, please take a look at celeb height wiki database.
2024年1月16日 17:20
We are a digital marketing agency that makes sure to deliver the highest ROI to increase your brand’s visibility over the internet.