db的读写操作可以算是io型,对于简单的sql,获取数据的数据可能远小于传输的时间,针对这种操作,异步的访问方式就比较占优势了。本篇主要介绍在python中如何借助aiomysql来实现db的异步读写

I. 异步基本使用

1. 配置

本地测试时的环境参数如下

  • mac操作系统
  • python3.7.1
  • aiomysql 0.0.19
  • PyMySQL 0.9.2
  • asyncio 3.4.3

db参数

  • mysq 5.7.22
  • 配置参数: user=root, password=, port=6579, host=127.0.0.1, db=test

表信息及数据如下图

1
2
3
4
5
6
7
8
9
10
CREATE TABLE `user` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`name` varchar(80) NOT NULL DEFAULT '' COMMENT '用户名',
`pwd` varchar(26) NOT NULL DEFAULT '' COMMENT '密码',
`isDeleted` tinyint(1) NOT NULL DEFAULT '0',
`created` varchar(13) NOT NULL DEFAULT '0',
`updated` varchar(13) NOT NULL DEFAULT '0',
PRIMARY KEY (`id`),
KEY `name` (`name`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4;

2. 基础使用

在进入正式的操作之前,有必要先过一下使用步骤,先回顾一下使用PyMySql时的基本使用姿势

  • 首先是与db建立连接 pymysql.connect(host='127.0.0.1', port=3306, user='root', password='', db='test', charset='utf8')
  • 根据连接得到的connect来创建光标cursor
  • 然后通过cursor.execute('sql')来执行具体的sql语句
    • 针对增加,删除和修改的sql命令,需要额外的提交来connect.commit()来确保所有的操作被mysql接收并落盘
    • 如果是查询,则可以根据 cursor.fetchXXX来获取返回结果中的一条,多条或者所有数据
  • 最后回收线程,关闭连接

对于异步的操作,基本步骤其实和上面也差不多,无非是某些耗时的io操作可以使用await来修饰,从而借助协程来提升性能,那么一个简单的使用case可以如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import asyncio
import aiomysql

async def basic_test():
conn = await aiomysql.connect(host="127.0.0.1", port=3306,
user='root', password='',
db='test', charset='utf8')
cursor = await conn.cursor()
await cursor.execute('select * from user limit 3')
result = await cursor.fetchall()
for record in result:
print("record: ", record)
await cursor.close()
conn.close()


loop = asyncio.get_event_loop()
loop.run_until_complete(basic_test())

上面的使用姿势,和前面的对比,基本上没有什么区别,唯一的不同就是前面的直接调用;这里需要将调用封装成协程丢到事件循环中执行

下面将主要介绍下,如何使用连接池来实现各种db的异步操作

3. 初始化

接下来我们的目的是尽量开发一个通用的sql读写工具类,所以在脚本启动时,实现db的连接初始化;然后其他的db读写地方,从连接池中获取连接执行db操作,用完之后释放连接;最后在脚本结束时,关闭连接池实现资源回收

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
async def register():
'''
初始化,获取数据库连接池
:return:
'''
try:
print("start to connect db!")
pool = await aiomysql.create_pool(host='127.0.0.1', port=3306,
user='root', password='',
db='test', charset='utf8')
print("succeed to connect db!")
return pool
except asyncio.CancelledError:
raise asyncio.CancelledError
except Exception as ex:
print("mysql数据库连接失败:{}".format(ex.args[0]))
return False

4. 获取连接和Cursor

根据前面的学习,我们要操作db,前提是要现拿到连接connection, 然后通过connection来绑定光标cursor,最后通过cursor执行sql语句,所以我们接下来就是需要根据前面的pool来获取所需的参数

1
2
3
4
5
6
7
8
9
async def getCurosr(pool):
'''
获取db连接和cursor对象,用于db的读写操作
:param pool:
:return:
'''
conn = await pool.acquire()
cur = await conn.cursor()
return conn, cur

5. 批量写入操作

接下来的db操作就比较简单了,基本上和前面基础篇的使用姿势一样,并没有什么太大的区别

1
2
3
4
5
6
7
8
9
10
11
12
13
14
async def batchInsert(pool, sql, values):
start = now()
# 第一步获取连接和cursor对象
conn, cur = await getCurosr(pool)
try:
# 执行sql命令
await cur.executemany(sql, values)
await conn.commit()
# 返回sql执行后影响的行数
return cur.rowcount
finally:
# 最后不能忘记释放掉连接,否则最终关闭连接池会有问题
await pool.release(conn)
print("execute insert cost: ", now() - start)

简单看一下上面的使用姿势

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
now = lambda: time.time() * 1000

loop = asyncio.get_event_loop()
pool = loop.run_until_complete(register())

# 批量插入
records = [('一灰灰1', 'asdf', 0, int(time.time()), int(time.time())),
('一灰灰2', 'qwer', 0, int(time.time()), int(time.time()))]
sql = "insert into user(`name`, `pwd`, `isDeleted`, `created`, `updated`) values (%s, %s, %s, %s, %s)"
task = asyncio.ensure_future(batchInsert(pool, sql, records))
result = loop.run_until_complete(task)
print("insert res:", result)

# 最后关闭连接

loop.run_until_complete(close(pool))
loop.close()

输出结果如下

1
2
execute insert cost:  2.943115234375
insert res: 2

6. 查询

经过上面的写入操作之后,查询的逻辑也比较顺了,基本上也没有啥区别

1
2
3
4
5
6
7
8
9
10
11
12
13
14
async def query(pool, sql):
'''
查询, 一般流程是首先获取连接,光标,获取数据之后,则需要释放连接
:param pool:
:return:
'''
start = now()
conn, cur = await getCurosr(pool)
try:
await cur.execute(sql)
return await cur.fetchall()
finally:
await pool.release(conn)
print("execute %s cost %d" % (sql, now() - start))

对应的测试case如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
loop = asyncio.get_event_loop()
pool = loop.run_until_complete(register())

# 开始增删改查操作

# 查询
start = now()
tasks = [
asyncio.ensure_future(query(pool, 'select * from user where id<3 order by id desc limit 2')),
asyncio.ensure_future(query(pool, 'select * from user where id<7 order by id desc limit 2')),
asyncio.ensure_future(query(pool, 'select * from user where id<9 order by id desc limit 2')),
]
result = loop.run_until_complete(asyncio.gather(*tasks))
print("total cose: ", now() - start)
for res in result:
print("record: ", res)

# 最后关闭连接
loop.run_until_complete(close(pool))
loop.close()

输出结果如下:

1
2
3
4
5
6
7
8
9
start to connect db!
succeed to connect db!
execute select * from user where id<3 order by id desc limit 2 cost 3
execute select * from user where id<7 order by id desc limit 2 cost 3
execute select * from user where id<9 order by id desc limit 2 cost 3
total cose: 4.31298828125
record: ((2, '一灰灰_1542431296', '123456', 0, '1542431296', '1542431296'), (1, '一灰灰_1542431218', '123456', 0, '1542431218', '1542431218'))
record: ((6, '一灰灰_1542431296', '123456', 0, '1542431296', '1542431296'), (5, '一灰灰_1542431296', '123456', 0, '1542431296', '1542431296'))
record: ((8, '一灰灰_1542431296', '123456', 0, '1542431296', '1542431296'), (7, '一灰灰_1542431296', '123456', 0, '1542431296', '1542431296'))

7. 关闭

最后就是资源回收关闭,前面的测试用例中也都用到了

1
2
3
4
async def close(pool):
pool.close()
await pool.wait_closed()
print("close pool!")

II. 其他

参考

1. 一灰灰Bloghttps://liuyueyi.github.io/hexblog

一灰灰的个人博客,记录所有学习和工作中的博文,欢迎大家前去逛逛

2. 声明

尽信书则不如,已上内容,纯属一家之言,因个人能力有限,难免有疏漏和错误之处,如发现bug或者有更好的建议,欢迎批评指正,不吝感激

3. 扫描关注

一灰灰blog

QrCode

知识星球

goals