SQLAlchemy 2.0 文档 – 异步IO支持 (二)

其他 piniu 28浏览 0评论

概要 – ORM

通过使用2.0风格的查询,AsyncSession类提供了完整的对象关系映射(ORM)功能。

在默认使用模式下,必须特别注意避免涉及ORM关系和列属性的延迟加载或其他过期属性访问;下一节“使用AsyncSession时防止隐式IO”将详细介绍这一点。

警告
在多个并发任务中使用单个AsyncSession实例是不安全的。有关背景信息,请参阅“在并发任务中使用AsyncSession”和“Session线程安全吗?在并发任务中共享AsyncSession安全吗?”两节。

下面的示例展示了一个完整的实例,包括映射器和会话配置:

from __future__ import annotations

import asyncio
import datetime
from typing import List

from sqlalchemy import ForeignKey
from sqlalchemy import func
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncAttrs
from sqlalchemy.ext.asyncio import async_sessionmaker
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import mapped_column
from sqlalchemy.orm import relationship
from sqlalchemy.orm import selectinload


class Base(AsyncAttrs, DeclarativeBase):
    pass

class B(Base):
    __tablename__ = "b"
    id: Mapped[int] = mapped_column(primary_key=True)
    a_id: Mapped[int] = mapped_column(ForeignKey("a.id"))
    data: Mapped[str]

class A(Base):
    __tablename__ = "a"
    id: Mapped[int] = mapped_column(primary_key=True)
    data: Mapped[str]
    create_date: Mapped[datetime.datetime] = mapped_column(server_default=func.now())
    bs: Mapped[List[B]] = relationship()

async def insert_objects(async_session: async_sessionmaker[AsyncSession]) -> None:
    async with async_session() as session:
        async with session.begin():
            session.add_all(
                [
                    A(bs=[B(data="b1"), B(data="b2")], data="a1"),
                    A(bs=[], data="a2"),
                    A(bs=[B(data="b3"), B(data="b4")], data="a3"),
                ]
            )


async def select_and_update_objects(
    async_session: async_sessionmaker[AsyncSession],
) -> None:
    async with async_session() as session:
        stmt = select(A).order_by(A.id).options(selectinload(A.bs))
        result = await session.execute(stmt)
        for a in result.scalars():
            print(a, a.data)
            print(f"created at: {a.create_date}")
            for b in a.bs:
                print(b, b.data)
        result = await session.execute(select(A).order_by(A.id).limit(1))
        a1 = result.scalars().one()
        a1.data = "new data"
        await session.commit()
        # access attribute subsequent to commit; this is what
        # expire_on_commit=False allows
        print(a1.data)
        # alternatively, AsyncAttrs may be used to access any attribute
        # as an awaitable (new in 2.0.13)
        for b1 in await a1.awaitable_attrs.bs:
            print(b1, b1.data)


async def async_main() -> None:
    engine = create_async_engine("sqlite+aiosqlite://", echo=True)
    # async_sessionmaker: a factory for new AsyncSession objects.
    # expire_on_commit - don't expire objects after transaction commit
    async_session = async_sessionmaker(engine, expire_on_commit=False)
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)
    await insert_objects(async_session)
    await select_and_update_objects(async_session)
    # for AsyncEngine created in function scope, close and
    # clean-up pooled connections
    await engine.dispose()


