db的读写操作可以算是io型,对于简单的sql,获取数据的数据可能远小于传输的时间,针对这种操作,异步的访问方式就比较占优势了。本篇主要介绍在python中如何借助aiomysql来实现db的异步读写
I. 异步基本使用 1. 配置 本地测试时的环境参数如下
mac操作系统 
python3.7.1 
aiomysql 0.0.19 
PyMySQL 0.9.2 
asyncio 3.4.3 
 
db参数 
mysq 5.7.22  
配置参数: user=root, password=, port=6579, host=127.0.0.1, db=test 
 
表信息及数据如下图
1 2 3 4 5 6 7 8 9 10 CREATE  TABLE  `user`  (  `id`  int (11 ) unsigned  NOT  NULL  AUTO_INCREMENT,   `name`  varchar (80 ) NOT  NULL  DEFAULT  ''  COMMENT  '用户名' ,   `pwd`  varchar (26 ) NOT  NULL  DEFAULT  ''  COMMENT  '密码' ,   `isDeleted`  tinyint (1 ) NOT  NULL  DEFAULT  '0' ,   `created`  varchar (13 ) NOT  NULL  DEFAULT  '0' ,   `updated`  varchar (13 ) NOT  NULL  DEFAULT  '0' ,   PRIMARY KEY  (`id` ),   KEY  `name`  (`name` ) ) ENGINE =InnoDB  AUTO_INCREMENT=1  DEFAULT  CHARSET =utf8mb4; 
2. 基础使用 在进入正式的操作之前,有必要先过一下使用步骤,先回顾一下使用PyMySql时的基本使用姿势
首先是与db建立连接 pymysql.connect(host='127.0.0.1', port=3306, user='root', password='', db='test', charset='utf8') 
根据连接得到的connect来创建光标cursor 
然后通过cursor.execute('sql')来执行具体的sql语句
针对增加,删除和修改的sql命令,需要额外的提交来connect.commit()来确保所有的操作被mysql接收并落盘 
如果是查询,则可以根据 cursor.fetchXXX来获取返回结果中的一条,多条或者所有数据 
 
 
最后回收线程,关闭连接 
 
