Skip to main content
友田 陽大
Python backend
Python
SQLAlchemy
PostgreSQL
ORM
型安全
FastAPI
非同期処理
アーキテクチャ設計

SQLAlchemy 2.0 Practical Guide: Designing a Type-Safe ORM Data-Access Layer That Survives Production

Faithful to the SQLAlchemy 2.0 official docs, a thorough walkthrough — from type-safe model definitions with Mapped/mapped_column, the unified select() API, the Session's unit-of-work, loader strategies that crush N+1, asyncio support, connection-pool design, all the way to maximizing maintainability with the Repository pattern — explained from a production-operations viewpoint with concrete Python code.

Published
Reading time
17 min read
Author
友田 陽大
Share
Contents

Introduction: Why You Should Re-Learn "SQLAlchemy 2.0" Now

SQLAlchemy is the de facto standard ORM in Python. Yet most of the explanatory articles circulating on the web, the Stack Overflow answers, and the code generative AI outputs are still written in the 1.x legacy style (Column(), session.query(), Base = declarative_base()).

SQLAlchemy 2.0, officially released in January 2023, was not a mere version bump but a paradigm shift that placed the type system at its core. Complete type safety via Mapped[...] annotations, a select() API unifying Core and ORM, native asyncio support — these are the watershed that clearly divides "code that just works" from "code that survives production."

This article is not a rehash of an introduction. Its aim is to be faithful to the official documentation (docs.sqlalchemy.org/en/20) while being one notch clearer, and to break through, with concrete code, the walls you will inevitably hit in practice:

  • "I've been writing with session.query(), but I don't get what changes when migrating to the 2.0-style select()."
  • "The distinction between Mapped[int] and mapped_column(), and the handling of Optional, is fuzzy."
  • "Opening a list screen fires a flood of SQL (the N+1 problem). Should I use selectinload or joinedload?"
  • "After going async in FastAPI, accessing a relationship throws a MissingGreenlet error."
  • "In production (especially RDS / serverless), connection closed–type errors crop up sporadically."

The author designed and implemented the backend of a B2B SaaS that won the Minister of Economy, Trade and Industry Award in Python 3.11 / SQLAlchemy 2.0 / PostgreSQL 16, and ran it in production with a strict layered separation of Router → UseCase → Repository → Model. This article organizes the insights gained from that implementation, backed by the official documentation.


1. Type-Safe Model Definitions: The Correct Use of DeclarativeBase and Mapped

The 2.0-style model definition is fully integrated with Python's type annotations. First, here is a model faithful to the official "ORM Quickstart."

from typing import List, Optional

from sqlalchemy import ForeignKey, String
from sqlalchemy.orm import (
    DeclarativeBase,
    Mapped,
    mapped_column,
    relationship,
)


class Base(DeclarativeBase):
    """全モデルの基底クラス。2.0では declarative_base() ではなく
    DeclarativeBase を継承するのが正攻法。"""


class User(Base):
    __tablename__ = "user_account"

    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(30))
    fullname: Mapped[Optional[str]]  # NULL許容(str | None でも可)

    addresses: Mapped[List["Address"]] = relationship(
        back_populates="user",
        cascade="all, delete-orphan",
    )

    def __repr__(self) -> str:
        return f"User(id={self.id!r}, name={self.name!r})"


class Address(Base):
    __tablename__ = "address"

    id: Mapped[int] = mapped_column(primary_key=True)
    email_address: Mapped[str]  # 型アノテーションだけで NOT NULL が決まる
    user_id: Mapped[int] = mapped_column(ForeignKey("user_account.id"))

    user: Mapped["User"] = relationship(back_populates="addresses")

H3: Mapped[...] Decides "Type" and "NULL Constraint" at Once

This is the biggest change from 1.x. In 2.0, the type annotation itself carries schema information.

