-
Notifications
You must be signed in to change notification settings - Fork 4
Helo quick start guide
Helo is a simple and small low-level asynchronous ORM using Python asyncio.
Helo can help you easily build expressive common SQL statements in your asynchronous applications. You only need to use friendly object-oriented APIs to manipulate data without caring about the details of SQL statement writing and data processing.
- Requires: Python 3.7+
- Only supports MySQL now, and the version is 5.7+
- Integration with web framework:
- Not supports table relationship now
Note: Using an asynchronous ORM does not completely mean that your application can be made faster, and it may complicate your application. Before moving on, you should read this blog post from Mike Bayer, the author of SQLAlchemy.
This guide will cover the following:
Using helo, first, you should to import helo
and instantiate a global variable with helo.G
, assuming that it is called db
:
Of course, in advance, you've got helo installed, If not, see the installation.
import helo
db = helo.G()
db
is a global singleton object, we will use it multiple times in the following inhelouction.
Defining a model using helo is simple, just inherit from helo.Model
, A few simple examples of model definitions are given belowοΌ
class Person(helo.Model):
id = helo.BigAuto()
name = helo.VarChar(length=45, null=False)
class User(Person):
email = helo.Email(default='')
password = helo.VarChar(length=100, null=False)
create_at = helo.Timestamp(default=helo.ON_CREATE)
class Meta:
indexes = [helo.K('idx_ep', ['email', 'password'])]
class Employee(Person):
department = helo.Smallint()
salary = helo.Float(default=0)
class Post(helo.Model):
id = helo.Auto(comment='auto increment pk')
title = helo.VarChar(length=100)
content = helo.Text(encoding=helo.ENCODING.UTF8MB4)
author = helo.Int(default=0)
create_at = helo.Timestamp(default=helo.ON_CREATE)
update_at = helo.Timestamp(default=helo.ON_UPDATE)
class Meta:
indexes = [
helo.K('idx_title', 'title'),
helo.K('idx_author', 'author'),
]
The inner class Meta
can be used to specify the meta attributes(db_name
, table_name
, engine
, indexes
, charset
, comment
, etc) of Table.
class Meta:
db = 'db_name'
name = 'table_name'
engine = helo.ENGINE.innodb
charset = helo.ENCODING.utf8mb4
indexes = []
comment = 'table comment'
Among them, table_name
defaults to the snake_case style name of the model class name,
and engine
defaults to InnoDB
, charset
defaults to utf8mb4
.
The model declaration only defines the mapping relationship between the model and the real table structure, not the tables are created in the database. To do this, we need to get connected to the database using helo first. Let's create a MySQL database instance:
>>> await db.bind('mysql://user:pwd@localhost:3306/helo')
Or by kwargsοΌ
>>> await db.bind(user='user', password='pwd', db='helo')
bind
actually creates a database connection pool for us:
If you set the environment variable HELO_DATABASE_URL
, you can:
>>> await db.bind()
If you want to customize the key, you can:
db = helo.G(env_key="YOUR_ENV_KEY")
>>> db.state
{'minsize': 1, 'maxsize': 15, 'size': 1, 'freesize': 1}
bind
provides us with many keyword arguments, see the helo.db.Pool class for details. E.g:
>>> await db.bind('mysql://user:pwd@127.0.0.1:3306/db', maxsize=10, connect_timeout=15)
The connection pool object that has been created will be a global singleton object, that is, if you have called bind
for your application to bind the database, before that if you did not use the unbind
for unbinding , You will no longer be able to use bind
to bind another database again, otherwise you will get a helo.err.DuplicateBinding
error.
If you need to explicitly disconnect from the database and close the connection pool, you can use the unbind
:
>>> await db.unbind()
In a small script, you can use db.binder
to automatically process the context:
>>> async with db.binder():
... pass
After the connection with the database is established, we need to create tables in the database for the convenience of data operation.
In application, the design, creation and maintenance of database tables are separated, and usually managed by special DBA. Of course, helo also provides basic DDL support.
Let's create them in the database:
>>> await db.create_tables([User, Employee, Post])
In a project, we usually put all the model definitions in a single module. Assuming the module name is models
, you can use create_all
to create tables for all models in the module:
>>> from your.application import models
>>> await db.create_all(models)
You can also use the create
method of Model
to create it:
>>> await User.create()
Helo provides the basic ability to operations data in the database, supports rich combination of logical operation expressions. You can easily complete expressive queries to achieve the ability to build SQL statements through object-oriented APIs.
The following example is the basic CRUD(Create, Retrieve, Update and Delete) operations.
With helo you have a variety of ways to insert data. Let's start by creating a User
object:
user = User(name='at7h', password='1111')
print(user.name, user.password) # at7h, 1111
# Now user.id is None, because it is not saved to the database
assert user.id is None
Now the user object is only an object in memory. You need to persist it to the database through the save
method:
user_id = await user.save()
assert user_id == user.id == 1
We can modify it and save the changes:
user.name = 'at8h'
user.email = 'g@at7h.com'
user_id = await user.save()
assert user_id == user.id == 1
Note: Now the
save
operation is implemented by MySQLREPLACE
statement, which decides whether to insert a new row according to the value of the object's PRIMARY KEY property or UNIQUE KEY property, please use it with caution! This implementation is planned to be optimized in subsequent releases.
The following methods are recommended for inserting data:
The methods add
and madd
can be used to add single or multiple of data. They are simple and quick ways of insert
, minsert
:
user_id = await User.add(name='bobo', password='2222')
# Or: user_id = await User.add({'name': 'bobo', 'password': '2222'})
print(user_id) # 2
users = [{'name': 'mingz', 'password': '3333'},
{'name': 'xy69z', 'password': '4444'}]
# Or using user object list:
# users = [User(name='mingz', password='3333'),
# User(name='xy69z', password='4444')]
count = await User.madd(users)
print(count) # 2
The methods insert
and minsert
are the most correct data insertion methods. They can be used for multiple data forms. They will return an Insert
object. To do this, don't forget to write do()
π :
ret = await User.insert(name='poper', password='5555').do()
# Or: ret = await User.insert({'name': 'bingo', 'password': '8888'}).do()
assert ret.affected == 1
assert ret.last_id == 5
print(ret) # (1, 5)
# Inserting multiple
employees = [
{'name': 'at7h', 'department': 1},
{'name': 'bobo', 'department': 2},
]
ret = await Employee.minsert(employees).do()
print(ret) # (2, 1)
# Specify row tuples columns the tuple values correspond to
posts = [
('post1', 1),
('post2', 2),
]
ret = await Post.minsert(
posts, columns=[Post.title, Post.author]
).do()
print(ret) # (2, 1)
Using insert_from
to support data filling between tables:
select = User.select(User.name).where(User.id.in_([3, 4, 5]))
ret = await Employee.insert_from(select, [Employee.name]).do()
print(ret) # (3, 3)
Helo also has multiple options for retrieving data. For example, to get a single data, you can use the get
method:
# By id
user = await User.get(1)
assert isinstance(user, User)
print(user.id, user.name, user.password) # 1, at7h, 1111
# Or by query
assert (await User.get(User.name == user.name)) == user
To get multiple data, use the mget
method:
# By id list
uid_list = [1, 2, 3]
users = await User.mget(uid_list)
print(users.count) # 3
print(users) # [<User object at 1>, <User object at 2>, <User object at 3>]
# Specify columns
users = await User.mget(uid_list, columns=[User.id, User.name])
assert users[0].password is None
# Or by query
users = await User.mget((User.id < 2) | (User.name == 'mingz'))
print(users) # [<User object at 1>, <User object at 3>]
Similarly, the methods get
and mget
are also simple shortcuts to select
, which are only suitable for scenarios where the known primary key or query condition is relatively simple. More often we need to using select
.
Using the select
method can help you easily construct your DQL in the form of an object-oriented API. It supports rich and composable logical conditional expressions.
users = await User.select().order_by(
User.id.desc()
).limit(3).offset(2).all()
print(users) # [<User object at 5>, <User object at 4>, <User object at 3>]
The method
all()
and the followingget()
,first()
,rows()
,paginate()
etc. are similar to thedo()
above, it is used to execute this query. Don't forget it.
For example, I need to know if there are any users using gmail mailbox:
is_exist = await User.select().where(
User.email.endswith('gmail.com')
).exist()
print(is_exist) # False
For example, I want to know how many users have been added since July 2019:
user_count = await User.select().where(
User.create_at > datetime(2019, 7, 1)
).count()
print(user_count) # 4
For another example, we need to paginate users who have written Python (title) related articles this year:
users = await User.select().where(
User.id.in_(
Post.select(Post.author).where(
Post.update_at > datetime(2019, 1, 1),
Post.title.contains('Python')
).order_by(
Post.update_at.desc()
)
)
).paginate(1, 10)
print(users) # [<User object at 1>]
For another example, we want to know how many articles each user wrote:
user_posts = await User.select(
User.name, helo.F.COUNT(helo.SQL('1')).as_('posts')
).join(
Post, helo.JOINTYPE.LEFT, on=(User.id == Post.author)
).group_by(
User.name
).rows(100)
print(user_posts) # [{'name': 'at7h', 'posts': 1}]
As shown above, we can use SQL functions with helo.F
.
For example, I need to calculate the total salary of all employees in each month:
salary_sum = await Employee.select(
helo.F.SUM(Employee.salary).as_('salary_sum')
).scalar()
print(salary_sum) # 30000.0
Next, let's try to make some modifications to the data in the database.
For example, you want to change the salary of an employee π :
ret = await Employee.update(salary=20000).where(
Employee.name == 'at7h'
).do()
print(ret.affected) # 1
The overall salary increase π :
ret = await Employee.update(
salary=Employee.salary + 1000
).where(
(Employee.department.in_([1, 2])) | (Employee.name == 'at7h')
).do()
Finally, let's try to delete the data in the table.
The first way, you can use the remove
method of the model object to delete the data corresponding to this row in the database:
user = User(name='at7h', password='1111')
await user.save()
user = await User.get(user_id)
print(user.id) # 1
await user.remove()
user = await User.get(user_id)
print(user) # None
Another, more common way is to use the delete
method:
ret = await Post.delete().where(
Post.create_at < datetime(2010, 1, 1)
).limit(
100
).do()
Note: Never forget the
where
clause, unless you want to truncate the whole table π !
Helo supports the MySQL REPLACE
statement, which provides two methods, replace
and mreplace
, whose usage is similar to insert
and minsert
. Before using them, you need to understand how MySQL REPLACE
statement works.
If you're using quart , a minimum application example is:
import quart
import helo
app = quart.Quart(__name__)
app.config["HELO_DATABASE_URL"] = "mysql://user:password@127.0.0.1:3306/db"
db = helo.G(app)
@app.route('/api/users')
async def users():
await User.insert(
name='at7h', email='g@test.com', password='xxxx'
).do()
user_list = await User.select().all(False)
return quart.jsonify(user_list)
app.run()
You no longer need to display calls db.bind
, the binding operation will be completed before the first request of your application.
Run it:
$ curl http://127.0.0.1:5000/api/users
[{"email":"g@test.com","id":1,"name":"at7h","password":"xxxx"}]
Both Model
and Select
in Helo support iteration. Helo automatically handles paging to avoid frequent IO operations and excessive data acquisition.
async for post in Post:
print(post)
# <Post object at 1>
# <Post object at 2>
# <Post object at 3>
# <Post object at 4>
users = User.select().where(User.id < 5).order_by(User.id.desc())
async for user in users:
print(user)
# <User object at 4>
# <User object at 3>
# <User object at 2>
# <User object at 1>
When using select
to retrieve data, helo wraps the row data into the corresponding the Model
object by default.
However, when you are using helo.F
functions and join
, helo may give up wrapping to the Model
objects and use the original helo.adict
objects. You can also use the wrap
argument to explicitly specify the row type of the adict
.
In large projects, this can significantly increase the speed and reduce the memory usage.
users = await User.select(User.id, User.name).limit(2).all(wrap=False)
print(users) # [{'id': 1, 'name': 'at7h'}, {'id': 2, 'name': 'bobo'}]
assert users[0].name == 'at7h'
employee = await Employee.select().order_by(
Employee.salary.desc()
).first(False)
print(employee)
# {'id': 1, 'name': 'at7h', 'department': 1, 'salary': 15000.0}
Sometimes you may be forced to execute some original SQL statements, then you can use the db.raw
function.
await db.raw("SELECT * FROM `user` WHERE `id` < %s;", params=[10])
Using helo, you can easily view the SQL statements executed.
The first way, when initializing db
, set debug=True
, so that you can see all executed SQL statements in the log output.
db = helo.G(debug=True)
The second way, it is mainly convenient for learning and debugging.
You can use the repr
function (or in the REPR environment) and the str
function to view objects such as Insert
, Update
, Select
etc.
Let us take the above example:
>>> q1 = Employee.update(
... salary=Employee.salary + 1000
... ).where(
... (Employee.department.in_([1, 2])) | (Employee.name == 'at7h')
... )
>>> q1
Query(UPDATE `employee` SET `salary` = (`salary` + %s) WHERE ((`department` IN %s) OR (`name` = %s)); % ((1000.0,), (1, 2), 'at7h'))
>>> q2 = User.select(
... User.name, helo.F.COUNT(helo.SQL('1')).as_('posts')
... ).join(
... Post, helo.JOINTYPE.LEFT, on=(User.id == Post.author)
... ).group_by(
... User.name
... )
>>> print(q2)
SELECT `t1`.`name`, COUNT(1) AS `posts` FROM `user` AS `t1` LEFT JOIN `post` AS `t2` ON (`t1`.`id` = `t2`.`author`) GROUP BY `t1`.`name`; % ()
π
This is the end of this quick start guide, if you are interested, you can follow the follow-up documentation.
Everyone is welcome to use it. If you have any questions, please feel free to communicate with me. You can give any issues or suggestions in any way.
Thanks π€