Introduction: Why You Should Re-Learn "SQLAlchemy 2.0" Now
SQLAlchemy is the de facto standard ORM in Python. Yet most of the explanatory articles circulating on the web, the Stack Overflow answers, and the code generative AI outputs are still written in the 1.x legacy style (Column(), session.query(), Base = declarative_base()).
SQLAlchemy 2.0, officially released in January 2023, was not a mere version bump but a paradigm shift that placed the type system at its core. Complete type safety via Mapped[...] annotations, a select() API unifying Core and ORM, native asyncio support — these are the watershed that clearly divides "code that just works" from "code that survives production."
This article is not a rehash of an introduction. Its aim is to be faithful to the official documentation (docs.sqlalchemy.org/en/20) while being one notch clearer, and to break through, with concrete code, the walls you will inevitably hit in practice:
- "I've been writing with
session.query(), but I don't get what changes when migrating to the 2.0-styleselect()." - "The distinction between
Mapped[int]andmapped_column(), and the handling ofOptional, is fuzzy." - "Opening a list screen fires a flood of SQL (the N+1 problem). Should I use
selectinloadorjoinedload?" - "After going
asyncin FastAPI, accessing a relationship throws aMissingGreenleterror." - "In production (especially RDS / serverless),
connection closed–type errors crop up sporadically."
The author designed and implemented the backend of a B2B SaaS that won the Minister of Economy, Trade and Industry Award in Python 3.11 / SQLAlchemy 2.0 / PostgreSQL 16, and ran it in production with a strict layered separation of Router → UseCase → Repository → Model. This article organizes the insights gained from that implementation, backed by the official documentation.
1. Type-Safe Model Definitions: The Correct Use of DeclarativeBase and Mapped
The 2.0-style model definition is fully integrated with Python's type annotations. First, here is a model faithful to the official "ORM Quickstart."
from typing import List, Optional
from sqlalchemy import ForeignKey, String
from sqlalchemy.orm import (
DeclarativeBase,
Mapped,
mapped_column,
relationship,
)
class Base(DeclarativeBase):
"""全モデルの基底クラス。2.0では declarative_base() ではなく
DeclarativeBase を継承するのが正攻法。"""
class User(Base):
__tablename__ = "user_account"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(30))
fullname: Mapped[Optional[str]] # NULL許容(str | None でも可)
addresses: Mapped[List["Address"]] = relationship(
back_populates="user",
cascade="all, delete-orphan",
)
def __repr__(self) -> str:
return f"User(id={self.id!r}, name={self.name!r})"
class Address(Base):
__tablename__ = "address"
id: Mapped[int] = mapped_column(primary_key=True)
email_address: Mapped[str] # 型アノテーションだけで NOT NULL が決まる
user_id: Mapped[int] = mapped_column(ForeignKey("user_account.id"))
user: Mapped["User"] = relationship(back_populates="addresses")
H3: Mapped[...] Decides "Type" and "NULL Constraint" at Once
This is the biggest change from 1.x. In 2.0, the type annotation itself carries schema information.
| Notation | Generated column | Meaning |
|---|---|---|
Mapped[int] | INTEGER NOT NULL | Non-Optional is NOT NULL |
Mapped[Optional[str]] | VARCHAR NULL | Optional permits NULL |
Mapped[str] = mapped_column(String(30)) | VARCHAR(30) NOT NULL | Type details supplemented by mapped_column |
Even if you omit mapped_column() — as in email_address: Mapped[str] — the String type and the NOT NULL constraint are automatically derived from the str annotation. You attach mapped_column() only when you want extra control over the type (length, primary key, foreign key, etc.).
Why is this superior?
From the IDE / type checker's (mypy, Pyright) viewpoint, the 1.x name = Column(String(30)) is merely a Column instance and cannot statically guarantee that user.name is a str. The 2.0 Mapped[str] makes mypy genuinely infer user.name as str — call .upper() on user.fullname (Optional[str]) and it is rejected at type-check time. This means you can carry the "Type Safety" of CLAUDE.md all the way to the ORM boundary.
⚠️ The difference from legacy:
Column()still works in 2.0, but the canonical form under aMapped[...]annotation ismapped_column(). MixingColumn()into new code disables type inference and becomes technical debt.
H3: Centralize Project Conventions with type_annotation_map (DRY)
Project conventions like "int is always BIGINT" or "datetime is always timezone-aware" become DRY violations if written per column. You can consolidate them in one place with DeclarativeBase's type_annotation_map.
import datetime
from sqlalchemy import BigInteger
from sqlalchemy.dialects.postgresql import TIMESTAMP
class Base(DeclarativeBase):
type_annotation_map = {
int: BigInteger,
datetime.datetime: TIMESTAMP(timezone=True),
}
From now on, just writing created_at: Mapped[datetime.datetime] automatically uses TIMESTAMP WITH TIME ZONE across all models. A convention change is done in a single point — a textbook practice of "ETC (Easy To Change)."
2. The Unified Query API: Drop session.query() and Migrate to select()
The heart of 2.0 is that Core and ORM can now use the same select() syntax.
from sqlalchemy import select
# 2.0スタイル:エンティティを直接イテレートできる
stmt = select(User).where(User.name.in_(["spongebob", "sandy"]))
for user in session.scalars(stmt):
print(user)
H3: When to Use session.scalars() vs. session.execute().scalars()
This is a point of confusion even in the official docs, so let me clear it up.
When you receive the result of select(User) via session.execute(), each row is wrapped in a single-element tuple (Row).
session.execute(select(User)).all()
# → [(User(id=1),), (User(id=2),)] ← タプルに包まれている
You want to receive the entity "bare" — this is the majority case. The shortcut for that is session.scalars(). Per the official definition, session.scalars(stmt) is exactly equivalent to session.execute(stmt).scalars().
session.scalars(select(User)).all()
# → [User(id=1), User(id=2)] ← 裸のエンティティ
Guidance in practice:
| What you want | The API to use |
|---|---|
| Get a list of single entities | session.scalars(stmt).all() |
| Get one by primary key | session.get(User, 5) (consults the identity map first) |
| Get exactly one (0/multiple raise) | session.scalars(stmt).one() / .scalar_one() |
| Get one, allowing zero | session.scalars(stmt).one_or_none() |
Multiple columns (select(User.name, User.id)) | Receive the Row tuple via session.execute(stmt) |
When you take multiple columns, as in select(User.name, User.fullname), use execute() and receive them as Row tuples. The shortest mnemonic: "single entity → scalars, multiple columns → execute."
H3: session.query() "Works, but Is Legacy"
The official documentation clearly positions the Query object as a legacy construct (as of SQLAlchemy 2.0). The important nuance is:
The bulk of the
QueryAPI will not be removed from SQLAlchemy.Querybecame a very thin adapter that internally converts to a 2.0-styleselect()at execution time.
In other words, existing code won't break. But there is no longer any reason to write new code with query(); unifying on select() removes the notational gap with Core queries and lowers the learning cost and cognitive load.
3. Session and Unit of Work: Designing Transaction Boundaries
The Session is the manager of the "unit of work" in the SQLAlchemy ORM. Correct lifecycle management in production code is the lifeline of data integrity.
H3: sessionmaker and the Context Manager
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/mydb")
# アプリ起動時に一度だけ生成するファクトリ
SessionFactory = sessionmaker(engine)
# リクエスト単位でセッションを開く(with で確実にクローズ)
with SessionFactory() as session:
session.add(some_object)
session.commit()
When you want to make the transaction boundary explicit, combine it with session.begin(). On exiting the begin() block, a commit happens automatically; on an exception, a rollback.
# 推奨:with の二段構えでトランザクションを明示
with SessionFactory() as session, session.begin():
session.add(user)
session.add(address)
# ブロックを抜けると自動 commit、例外なら自動 rollback
Why is this superior?
If you hand-write commit() / rollback() / close() with try/except/finally, omissions will inevitably occur. By delegating to the context manager, you can guarantee at the code level the Reliability that "on an exception it is always rolled back, and the connection is always returned."
H3: Understand the Trap of commit / flush / expire
This is the source of the most common bugs in production. Let me pin down the official behavior precisely.
flush: sends pending changes to the DB as SQL (but does not commit). By default, an automatic flush before issuing a query (autoflush) happens.commit:commit()internally callsflush()unconditionally and then issues a COMMIT.- expire after
commit: after a commit, all objects in the session become expired and are reloaded on the next access.
This "expire after commit" is the trap. Look at the following code.
with SessionFactory() as session, session.begin():
user = User(name="alice")
session.add(user)
# ブロックを抜けて commit 済み。session も閉じている。
print(user.name) # ❌ DetachedInstanceError の危険
After the commit, user becomes expired and attempts to reload on attribute access. But because exiting the with has closed the session, a DetachedInstanceError occurs.
The countermeasure depends on the use case.
# 対策1:commit 後も属性を使うなら expire_on_commit=False
SessionFactory = sessionmaker(engine, expire_on_commit=False)
# 対策2:必要な値を commit 前に取り出しておく(DTO へ詰め替える)
In async (covered later), this "implicit reload after commit" causes an even more serious problem, so expire_on_commit=False becomes practically mandatory.
4. Eradicating the N+1 Problem: Choosing Loader Strategies Correctly
The first performance problem that always bites you when you put an ORM into production is N+1. The official documentation warns about it by name.
The
lazyload()strategy causes one of the most common problems in the ORM, namely the N+1 problem.
H3: What Is the N+1 Problem?
The default for relationships is lazy loading (lazy="select"). The following code looks harmless at a glance.
users = session.scalars(select(User)).all() # ① SELECT * FROM user_account
for user in users:
print(user.addresses) # ② user ごとに SELECT * FROM address WHERE user_id = ?
If there are 100 users, then in addition to the single ①, ② fires 100 times — 101 SQL queries total. That is N+1. It is the textbook anti-pattern where a list screen's response degrades exponentially.
H3: A Quick Reference for Loader Strategies
The solution the docs recommend is declaring in advance, at query-issue time, how related data should be loaded (eager loading). You pass it to .options().
| Strategy | lazy= equivalent | Mechanism | Official recommended use |
|---|---|---|---|
selectinload() | "selectin" | Bundle the parents' primary keys in an IN clause and fetch related rows in one additional SELECT | First choice for collections (one-to-many, many-to-many) |
joinedload() | "joined" | Include in the same result set via a JOIN | The most general strategy for many-to-one (not many-to-many) |
subqueryload() | "subquery" | Fetch related rows via a subquery | Mostly legacy (superseded by selectinload) |
lazyload() | "select" | Lazy SELECT on access (default) | The cause of N+1 |
raiseload() | "raise" | Disallows lazy loading and raises | A production guardrail |
H3: selectinload for Collections, joinedload for Many-to-One
The official guidance is clear.
In most cases, selectin loading is the simplest and most efficient way to eagerly load collections.
from sqlalchemy.orm import selectinload
# users の取得は1回、addresses の取得も IN句でまとめて1回。計2回で済む
stmt = select(User).options(selectinload(User.addresses))
users = session.scalars(stmt).all()
for user in users:
print(user.addresses) # 追加のSQLは飛ばない
For a many-to-one (a single reference like Address.user), on the other hand, joinedload() is the most general.
from sqlalchemy.orm import joinedload
stmt = select(Address).options(joinedload(Address.user))
addresses = session.scalars(stmt).all()
⚠️ Caution when reading collections with
joinedload: usingjoinedloadon a one-to-many or many-to-many collection duplicates the parent rows by the number of related rows. In that case, you must apply.unique()to the result (the docs state this explicitly). To avoid duplication and row-count inflation, you should chooseselectinloadfor collections as a rule.
H3: Forbid "Accidental N+1" in Production with raiseload()
The most practical technique is this. raiseload() is a guardrail that turns access to a relationship you did not explicitly eager-load into an exception.
from sqlalchemy.orm import raiseload
# このクエリで取得した User は、addresses に触れると例外になる
stmt = select(User).options(raiseload(User.addresses))
Why is this superior?
Because N+1 "just works," it tends to slip past tests and reviews and only surfaces under increased production traffic. If you set raiseload() as the default, you can detect, at development time as an exception, the very moment you forget to write an eager load, making N+1 structurally "impossible to occur." Using lazy="raise_on_sql", you can permit many-to-one accesses that already exist in the session and require no SQL, while forbidding only lazy loads that entail new SQL.
5. asyncio Support: Asynchronous Data Access in the FastAPI Era
In asynchronous frameworks like FastAPI, you use AsyncSession. However, there are pitfalls unique to it that the synchronous version lacks, and the official documentation warns about them strongly.
H3: A Minimal, Officially Faithful Async ORM Setup
import asyncio
from typing import List
from sqlalchemy import ForeignKey, select
from sqlalchemy.ext.asyncio import (
AsyncAttrs,
async_sessionmaker,
create_async_engine,
)
from sqlalchemy.orm import (
DeclarativeBase,
Mapped,
mapped_column,
relationship,
selectinload,
)
class Base(AsyncAttrs, DeclarativeBase):
"""AsyncAttrs を混ぜると await a.awaitable_attrs.<rel> が使えるようになる。"""
class A(Base):
__tablename__ = "a"
id: Mapped[int] = mapped_column(primary_key=True)
bs: Mapped[List["B"]] = relationship()
class B(Base):
__tablename__ = "b"
id: Mapped[int] = mapped_column(primary_key=True)
a_id: Mapped[int] = mapped_column(ForeignKey("a.id"))
async def main() -> None:
# PostgreSQL の非同期ドライバは asyncpg が定番
engine = create_async_engine("postgresql+asyncpg://scott:tiger@localhost/test")
# expire_on_commit=False が非同期では実質必須(理由は後述)
async_session = async_sessionmaker(engine, expire_on_commit=False)
async with async_session() as session:
async with session.begin():
session.add_all([A(bs=[B(), B()]), A(bs=[B()])])
# 非同期では遅延ロードできないので、selectinload で先読みする
stmt = select(A).options(selectinload(A.bs))
result = await session.scalars(stmt)
for a in result:
print(a, a.bs) # ← 先読み済みなので安全
await engine.dispose()
asyncio.run(main())
H3: The Three Big Async Pitfalls (Official Warnings)
The async-specific constraints the official docs spell out will bite you for certain if you don't know them.
① Implicit IO (lazy loading) is forbidden. In the synchronous version, attribute access to a.bs implicitly fires SQL, but in async this becomes a MissingGreenlet–type error. In the official words: "you must avoid points where IO could occur on attribute access." There are 3 workarounds:
- Eager load: attach things like
selectinload(A.bs)to the query (most recommended). - Use
AsyncAttrs: explicitly await withawait a.awaitable_attrs.bs. await session.refresh(a, ["bs"]): refresh with explicit attribute names.
② expire_on_commit=False is practically mandatory. As noted, after a commit, objects expire and a reload (= implicit IO) runs on the next access. Because that implicit IO is itself forbidden in async, set expire_on_commit=False so you can safely access attributes even after a commit.
③ An AsyncSession cannot be shared across concurrent tasks. The docs state plainly that "a single AsyncSession instance cannot be used safely by multiple concurrent tasks." In each branch of asyncio.gather(), always create a separate AsyncSession per task.
H3: The Dependency-Injection Pattern in FastAPI
In practice, you define a dependency that hands out a session per request and reliably closes it after the response.
from collections.abc import AsyncIterator
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession
async def get_session() -> AsyncIterator[AsyncSession]:
async with async_session() as session:
yield session
# ルーターでの利用
@app.get("/users/{user_id}")
async def read_user(user_id: int, session: AsyncSession = Depends(get_session)):
user = await session.get(User, user_id)
...
With a yield-based dependency, whether the handler's processing succeeds or ends with an exception, async with always closes the session. It is the standard move that achieves both reliability and resource-leak prevention.
6. Production Operations: Designing the Connection Pool Correctly
Code that worked locally produces sporadic connection closed / server closed the connection unexpectedly in production (especially AWS RDS and serverless) — most of that is a pool-configuration problem.
create_engine() integrates QueuePool by default, with the following defaults.
| Parameter | Default | Meaning |
|---|---|---|
pool_size | 5 | Connections kept open at all times |
max_overflow | 10 | Additional connections temporarily creatable beyond pool_size |
pool_timeout | 30.0 sec | Upper bound on waiting to acquire a connection |
pool_recycle | -1 (disabled) | Recreate connections older than N seconds |
pool_pre_ping | False | Liveness-check with SELECT 1 on checkout |
H3: The Two Parameters You Must Always Set in Production
engine = create_engine(
"postgresql+psycopg2://me@db.example.com/mydb",
pool_pre_ping=True, # 切れた接続を掴む事故を防ぐ
pool_recycle=1800, # 30分でリサイクル(DB/LB のアイドルタイムアウト対策)
pool_size=10,
max_overflow=20,
)
Why does this matter?
pool_pre_ping=True: the connections the pool holds can be severed without your noticing by the idle timeout on the DB side or a load balancer (RDS Proxy, ELB, etc.).pre_pingsends a light ping just before using a connection and transparently reconnects if it's dead. This one setting eliminates most of the sporadic errors stemming fromstale connection.pool_recycle: against settings where the DB severs connections after a certain time (like MySQL'swait_timeout), proactively recycling with a shorter interval prevents the disconnect before it happens.
💡 A note for serverless (AWS Lambda, etc.): because Lambda execution environments are frequently recreated, each instance holding a large pool eats up the DB's max connections. In serverless, the standard design is
NullPool(no pooling) or leaning on an external pooler likeRDS Proxy. Choose based on the trade-off between cost efficiency (connection count) and reliability, according to the execution environment.
7. Application: Maximizing the Maintainability of the Data-Access Layer with the Repository Pattern
Finally, combine the elements so far and elevate them into a structure that survives production. Of the Router → UseCase → Repository → Model layered separation I adopted in the B2B SaaS that won the Minister of Economy, Trade and Industry Award, let me take the Repository layer as an example.
If you write ORM queries (select(), loader strategies) directly in routers or use cases, business logic and the persistence concern get mixed and the SRP (single responsibility) collapses. The Repository creates this boundary.
from collections.abc import Sequence
from sqlalchemy import select
from sqlalchemy.orm import Session, selectinload
class UserRepository:
"""User 集約の永続化の関心だけを担う。SQLの知識をここに閉じ込める。"""
def __init__(self, session: Session) -> None:
self._session = session
def get(self, user_id: int) -> User | None:
# identity map を活用する単純な主キー取得
return self._session.get(User, user_id)
def list_with_addresses(self, limit: int = 100) -> Sequence[User]:
# N+1 を構造的に防ぐ:addresses を必ず selectinload する
stmt = (
select(User)
.options(selectinload(User.addresses))
.order_by(User.id)
.limit(limit)
)
return self._session.scalars(stmt).all()
def add(self, user: User) -> None:
# commit はここでは行わない。トランザクション境界は上位層が握る
self._session.add(user)
The point is not to place commit() in the Repository. The transaction boundary (how far constitutes one unit of work) is the responsibility of the use-case layer, which coordinates across multiple Repositories.
class RegisterUserUseCase:
def __init__(self, session: Session) -> None:
self._session = session
self._users = UserRepository(session)
def execute(self, name: str, email: str) -> int:
user = User(name=name, addresses=[Address(email_address=email)])
self._users.add(user)
self._session.commit() # トランザクション境界はユースケースが握る
return user.id
Why is this design superior? (Mapping to the CLAUDE.md principles)
- SRP: "how to write SQL (Repository)" and "the business procedure and transaction boundary (UseCase)" are separated, each with a single reason to change.
- ETC / testability: in a use-case test you can swap the Repository for a mock and verify business logic without a DB. Persistence optimizations like adding
selectinloaddon't ripple up to the higher layer. - Structural containment of N+1: with the convention that "list retrieval always goes through
list_with_addresses," forgetting to attach a loader strategy (= N+1) becomes impossible at the design level. - DRY: the same query doesn't scatter across multiple places; the Repository method is the single source of truth.
This is the skeleton that must not be broken even in solo development leveraging generative AI to build "fast, cheap, and safe." Even when you delegate implementation to AI, a human designs the layers' responsibilities and the verification gates (type checking, N+1 detection), so you achieve both speed and quality.
Conclusion: Moving the ORM from "Just Works" to "Survives Operations"
SQLAlchemy 2.0 is a modern, robust ORM deeply integrated with Python's type system. Let me re-list the key points covered.
- With
Mapped[...]+mapped_column(), carry type safety all the way to the ORM boundary. - Migrate to the unified
select()API andsession.scalars(), graduating from the legacyquery(). - Understand the
Sessionlifecycle and expire-after-commit, and safely manage transaction boundaries with context managers. - Eradicate N+1 structurally with
selectinload(collections) /joinedload(many-to-one) /raiseload(guard). - In
AsyncSession, avoid implicit IO and enforceexpire_on_commit=Falseand eager loading. - Prevent sporadic production connection errors with
pool_pre_ping/pool_recycle. - With the Repository pattern, confine the persistence concern and maximize maintainability, testability, and N+1 resistance.
The difference between "code that works" and "code you can operate for 10 years" lies in the accumulation of design judgments like these, one by one.
For further exploration, I recommend re-reading the official documentation's ORM Quickstart, Relationship Loading Techniques, and Asynchronous I/O (asyncio), with this article's design viewpoint in mind.
Consultation on Backend Design That Survives Production
The author has implemented and operated the SQLAlchemy 2.0 data-access-layer design explained here in the production environment of a B2B SaaS that won the Minister of Economy, Trade and Industry Award. I build, fast and high-quality with generative AI, the foundation directly connected to a business's reliability — type safety, N+1 countermeasures, transaction design, and multi-tenant data isolation. On product development using Python and PostgreSQL and on improving existing systems, feel free to consult me.