Jacky Liu's Blog

Python asyncore / asynchat 基本传输实验

    ---- 自从上回实验了 Python socket 的基本传输之后又受了些启发,于是想试试基于 non-blocking socket 的通信机制。Python 标准库里的 asynchat 自然是第一步。昨天写的实验程序实现了基于 asynchat 的基本通信,把前因后果总结一下:

    实验前的考虑

    ---- 用 non-blocking socket 最基本的考虑自然是传输效率,尤其是遇到一大坨数据过来的时候,希望它能尽快传送完,这时最好其它线程通通都停掉,在传完之前不要再无谓地换来换去(反正其它线程此时应该也没甚实际工作要干),当然这只是愿望而已。既然做不到,就只能追求尽量快速的传输方式了。

    ---- 查资料得出的印象是 non-blocking socket 当然比较快,但是好的东西总是有代价,这是源于我个人的印象。因为非阻塞意味着没有延时,而没有延时环节的无限循环跑起来可能会很耗 CPU。但是另一方面来讲,既然非阻塞 socket 已经用得这么多,意味着底层实现很可能已经解决了这个问题,我的担心很可能是不成立的。这个还是要试试才知道。

    结果总结

    1. 关于 CPU 使用

    ---- 实验的结果,当服务端非正常退出以后,使用 asynchat 的客户端好像就会陷入死循环,没有任何异常的迹象,所以此时 polling loop 应该还是在运行的,而 CPU 使用并没有明显高于平时的水平,所以 non-blocking socket + polling loop 应该对 CPU 并没有特别大的压力,当然如果开十个 polling loop 可能会是另外一回事。

    2. 对坏连接的处理

    ---- 关于死循环,这实际上就是一个网络通信模块有没有能力对付坏连接的问题。如果用 blocking socket,上一篇已经讲了,会抛出异常或返回 b'',编程时就可以做相应处理,而死循环显然不是个很讨喜的行为方式。我想了一下如果直接用底层的 select() 函数时会发生什么。因为我曾经在进程间通信的特性中体验过 select() / pooling loop 的用法,这里用作网络传输时应该也差不多。

    ---- 如果一个进程(或者网络连接)还没吱一声就挂了,那么与它相关的 fd(或 socket)应该仍然会被 select() 放在有效列表里返回,但是后续的 read()(或 recv())操作就会读到一个空的对象,这时你就知道这个进程或连接已经挂了。再看 asynchat 提供的那些寥寥的接口: 好像 collect_incoming_data() 跟 recv() 也差不多,能不能在里面判断,当 data 参数是空的就说明连接已经挂了呢?试了一下,不行。collect_incoming_data() 的 data 参数总是有效的,也就是说读不到内容时这个函数根本不会被调用。

    ---- 如果再要深究下去大概要去看 asynchat.py 了,但问题是 asynchat 完全是用 Python 写的而且只有几百行而已,它应该木有那么多考虑在里面,我想有这功夫还不如直接去试验 select()。

    3. 在运行中更新连接

    ---- 另一个很重要的问题,我觉得网络通信模块应该提供在 pooling loop 运行的时候动态添加或删除连接的功能,但是很显然 asynchat 并没有这个特性的接口。试着动点脑筋 —— 因为连接对象是通过 asyncore.loop() 的一个参数: map 来传递的,能不能动态修改它呢?文档里说了 map 参数是普通的 dict 类型,dict 当然没有线程安全性,也就是说从 asyncore.loop() 所在的线程之外的地方动它肯定会让 loop() 崩溃。如果在同一个线程内呢?行不行呢?这大概又要去看 asyncore.py 了,卟啦卟啦 。。。如果有这功夫,卟啦卟啦 。。。

    所以,总结的总结:

    ---- 我觉得上面的 2 跟 3 对于一个网络传输组件来说应该算是很基本的功能。如果缺了这两个功能,那这个网络传输组件应该没太大实际用处,大概只能用在一次性程序里。但是实话讲我对异步网络传输的了解还浅得很,俩月前我还不知道 socket 是什么,也不知道局域网该怎么连,所以以上这些都不是很确定。眼下先跑起来,以后有机会再修正吧。

    ---- 实验程序:
 

# -*- encoding: utf-8 -*-