asyncio.run(async_main())
BEGIN (implicit)
...
CREATE TABLE a (
    id INTEGER NOT NULL,
    data VARCHAR NOT NULL,
    create_date DATETIME DEFAULT CURRENT_TIMESTAMP NOT NULL,
    PRIMARY KEY (id)
)
...
CREATE TABLE b (
    id INTEGER NOT NULL,
    a_id INTEGER NOT NULL,
    data VARCHAR NOT NULL,
    PRIMARY KEY (id),
    FOREIGN KEY(a_id) REFERENCES a (id)
)
...
COMMIT
BEGIN (implicit)
INSERT INTO a (data) VALUES (?) RETURNING id, create_date
[...] ('a1',)
...
INSERT INTO b (a_id, data) VALUES (?, ?) RETURNING id
[...] (1, 'b2')
...
COMMIT
BEGIN (implicit)
SELECT a.id, a.data, a.create_date
FROM a ORDER BY a.id
[...] ()
SELECT b.a_id AS b_a_id, b.id AS b_id, b.data AS b_data
FROM b
WHERE b.a_id IN (?, ?, ?)
[...] (1, 2, 3)
<A object at ...> a1
created at: ...
<B object at ...> b1
<B object at ...> b2
<A object at ...> a2
created at: ...
<A object at ...> a3
created at: ...
<B object at ...> b3
<B object at ...> b4
SELECT a.id, a.data, a.create_date
FROM a ORDER BY a.id
LIMIT ? OFFSET ?
[...] (1, 0)
UPDATE a SET data=? WHERE a.id = ?
[...] ('new data', 1)
COMMIT
new data
<B object at ...> b1
<B object at ...> b2

在上述示例中,使用可选的async_sessionmaker助手函数实例化了AsyncSession,该函数为具有固定参数集的新AsyncSession对象提供了一个工厂,这里的参数集包括将其与特定数据库URL的AsyncEngine相关联。然后,该对象被传递给其他方法,在Python异步上下文管理器(即async with:语句)中使用,以便在块结束时自动关闭;这相当于调用AsyncSession.close()方法。

在并发任务中使用AsyncSession

AsyncSession对象是一个可变的、有状态的对象,它表示正在进行的单个有状态的数据库事务。在使用asyncio的并发任务时,例如使用asyncio.gather()等API,应为每个单独的任务使用一个单独的AsyncSession。

有关Session和AsyncSession在并发工作负载中的使用方式的一般描述,请参阅“Session线程安全吗?在并发任务中共享AsyncSession安全吗?”一节。

在使用AsyncSession时防止隐式IO

在使用传统的asyncio时,应用程序需要避免任何可能发生属性访问时IO操作的情况。以下是一些可用来帮助避免这种情况的技术,其中许多技术在前面的示例中已有说明。

・对于那些具有延迟加载关系、延迟列或表达式,或在过期场景下被访问的属性,可以利用AsyncAttrs混入。当将此混入添加到特定类中,或更广泛地添加到Declarative Base超类中时,它会提供一个访问器AsyncAttrs.awaitable_attrs,该访问器可将任何属性作为可等待对象传递:

from __future__ import annotations

from typing import List

from sqlalchemy.ext.asyncio import AsyncAttrs
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import relationship


class Base(AsyncAttrs, DeclarativeBase):
    pass


class A(Base):
    __tablename__ = "a"

    # ... rest of mapping ...

    bs: Mapped[List[B]] = relationship()


class B(Base):
    __tablename__ = "b"

    # ... rest of mapping ...

在不使用即时加载的情况下,访问新加载的A实例上的A.bs集合通常会使用延迟加载,而延迟加载为了成功通常会向数据库发出IO操作,这在asyncio环境下会失败,因为asyncio不允许隐式IO。要在asyncio环境下直接访问此属性而无需任何预先加载操作,可以通过指定AsyncAttrs.awaitable_attrs前缀将该属性作为可等待对象来访问:

a1 = (await session.scalars(select(A))).one()
for b1 in await a1.awaitable_attrs.bs:
    print(b1)

AsyncAttrs混入为内部方法提供了一个简洁的外观,该方法也被AsyncSession.run_sync()方法所使用。

另见
异步属性

・通过使用SQLAlchemy 2.0中的“只写关系”特性,集合可以被替换为永远不会隐式发出IO的只写集合。使用此特性,集合永远不会被读取,只能通过显式SQL调用进行查询。有关与asyncio一起使用的只写集合的示例,请参阅“Asyncio集成”部分中的示例async_orm_writeonly.py。

