181120-Python之Mysql异步使用篇aiomysql使用小结

文章目录
  1. I. 异步基本使用
    1. 1. 配置
    2. 2. 基础使用
    3. 3. 初始化
    4. 4. 获取连接和Cursor
    5. 5. 批量写入操作
    6. 6. 查询
    7. 7. 关闭
  2. II. 其他
    1. 1. 一灰灰Blog: https://liuyueyi.github.io/hexblog
    2. 2. 声明
    3. 3. 扫描关注

db的读写操作可以算是io型,对于简单的sql,获取数据的数据可能远小于传输的时间,针对这种操作,异步的访问方式就比较占优势了。本篇主要介绍在python中如何借助aiomysql来实现db的异步读写

I. 异步基本使用

1. 配置

本地测试时的环境参数如下

  • mac操作系统
  • python3.7.1
  • aiomysql 0.0.19
  • PyMySQL 0.9.2
  • asyncio 3.4.3

db参数

  • mysq 5.7.22
  • 配置参数: user=root, password=, port=6579, host=127.0.0.1, db=test

表信息及数据如下图

1
2
3
4
5
6
7
8
9
10
CREATE TABLE `user` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`name` varchar(80) NOT NULL DEFAULT '' COMMENT '用户名',
`pwd` varchar(26) NOT NULL DEFAULT '' COMMENT '密码',
`isDeleted` tinyint(1) NOT NULL DEFAULT '0',
`created` varchar(13) NOT NULL DEFAULT '0',
`updated` varchar(13) NOT NULL DEFAULT '0',
PRIMARY KEY (`id`),
KEY `name` (`name`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4;

2. 基础使用

在进入正式的操作之前,有必要先过一下使用步骤,先回顾一下使用PyMySql时的基本使用姿势

  • 首先是与db建立连接 pymysql.connect(host='127.0.0.1', port=3306, user='root', password='', db='test', charset='utf8')
  • 根据连接得到的connect来创建光标cursor
  • 然后通过cursor.execute('sql')来执行具体的sql语句
    • 针对增加,删除和修改的sql命令,需要额外的提交来connect.commit()来确保所有的操作被mysql接收并落盘
    • 如果是查询,则可以根据 cursor.fetchXXX来获取返回结果中的一条,多条或者所有数据
  • 最后回收线程,关闭连接

对于异步的操作,基本步骤其实和上面也差不多,无非是某些耗时的io操作可以使用await来修饰,从而借助协程来提升性能,那么一个简单的使用case可以如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import asyncio
import aiomysql

async def basic_test():
conn = await aiomysql.connect(host="127.0.0.1", port=3306,
user='root', password='',
db='test', charset='utf8')
cursor = await conn.cursor()
await cursor.execute('select * from user limit 3')
result = await cursor.fetchall()
for record in result:
print("record: ", record)
await cursor.close()
conn.close()


loop = asyncio.get_event_loop()
loop.run_until_complete(basic_test())

上面的使用姿势,和前面的对比,基本上没有什么区别,唯一的不同就是前面的直接调用;这里需要将调用封装成协程丢到事件循环中执行

下面将主要介绍下,如何使用连接池来实现各种db的异步操作

3. 初始化

接下来我们的目的是尽量开发一个通用的sql读写工具类,所以在脚本启动时,实现db的连接初始化;然后其他的db读写地方,从连接池中获取连接执行db操作,用完之后释放连接;最后在脚本结束时,关闭连接池实现资源回收

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
async def register():
'''
初始化,获取数据库连接池
:return:
'''
try:
print("start to connect db!")
pool = await aiomysql.create_pool(host='127.0.0.1', port=3306,
user='root', password='',
db='test', charset='utf8')
print("succeed to connect db!")
return pool
except asyncio.CancelledError:
raise asyncio.CancelledError
except Exception as ex:
print("mysql数据库连接失败:{}".format(ex.args[0]))
return False

4. 获取连接和Cursor

根据前面的学习,我们要操作db,前提是要现拿到连接connection, 然后通过connection来绑定光标cursor,最后通过cursor执行sql语句,所以我们接下来就是需要根据前面的pool来获取所需的参数

1
2
3
4
5
6
7
8
9
async def getCurosr(pool):
'''
获取db连接和cursor对象,用于db的读写操作
:param pool:
:return:
'''
conn = await pool.acquire()
cur = await conn.cursor()
return conn, cur

5. 批量写入操作

接下来的db操作就比较简单了,基本上和前面基础篇的使用姿势一样,并没有什么太大的区别

1
2
3
4
5
6
7
8
9
10
11
12
13
14
async def batchInsert(pool, sql, values):
start = now()
# 第一步获取连接和cursor对象
conn, cur = await getCurosr(pool)
try:
# 执行sql命令
await cur.executemany(sql, values)
await conn.commit()
# 返回sql执行后影响的行数
return cur.rowcount
finally:
# 最后不能忘记释放掉连接,否则最终关闭连接池会有问题
await pool.release(conn)
print("execute insert cost: ", now() - start)

简单看一下上面的使用姿势

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
now = lambda: time.time() * 1000

loop = asyncio.get_event_loop()
pool = loop.run_until_complete(register())

# 批量插入
records = [('一灰灰1', 'asdf', 0, int(time.time()), int(time.time())),
('一灰灰2', 'qwer', 0, int(time.time()), int(time.time()))]
sql = "insert into user(`name`, `pwd`, `isDeleted`, `created`, `updated`) values (%s, %s, %s, %s, %s)"
task = asyncio.ensure_future(batchInsert(pool, sql, records))
result = loop.run_until_complete(task)
print("insert res:", result)

# 最后关闭连接

loop.run_until_complete(close(pool))
loop.close()

输出结果如下

1
2
execute insert cost:  2.943115234375
insert res: 2

6. 查询

经过上面的写入操作之后,查询的逻辑也比较顺了,基本上也没有啥区别

1
2
3
4
5
6
7
8
9
10
11
12
13
14
async def query(pool, sql):
'''
查询, 一般流程是首先获取连接,光标,获取数据之后,则需要释放连接
:param pool:
:return:
'''
start = now()
conn, cur = await getCurosr(pool)
try:
await cur.execute(sql)
return await cur.fetchall()
finally:
await pool.release(conn)
print("execute %s cost %d" % (sql, now() - start))

对应的测试case如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
loop = asyncio.get_event_loop()
pool = loop.run_until_complete(register())

# 开始增删改查操作

# 查询
start = now()
tasks = [
asyncio.ensure_future(query(pool, 'select * from user where id<3 order by id desc limit 2')),
asyncio.ensure_future(query(pool, 'select * from user where id<7 order by id desc limit 2')),
asyncio.ensure_future(query(pool, 'select * from user where id<9 order by id desc limit 2')),
]
result = loop.run_until_complete(asyncio.gather(*tasks))
print("total cose: ", now() - start)
for res in result:
print("record: ", res)

# 最后关闭连接
loop.run_until_complete(close(pool))
loop.close()

输出结果如下:

1
2
3
4
5
6
7
8
9
start to connect db!
succeed to connect db!
execute select * from user where id<3 order by id desc limit 2 cost 3
execute select * from user where id<7 order by id desc limit 2 cost 3
execute select * from user where id<9 order by id desc limit 2 cost 3
total cose: 4.31298828125
record: ((2, '一灰灰_1542431296', '123456', 0, '1542431296', '1542431296'), (1, '一灰灰_1542431218', '123456', 0, '1542431218', '1542431218'))
record: ((6, '一灰灰_1542431296', '123456', 0, '1542431296', '1542431296'), (5, '一灰灰_1542431296', '123456', 0, '1542431296', '1542431296'))
record: ((8, '一灰灰_1542431296', '123456', 0, '1542431296', '1542431296'), (7, '一灰灰_1542431296', '123456', 0, '1542431296', '1542431296'))

7. 关闭

最后就是资源回收关闭,前面的测试用例中也都用到了

1
2
3
4
async def close(pool):
pool.close()
await pool.wait_closed()
print("close pool!")

II. 其他

参考

1. 一灰灰Bloghttps://liuyueyi.github.io/hexblog

一灰灰的个人博客,记录所有学习和工作中的博文,欢迎大家前去逛逛

2. 声明

尽信书则不如,已上内容,纯属一家之言,因个人能力有限,难免有疏漏和错误之处,如发现bug或者有更好的建议,欢迎批评指正,不吝感激

3. 扫描关注

一灰灰blog

QrCode

知识星球

goals

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×