SQLAlchemy 是 Python 生态系统中最流行的 ORM。SQLAlchemy 设计非常优雅,分为了两部分——底层的
Core 和上层的传统 ORM。在 Python 乃至其他语言的大多数 ORM 中,都没有实现很好的分层设计,
比如 django 的 ORM,数据库链接和 ORM 本身完全混在一起。
SQLAlchemy 当前版本是 1.4. 有两套 API,1.3 及之前的老 API 和 1.4 及之后的 2.0 兼容 API。
本文中只介绍 2.0 API。
为什么要有 Core
Core 层主要实现了客户端连接池。我们知道,作为现代 Web 应用的核心,关系型数据库的并发连接
能力往往并不是很强,一般不要搞好多短连接过去,大多数情况下需要一个连接池。连接池大体分为
两种,一种在服务端,也就是一个专门的连接池中间件,给短连接每次分配一个长链接复用。另一种
在客户端,一般作为第三方库引入代码中。SQLAlchemy 的连接池显然是客户端的。在这个连接池中,
SQLAlchemy 维护了一定数量的长连接,当调用
connect
时,实际是从池子中取出了一个链接,
调用 close 时,实际是放回到了池子中一个链接。
创建链接
SQLAlchemy 中使用
create_engine
来创建连接(池)。create_engine 的参数是数据库的 URL。
from sqlalchemy import create_engine
engine = create_engine(
"mysql://user:password@localhost:3306/dbname",
echo=True,
future=True,
pool_size=5,
pool_recycle=3600,
engine = create_engine("sqlite:///:memory:", echo=True, future=True,
connect_args={"check_same_thread": False})
engine = create_engine('mysql+mysqldb://user:password@localhost/foo?charset=utf8mb4')
Core 层 -- 直接使用 SQL
CRUD
from sqlachemy import text
with engine.connect() as conn:
result = conn.execute(text("select * from users"))
print(result.all())
for row in result:
print(row.x, row.y)
print(row[0], row[1])
print(row["x"], row["y"])
result = conn.execute(
text("SELECT x, y FROM some_table WHERE y > :y"),
{"y": 2}
stmt = text("SELECT x, y FROM some_table WHERE y > :y ORDER BY x, y").bindparams(y=6)
conn.execute(
text("INSERT INTO some_table (x, y) VALUES (:x, :y)"),
[{"x": 11, "y": 12}, {"x": 13, "y": 14}]
)
事务与 commit
SQLAlchemy 提供两种提交的方式,一种是手工 commit,一种是半自动 commit。官方文档建议使用
engine.begin()。还有一种完全自动的,每一行提交一次的 autocommit 方式,不建议使用。
with engine.connect() as conn:
conn.execute(text("CREATE TABLE some_table (x int, y int)"))
conn.execute(
text("INSERT INTO some_table (x, y) VALUES (:x, :y)"),
[{"x": 1, "y": 1}, {"x": 2, "y": 4}]
conn.commit()
with engine.begin() as conn:
conn.execute(
text("INSERT INTO some_table (x, y) VALUES (:x, :y)"),
[{"x": 6, "y": 8}, {"x": 9, "y": 10}]
)
ORM
Session
不是
线程安全的。但是一般情况下,web 框架应该在每个请求开始时获取一个 session,
所以也不是问题。
from sqlalchemy.orm import Session
with Session(engine) as session:
session.add(foo)
session.commit()
from sqlachemy.orm import sessionmaker
new_session = sessionmaker(engine)
with new_session() as session:
...
声明式 API
-
使用
__tablename__
指定数据库表名
-
使用 Column 声明每个字段
-
使用 Integer/String... 指定字段类型
-
使用 index 参数指定索引
-
使用 unique 参数指定唯一索引
-
使用
__table_args__
指定其他属性,比如联合索引
from datetime import datetime
from sqlalchemy import Integer, Column, String, func, UniqueConstraint
from sqlalchemy.orm import declarative_base, relationship
Base = declarative_base()
class User(Base):
__tablename__ = "users"
__table_args__ = (UniqueConstraint("name", "time_created"),)
id = Column(Integer, primary_key=True)
name = Column(String(30), index=True)
fullname = Column(String, unique=True)
description = deferred(Column(Text))
time_created = Column(DateTime(Timezone=True), default=datetime.now)
time_created = Column(DateTime(timezone=True), server_default=func.now())
time_updated = Column(DateTime(timezone=True), onupdate=func.now())
class Address(Base):
__tablename__ = "address"
id = Column(Integer, primary_key=True)
email_address = Column(String, nullable=False)
Base.metadata.create_all(engine)
User.__table__.create(engine)
外键
使用 relationship 来制定模型之间的关联关系。
一对多关系的双向映射
from sqlalchemy import create_engine, Column, Integer, String, ForeignKey
from sqlalchemy.orm import declarative_base, relationship, Session
Base = declarative_base()
class Group(Base):
__tablename__ = 'groups'
id = Column(Integer, primary_key=True)
name = Column(String)
members = relationship('User')
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String)
group_id = Column(Integer, ForeignKey('groups.id'))
group = relationship('Group', overlaps="members")
多对多映射,需要使用一个关联表。
class UserPermissions(Base):
__tablename__ = 'user_permissions'
id = Column(Integer, primary_key=True)
user_id = Column(Integer, ForeignKey('users.id'))
permission_id = Column(String, ForeignKey('permissions.id'))
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String)
permissions = relationship('Permission', secondary="user_permissions", overlaps="users")
class Permission(Base):
__tablename__ = 'permissions'
id = Column(Integer, primary_key=True)
name = Column(String)
users = relationship('User', secondary="user_permissions", overlaps="permissions")
user1 = User(name='user1', group_id=1)
user2 = User(name='user2')
group1 = Group(name='group1')
group2 = Group(name='group2', members=[user2])
permission1 = Permission(name="open_file")
permission2 = Permission(name="save_file")
user1.permissions.append(permission1)
db.add_all([user1, user2, group1, group2, permission1, permission2])
db.commit()
print(user1.permissions[0].id)
其他的教程中大多使用
backref
来生成对应模型的属性,我并不喜欢这样,更倾向于在对应的模型中
都显式声明可以访问的属性。
CRUD
和 1.x API 不同,2.0 API 中不再使用 query,而是使用 select 来查询数据。网上的诸多教程还在
讲解 query API,最好不要参考这些教程了。
from sqlalchemy import select
stmt = select(User).where(User.name == "john").order_by(User.id)
stmt = select(User).filter_by(name="some_user")
result = session.execute(stmt)
for user in result.scalars():
print(user.name)
result = session.execute(select(User.name))
for row in result:
print(row.name)
user = session.get(User, pk=1)
from sqlalchemy import update
stmt = update(User).where(User.name == "john").values(name="John").execution_options(synchronize_session="fetch")
session.execute(stmt)
user.name = "John"
session.commit()
user.visit_count += 1
user.visit_count = User.visit_count + 1
session.add(user)
session.add_all([user1, user2, group1])
session.flush()
print(user.id)
session.delete(user)
加载外键关联模型
如果我们在读取一个 N 个记录的列表之后,再去数据库中一一读取每个项目的具体值,就会产生 N+1 个
查询。这就是数据库中最常犯的错误:N+1 问题。
默认情况下,查询中不会加载外键关联的模型,可以使用 selectinload 选项来加载外键,从而避免
N+1 问题。
select(Model).options(selectinload(Model.field))
- session.execute(select(User)).scalars().all() # 没有加载 parent 外键
+ session.execute(select(User).options(selectinload(User.groups))).scalars().all()
Selectinload 的原理在于使用了
select in
子查询,这也是名字的又来。除了 selectinload 外,
还可以使用传统的 joinedload,它的原理就是最普通的 join table
# 使用 joinedload 加载外键,注意需要使用 unique 方法,这是 2.0 中规定的。
session.execute(select(User).options(joinedload(User.groups))).unique().scalars().all()
在 2.0 中,更推荐使用 selectinload 而不是 joinedload,一般情况下,selectinload 都要好,
而且不用使用 unique.
外键的写入
SQLAlchemy 中,直接像处理数组一样处理外键就好了,这点非常方便。
user.permissions.append(open_permission)
user.permissions.remove(save_permission)
user.permissions.clear()
user.permissions = []
JSON 字段的特殊处理
大多数的数据库现在都支持 JSON 字段了,在 SQLAlchemy 中我们也可以直接从字段读取 json 对象
或者写入 json 对象。但是,千万不要直接对这个 json 对象做 update 并期望写回数据库中,这是
不可靠的。一定要复制后读写,然后再赋值回去。
article = session.get(Article, 1)
tags = copy.copy(article.tags)
tags.append("iOS")
article.tags = tags
session.commit()
批量插入
当需要插入大量数据的时候,如果依然采用逐个插入的方法,那么就会在和数据库的交互上浪费很多
时间,效率很低。MySQL 等大多数数据库都提供了
insert ... values (...), (...) ...
这种
批量插入的 API,在 SQLAlchemy 中也可以很好地利用这一点。
s = Session()
objects = [
User(name="u1"),
User(name="u2"),
User(name="u3")
s.bulk_save_objects(objects)
s.commit()
users = [
{"name": "u1"},
{"name": "u2"},
{"name": "u3"},
s.bulk_insert_mappings(User, users)
s.commit()
session.bulk_update_mappings(User, users)
从 1.X API 迁移到 2.0 API
- session.query(User).get(42)
+ session.get(User, 42)
- session.query(User).all()
+ session.execute(select(User)).scalars().all()
- session.query(User).filter_by(name="some_user").one()
+ session.execute(select(User).filter_by(name="some_user")).scalar_one()
- session.query(User).from_statememt(text("select * from users")).a..()
+ session.execute(select(User).from_statement(text("selct * from users"))).scalars().all()
- session.query(User).filter(User.name == "foo").update({"fullname": "FooBar"}, synchronize_session="evaluate")
+ session.execute(update(User).where(User.name == "foo").values(fullname="FooBar").execute_options(synchronize_session="evaluate"))
import
SQLAlchemy 的导出类型主要分布在以下三个部分:
from sqlalchemy import text, insert, select, create_engine, Column, Integer, String
from sqlalchemy.orm import declarative_base, Session, sessionmaker