对于异步的操作,基本步骤其实和上面也差不多,无非是某些耗时的io操作可以使用await来修饰,从而借助协程来提升性能,那么一个简单的使用case可以如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 import  asyncioimport  aiomysqlasync  def  basic_test () :    conn = await  aiomysql.connect(host="127.0.0.1" , port=3306 ,                                   user='root' , password='' ,                                   db='test' , charset='utf8' )     cursor = await  conn.cursor()     await  cursor.execute('select * from user limit 3' )     result = await  cursor.fetchall()     for  record in  result:         print("record: " , record)     await  cursor.close()     conn.close() loop = asyncio.get_event_loop() loop.run_until_complete(basic_test()) 
上面的使用姿势,和前面的对比,基本上没有什么区别,唯一的不同就是前面的直接调用;这里需要将调用封装成协程丢到事件循环中执行
下面将主要介绍下,如何使用连接池来实现各种db的异步操作
3. 初始化 接下来我们的目的是尽量开发一个通用的sql读写工具类,所以在脚本启动时,实现db的连接初始化;然后其他的db读写地方,从连接池中获取连接执行db操作,用完之后释放连接;最后在脚本结束时,关闭连接池实现资源回收
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 async  def  register () :    '''      初始化,获取数据库连接池     :return:     '''     try :         print("start to connect db!" )         pool = await  aiomysql.create_pool(host='127.0.0.1' , port=3306 ,                                     user='root' , password='' ,                                     db='test' , charset='utf8' )         print("succeed to connect db!" )         return  pool     except  asyncio.CancelledError:         raise  asyncio.CancelledError     except  Exception as  ex:         print("mysql数据库连接失败:{}" .format(ex.args[0 ]))         return  False  
4. 获取连接和Cursor 根据前面的学习,我们要操作db,前提是要现拿到连接connection, 然后通过connection来绑定光标cursor,最后通过cursor执行sql语句,所以我们接下来就是需要根据前面的pool来获取所需的参数
1 2 3 4 5 6 7 8 9 async  def  getCurosr (pool) :    '''      获取db连接和cursor对象,用于db的读写操作     :param pool:     :return:     '''     conn = await  pool.acquire()     cur = await  conn.cursor()     return  conn, cur 
5. 批量写入操作 接下来的db操作就比较简单了,基本上和前面基础篇的使用姿势一样,并没有什么太大的区别
1 2 3 4 5 6 7 8 9 10 11 12 13 14 async  def  batchInsert (pool, sql, values) :    start = now()          conn, cur = await  getCurosr(pool)     try :                  await  cur.executemany(sql, values)         await  conn.commit()                  return  cur.rowcount     finally :                  await  pool.release(conn)         print("execute insert cost: " , now() - start) 
简单看一下上面的使用姿势
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 now = lambda : time.time() * 1000  loop = asyncio.get_event_loop() pool = loop.run_until_complete(register())   records = [('一灰灰1' , 'asdf' , 0 , int(time.time()), int(time.time())),            ('一灰灰2' , 'qwer' , 0 , int(time.time()), int(time.time()))] sql = "insert into user(`name`, `pwd`, `isDeleted`, `created`, `updated`) values (%s, %s, %s, %s, %s)"  task = asyncio.ensure_future(batchInsert(pool, sql, records)) result = loop.run_until_complete(task) print("insert res:" , result) loop.run_until_complete(close(pool)) loop.close() 
输出结果如下
1 2 execute insert cost:  2.943115234375 insert res: 2 
6. 查询 经过上面的写入操作之后,查询的逻辑也比较顺了,基本上也没有啥区别
1 2 3 4 5 6 7 8 9 10 11 12 13 14 async  def  query (pool, sql) :    '''      查询, 一般流程是首先获取连接,光标,获取数据之后,则需要释放连接     :param pool:     :return:     '''     start = now()     conn, cur = await  getCurosr(pool)     try :         await  cur.execute(sql)         return  await  cur.fetchall()     finally :         await  pool.release(conn)         print("execute %s cost %d"  % (sql, now() - start)) 
对应的测试case如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 loop = asyncio.get_event_loop() pool = loop.run_until_complete(register()) start = now() tasks = [     asyncio.ensure_future(query(pool, 'select * from user where id<3 order by id desc limit 2' )),     asyncio.ensure_future(query(pool, 'select * from user where id<7 order by id desc limit 2' )),     asyncio.ensure_future(query(pool, 'select * from user where id<9 order by id desc limit 2' )), ] result = loop.run_until_complete(asyncio.gather(*tasks)) print("total cose: " , now() - start) for  res in  result:    print("record: " , res) loop.run_until_complete(close(pool)) loop.close() 
输出结果如下:
1 2 3 4 5 6 7 8 9 start to connect db! succeed to connect db! execute select * from user where  id<3 order by id desc limit  2 cost 3 execute select * from user where  id<7 order by id desc limit  2 cost 3 execute select * from user where  id<9 order by id desc limit  2 cost 3 total cose:  4.31298828125 record:  ((2, '一灰灰_1542431296' , '123456' , 0, '1542431296' , '1542431296' ), (1, '一灰灰_1542431218' , '123456' , 0, '1542431218' , '1542431218' )) record:  ((6, '一灰灰_1542431296' , '123456' , 0, '1542431296' , '1542431296' ), (5, '一灰灰_1542431296' , '123456' , 0, '1542431296' , '1542431296' )) record:  ((8, '一灰灰_1542431296' , '123456' , 0, '1542431296' , '1542431296' ), (7, '一灰灰_1542431296' , '123456' , 0, '1542431296' , '1542431296' )) 
7. 关闭 最后就是资源回收关闭,前面的测试用例中也都用到了
1 2 3 4 async  def  close (pool) :    pool.close()     await  pool.wait_closed()     print("close pool!" ) 
II. 其他 参考
一灰灰的个人博客,记录所有学习和工作中的博文,欢迎大家前去逛逛
2. 声明 尽信书则不如,已上内容,纯属一家之言,因个人能力有限,难免有疏漏和错误之处,如发现bug或者有更好的建议,欢迎批评指正,不吝感激
3. 扫描关注 一灰灰blog 
知识星球