'''
实验平台:
	Ubuntu 12.04 LTS / Linux 3.2.0-25-generic-pae

实验目标:
	1. asynchat 模块的基本传输机制
	2. 让服务端在未关闭 socket 的情况下终止运行,模拟连接异常中断的情况,检视后续行为。

实验过程:
	1. 另起一个进程作为服务端,服务端仍然用 blocking socket + 多线程的方式,针对每个连接开启一个
	发送线程和一个接收线程。服务端的任务是向客户端发送数据(一次 3 Mb)并接收客户端的回执。

	2. 主进程内建立客户端对象并向服务端发起指定数目的连接。客户端使用 asynchat 模块,背后的机制
	是 non-blocking socket + select()。连接建立以后向服务端发送一条 'start' 消息,通知服务端开始
	发送,之后从服务端接收数据,收完一条之后发送一个回执。

	3. 服务端完成指定次数的发送以后,在未关闭 socket 的情况下退出运行,检视后续的行为。

实验结果:
	1. 客户端使用 asynchat 模块实现了基本传输功能。服务端非正常退出以后 CPU 使用一直保持低水平,
	此时 polling loop 应该是在运行的,所以 non-blocking socket + polling loop 的方式应该对 CPU
	并没有太大压力。

	2.1. 如果服务端发送线程选择未关闭 socket 就退出,那么 asyncore.loop() 就会死循环,不抛出异常
	2.2. 如果服务端发送线程选择先关闭 socket 再退出,那么 asyncore.loop() 就会抛出异常
'''



import asyncore
import asynchat

import socket
import multiprocessing
import time
import threading



__host_server__= '127.0.0.1'
__port_server__= 9000

__host_client__= '127.0.0.1'



__消息内容__= b'head' + b'.' * (3 * 2**20 - 8) + b'tail'	# 一次 3 Mb
__结束标志__= b'\r<--EOM-->\r'
__发送次数__= 3
__连接个数__= 3	
__接收缓存__= 2**15
__消息编码__= 'utf-8'



#================================================== ↓ 服务端 ↓ ==================================================

# 全局 socket,最后在主进程内关闭。
sock_server= socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock_server.bind((__host_server__, __port_server__))
sock_server.listen(1)	# 开始监听



def 服务端进程():

	服务端对象= 服务端()

	for i in range(__连接个数__):
		try:
			sock, addr= sock_server.accept()
		except:
			print('服务端进程() -- server socket 已关闭,监听进程退出 ...')
			break

		print('服务端进程() -- 接收到来自 ' + str(addr) + ' 的连接。')

		服务端对象.添加连接(addr=addr, sock=sock)

	time.sleep(0.3)		# 等所有发送线程开始运行

	服务端对象.join()

	# 服务端退出 ...
	print('服务端进程() -- 所有连接已终止,服务端进程退出 ...')



class 服务端:

	def __init__(self):
		self._连接集= {}

	def 添加连接(self, addr, sock):
		新连接= 服务端连接(addr=addr, sock=sock)
		self._连接集[addr]= 新连接

	def join(self):
		for conn in self._连接集.values():
			conn.join()



class 服务端连接:

	def __init__(self, addr, sock):

		self._addr= addr
		self._sock= sock
		self._buff= b''

		self._发送= False
		self._发送线程= threading.Thread(target=self._服务端发送线程, name='发送线程_'+str(self._addr), kwargs={})
		self._接收线程= threading.Thread(target=self._服务端接收线程, name='接收线程_'+str(self._addr), kwargs={})

		self._发送线程.start()
		self._接收线程.start()



	def _服务端发送线程(self):
		'''

		'''
		sock= self._sock
		addr= self._addr

		while not self._发送:
			time.sleep(0.3)

		print('_服务端发送线程() -- 与 ' + str(addr) + ' 的连接开始发送 ...')

		for i in range(1, __发送次数__+1):
			try:
				sock.sendall(__消息内容__ + __结束标志__)
			except:
				print('_服务端发送线程() -- 与 ' + str(addr) + ' 的连接已损坏,发送线程终止。')
				break
			print('_服务端发送线程() -- 与 ' + str(addr) + ' 的连接已发 ' + str(i) + ' 条。')

		print('_服务端发送线程() -- 与 ' + str(addr) + ' 的连接发送完毕,发送线程退出 ... ')

		time.sleep(3)

		# XXX: 非正常退出,未关闭 socket 的情况下就退出
		#	try: sock.shutdown(socket.SHUT_RDWR)
		#	except: pass
		#	sock.close()



	def _服务端接收线程(self):
		'''

		'''
		sock= self._sock
		addr= self._addr

		print('_服务端接收线程() -- 与 ' + str(addr) + ' 的连接开始接收 ...')

		while True:
			data= sock.recv(8192)

			if not data:
				print('_服务端接收线程() -- 与 ' + str(addr) + ' 的连接已损坏,接收线程终止。')
				break

			sidx= max(len(self._buff) - len(__结束标志__), 0)
			self._buff += data

			if __结束标志__ in self._buff[sidx:]:
				mlist= self._buff.split(__结束标志__)
				self._buff= mlist.pop(-1)

				for msg in mlist:
					msg= msg.decode(__消息编码__)
					print('_服务端接收线程() -- 接收到 ' + str(addr) + ' 发来的信息: "' + msg + '"')
					self._处理所收消息(msg=msg)

		try: sock.shutdown(socket.SHUT_RDWR)
		except: pass
		sock.close()



	def _处理所收消息(self, msg):

		if msg == 'start':
			self._发送= True



	def join(self):
		self._发送线程.join()
		self._接收线程.join()



