最大限度地提高 FastAPI 效率:使用 py-cachify 极快地实现缓存和锁定
在快节奏的 web 开发世界中,性能至关重要。高效的缓存机制可以通过减少冗余计算和数据库查询来显着增强 api 的响应能力。在本文中,我们将探讨如何使用 sqlmodel 和 redis 将 py-cachify 库集成到 fastapi 应用程序中,以实现缓存和并发控制。
目录:
介绍
缓存是一种强大的技术,通过存储昂贵操作的结果并从快速访问存储中提供它们来提高 web 应用程序的性能。借助 py-cachify,我们可以无缝地将缓存添加到 fastapi 应用程序中,并利用 redis 进行存储。此外,py-cachify 提供并发控制工具,防止关键操作期间出现竞争情况。
在本教程中,我们将逐步在 fastapi 应用程序中设置 py-cachify 库,并使用用于 orm 的 sqlmodel 和用于缓存的 redis。
项目设置
让我们从设置项目环境开始。
先决条件
安装依赖项
通过诗歌开始一个新项目:
# create new projectpoetry new --name app py-cachify-fastapi-demo# enter the directorycd py-cachify-fastapi-demo# point poetry to use python3.12poetry env use python3.12# add dependenciespoetry add "fastapi[standard]" sqlmodel aiosqlite redis py-cachify
初始化 py-cachify
在使用 py-cachify 之前,我们需要使用 redis 客户端对其进行初始化。我们将使用 fastapi 的寿命参数来完成此操作。
# app/main.pyfrom contextlib import asynccontextmanagerfrom fastapi import fastapifrom py_cachify import init_cachifyfrom redis.asyncio import from_url@asynccontextmanagerasync def lifespan(_: fastapi): init_cachify( # replace with your redis url if it differs async_client=from_url('redis://localhost:6379/0'), ) yieldapp = fastapi(lifespan=lifespan)
在生命周期内,我们:
使用 sqlmodel 创建数据库模型
我们将创建一个简单的用户模型来与我们的数据库交互。
# app/db.pyfrom sqlmodel import field, sqlmodelclass user(sqlmodel, table=true): id: int | none = field(default=none, primary_key=true) name: str email: str
设置数据库引擎并在生命周期函数中创建表:
# app/db.py# adjust importsfrom sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine# add the following at the end of the filesqlite_file_name = 'database.db'sqlite_url = f'sqlite+aiosqlite:///{sqlite_file_name}'engine = create_async_engine(sqlite_url, echo=true)session_maker = async_sessionmaker(engine)# app/main.py# adjust imports and lifespan functionfrom sqlmodel import sqlmodelfrom .db import engine@asynccontextmanagerasync def lifespan(_: fastapi): init_cachify( async_client=from_url('redis://localhost:6379/0'), ) # create sql model tables async with engine.begin() as conn: await conn.run_sync(sqlmodel.metadata.create_all) yield
注意:为了简单起见,我们使用 sqlite,但您可以使用 sqlalchemy 支持的任何数据库。
构建 fastapi 端点
让我们创建端点来与我们的用户模型交互。
# app/main.py# adjust importsfrom fastapi import depends, fastapifrom sqlalchemy.ext.asyncio import asyncsessionfrom .db import user, engine, session_maker# database session dependencyasync def get_session(): async with session_maker() as session: yield sessionapp = fastapi(lifespan=lifespan)@app.post('/users/')async def create_user(user: user, session: asyncsession = depends(get_session)) -> user: session.add(user) await session.commit() await session.refresh(user) return user@app.get('/users/{user_id}')async def read_user(user_id: int, session: asyncsession = depends(get_session)) -> user | none: return await session.get(user, user_id)@app.put('/users/{user_id}')async def update_user(user_id: int, new_user: user, session: asyncsession = depends(get_session)) -> user | none: user = await session.get(user, user_id) if not user: return none user.name = new_user.name user.email = new_user.email session.add(user) await session.commit() await session.refresh(user) return user
缓存端点结果
现在,让我们缓存 read_user 端点的结果,以避免不必要的数据库查询。
端点代码将如下所示:
# app/main.py# add the importfrom py_cachify import cached@app.get('/users/{user_id}')@cached('read_user-{user_id}', ttl=300) # new decoratorasync def read_user(user_id: int, session: asyncsession = depends(get_session)) -> user | none: return await session.get(user, user_id)
使用@cached装饰器:
更新时重置缓存
当用户的数据更新时,我们需要重置缓存以确保客户端收到最新的信息。为了实现这一点,让我们修改 update_user 端点。
# app/main.py@app.put('/users/{user_id}')async def update_user(user_id: int, new_user: user, session: asyncsession = depends(get_session)) -> user | none: user = await session.get(user, user_id) if not user: return none user.name = new_user.name user.email = new_user.email session.add(user) await session.commit() await session.refresh(user) # reset cache for this user await read_user.reset(user_id=user_id) return user
通过调用 read_user.reset(user_id=user_id),我们:
在下面,缓存的装饰器动态包装您的函数,添加 .reset 方法。此方法模仿函数的签名和类型,这样根据原始函数,它将是同步或异步的,并且将接受相同的参数。
.reset 方法使用缓存装饰器中定义的相同密钥生成逻辑来识别要使哪个缓存条目无效。例如,如果您的缓存键模式是 user-{user_id},则调用await read_user.reset(user_id=123) 将专门定位并删除 user_id=123 的缓存条目。
锁定更新端点的执行
为了防止更新期间的竞争条件,我们将使用一次装饰器来锁定更新端点的执行。
# app/main.pyfrom py_cachify import oncefrom fastapi.responses import jsonresponse@app.put('/users/{user_id}', response_model=user)# add the locking decorator@once('update-user-{user_id}', return_on_locked=jsonresponse(content={'status': 'update in progress'}, status_code=226))async def update_user(user_id: int, new_user: user, session: asyncsession = depends(get_session)) -> user | none: user = await session.get(user, user_id) user.name = new_user.name user.email = new_user.email session.add(user) await session.commit() await session.refresh(user) # reset cache for this user await read_user.reset(user_id=user_id) return user
曾经:
(可选)您可以配置 @once 以引发异常或在已获取锁的情况下返回特定值。
运行应用程序
现在是时候运行和测试我们的应用程序了!
1) 启动 redis 服务器:
确保您的 redis 服务器在本地运行或可远程访问。您可以使用 docker 启动本地 redis 服务器:
docker run --name redis -p 6379:6379 -d redis
2) 运行 fastapi 应用程序:
一切设置完毕后,您可以使用 poetry 启动 fastapi 应用程序。导航到项目的根目录并执行以下命令:
poetry run fastapi dev app/main.py
3) 测试和使用缓存和锁定:
缓存: 在 read_user 函数中添加延迟(例如,使用 asyncio.sleep)以模拟长时间运行的计算。观察结果缓存后响应时间如何显着改善。
示例:
import asyncioasync def read_user(user_id: int, session: asyncsession = depends(get_session)) -> user | none: await asyncio.sleep(2) # simulate expensive computation or database call return await session.get(user, user_id)
并发和锁定:同样,在 update_user 函数中引入延迟,以观察并发更新尝试时锁的行为。
示例:
async def update_user(user_id: int, new_user: User, session: AsyncSession = Depends(get_session)) -> User | None: await asyncio.sleep(2) # Simulate update delay # update logic here…
这些延迟可以帮助您了解缓存和锁定机制的有效性,因为由于缓存,后续读取应该更快,并且应该通过锁定有效管理对同一资源的并发写入。
现在,您可以使用 postman 等工具或转到 http://127.0.0.1:8000/docs(当应用程序运行时!)来测试端点,并观察性能改进和并发控制的实际情况。
享受使用增强型 fastapi 应用程序进行实验的乐趣!
结论
通过将 py-cachify 集成到我们的 fastapi 应用程序中,我们释放了众多优势,增强了 api 的性能和可靠性。
让我们回顾一下一些关键优势:
对于那些渴望进一步探索的人,请查看py-cachify 的 github 存储库和官方文档以获取更深入的指导、教程和示例。
您可以在 github 此处访问本教程的完整代码。请随意克隆存储库并尝试实现以满足您项目的需求。
如果您发现 py-cachify 有益,请考虑在 github 上给它一颗星来支持该项目!您的支持有助于推动进一步的改进和新功能。
编码愉快!