添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

在2.x的SQLALchemy中,查询语法为:

db.session.execute(db.select(...)) 构造一个查询从数据库中获取数据。

另外:查询是 SQLAlchemy 的功能,因此您需要阅读其有关 select 的教程来了解所有相关信息。

*常使用 Result.scalars() 方法来获取查询结果列表,使用 Result.scalar() 方法来获取单个结果。

1.x语法:

您可能会看到使用 模型类.query 来构建查询。这是一个较旧的查询接口,在 SQLAlchemy 中被视为遗留。

总结1.x和2.x语法区别:

1.x语法: 模型类.query() 是等同于 db.session.query(模型类, <查询语法>)

因为Flask-SQLAlchemy 向每个模型添加一个 query 对象。用于查询给定模型的实例。如: User.query db.session.query(User) 的快捷方式。

2.x语法: db.session.execute() 是sqlalchemy 2.x版本后的语法,flask-sqlalchemy3.x版本都是基于sqlalchemy 2.x的语法使用。

  • 2.x语法更加接*我们*时使用sql语句去查询数据。特别要注意的是2.x的各个方法调用都要按顺序,和sql一样,如where要在group_by前调用。
  • 使用 模型类.query() 语法最大的问题是没有比较完善的语法提示。

    旧版本的flask-sqlalchemy是基于sqlalchemy 1.x版本的,但是新版本依旧可以使用旧版本的查询语法。

    常用查询语句

    1.x和2.x语法相互参考对比: https://docs.sqlalchemy.org/en/20/changelog/migration_20.html#migration-orm-usage

    获取全部、一个数据的模型类

    # 查看所有对象的所有数据
    User.query.all() # flask-sqlalchemy 提供的快捷方式,实际上就是1.x的语法
    db.session.query(User).all() # 1.x 语法
    db.session.execute(db.select(User)).scalars().all() # 2.x 语法
    # 查询第一个用户
    User.query.first() # 返回的是用户模型类
    db.session.query(User).first()
    db.session.execute(db.select(User)).scalar()
    # 获取一个,如果不止一个则抛出异常
    db.session.query(User).one()
    # 获取一个,如果一个都没有获取到则返回none
    db.session.query(User).one_or_none()
    

    过滤查询filter、where方法

    基本语法:

    1.x语法:
    db.session.query(<模型类>).filter(<表达式>,<表达式>,...).all()|first()
    2.x语法:
    db.session.execute(sa.select(<模型类>).filter(<表达式>,<表达式>,...)).scalars().all()|scalar()
    表达式的组成一般为:
      <模型类.字段><表达式><值>
    多个表达式等同于你where的条件是and.
      还有一种方法就是连续多个filter().filter() 也可以实现and运算。
      2.x语法中,filter()方法等同于where()方法。where()更加接*你*时使用sql语句查询的语法。
    
    # 查询user.id 大于等于3的所有用户
    db.session.query(User).filter(User.id >= 3).all()
    db.session.execute(sa.select(User).filter(User.id>=3)).scalars().all()
    # 查询user.id 大于等于3的所有用户(select * from user where user.id>=3 and user.age>10)
    db.session.query(User).filter(User.id>=3, User.age>10)).all()
    db.session.execute(sa.select(User).filter(User.id>=3, User.age>10)).scalars().all()
    # 查看user.id不等于1的用户
    db.session.execute(sa.select(User).where(User.id != 1)).scalars().all()
    # 使用not_()来取反也是可以的
    db.session.execute(sa.select(User).where(sa.not_(User.id == 1))).scalars().all()
    

    空值、非空值判断

    # 空判断
    db.session.query(User).filter(User.gender==None).all()
    db.session.query(User).filter(User.gender.is_(None)).all()
    # 非空判断
    db.session.query(User).filter(User.gender!=None).all()
    db.session.query(User).filter(User.gender.isnot(None)).all()
    db.session.query(User).filter(sa.not_(User.gender==None)).all()
    

    模糊查询like、startswith、endswith

    # like(), sql中的%%
    pline("like() like表达式")
    print(User.query.filter(User.name.like("%g%")).all())
    # endswith()
    # 实际上用的也是like.., SQL:可以看到只是拼接一个like表达式字符串而已, WHERE (users.name LIKE concat(%(name_1)s, '%%'))
    pline("endswith() 字段结尾是否包含指定字符串")
    print(db.session.query(User).filter(User.name.endswith("g")).all())
    # startswith()
    pline("startswith() 字段开头是否包含指定字符串")
    print(db.session.query(User).filter(User.name.startswith("w")).all())
    # contains()
    # 实际上用的也是like..  SQL:WHERE (users.name LIKE concat('%%', %(name_1)s, '%%'))
    pline("contains() 字段是否包含指定字符串")
    print(User.query.filter(User.name.contains("n")).all())
    

    逻辑运算and_、or_、not_

    与运算and_()方法用于将多个条件组合在一起。

    # 方式一:直接在filter中使用分号,来给定多个表达式可以实现逻辑and运算
    db.session.query(User).filter(User.name.startswith("li"), User.email.startswith("li")).all()
    # 方式二:使用and_()方法,向方法参数中传递多个表达式
    from sqlalchemy import and_
    db.session.query(User).filter(sa.and_(User.name.startswith("li"), User.email.startswith("li"))).all()
    

    或运算必须使用or_()方法,这点和django的是一样的,django只能使用Q()对象 然后用 | 连接多个Q对象。

    # sql:SELECT * FROM users WHERE users.age > 20 or users.email like 'li%'
    # 1.x
    User.query.filter(sa.or_(User.age > 20, User.email.startswith("li"))).all()
    # 2.x
    db.session.execute(sa.select(User).where(
        sa.or_(
            User.age > 20, 
            User.email.startswith("li")
    )).scalars().all()
    

    取反not_()

    # 查询user.id不大于3的所有用户
    # sql: select * from users where not users.id>3
    db.session.query(User).filter(sa.not_(User.id > 3)).all()
    db.session.execute(sa.select(User).where(sa.not_(User.id > 3))).scalars().all()
    

    in_() 、notin_()查询

    # 查询用户id在1,2,3集合中的用户
    db.session.execute(
      sa.select(User).where(
        User.id.in_([1, 2, 3])
    ).scalars().all()
    # 查询用户id不在1,2,3集合中的用户
    db.session.execute(
      sa.select(User).where(
        User.id.notin_([1, 2, 3])
    ).scalars().all()
    

    排序order_by()

    # 根据用户id倒序排序
    # SELECT * FROM user ORDER BY user.id DESC
    db.session.query(User).order_by(User.id.desc()).all()
    db.session.query(User).order_by(sa.desc(User.id)).all()
    # 2.x语法
    db.session.execute(sa.select(User).order_by(User.id.desc())).scalars().all()
    db.session.execute(sa.select(User).order_by(sa.desc(User.id))).scalars().all()
    

    limit()、offset() 以及slice()

    # limit限制返回数
    db.session.execute(sa.select(User).limit(2)).scalars().all()
    # offset 偏移
    db.session.execute(sa.select(User).offset(2)).scalars().all()
    # slice(offset, limit) 这个方法是将limit和offset组合在一起了。表名意思就是切片。
    db.session.query(User).order_by(User.name).slice(1, 3).all()
    db.session.execute(sa.select(User).order_by(User.name).slice(1, 3)).scalars().all()
    

    分页 pagination对象

    没错...sqlalchemy给我们提供了分页查询的对象。

    分页对象只能通过SQLAlchemy.paginate() and Query.paginate()方法来创建。返回的对象是Pagination.

    Pagination类常用属性:

    page: int
    per_page: int
        每页多少条数据
    items: list[Any]
        当前页面上的项目。迭代分页对象相当于迭代当前页的所有项目。
    total: int | None
       所有页数的总项目数。也就是所有查询结果的总数量。
    pages: int
       分页的总页数。
    has_prev: bool
       是否还有上一页
    prev_num: int | None
       上一页的页码数,没有上一页就是None
    has_next: bool
       是否还有下一页
    next_num: int | None
       下一页的页码数,没有下一页就是None
    

    Pagination类常用方法:

    prev(*, error_out=False)
      查询上一页的 Pagination 对象
    next(*, error_out=False)
      查询下一页的 Pagination 对象
    

    分页查询实例

    # 1.x 语法
    # page: 第几页, 默认为1
    # per_page: 每页多少条数据,默认为20
    # max_per_page: 限制per_page的最大值,默认为100
    pn = db.session.query(User).paginate(page=1, per_page=2, max_per_page=10)
    pn2 = db.paginate(sa.select(User).order_by(User.id.desc()), page=2, per_page=3, max_per_page=10)
    # 直接迭代pn就等同于迭代当前页中的项目
    for item in pn:
      print(item)
    相等于下面的代码
    for item in pn.items:
      print(item)
    

    group_by()、having()、聚合函数

    group_by()、count()、max()、min()、sum()、avg() 等方法使用

    count()

    from sqlalchemy import func
    ## count() 数量统计
    # SELECT count(*) FROM user
    User.query.count() # count(*)
    db.session.query(User).count() # count(*)
    # 这样只能count某一列
    # SELECT count(user.id) FROM user
    db.session.scalar(sa.select(func.count(User.id)))
    # 这样就是count(*)啦
    # count(*)是包含null值的。
    db.session.scalar(sa.select(func.count()).select_from(User))
    # count User records, without
    # using a subquery.
    db.session.query(func.count(User.id))
    # return count of user "id" grouped
    # by "name"
    db.session.query(func.count(User.id)).\
            group_by(User.name)
    from sqlalchemy import distinct
    # count distinct "name" values
    #  SELECT count(DISTINCT users.name) AS count_1 FROM users
    db.session.query(func.count(distinct(User.name)))
    

    group_by()

    # 按照用户的性别分组并统计每组的人数
    # select users.gender, count(users.gender) from users group by users.gender
    >>> ret = db.session.query(User.gender, sa.func.count(User.gender)).group_by(User.gender).all()
    # 返回的结果是列表:里面嵌套一个Row对象,Row对象类似一个元组
    [(0, 4), (1, 6)]
    >>> type(ret[0])    
    <class 'sqlalchemy.engine.row.Row'>
    >>> ret[0]._fields
    ('gender',)
    # 可以通过.的方式获取,但是可以发现并没有count字段的属性,是因为这种都需要自己指定,类似sql中的as
    >>> ret[0].gender
    >>> ret = db.session.query(User.gender, sa.func.count(User.gender).label("count")).group_by(User.gender).all()
    >>> ret[0]._fields
    ('gender', 'count')
    >>> ret[0].count
    

    sum()、max()、min()、avg()

    # max()
    db.session.query(User.gender, sa.func.max(User.age).label("max_age")).group_by(User.gender).all()
    # min()
    db.session.query(User.gender, sa.func.min(User.age).label("min_age")).group_by(User.gender).all()
    # sum()
    db.session.query(User.gender, sa.func.sum(User.age).label("sum_age")).group_by(User.gender).all()
    # avg()
    db.session.query(User.gender, sa.func.avg(User.age).label("avg_age")).group_by(User.gender).all()
    

    having() 分组后过滤

    # 查看素有用户,按照年龄分组,并统计每个年龄组的总数,要求年龄大于20
    # SELECT user.age, count(*) as count FROM user group by user.age having user.age > 20
    db.session.execute(sa.select(User.age, sa.func.count("*").label("count")).group_by(User.age).having(User.age > 20)).all()
    

    使用原生SQL语句查询

    django中也有,是模型类.objects.raw(<sql语句>)

    # 1.x
    db.session.query(User).\
      from_statement(
        text("select * from users")
      all()
    # 2.x
    db.session.scalars(
      select(User).
      from_statement(
        text("select * from users")
    ).all()
    # 还可以使用变量
    db.session.query(User).from_statement(
        sa.text("select * from users where users.id > :num or users.age > :age").bindparams(num=2, age=30)
    ).all()
    

    使用变量查询:

     db.session.query(User).filter(sa.text("id>:id")).params(id=1).all()
     db.session.query(User).from_statement(
        sa.text("select * from users where users.id > :num or users.age > :age").bindparams(num=2, age=30)
    ).all()