在使用只写集合时,程序在集合方面的行为简单且易于预测。然而,缺点在于没有内置系统可以一次性加载多个这样的集合,而是需要手动执行。因此,下面的许多要点都是针对在使用asyncio的传统延迟加载关系时需要特别注意的具体技巧。

・如果不使用AsyncAttrs,可以用lazy=”raise”来声明关系,这样默认情况下它们就不会尝试执行SQL查询。为了加载集合,应使用即时加载。

・最实用的预加载策略是selectinload()预加载器,在前面的示例中,我们使用了该预加载器在await session.execute()调用的范围内预加载A.bs集合:

stmt = select(A).options(selectinload(A.bs))

・在构造新对象时,集合总是被分配一个默认的空集合,如上例中的列表:

A(bs=[], data="a2")

这允许在刷新上述A对象时,该对象上的.bs集合能够存在且可读;否则,当刷新A对象时,.bs将被卸载,并在访问时引发错误。

・AsyncSession的配置中,Session.expire_on_commit被设置为False,这样我们就可以在调用AsyncSession.commit()后访问对象的属性,就像最后那行访问属性的代码一样:

# create AsyncSession with expire_on_commit=False
async_session = AsyncSession(engine, expire_on_commit=False)

# sessionmaker version
async_session = async_sessionmaker(engine, expire_on_commit=False)

async with async_session() as session:
    result = await session.execute(select(A).order_by(A.id))

    a1 = result.scalars().first()

    # commit would normally expire all attributes
    await session.commit()

    # access attribute subsequent to commit; this is what
    # expire_on_commit=False allows
    print(a1.data)

其他指南包括:

・如果确实需要过期处理,应避免使用像AsyncSession.expire()这样的方法,而应使用AsyncSession.refresh()。通常不需要进行过期处理,因为在使用asyncio时,Session.expire_on_commit通常应设置为False。

・如果将所需的属性名称显式传递给 Session.refresh.attribute_names,则可以在 asyncio 下使用 AsyncSession.refresh() 显式加载延迟加载的关系,例如:

# assume a_obj is an A that has lazy loaded A.bs collection
a_obj = await async_session.get(A, [1])

# force the collection to load by naming it in attribute_names
await async_session.refresh(a_obj, ["bs"])

# collection is present
print(f"bs collection: {a_obj.bs}")

当然,最好预先使用预加载,以便在无需延迟加载的情况下,就已经准备好集合。

2.0.4版本新增功能:新增对AsyncSession.refresh()和底层Session.refresh()方法的支持,以强制加载延迟加载的关系(如果这些关系在Session.refresh.attribute_names参数中明确命名)。在之前的版本中,即使关系在参数中命名,也会被默认跳过。

・避免使用Cascades中记录的“全部级联”选项,而是明确列出所需的级联特性。该“全部级联”选项意味着(其中包括)刷新-过期设置,这意味着在relationship()中未配置为立即加载的情况下,AsyncSession.refresh()方法将使相关对象的属性过期,但并不一定会刷新这些相关对象,从而使它们处于过期状态。

・除了上述提到的relationship()构造外,如果使用了deferred()列,则应为其采用适当的加载器选项。有关延迟加载列的背景信息,请参阅限制哪些列使用列延迟加载。

・在动态关系加载器中描述的“动态”关系加载策略默认情况下与asyncio方法不兼容。只有在asyncio下运行同步方法和函数中描述的AsyncSession.run_sync()方法内调用时,或者通过使用其.statement属性获取常规select语句时,才能直接使用该策略:

user = await session.get(User, 42)
addresses = (await session.scalars(user.addresses.statement)).all()
stmt = user.addresses.statement.where(Address.email_address.startswith("patrick"))
addresses_filter = (await session.scalars(stmt)).all()

