-
Notifications
You must be signed in to change notification settings - Fork 4
Helo 快速上手指南
Helo 是一个简单的小型低级别异步(asyncio) Python ORM。它非常直白,非常容易上手使用。
Helo 可以在你的异步应用中帮助你轻松的构建出富有表达力的常用 SQL 语句,你只需以友好的对象化 API 来操作数据,而不用关心 SQL 语句编写、数据处理等细节。
- 支持的版本: Python 3.7+
- 目前仅支持 MySQL, 版本 5.7+
- 与 web 框架的结合,目前支持:
- 目前不支持表关系操作
请注意:使用异步 ORM 并不完全意味着可以使你的应用变快,而且有可能会使你的应用变得复杂。继续之前,你可以先阅读下 SQLAlchemy 的作者 Mike Bayer 的 这篇博客文章。
本篇上手指南主要介绍以下方面:
使用 helo, 首先你需要引入 helo
并使用 helo.G
实例化一个全局变量,假定称其为 db
:
当然,提前是你已经获取安装了 helo。
import helo
db = helo.G()
db
是一个全局单例对象,下面的介绍中我们将多次使用到它。
使用 helo 声明模型很简单,只需从 helo.Model
继承即可,下面给出几个简单的模型声明的例子:
class Person(helo.Model):
id = helo.BigAuto()
name = helo.VarChar(length=45, null=False)
class User(Person):
email = helo.Email(default='')
password = helo.VarChar(length=100, null=False)
create_at = helo.Timestamp(default=helo.ON_CREATE)
class Meta:
indexes = [helo.K('idx_ep', ['email', 'password'])]
class Employee(Person):
department = helo.Smallint()
salary = helo.Float(default=0)
class Post(helo.Model):
id = helo.Auto(comment='auto increment pk')
title = helo.VarChar(length=100)
content = helo.Text(encoding=helo.ENCODING.UTF8MB4)
author = helo.Int(default=0)
create_at = helo.Timestamp(default=helo.ON_CREATE)
update_at = helo.Timestamp(default=helo.ON_UPDATE)
class Meta:
indexes = [
helo.K('idx_title', 'title'),
helo.K('idx_author', 'author'),
]
内部类 Meta
可用于指定 db_name
, table_name
, engine
, indexes
, charset
, comment
等
Table 的元属性。
class Meta:
db = 'db_name'
name = 'table_name'
engine = helo.ENGINE.innodb
charset = helo.ENCODING.utf8mb4
indexes = []
comment = 'table comment'
其中 table_name
默认为 model 类名的 snake_case 风格名称,engine
默认为 InnoDB
,charset
默认为 utf8mb4
。
前面的模型声明只是定义了模型与真实表结构的映射关系,并非实际在数据库中创建了这些表结构。为此,我们需要先使用 helo 来与数据库建立连接,这里我们创建一个 MySQL 的数据库实例:
>>> await db.bind('mysql://user:pwd@localhost:3306/helo')
或者传递配置参数:
>>> await db.bind(user='user', password='pwd', db='helo')
如果你设置了环境变量 HELO_DATABASE_URL
,那么你不用再传递 url:
>>> await db.bind()
如果你想自定义 KEY 的值,可以在初始化
db
时通过env_key
参数来设置db = helo.G(env_key="YOUR_ENV_KEY")
bind
实际上为我们创建了一个数据库连接池:
>>> db.state
{'minsize': 1, 'maxsize': 15, 'size': 1, 'freesize': 1}
bind
给我们提供了很多关键字参数来允许我们自定义设置,详见 helo.db.Pool 类。例如:
>>> await db.bind('mysql://user:pwd@127.0.0.1:3306/db', maxsize=10, connect_timeout=15)
已经创建的连接池对象将是一个全局的单例对象,也就是说如果你已经为你的应用程序调用 bind
绑定了数据库,在此之前如果你没有使用 unbind
进行解绑,你将不能再继续使用 bind
再次绑定另一个数据库,否则你将会得到一个 helo.err.DuplicateBinding
错误。
如果你需要显式地断开与数据库的连接,关闭连接池,可以使用 unbind
:
>>> await db.unbind()
在小型的脚本中你可以使用 db.binder
来自动处理上下文:
>>> async with db.binder():
... pass
与数据库建立了连接之后,我们需要在数据库创建我们的表,以便于接下来进行数据的操作。
在真正的应用中,数据库表的设计创建与维护是单独分开,一般由专门的 DBA 来管理,当然 helo 也提供了基础的 DDL 支持。
下面我们在数据库创建它们:
>>> await db.create_tables([User, Employee, Post])
在应用项目中,我们通常将所有的模型声明单独放在一个模块中,在此假设模块名为 models
,则可以使用 create_all
为模块中所有的 model 创建表:
>>> from your.application import models
>>> await db.create_all(models)
当然你也可以使用 Model
的 create
方法来单独创建:
>>> await User.create()
Helo 提供了基本的操作数据库中数据的能力,支持丰富的可组合的逻辑运算表达,你可以轻松的完成富有表达力的 queries,以实现通过对象化的 API 来构建你想要的 SQL 语句(DML 和 DQL)的能力。
下面示例基本的增删改查的操作。
使用 helo 你可以有多种插入数据的方式选择,我们从创建一个 User
对象开始:
user = User(name='at7h', password='1111')
print(user.name, user.password) # at7h, 1111
# Now user.id is None, because it is not saved to the database
assert user.id is None
此时的 user 仅是内存中的一个对象,你需要通过 save
方法持久化到数据库中:
user_id = await user.save()
assert user_id == user.id == 1
我们可以修改它,并保存更改:
user.name = 'at8h'
user.email = 'g@at7h.com'
user_id = await user.save()
assert user_id == user.id == 1
请注意: 目前
save
操作是通过 MySQLREPLACE
语句实现,其根据对象的 PRIMARY KEY 属性或 UNIQUE KEY 属性的值来决定是否插入新行,请谨慎使用!该实现计划在后续版本中优化。
推荐使用下面几种方式来插入数据。
方法 add
, madd
可以用来添加单条或多条数据,它们是 insert
, minsert
的简单快捷方式:
user_id = await User.add(name='bobo', password='2222')
# Or: user_id = await User.add({'name': 'bobo', 'password': '2222'})
print(user_id) # 2
users = [{'name': 'mingz', 'password': '3333'},
{'name': 'xy69z', 'password': '4444'}]
# Or using user object list:
# users = [User(name='mingz', password='3333'),
# User(name='xy69z', password='4444')]
count = await User.madd(users)
print(count) # 2
方法 insert
和 minsert
是最正确的数据插入姿势,它们可以胜任多种数据形式,它们将返回一个 Insert
对象,要执行此操作,请不要忘了写 do()
哦 😉:
ret = await User.insert(name='poper', password='5555').do()
# Or: ret = await User.insert({'name': 'bingo', 'password': '8888'}).do()
assert ret.affected == 1
assert ret.last_id == 5
print(ret) # (1, 5)
# Inserting multiple
employees = [
{'name': 'at7h', 'department': 1},
{'name': 'bobo', 'department': 2},
]
ret = await Employee.minsert(employees).do()
print(ret) # (2, 1)
# Specify row tuples columns the tuple values correspond to
posts = [
('post1', 1),
('post2', 2),
]
ret = await Post.minsert(
posts, columns=[Post.title, Post.author]
).do()
print(ret) # (2, 1)
使用 insert_from
支持表间数据填充:
select = User.select(User.name).where(User.id.in_([3, 4, 5]))
ret = await Employee.insert_from(select, [Employee.name]).do()
print(ret) # (3, 3)
Helo 也有多种获取数据的方式选择,如简单获取单条数据可以使用 get
方法:
# By id
user = await User.get(1)
assert isinstance(user, User)
print(user.id, user.name, user.password) # 1, at7h, 1111
# Or by query
assert (await User.get(User.name == user.name)) == user
获取多条数据可以使用 mget
方法:
# By id list
uid_list = [1, 2, 3]
users = await User.mget(uid_list)
print(users.count) # 3
print(users) # [<User object at 1>, <User object at 2>, <User object at 3>]
# Specify columns
users = await User.mget(uid_list, columns=[User.id, User.name])
assert users[0].password is None
# Or by query
users = await User.mget((User.id < 2) | (User.name == 'mingz'))
print(users) # [<User object at 1>, <User object at 3>]
同样的,方法 get
和 mget
也是 select
的简单快捷版本,其只适合于已知主键或查询条件比较简单的场景,更多的时候我们还是需要使用 select
。
使用 select
方法可以帮助你以对象化 API 的方式轻松的构造你的 DQL,其支持丰富的可组合的逻辑条件表达式。
users = await User.select().order_by(
User.id.desc()
).limit(3).offset(2).all()
print(users) # [<User object at 5>, <User object at 4>, <User object at 3>]
方法
all()
以及下面提到的get()
,first()
,rows()
,paginate()
等方法类似于上面提到的do()
,都用于驱动执行此次查询,不要忘了哦。
比如我需要知道有没有使用 gmail 邮箱的用户:
is_exist = await User.select().where(
User.email.endswith('gmail.com')
).exist()
print(is_exist) # False
比如我想知道 2019 年 7 月以来共新增了多少用户:
user_count = await User.select().where(
User.create_at > datetime(2019, 7, 1)
).count()
print(user_count) # 4
再比如我们需要分页的获取今年写了 Python(title) 相关文章的用户:
users = await User.select().where(
User.id.in_(
Post.select(Post.author).where(
Post.update_at > datetime(2019, 1, 1),
Post.title.contains('Python')
).order_by(
Post.update_at.desc()
)
)
).paginate(1, 10)
print(users) # [<User object at 1>]
再比如我们想知道每个用户都写了多少篇文章:
user_posts = await User.select(
User.name, helo.F.COUNT(helo.SQL('1')).as_('posts')
).join(
Post, helo.JOINTYPE.LEFT, on=(User.id == Post.author)
).group_by(
User.name
).rows(100)
print(user_posts) # [{'name': 'at7h', 'posts': 1}]
如上所示,我们可以通过 helo.F
来使用 SQL 函数,比如我需要计算出每个月所有雇员薪资的总和:
salary_sum = await Employee.select(
helo.F.SUM(Employee.salary).as_('salary_sum')
).scalar()
print(salary_sum) # 30000.0
接下来,让我们尝试对数据库中的数据做一些修改操作。
比如你要更改某一位雇员的薪资 😋 :
ret = await Employee.update(salary=20000).where(
Employee.name == 'at7h'
).do()
print(ret.affected) # 1
或者,整体涨工资啦 👏:
ret = await Employee.update(
salary=Employee.salary + 1000
).where(
(Employee.department.in_([1, 2])) | (Employee.name == 'at7h')
).do()
最后我们来尝试删除表中的数据。
第一种方式,你可以使用 model 对象的 remove
方法来删除它对应于数据库中这一行的数据:
user = User(name='at7h', password='1111')
await user.save()
user = await User.get(user_id)
print(user.id) # 1
await user.remove()
user = await User.get(user_id)
print(user) # None
另一种更为通常的方式是使用 delete
方法:
ret = await Post.delete().where(
Post.create_at < datetime(2010, 1, 1)
).limit(
100
).do()
请注意: 永远不要忘记写
where
子句,是不是整个表都不想要了 😟 ?
另外,helo 支持 MySQL REPLACE
语句,提供了 replace
和 mreplace
两个方法,其用法与 insert
和 minsert
类似。当然,在使用它们之前你需要了解 MySQL REPLACE
语句的工作原理。
如果你正在使用 quart, 一个最小的应用示例是:
import quart
import helo
app = quart.Quart(__name__)
app.config["HELO_DATABASE_URL"] = "mysql://user:password@127.0.0.1:3306/db"
db = helo.G(app)
@app.route('/api/users')
async def users():
await User.insert(
name='at7h', email='g@test.com', password='xxxx'
).do()
user_list = await User.select().all(False)
return quart.jsonify(user_list)
app.run()
此时你不需要再显示的执行 db.bind
,binding 操作将会在你应用的第一个请求之前自动完成。
启动此服务:
$ curl http://127.0.0.1:5000/api/users
[{"email":"g@test.com","id":1,"name":"at7h","password":"xxxx"}]
Helo 中的 Model
和 Select
都支持迭代,helo 会自动帮你处理分页问题,以避免频繁的 IO 操作和过大的数据量获取。
async for post in Post:
print(post)
# <Post object at 1>
# <Post object at 2>
# <Post object at 3>
# <Post object at 4>
users = User.select().where(User.id < 5).order_by(User.id.desc())
async for user in users:
print(user)
# <User object at 4>
# <User object at 3>
# <User object at 2>
# <User object at 1>
当你使用 select
获取数据时,helo 默认会将行数据包装成为对应的 Model
对象,但是,当你使用了 helo.F
函数和 join
时可能会放弃加载到 Model
对象而使用原始的 helo.adict
字典。当然,你也可以通过 wrap
参数来显式指定使用字典类型的 row type。在大型项目中,这可能会显著提高速度并减少内存的使用。
users = await User.select(User.id, User.name).limit(2).all(wrap=False)
print(users) # [{'id': 1, 'name': 'at7h'}, {'id': 2, 'name': 'bobo'}]
assert users[0].name == 'at7h'
employee = await Employee.select().order_by(
Employee.salary.desc()
).first(False)
print(employee)
# {'id': 1, 'name': 'at7h', 'department': 1, 'salary': 15000.0}
有时你可能迫不得已想要执行一些原始的 SQL 语句,那么你可以使用 db.raw
函数来实现。
await db.raw("SELECT * FROM `user` WHERE `id` < %s;", params=[10])
为方便调试,有时候需要查看执行的 SQL,在 helo 中,你可以:
第一种方式,在初始化 db
时,设置 debug
为 True
即可,这样你将在日志输出中看到执行过的所有 SQL 语句。
db = helo.G(debug=True)
第二种方式主要是方便学习和调试,你可以使用 repr
函数(或在 REPR 环境中)和 str
函数来查看 Insert
, Update
, Select
等对象,我们拿上面的示例来举个例子:
>>> q1 = Employee.update(
... salary=Employee.salary + 1000
... ).where(
... (Employee.department.in_([1, 2])) | (Employee.name == 'at7h')
... )
>>> q1
Query(UPDATE `employee` SET `salary` = (`salary` + %s) WHERE ((`department` IN %s) OR (`name` = %s)); % ((1000.0,), (1, 2), 'at7h'))
>>> q2 = User.select(
... User.name, helo.F.COUNT(helo.SQL('1')).as_('posts')
... ).join(
... Post, helo.JOINTYPE.LEFT, on=(User.id == Post.author)
... ).group_by(
... User.name
... )
>>> print(q2)
SELECT `t1`.`name`, COUNT(1) AS `posts` FROM `user` AS `t1` LEFT JOIN `post` AS `t2` ON (`t1`.`id` = `t2`.`author`) GROUP BY `t1`.`name`; % ()
👋
至此,本篇上手指南就到此结束,感兴趣的同学可以关注后续的使用文档。
十分欢迎大家的使用,问题可随时与我交流,欢迎以任何形式提出任何问题或建议。
感谢 🤝