NotationGenerated columnMeaning
Mapped[int]INTEGER NOT NULLNon-Optional is NOT NULL
Mapped[Optional[str]]VARCHAR NULLOptional permits NULL
Mapped[str] = mapped_column(String(30))VARCHAR(30) NOT NULLType details supplemented by mapped_column

Even if you omit mapped_column() — as in email_address: Mapped[str] — the String type and the NOT NULL constraint are automatically derived from the str annotation. You attach mapped_column() only when you want extra control over the type (length, primary key, foreign key, etc.).

Why is this superior? From the IDE / type checker's (mypy, Pyright) viewpoint, the 1.x name = Column(String(30)) is merely a Column instance and cannot statically guarantee that user.name is a str. The 2.0 Mapped[str] makes mypy genuinely infer user.name as str — call .upper() on user.fullname (Optional[str]) and it is rejected at type-check time. This means you can carry the "Type Safety" of CLAUDE.md all the way to the ORM boundary.

⚠️ The difference from legacy: Column() still works in 2.0, but the canonical form under a Mapped[...] annotation is mapped_column(). Mixing Column() into new code disables type inference and becomes technical debt.

H3: Centralize Project Conventions with type_annotation_map (DRY)

Project conventions like "int is always BIGINT" or "datetime is always timezone-aware" become DRY violations if written per column. You can consolidate them in one place with DeclarativeBase's type_annotation_map.

import datetime
from sqlalchemy import BigInteger
from sqlalchemy.dialects.postgresql import TIMESTAMP


class Base(DeclarativeBase):
    type_annotation_map = {
        int: BigInteger,
        datetime.datetime: TIMESTAMP(timezone=True),
    }

From now on, just writing created_at: Mapped[datetime.datetime] automatically uses TIMESTAMP WITH TIME ZONE across all models. A convention change is done in a single point — a textbook practice of "ETC (Easy To Change)."


2. The Unified Query API: Drop session.query() and Migrate to select()

The heart of 2.0 is that Core and ORM can now use the same select() syntax.

from sqlalchemy import select

# 2.0スタイル:エンティティを直接イテレートできる
stmt = select(User).where(User.name.in_(["spongebob", "sandy"]))

for user in session.scalars(stmt):
    print(user)

H3: When to Use session.scalars() vs. session.execute().scalars()

This is a point of confusion even in the official docs, so let me clear it up.

When you receive the result of select(User) via session.execute(), each row is wrapped in a single-element tuple (Row).

session.execute(select(User)).all()
# → [(User(id=1),), (User(id=2),)]   ← タプルに包まれている

You want to receive the entity "bare" — this is the majority case. The shortcut for that is session.scalars(). Per the official definition, session.scalars(stmt) is exactly equivalent to session.execute(stmt).scalars().

session.scalars(select(User)).all()
# → [User(id=1), User(id=2)]         ← 裸のエンティティ

Guidance in practice:

What you wantThe API to use
Get a list of single entitiessession.scalars(stmt).all()
Get one by primary keysession.get(User, 5) (consults the identity map first)
Get exactly one (0/multiple raise)session.scalars(stmt).one() / .scalar_one()
Get one, allowing zerosession.scalars(stmt).one_or_none()
Multiple columns (select(User.name, User.id))Receive the Row tuple via session.execute(stmt)

When you take multiple columns, as in select(User.name, User.fullname), use execute() and receive them as Row tuples. The shortest mnemonic: "single entity → scalars, multiple columns → execute."

H3: session.query() "Works, but Is Legacy"

The official documentation clearly positions the Query object as a legacy construct (as of SQLAlchemy 2.0). The important nuance is:

The bulk of the Query API will not be removed from SQLAlchemy. Query became a very thin adapter that internally converts to a 2.0-style select() at execution time.

In other words, existing code won't break. But there is no longer any reason to write new code with query(); unifying on select() removes the notational gap with Core queries and lowers the learning cost and cognitive load.


3. Session and Unit of Work: Designing Transaction Boundaries

The Session is the manager of the "unit of work" in the SQLAlchemy ORM. Correct lifecycle management in production code is the lifeline of data integrity.

H3: sessionmaker and the Context Manager

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/mydb")

# アプリ起動時に一度だけ生成するファクトリ
SessionFactory = sessionmaker(engine)

# リクエスト単位でセッションを開く(with で確実にクローズ)
with SessionFactory() as session:
    session.add(some_object)
    session.commit()

When you want to make the transaction boundary explicit, combine it with session.begin(). On exiting the begin() block, a commit happens automatically; on an exception, a rollback.

# 推奨:with の二段構えでトランザクションを明示
with SessionFactory() as session, session.begin():
    session.add(user)
    session.add(address)
    # ブロックを抜けると自動 commit、例外なら自動 rollback

Why is this superior? If you hand-write commit() / rollback() / close() with try/except/finally, omissions will inevitably occur. By delegating to the context manager, you can guarantee at the code level the Reliability that "on an exception it is always rolled back, and the connection is always returned."

H3: Understand the Trap of commit / flush / expire

This is the source of the most common bugs in production. Let me pin down the official behavior precisely.

  • flush: sends pending changes to the DB as SQL (but does not commit). By default, an automatic flush before issuing a query (autoflush) happens.
  • commit: commit() internally calls flush() unconditionally and then issues a COMMIT.
  • expire after commit: after a commit, all objects in the session become expired and are reloaded on the next access.

This "expire after commit" is the trap. Look at the following code.

with SessionFactory() as session, session.begin():
    user = User(name="alice")
    session.add(user)
# ブロックを抜けて commit 済み。session も閉じている。

print(user.name)  # ❌ DetachedInstanceError の危険

After the commit, user becomes expired and attempts to reload on attribute access. But because exiting the with has closed the session, a DetachedInstanceError occurs.

The countermeasure depends on the use case.

# 対策1:commit 後も属性を使うなら expire_on_commit=False
SessionFactory = sessionmaker(engine, expire_on_commit=False)

# 対策2:必要な値を commit 前に取り出しておく(DTO へ詰め替える)

In async (covered later), this "implicit reload after commit" causes an even more serious problem, so expire_on_commit=False becomes practically mandatory.


4. Eradicating the N+1 Problem: Choosing Loader Strategies Correctly

The first performance problem that always bites you when you put an ORM into production is N+1. The official documentation warns about it by name.

The lazyload() strategy causes one of the most common problems in the ORM, namely the N+1 problem.

H3: What Is the N+1 Problem?

The default for relationships is lazy loading (lazy="select"). The following code looks harmless at a glance.

users = session.scalars(select(User)).all()  # ① SELECT * FROM user_account
for user in users:
    print(user.addresses)  # ② user ごとに SELECT * FROM address WHERE user_id = ?

If there are 100 users, then in addition to the single , fires 100 times — 101 SQL queries total. That is N+1. It is the textbook anti-pattern where a list screen's response degrades exponentially.

H3: A Quick Reference for Loader Strategies

The solution the docs recommend is declaring in advance, at query-issue time, how related data should be loaded (eager loading). You pass it to .options().

Strategylazy= equivalentMechanismOfficial recommended use
selectinload()"selectin"Bundle the parents' primary keys in an IN clause and fetch related rows in one additional SELECTFirst choice for collections (one-to-many, many-to-many)
joinedload()"joined"Include in the same result set via a JOINThe most general strategy for many-to-one (not many-to-many)
subqueryload()"subquery"Fetch related rows via a subqueryMostly legacy (superseded by selectinload)
lazyload()"select"Lazy SELECT on access (default)The cause of N+1
raiseload()"raise"Disallows lazy loading and raisesA production guardrail

H3: selectinload for Collections, joinedload for Many-to-One

The official guidance is clear.

In most cases, selectin loading is the simplest and most efficient way to eagerly load collections.

from sqlalchemy.orm import selectinload

# users の取得は1回、addresses の取得も IN句でまとめて1回。計2回で済む
stmt = select(User).options(selectinload(User.addresses))
users = session.scalars(stmt).all()

for user in users:
    print(user.addresses)  # 追加のSQLは飛ばない

For a many-to-one (a single reference like Address.user), on the other hand, joinedload() is the most general.

from sqlalchemy.orm import joinedload

stmt = select(Address).options(joinedload(Address.user))
addresses = session.scalars(stmt).all()

⚠️ Caution when reading collections with joinedload: using joinedload on a one-to-many or many-to-many collection duplicates the parent rows by the number of related rows. In that case, you must apply .unique() to the result (the docs state this explicitly). To avoid duplication and row-count inflation, you should choose selectinload for collections as a rule.

H3: Forbid "Accidental N+1" in Production with raiseload()

The most practical technique is this. raiseload() is a guardrail that turns access to a relationship you did not explicitly eager-load into an exception.

from sqlalchemy.orm import raiseload

# このクエリで取得した User は、addresses に触れると例外になる
stmt = select(User).options(raiseload(User.addresses))

Why is this superior? Because N+1 "just works," it tends to slip past tests and reviews and only surfaces under increased production traffic. If you set raiseload() as the default, you can detect, at development time as an exception, the very moment you forget to write an eager load, making N+1 structurally "impossible to occur." Using lazy="raise_on_sql", you can permit many-to-one accesses that already exist in the session and require no SQL, while forbidding only lazy loads that entail new SQL.


5. asyncio Support: Asynchronous Data Access in the FastAPI Era

In asynchronous frameworks like FastAPI, you use AsyncSession. However, there are pitfalls unique to it that the synchronous version lacks, and the official documentation warns about them strongly.

H3: A Minimal, Officially Faithful Async ORM Setup

import asyncio
from typing import List

from sqlalchemy import ForeignKey, select
from sqlalchemy.ext.asyncio import (
    AsyncAttrs,
    async_sessionmaker,
    create_async_engine,
)
from sqlalchemy.orm import (
    DeclarativeBase,
    Mapped,
    mapped_column,
    relationship,
    selectinload,
)


class Base(AsyncAttrs, DeclarativeBase):
    """AsyncAttrs を混ぜると await a.awaitable_attrs.<rel> が使えるようになる。"""


class A(Base):
    __tablename__ = "a"
    id: Mapped[int] = mapped_column(primary_key=True)
    bs: Mapped[List["B"]] = relationship()


class B(Base):
    __tablename__ = "b"
    id: Mapped[int] = mapped_column(primary_key=True)
    a_id: Mapped[int] = mapped_column(ForeignKey("a.id"))


async def main() -> None:
    # PostgreSQL の非同期ドライバは asyncpg が定番
    engine = create_async_engine("postgresql+asyncpg://scott:tiger@localhost/test")

    # expire_on_commit=False が非同期では実質必須(理由は後述)
    async_session = async_sessionmaker(engine, expire_on_commit=False)

    async with async_session() as session:
        async with session.begin():
            session.add_all([A(bs=[B(), B()]), A(bs=[B()])])

        # 非同期では遅延ロードできないので、selectinload で先読みする
        stmt = select(A).options(selectinload(A.bs))
        result = await session.scalars(stmt)
        for a in result:
            print(a, a.bs)  # ← 先読み済みなので安全

    await engine.dispose()


asyncio.run(main())

H3: The Three Big Async Pitfalls (Official Warnings)

The async-specific constraints the official docs spell out will bite you for certain if you don't know them.

① Implicit IO (lazy loading) is forbidden. In the synchronous version, attribute access to a.bs implicitly fires SQL, but in async this becomes a MissingGreenlet–type error. In the official words: "you must avoid points where IO could occur on attribute access." There are 3 workarounds:

  1. Eager load: attach things like selectinload(A.bs) to the query (most recommended).
  2. Use AsyncAttrs: explicitly await with await a.awaitable_attrs.bs.
  3. await session.refresh(a, ["bs"]): refresh with explicit attribute names.

expire_on_commit=False is practically mandatory. As noted, after a commit, objects expire and a reload (= implicit IO) runs on the next access. Because that implicit IO is itself forbidden in async, set expire_on_commit=False so you can safely access attributes even after a commit.

③ An AsyncSession cannot be shared across concurrent tasks. The docs state plainly that "a single AsyncSession instance cannot be used safely by multiple concurrent tasks." In each branch of asyncio.gather(), always create a separate AsyncSession per task.

H3: The Dependency-Injection Pattern in FastAPI

In practice, you define a dependency that hands out a session per request and reliably closes it after the response.

from collections.abc import AsyncIterator
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession


async def get_session() -> AsyncIterator[AsyncSession]:
    async with async_session() as session:
        yield session


# ルーターでの利用
@app.get("/users/{user_id}")
async def read_user(user_id: int, session: AsyncSession = Depends(get_session)):
    user = await session.get(User, user_id)
    ...

With a yield-based dependency, whether the handler's processing succeeds or ends with an exception, async with always closes the session. It is the standard move that achieves both reliability and resource-leak prevention.


6. Production Operations: Designing the Connection Pool Correctly

Code that worked locally produces sporadic connection closed / server closed the connection unexpectedly in production (especially AWS RDS and serverless) — most of that is a pool-configuration problem.

create_engine() integrates QueuePool by default, with the following defaults.

ParameterDefaultMeaning
pool_size5Connections kept open at all times
max_overflow10Additional connections temporarily creatable beyond pool_size
pool_timeout30.0 secUpper bound on waiting to acquire a connection
pool_recycle-1 (disabled)Recreate connections older than N seconds
pool_pre_pingFalseLiveness-check with SELECT 1 on checkout

H3: The Two Parameters You Must Always Set in Production

engine = create_engine(
    "postgresql+psycopg2://me@db.example.com/mydb",
    pool_pre_ping=True,   # 切れた接続を掴む事故を防ぐ
    pool_recycle=1800,    # 30分でリサイクル(DB/LB のアイドルタイムアウト対策)
    pool_size=10,
    max_overflow=20,
)

Why does this matter?

  • pool_pre_ping=True: the connections the pool holds can be severed without your noticing by the idle timeout on the DB side or a load balancer (RDS Proxy, ELB, etc.). pre_ping sends a light ping just before using a connection and transparently reconnects if it's dead. This one setting eliminates most of the sporadic errors stemming from stale connection.
  • pool_recycle: against settings where the DB severs connections after a certain time (like MySQL's wait_timeout), proactively recycling with a shorter interval prevents the disconnect before it happens.

💡 A note for serverless (AWS Lambda, etc.): because Lambda execution environments are frequently recreated, each instance holding a large pool eats up the DB's max connections. In serverless, the standard design is NullPool (no pooling) or leaning on an external pooler like RDS Proxy. Choose based on the trade-off between cost efficiency (connection count) and reliability, according to the execution environment.


7. Application: Maximizing the Maintainability of the Data-Access Layer with the Repository Pattern

Finally, combine the elements so far and elevate them into a structure that survives production. Of the Router → UseCase → Repository → Model layered separation I adopted in the B2B SaaS that won the Minister of Economy, Trade and Industry Award, let me take the Repository layer as an example.

If you write ORM queries (select(), loader strategies) directly in routers or use cases, business logic and the persistence concern get mixed and the SRP (single responsibility) collapses. The Repository creates this boundary.

from collections.abc import Sequence
from sqlalchemy import select
from sqlalchemy.orm import Session, selectinload


class UserRepository:
    """User 集約の永続化の関心だけを担う。SQLの知識をここに閉じ込める。"""

    def __init__(self, session: Session) -> None:
        self._session = session

    def get(self, user_id: int) -> User | None:
        # identity map を活用する単純な主キー取得
        return self._session.get(User, user_id)

    def list_with_addresses(self, limit: int = 100) -> Sequence[User]:
        # N+1 を構造的に防ぐ:addresses を必ず selectinload する
        stmt = (
            select(User)
            .options(selectinload(User.addresses))
            .order_by(User.id)
            .limit(limit)
        )
        return self._session.scalars(stmt).all()

    def add(self, user: User) -> None:
        # commit はここでは行わない。トランザクション境界は上位層が握る
        self._session.add(user)

The point is not to place commit() in the Repository. The transaction boundary (how far constitutes one unit of work) is the responsibility of the use-case layer, which coordinates across multiple Repositories.

class RegisterUserUseCase:
    def __init__(self, session: Session) -> None:
        self._session = session
        self._users = UserRepository(session)

    def execute(self, name: str, email: str) -> int:
        user = User(name=name, addresses=[Address(email_address=email)])
        self._users.add(user)
        self._session.commit()  # トランザクション境界はユースケースが握る
        return user.id

Why is this design superior? (Mapping to the CLAUDE.md principles)

  • SRP: "how to write SQL (Repository)" and "the business procedure and transaction boundary (UseCase)" are separated, each with a single reason to change.
  • ETC / testability: in a use-case test you can swap the Repository for a mock and verify business logic without a DB. Persistence optimizations like adding selectinload don't ripple up to the higher layer.
  • Structural containment of N+1: with the convention that "list retrieval always goes through list_with_addresses," forgetting to attach a loader strategy (= N+1) becomes impossible at the design level.
  • DRY: the same query doesn't scatter across multiple places; the Repository method is the single source of truth.

This is the skeleton that must not be broken even in solo development leveraging generative AI to build "fast, cheap, and safe." Even when you delegate implementation to AI, a human designs the layers' responsibilities and the verification gates (type checking, N+1 detection), so you achieve both speed and quality.


Conclusion: Moving the ORM from "Just Works" to "Survives Operations"

SQLAlchemy 2.0 is a modern, robust ORM deeply integrated with Python's type system. Let me re-list the key points covered.

  1. With Mapped[...] + mapped_column(), carry type safety all the way to the ORM boundary.
  2. Migrate to the unified select() API and session.scalars(), graduating from the legacy query().
  3. Understand the Session lifecycle and expire-after-commit, and safely manage transaction boundaries with context managers.
  4. Eradicate N+1 structurally with selectinload (collections) / joinedload (many-to-one) / raiseload (guard).
  5. In AsyncSession, avoid implicit IO and enforce expire_on_commit=False and eager loading.
  6. Prevent sporadic production connection errors with pool_pre_ping / pool_recycle.
  7. With the Repository pattern, confine the persistence concern and maximize maintainability, testability, and N+1 resistance.

The difference between "code that works" and "code you can operate for 10 years" lies in the accumulation of design judgments like these, one by one.

For further exploration, I recommend re-reading the official documentation's ORM Quickstart, Relationship Loading Techniques, and Asynchronous I/O (asyncio), with this article's design viewpoint in mind.


Consultation on Backend Design That Survives Production

The author has implemented and operated the SQLAlchemy 2.0 data-access-layer design explained here in the production environment of a B2B SaaS that won the Minister of Economy, Trade and Industry Award. I build, fast and high-quality with generative AI, the foundation directly connected to a business's reliability — type safety, N+1 countermeasures, transaction design, and multi-tenant data isolation. On product development using Python and PostgreSQL and on improving existing systems, feel free to consult me.

友田

友田 陽大

Developer of a METI Minister's Award–winning product. With TypeScript + Python + AWS, I deliver SaaS, industry DX, and production-grade generative AI (RAG) end to end — from requirements to infrastructure and operations — single-handedly.

Got a challenge?

From design to implementation and operations — solo × generative AI

Implementation like this article's, end to end from requirements to production. Start with a free 30-minute technical consult and tell me about your situation.

Available for both project-based (contract) and advisory engagements. Start with a free 30-minute consult.

Also worth reading