#================================================== ↓ 客户端 ↓ ==================================================

class 客户端:

	def __init__(self):
		'''

		'''
		self._连接集= {}



	def 建立连接(self):
		'''

		'''
		for i in range(1, __连接个数__+1):
			addr= (__host_client__, 9000+i)
			self._连接集[addr]= 客户端连接(addr=addr)



	def start(self):
		'''

		'''
		for conn in self._连接集.values():
			conn.start()

		map= {conn._sock.fileno(): conn for conn in self._连接集.values()}
		polling_loop= threading.Thread(target=asyncore.loop, name='polling_loop', kwargs={'map':map})
		polling_loop.start()

		print('客户端.start() -- 客户端已开始运行 ...')



class 客户端连接(asynchat.async_chat):

	def __init__(self, addr):

		self._addr= addr
		#	self._sock= self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
		self._sock= socket.socket(socket.AF_INET, socket.SOCK_STREAM)
		self._sock.bind(addr)
		asynchat.async_chat.__init__(self, self._sock)
		self._buff= b''
		self._msgs= []
		self.ac_in_buffer_size= __接收缓存__
		self.set_terminator(__结束标志__)
		self.connect( (__host_server__, __port_server__) )



	def collect_incoming_data(self, data):
		if not data:	# 说明连接已损坏
			print('客户端连接.collect_incoming_data() -- 连接 ' + str(self._addr) + ' 已损坏。')

		self._buff += data



	def found_terminator(self):

		msg= self._buff.decode(__消息编码__)
		self._msgs.append(msg)
		self._buff= b''

		print('客户端连接.found_terminator() -- 连接 ' + str(self._addr) + ' 共收到 ' + str(len(self._msgs)) + ' 条。')

		self.push( ('已收 ' + str(len(self._msgs)) + ' 条。').encode(__消息编码__) + __结束标志__ )



	def start(self):
		self.push(b'start' + __结束标志__)



#================================================== ↓ 主进程 ↓ ==================================================

# 启动监听进程
服务端进程对象= multiprocessing.Process(name='服务端进程', target=服务端进程)
服务端进程对象.start()



