SQLAlchemy 2.0 文档 – 异步IO支持 (一)

其他 piniu 69浏览 0评论

异步I/O(asyncio)

支持Python的asyncio库。同时,通过使用与asyncio兼容的方言,支持核心库和ORM的使用。

警告
请阅读《Asyncio平台安装说明(包括Apple M1)》,以获取包括Apple M1架构在内的多种平台的重要安装说明。

另见
核心和ORM的异步IO支持 – 初步功能公告
Asyncio集成 – 示例脚本,展示了在asyncio扩展中核心和ORM的使用实例。

Asyncio平台安装说明(包括Apple M1)

asyncio扩展仅支持Python 3。它还依赖于greenlet库。在常见的机器平台上,此依赖项默认已安装,包括:

x86_64 aarch64 ppc64le amd64 win32

对于上述平台,已知greenlet提供了预构建的wheel文件。对于其他平台,greenlet默认不会进行安装;有关greenlet的当前文件列表,请访问Greenlet – 下载文件。请注意,这里省略了许多架构,包括Apple M1。

为了在安装SQLAlchemy时确保无论使用何种平台,greenlet依赖项都存在,可以按照以下步骤安装[asyncio] setuptools extra,该操作还将指示pip安装greenlet:

pip install sqlalchemy[asyncio]

请注意,在未预构建wheel文件的平台上安装greenlet意味着需要从源代码构建greenlet,这要求Python的开发库也已安装。

概要 – 核心

对于核心使用,create_async_engine()函数会创建一个AsyncEngine的实例,该实例随后提供传统Engine API的异步版本。AsyncEngine通过其AsyncEngine.connect()和AsyncEngine.begin()方法提供AsyncConnection,这两个方法都提供异步上下文管理器。然后,AsyncConnection可以使用AsyncConnection.execute()方法调用语句以提供缓冲的结果,或者使用AsyncConnection.stream()方法调用语句以提供流式服务器端AsyncResult:

import asyncio

from sqlalchemy import Column
from sqlalchemy import MetaData
from sqlalchemy import select
from sqlalchemy import String
from sqlalchemy import Table
from sqlalchemy.ext.asyncio import create_async_engine

meta = MetaData()
t1 = Table("t1", meta, Column("name", String(50), primary_key=True))


async def async_main() -> None:
    engine = create_async_engine("sqlite+aiosqlite://", echo=True)
    async with engine.begin() as conn:
        await conn.run_sync(meta.drop_all)
        await conn.run_sync(meta.create_all)
        await conn.execute(
            t1.insert(), [{"name": "some name 1"}, {"name": "some name 2"}]
        )
    async with engine.connect() as conn:
        # select a Result, which will be delivered with buffered
        # results
        result = await conn.execute(select(t1).where(t1.c.name == "some name 1"))
        print(result.fetchall())
    # for AsyncEngine created in function scope, close and
    # clean-up pooled connections
    await engine.dispose()


asyncio.run(async_main())
BEGIN (implicit)
...
CREATE TABLE t1 (
    name VARCHAR(50) NOT NULL,
    PRIMARY KEY (name)
)
...
INSERT INTO t1 (name) VALUES (?)
[...] [('some name 1',), ('some name 2',)]
COMMIT
BEGIN (implicit)
SELECT t1.name
FROM t1
WHERE t1.name = ?
[...] ('some name 1',)
[('some name 1',)]
ROLLBACK

在上文中,可以使用AsyncConnection.run_sync()方法来调用特殊的DDL函数,如MetaData.create_all(),这些函数不包含可等待的钩子。

提示:
在将AsyncEngine对象用于将超出上下文并被垃圾回收的范围时,建议使用await来调用AsyncEngine.dispose()方法,如上述示例中的async_main函数所示。这可以确保连接池保持打开的任何连接都将在可等待的上下文中被正确释放。与使用阻塞IO时不同,由于没有机会调用await,SQLAlchemy无法在__del__或weakref终结器等方法中正确释放这些连接。当引擎超出作用域时,如果未明确释放,可能会导致标准输出中出现类似“RuntimeError: Event loop is closed within garbage collection”的警告。

AsyncConnection还通过AsyncConnection.stream()方法提供了一个“流式”API,该方法返回一个AsyncResult对象。此结果对象使用服务器端游标并提供一个异步/等待API,例如异步迭代器:

async with engine.connect() as conn:
    async_result = await conn.stream(select(t1))

    async for row in async_result:
        print("row: %s" % (row,))
发表我的评论
取消评论
表情

Hi,您需要填写昵称和邮箱!

  • *昵称:
  • *邮箱: