Jacky Liu's Blog

给程序添加了数据库组件

给程序添加了数据库组件,跑通了第一个测试任务。

数据库组件的内容:

1. 数据库组件包含一个数据库接口,用 subprocess.Popen() 对象实现,负责连上外部的 MySQL 服务器进程。具体见前面

2. 包含一个任务队列,用 queue.Queue() 对象实现,内含 Query 任务对象,保证不同客户线程提交的 Query 任务被顺序执行。

3. 包含一个主控线程,用 threading.Thread() 对象实现,负责管理 Query 任务队列,逐个提取任务并执行。

4. 包含一个底层界面函数,负责向任务队列中添加任务对象。

5. 包含数目可扩展的多个高层界面函数,内部调用底层界面函数完成任务对象的添加,外部客户线程通过调用这些函数来完成数据库维护和查询任务。

6. 包含 Logger 对象,用 logging.Logger 实现,用来记录日志。

数据库组件作为主程序的一部分,在程序启动时初始化,向程序其它部分提供数据库查询服务。程序退出时,主程序通过调用合适的界面函数向任务队列里加入一个 “毒药”,主控线程提取到这个 “毒药” 后,就会拒绝接受新的查询任务,并启动组件的退出过程。

所有的 Query 任务对象,不论执行哪种查询任务,都必须符合一定的接口规范,所以使用类的继承机制是个自然选择。以下是 Query 对象基类的设计:

# -*- coding: utf-8 -*-

import threading

class QueryBase:
	
	def __init__(self, ilogger, kargs):
		self._ilogger= ilogger		# 提交任务的客户线程携带的 Logger 对象。
		self._name= kargs['name']	# kargs 是客户线程提交的信息,dict 类型,不同派生类有不同的内容
		
		self._querystr= self._generate_querystr()	# 发往 MySQL Server 的输入
		self._outstr= ''		# MySQL Server 的输出

		# Parser 线程对象,先建立起来,暂时不运行。
		self._parser= threading.Thread(target= self._thread_parser, name= self._name + '_parser')

		self._result= None	# 根据 self._outstr 处理得到的 Python 数据结构,由 self._thread_parser() 负责填充

		self._querydone= threading.Event()	# 通知提交 Query 任务的客户线程任务已完成。由 self._thread_parser() 负责置位。



	def _generate_querystr(self):
		'''
		负责根据初始化参数 kargs 里的值生成要送往 MySQL Server 的输入语句。
		'''
		return ''


	def _thread_parser(self):
		'''
		负责对 self._outstr 进行处理,得到 self._result 数据结构,并最终置位 self._querydone。
		'''
		pass


	def wait(self):
		'''
		Database 模块的界面函数调用此函数来阻塞主调的客户线程,直到查询任务完成。
		'''
		self._querydone.wait()


	def start(self, dbif):
		'''
		此函数在 Database 模块的主控线程内运行,主控线程通过调用此函数来执行查询任务。
		
		参数:
			dbif: 由 Database 模块的主控线程提供的一个接口函数,本函数通过调用此函数来获得 MySQL 服务器的输出。

		执行过程:
			1. 利用 dbif 获得 MySQL 服务器的输出,放在 self._outstr 里面。
			2. 开始执行 self._parser 线程对象。self._parser 将负责填充 self._result 数据结构,并置位 self._querydone
			3. self._parser 开始执行后此函数就可以退出,将控制权交还 Database 模块主控线程。
		'''

		self._outstr= dbif(ilogger=self._ilogger, querystr=self._querystr)
		self._parser.start()		# 开始执行 self._parser 线程对象。



在 Python 3 程序里连上 MySQL 服务器

准备给程序添加数据库组件。因为该死的 MySQLdb 模块还不支持 Python 3, 只能暂时用土办法,通过 subprocess 模块连上 MySQL 服务器,然后用 stdin/stdout 做交流。基本的交互机制已经在测试程序上验证通过:

 

测试代码不长,就图里那一段。用 subprocess.Popen() 新开一个 MySQL 进程,发送一段 SQL 语句给它,接收输出并显示,当不再有输出时就退出。在实际的程序里可以不停地执行查询任务,靠外部条件来退出。底部的 Vim 窗口里加载的是 PythonScriptAgent 功能模块,用来实时运行主窗口内的 Python 程序并且显示输出。

要点:

1. 开启 MySQL 服务器时要传递 ‘--unbuffered’ 参数,因为是程序之间交互,这样 MySQL 有了输出以后不会自己暂存,会立即输出到 Python 进程。

2. 向 MySQL 发送数据用 Popen().stdin.write() 函数。注意尾部一定要有一个 '\n' 字符,底层的 I/O 靠这个来确认发送。否则 MySQL 收不到数据。另外,紧接着要有一个 Popen().stdin.flush() 操作,通知 I/O 机制不要暂存,立即发送,否则 MySQL 也收不到数据。这个很重要,曾经有许多测试代码达不到效果都是因为栽在这个上面。

3. 读取 MySQL 的输出要通过比较底层的 os.read() 操作,通过 Popen().stdout.read() 或者 Popen().stdout.readline() 这些高层的操作是不行的,至少没试成过。可能涉及到底层 I/O 的缓存机制,目前不知道具体为什么。select.select() 函数的作用是监听 MySQL 的输出,如果有就开始读。这种进程间交互的机制是从 Vim 插件 Conque 里学来的。

4. 发送到 MySQL 的语句中混有 SELECT '---------------'; 这样的语句,这在进程交互中起到标志位的作用,实际的 Python 程序可以靠这个来辨别,只要读到 ‘-----------------’ 这个标志字串,就说明属于一个查询任务的输出已经完毕,Python 程序可以开始发送下一个任务的语句。微软出的 SQL 版本(不知道具体叫什么)里面有 PRINT 语句,可以方便调试。MySQL 没有,但是用 SELECT 可以达到相似的效果。




Host by is-Programmer.com | Power by Chito 1.3.3 beta | © 2007 LinuxGem | Design by Matthew "Agent Spork" McGee