客户端对象= 客户端()
客户端对象.建立连接()
time.sleep(0.3)
客户端对象.start()



    ---- 以下是运行结果:

        服务端进程() -- 接收到来自 ('127.0.0.1', 9001) 的连接。
        服务端进程() -- 接收到来自 ('127.0.0.1', 9002) 的连接。
        _服务端接收线程() -- 与 ('127.0.0.1', 9001) 的连接开始接收 ...
        _服务端接收线程() -- 与 ('127.0.0.1', 9002) 的连接开始接收 ...
        客户端.start() -- 客户端已开始运行 ...
        _服务端接收线程() -- 接收到 ('127.0.0.1', 9001) 发来的信息: "start"
        服务端进程() -- 接收到来自 ('127.0.0.1', 9003) 的连接。
        _服务端接收线程() -- 接收到 ('127.0.0.1', 9002) 发来的信息: "start"
        _服务端接收线程() -- 与 ('127.0.0.1', 9003) 的连接开始接收 ...
        _服务端接收线程() -- 接收到 ('127.0.0.1', 9003) 发来的信息: "start"
        _服务端发送线程() -- 与 ('127.0.0.1', 9001) 的连接开始发送 ...
        _服务端发送线程() -- 与 ('127.0.0.1', 9002) 的连接开始发送 ...
        _服务端发送线程() -- 与 ('127.0.0.1', 9002) 的连接已发 1 条。
        _服务端发送线程() -- 与 ('127.0.0.1', 9001) 的连接已发 1 条。
        _服务端发送线程() -- 与 ('127.0.0.1', 9003) 的连接开始发送 ...
        客户端连接.found_terminator() -- 连接 ('127.0.0.1', 9002) 共收到 1 条。
        _服务端接收线程() -- 接收到 ('127.0.0.1', 9002) 发来的信息: "已收 1 条。"
        客户端连接.found_terminator() -- 连接 ('127.0.0.1', 9001) 共收到 1 条。
        _服务端接收线程() -- 接收到 ('127.0.0.1', 9001) 发来的信息: "已收 1 条。"
        _服务端发送线程() -- 与 ('127.0.0.1', 9002) 的连接已发 2 条。
        _服务端发送线程() -- 与 ('127.0.0.1', 9003) 的连接已发 1 条。
        _服务端发送线程() -- 与 ('127.0.0.1', 9001) 的连接已发 2 条。
        客户端连接.found_terminator() -- 连接 ('127.0.0.1', 9002) 共收到 2 条。
        _服务端接收线程() -- 接收到 ('127.0.0.1', 9002) 发来的信息: "已收 2 条。"
        客户端连接.found_terminator() -- 连接 ('127.0.0.1', 9003) 共收到 1 条。
        _服务端接收线程() -- 接收到 ('127.0.0.1', 9003) 发来的信息: "已收 1 条。"
        客户端连接.found_terminator() -- 连接 ('127.0.0.1', 9001) 共收到 2 条。
        _服务端接收线程() -- 接收到 ('127.0.0.1', 9001) 发来的信息: "已收 2 条。"
        _服务端发送线程() -- 与 ('127.0.0.1', 9002) 的连接已发 3 条。
        _服务端发送线程() -- 与 ('127.0.0.1', 9002) 的连接发送完毕,发送线程退出 ...
        _服务端发送线程() -- 与 ('127.0.0.1', 9003) 的连接已发 2 条。
        _服务端发送线程() -- 与 ('127.0.0.1', 9001) 的连接已发 3 条。
        _服务端发送线程() -- 与 ('127.0.0.1', 9001) 的连接发送完毕,发送线程退出 ...
        客户端连接.found_terminator() -- 连接 ('127.0.0.1', 9002) 共收到 3 条。
        _服务端接收线程() -- 接收到 ('127.0.0.1', 9002) 发来的信息: "已收 3 条。"
        客户端连接.found_terminator() -- 连接 ('127.0.0.1', 9003) 共收到 2 条。
        _服务端接收线程() -- 接收到 ('127.0.0.1', 9003) 发来的信息: "已收 2 条。"
        客户端连接.found_terminator() -- 连接 ('127.0.0.1', 9001) 共收到 3 条。
        _服务端接收线程() -- 接收到 ('127.0.0.1', 9001) 发来的信息: "已收 3 条。"
        _服务端发送线程() -- 与 ('127.0.0.1', 9003) 的连接已发 3 条。
        _服务端发送线程() -- 与 ('127.0.0.1', 9003) 的连接发送完毕,发送线程退出 ...
        客户端连接.found_terminator() -- 连接 ('127.0.0.1', 9003) 共收到 3 条。
        _服务端接收线程() -- 接收到 ('127.0.0.1', 9003) 发来的信息: "已收 3 条。"


        ^CException KeyboardInterrupt: KeyboardInterrupt() in <module 'threading' from '/usr/lib/python3.2/threading.py'> ignored
        Process 服务端进程:
        Traceback (most recent call last):
          File "/usr/lib/python3.2/multiprocessing/process.py", line 267, in _bootstrap
            self.run()
          File "/usr/lib/python3.2/multiprocessing/process.py", line 116, in run
            self._target(*self._args, **self._kwargs)
          File "asynchat基本传输.py", line 83, in 服务端进程
            服务端对象.join()
          File "asynchat基本传输.py", line 101, in join
            conn.join()
          File "asynchat基本传输.py", line 196, in join
            self._接收线程.join()
          File "/usr/lib/python3.2/threading.py", line 854, in join
            self._block.wait()
          File "/usr/lib/python3.2/threading.py", line 235, in wait
            waiter.acquire()
        KeyboardInterrupt

 




Host by is-Programmer.com | Power by Chito 1.3.3 beta | © 2007 LinuxGem | Design by Matthew "Agent Spork" McGee