当关系的一侧已存在于数据库中时,使用 SQLModel 插入多对多关系对象
我正在尝试使用 sqlmodel 在数据库中插入记录,其中数据如下所示。一个 house 对象,它有颜色和许多位置。地点也将与许多房屋相关联。输入为:
[ { "color": "red", "locations": [ {"type": "country", "name": "netherlands"}, {"type": "municipality", "name": "amsterdam"}, ], }, { "color": "green", "locations": [ {"type": "country", "name": "netherlands"}, {"type": "municipality", "name": "amsterdam"}, ], },]
这是我正在尝试做的事情的可重现示例:
import asynciofrom typing import listfrom sqlalchemy.ext.asyncio import create_async_enginefrom sqlalchemy.orm import sessionmakerfrom sqlmodel import field, relationship, sqlmodel, uniqueconstraintfrom sqlmodel.ext.asyncio.session import asyncsessiondatabase_url = "sqlite+aiosqlite:///./database.db"engine = create_async_engine(database_url, echo=true, future=true)async def init_db() -> none: async with engine.begin() as conn: await conn.run_sync(sqlmodel.metadata.create_all)sessionlocal = sessionmaker( autocommit=false, autoflush=false, bind=engine, class_=asyncsession, expire_on_commit=false,)class houselocationlink(sqlmodel, table=true): house_id: int = field(foreign_key="house.id", nullable=false, primary_key=true) location_id: int = field( foreign_key="location.id", nullable=false, primary_key=true )class location(sqlmodel, table=true): id: int = field(primary_key=true) type: str # country, county, municipality, district, city, area, street, etc name: str # amsterdam, germany, my street, etc houses: list["house"] = relationship( back_populates="locations", link_model=houselocationlink, ) __table_args__ = (uniqueconstraint("type", "name"),)class house(sqlmodel, table=true): id: int = field(primary_key=true) color: str = field() locations: list["location"] = relationship( back_populates="houses", link_model=houselocationlink, ) # other fields...data = [ { "color": "red", "locations": [ {"type": "country", "name": "netherlands"}, {"type": "municipality", "name": "amsterdam"}, ], }, { "color": "green", "locations": [ {"type": "country", "name": "netherlands"}, {"type": "municipality", "name": "amsterdam"}, ], },]async def add_houses(payload) -> list[house]: result = [] async with sessionlocal() as session: for item in payload: locations = [] for location in item["locations"]: locations.append(location(**location)) house = house(color=item["color"], locations=locations) result.append(house) session.add_all(result) await session.commit()asyncio.run(init_db())asyncio.run(add_houses(data))
问题是,当我运行此代码时,它尝试将重复的位置对象与房屋对象一起插入。我希望能够在这里使用 relationship,因为它使访问 house.locations 变得非常容易。
但是,我无法弄清楚如何阻止它尝试插入重复的位置。理想情况下,我有一个映射器函数来执行 get_or_create 位置。
我所见过的最能实现这一点的是 sqlalchemy 的关联代理。但看起来 sqlmodel 不支持这一点。
有人知道如何实现这一目标吗?如果您知道如何使用 sqlalchemy 而不是 sqlmodel 来完成此操作,我有兴趣查看您的解决方案。我还没有开始这个项目,所以如果它能让我的生活更轻松的话,我不妨使用 sqlalchemy。
我还尝试使用 sa_relationship_kwargs 进行调整,例如
sa_relationship_kwargs={ "lazy": "selectin", "cascade": "none", "viewonly": "true",}
但这会阻止将关联条目添加到 houselocationlink 表中。
任何指示将不胜感激。即使这意味着完全改变我的方法。
谢谢!
正确答案
我正在编写这个解决方案,因为您提到您愿意使用 sqlalchemy。正如您所提到的,您需要关联代理,但您还需要“唯一对象”。我已将其调整为异步查询(而不是同步)功能,与我的个人偏好保持一致,所有这些都没有显着改变逻辑。
import asynciofrom sqlalchemy import UniqueConstraint, ForeignKey, select, text, funcfrom sqlalchemy.orm import DeclarativeBase, mapped_column, Mapped, relationshipfrom sqlalchemy.ext.asyncio import AsyncSession, create_async_enginefrom sqlalchemy.ext.associationproxy import AssociationProxy, association_proxyclass Base(DeclarativeBase): passclass UniqueMixin: cache = {} @classmethod async def as_unique(cls, session: AsyncSession, *args, **kwargs): key = cls, cls.unique_hash(*args, **kwargs) if key in cls.cache: return cls.cache[key] with session.no_autoflush: statement = select(cls).where(cls.unique_filter(*args, **kwargs)).limit(1) obj = (await session.scalars(statement)).first() if obj is None: obj = cls(*args, **kwargs) session.add(obj) cls.cache[key] = obj return obj @classmethod def unique_hash(cls, *args, **kwargs): raise NotImplementedError("Implement this in subclass") @classmethod def unique_filter(cls, *args, **kwargs): raise NotImplementedError("Implement this in subclass")class Location(UniqueMixin, Base): __tablename__ = "location" id: Mapped[int] = mapped_column(primary_key=True) name: Mapped[str] = mapped_column() type: Mapped[str] = mapped_column() house_associations: Mapped[list["HouseLocationLink"]] = relationship(back_populates="location") __table_args = (UniqueConstraint(type, name),) @classmethod def unique_hash(cls, name, type): # this is the key for the dict return type, name @classmethod def unique_filter(cls, name, type): # this is how you want to establish the uniqueness # the result of this filter will be the value in the dict return (cls.type == type) & (cls.name == name)class House(Base): __tablename__ = "house" id: Mapped[int] = mapped_column(primary_key=True) name: Mapped[str] = mapped_column() location_associations: Mapped[list["HouseLocationLink"]] = relationship(back_populates="house") locations: AssociationProxy[list[Location]] = association_proxy( "location_associations", "location", # you need this so you can directly add ``Location`` objects to ``House`` creator=lambda location: HouseLocationLink(location=location), )class HouseLocationLink(Base): __tablename__ = "houselocationlink" house_id: Mapped[int] = mapped_column(ForeignKey(House.id), primary_key=True) location_id: Mapped[int] = mapped_column(ForeignKey(Location.id), primary_key=True) location: Mapped[Location] = relationship(back_populates="house_associations") house: Mapped[House] = relationship(back_populates="location_associations")engine = create_async_engine("sqlite+aiosqlite:///test.sqlite")async def main(): data = [ { "name": "red", "locations": [ {"type": "country", "name": "Netherlands"}, {"type": "municipality", "name": "Amsterdam"}, ], }, { "name": "green", "locations": [ {"type": "country", "name": "Netherlands"}, {"type": "municipality", "name": "Amsterdam"}, ], }, ] async with engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) async with AsyncSession(engine) as session, session.begin(): for item in data: house = House( name=item["name"], locations=[await Location.as_unique(session, **location) for location in item["locations"]] ) session.add(house) async with AsyncSession(engine) as session: statement = select(func.count(text("*")), Location) assert await session.scalar(statement) == 2 statement = select(func.count(text("*")), House) assert await session.scalar(statement) == 2 statement = select(func.count(text("*")), HouseLocationLink) assert await session.scalar(statement) == 4asyncio.run(main())
您可以注意到断言确实通过,没有违反唯一约束,也没有多次插入。我留下了一些内联注释,其中提到了这段代码的“关键”方面。如果多次运行此代码,您会注意到仅添加了新的 house 对象和相应的 houselocationlink,而没有添加新的 location 对象。对于每个键值对,只会进行一次查询来缓存此行为。