Jacky Liu's Blog
给程序添加了数据库组件
给程序添加了数据库组件,跑通了第一个测试任务。
数据库组件的内容:
1. 数据库组件包含一个数据库接口,用 subprocess.Popen() 对象实现,负责连上外部的 MySQL 服务器进程。具体见前面。
2. 包含一个任务队列,用 queue.Queue() 对象实现,内含 Query 任务对象,保证不同客户线程提交的 Query 任务被顺序执行。
3. 包含一个主控线程,用 threading.Thread() 对象实现,负责管理 Query 任务队列,逐个提取任务并执行。
4. 包含一个底层界面函数,负责向任务队列中添加任务对象。
5. 包含数目可扩展的多个高层界面函数,内部调用底层界面函数完成任务对象的添加,外部客户线程通过调用这些函数来完成数据库维护和查询任务。
6. 包含 Logger 对象,用 logging.Logger 实现,用来记录日志。
数据库组件作为主程序的一部分,在程序启动时初始化,向程序其它部分提供数据库查询服务。程序退出时,主程序通过调用合适的界面函数向任务队列里加入一个 “毒药”,主控线程提取到这个 “毒药” 后,就会拒绝接受新的查询任务,并启动组件的退出过程。
所有的 Query 任务对象,不论执行哪种查询任务,都必须符合一定的接口规范,所以使用类的继承机制是个自然选择。以下是 Query 对象基类的设计:
# -*- coding: utf-8 -*- import threading class QueryBase: def __init__(self, ilogger, kargs): self._ilogger= ilogger # 提交任务的客户线程携带的 Logger 对象。 self._name= kargs['name'] # kargs 是客户线程提交的信息,dict 类型,不同派生类有不同的内容 self._querystr= self._generate_querystr() # 发往 MySQL Server 的输入 self._outstr= '' # MySQL Server 的输出 # Parser 线程对象,先建立起来,暂时不运行。 self._parser= threading.Thread(target= self._thread_parser, name= self._name + '_parser') self._result= None # 根据 self._outstr 处理得到的 Python 数据结构,由 self._thread_parser() 负责填充 self._querydone= threading.Event() # 通知提交 Query 任务的客户线程任务已完成。由 self._thread_parser() 负责置位。 def _generate_querystr(self): ''' 负责根据初始化参数 kargs 里的值生成要送往 MySQL Server 的输入语句。 ''' return '' def _thread_parser(self): ''' 负责对 self._outstr 进行处理,得到 self._result 数据结构,并最终置位 self._querydone。 ''' pass def wait(self): ''' Database 模块的界面函数调用此函数来阻塞主调的客户线程,直到查询任务完成。 ''' self._querydone.wait() def start(self, dbif): ''' 此函数在 Database 模块的主控线程内运行,主控线程通过调用此函数来执行查询任务。 参数: dbif: 由 Database 模块的主控线程提供的一个接口函数,本函数通过调用此函数来获得 MySQL 服务器的输出。 执行过程: 1. 利用 dbif 获得 MySQL 服务器的输出,放在 self._outstr 里面。 2. 开始执行 self._parser 线程对象。self._parser 将负责填充 self._result 数据结构,并置位 self._querydone 3. self._parser 开始执行后此函数就可以退出,将控制权交还 Database 模块主控线程。 ''' self._outstr= dbif(ilogger=self._ilogger, querystr=self._querystr) self._parser.start() # 开始执行 self._parser 线程对象。