SQLAlchemy 2.0版本中引入的只写技术完全兼容asyncio,应优先考虑使用。

另见
“动态”关系加载器被“只写”模式取代——迁移到2.0版本的注意事项

・如果将asyncio与不支持RETURNING的数据库(如MySQL 8)一起使用,除非使用Mapper.eager_defaults选项,否则新刷新对象上的服务器默认值(如生成的时间戳)将不可用。在SQLAlchemy 2.0中,当插入行时,这种行为会自动应用于使用RETURNING来获取新值的后端,如PostgreSQL、SQLite和MariaDB。

在asyncio下运行同步方法和函数

深度炼金术
这种方法本质上是在公开暴露SQLAlchemy最初能够提供asyncio接口的机制。虽然这样做在技术上没有问题,但总体而言,这种方法可能被视为“有争议的”,因为它违背了asyncio编程模型的一些核心理念,即任何可能导致IO调用的编程语句都必须包含await调用,以免程序没有明确指出每行代码中可能发生IO的位置。这种方法并没有改变这一总体思路,只是允许在函数调用的范围内对一系列同步IO指令免除这一规则,实质上是将它们打包成一个可等待的对象。

作为将传统SQLAlchemy“延迟加载”集成到asyncio事件循环中的一种替代方法,提供了一个名为AsyncSession.run_sync()的可选方法,该方法将在greenlet中运行任何Python函数,其中传统的同步编程概念在到达数据库驱动程序时将被转换为使用await。这里的一个假设方法是,面向asyncio的应用程序可以将与数据库相关的方法打包成函数,这些函数通过AsyncSession.run_sync()调用。

改变上述示例,如果我们不对A.bs集合使用selectinload(),我们可以在一个单独的函数中完成对这些属性访问的处理:

import asyncio

from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine


def fetch_and_update_objects(session):
    """run traditional sync-style ORM code in a function that will be
    invoked within an awaitable.

    """

    # the session object here is a traditional ORM Session.
    # all features are available here including legacy Query use.

    stmt = select(A)

    result = session.execute(stmt)
    for a1 in result.scalars():
        print(a1)

        # lazy loads
        for b1 in a1.bs:
            print(b1)

    # legacy Query use
    a1 = session.query(A).order_by(A.id).first()

    a1.data = "new data"


async def async_main():
    engine = create_async_engine(
        "postgresql+asyncpg://scott:tiger@localhost/test",
        echo=True,
    )
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.drop_all)
        await conn.run_sync(Base.metadata.create_all)

    async with AsyncSession(engine) as session:
        async with session.begin():
            session.add_all(
                [
                    A(bs=[B(), B()], data="a1"),
                    A(bs=[B()], data="a2"),
                    A(bs=[B(), B()], data="a3"),
                ]
            )

        await session.run_sync(fetch_and_update_objects)

        await session.commit()

    # for AsyncEngine created in function scope, close and
    # clean-up pooled connections
    await engine.dispose()


asyncio.run(async_main())

上述在“sync”运行器中运行特定函数的方法与在基于事件的编程库(如gevent)之上运行SQLAlchemy应用程序的方法有一些相似之处。两者的区别如下:

1. 与使用gevent时不同,我们可以继续使用标准的Python asyncio事件循环或任何自定义事件循环,而无需集成到gevent事件循环中。

2. 这里绝对没有“monkeypatching”。上述示例使用了真正的asyncio驱动,而底层的SQLAlchemy连接池也使用了Python内置的asyncio.Queue来管理连接池。

3. 该程序可以在异步/等待代码和使用同步代码的包含函数之间自由切换,几乎不会带来性能损失。它没有使用“线程执行器”或任何额外的等待线程或同步机制。

4. 底层网络驱动也使用了纯Python的asyncio概念,没有使用如gevent和eventlet等第三方网络库。

发表我的评论
取消评论
表情

Hi,您需要填写昵称和邮箱!

  • *昵称:
  • *邮箱: