Tutorial GraphQL Python con FastAPI, Strawberry y SQLAlchemy (async)
Un tutorial GraphQL Python paso a paso para construir una API moderna, segura y performante con FastAPI GraphQL, Strawberry GraphQL y SQLAlchemy async, lista para producción con JWT Python y Docker FastAPI.
1) Requisitos previos e instalación
1.1 Prerrequisitos
- Python 3.11+ o 3.12+
- pip o pipx
- virtualenv o Poetry
- Git
- Docker y Docker Compose
- PostgreSQL (cliente psql opcional)
1.2 Verificaciones rápidas
1 2 3 4 5 |
python --version pip --version docker --version docker compose version |
Explicación: confirma que tu entorno tiene las herramientas necesarias. Si falta alguna, instálala antes de continuar.
1.3 Crear y activar entorno virtual
Con venv:
1 2 3 4 5 |
python -m venv .venv source .venv/bin/activate # Linux/macOS # En Windows (PowerShell): # .venv\Scripts\Activate.ps1 |
Explicación: aislamos dependencias del proyecto para evitar conflictos.
Con Poetry (opcional):
1 2 3 4 5 |
pipx install poetry poetry init # sigue el asistente poetry env use python3.12 poetry shell |
Explicación: Poetry gestiona entornos y dependencias de forma reproducible.
Checklist de la sección:
- [ ] Python y Docker instalados
- [ ] Entorno virtual creado y activo
- [ ] Git inicializado (git init)
2) Estructura del proyecto y configuración
2.1 Estructura recomendada
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
app/ api/ auth/ config/ db/ loaders/ models/ repos/ schemas/ utils/ __init__.py migrations/ scripts/ tests/ .env.example alembic.ini pyproject.toml # o requirements.txt Dockerfile .dockerignore docker-compose.yml |
Explicación: organizamos por capas (API, esquema, datos, repositorios) para escalabilidad y mantenibilidad.
ASCII del flujo de una request GraphQL:
1 2 3 4 |
Cliente -> /graphql (FastAPI) -> Strawberry -> Resolvers -> Repos/DB (SQLAlchemy async) |-> DataLoaders (evita N+1) |-> Auth (JWT en contexto) |
2.2 Configuración con Pydantic Settings v2
Archivo: app/config/settings.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
# app/config/settings.py from pydantic_settings import BaseSettings, SettingsConfigDict from pydantic import AnyUrl from typing import Optional class Settings(BaseSettings): model_config = SettingsConfigDict(env_file='.env', env_file_encoding='utf-8', case_sensitive=False) APP_NAME: str = 'fastapi-strawberry-graphql' ENV: str = 'dev' # dev|test|prod DEBUG: bool = True DATABASE_URL: AnyUrl JWT_SECRET_KEY: str JWT_ALGORITHM: str = 'HS256' JWT_ACCESS_EXPIRES_MIN: int = 15 JWT_REFRESH_EXPIRES_MIN: int = 60 * 24 * 7 # 7 días ALLOWED_ORIGINS: str = '*' # CSV RATE_LIMIT: Optional[str] = '100/minute' settings = Settings() |
Explicación: Pydantic Settings lee variables desde .env según el entorno; evita hardcodear secretos.
Ejemplos de .env:
1 2 3 4 5 6 7 |
# .env (dev) ENV=dev DEBUG=true DATABASE_URL=postgresql+asyncpg://postgres:postgres@db:5432/appdb JWT_SECRET_KEY=supersecret ALLOWED_ORIGINS=http://localhost:5173,http://localhost:3000 |
Explicación: define credenciales y opciones; en prod, monta secretos vía variables de entorno.
Checklist de la sección:
- [ ] Estructura creada
- [ ] .env preparado para dev/test/prod
- [ ] Settings cargan sin errores
3) Dependencias e inicialización
3.1 Instalar dependencias
Paquetes clave: fastapi, strawberry-graphql[fastapi], uvicorn[standard], SQLAlchemy (async), asyncpg, alembic, pydantic, pydantic-settings, passlib[bcrypt], python-jose, aiodataloader, httpx, pytest, pytest-asyncio, sqlalchemy-utils, structlog, slowapi (opcional), y CORS (Starlette lo trae de serie).
pyproject.toml (ejemplo con versiones aproximadas)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
[tool.poetry] name = "fastapi-graphql-tutorial" version = "0.1.0" description = "API GraphQL con FastAPI, Strawberry y SQLAlchemy async" [tool.poetry.dependencies] python = "^3.12" fastapi = "^0.112.0" strawberry-graphql = {version = "^0.229.0", extras=["fastapi"]} uvicorn = {version = "^0.30.0", extras=["standard"]} sqlalchemy = "^2.0.32" asyncpg = "^0.29.0" alembic = "^1.13.2" pydantic = "^2.8.0" pydantic-settings = "^2.3.4" passlib = {version = "^1.7.4", extras=["bcrypt"]} python-jose = "^3.3.0" aiodataloader = "^0.4.1" httpx = "^0.27.0" pytest = "^8.3.1" pytest-asyncio = "^0.23.7" sqlalchemy-utils = "^0.41.2" structlog = "^24.2.0" slowapi = "^0.1.9" [tool.poetry.group.dev.dependencies] pytest-cov = "^5.0.0" [tool.pytest.ini_options] asyncio_mode = "auto" |
Explicación: fija versiones aproximadas para reproducibilidad. Con pip, puedes generar un requirements.txt con pip freeze para pinnear exactas.
Instalación con Poetry:
1 2 |
poetry install |
Con pip (requirements.txt simple):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
fastapi strawberry-graphql[fastapi] uvicorn[standard] sqlalchemy asyncpg alembic pydantic pydantic-settings passlib[bcrypt] python-jose aiodataloader httpx pytest pytest-asyncio sqlalchemy-utils structlog slowapi |
1 2 |
pip install -r requirements.txt |
Explicación: instala dependencias de ejecución y de pruebas.
Checklist de la sección:
- [ ] Dependencias instaladas
- [ ] Proyecto ejecuta
python -c "import fastapi, strawberry"
sin error
4) Base de datos y migraciones
4.1 PostgreSQL en Docker
docker-compose.yml (DB + API en la misma red)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
services: db: image: postgres:16-alpine environment: POSTGRES_USER: postgres POSTGRES_PASSWORD: postgres POSTGRES_DB: appdb ports: - "5432:5432" healthcheck: test: ["CMD-SHELL", "pg_isready -U postgres"] interval: 5s timeout: 3s retries: 5 volumes: - pgdata:/var/lib/postgresql/data api: build: . env_file: .env ports: - "8000:8000" depends_on: db: condition: service_healthy volumes: pgdata: |
Explicación: levanta Postgres con healthcheck y un servicio API que usaremos luego.
4.2 SQLAlchemy async: motor y sesión
app/db/base.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
# app/db/base.py from sqlalchemy.orm import DeclarativeBase, declared_attr from sqlalchemy import MetaData convention = { "ix": "ix_%(column_0_label)s", "uq": "uq_%(table_name)s_%(column_0_name)s", "ck": "ck_%(table_name)s_%(constraint_name)s", "fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s", "pk": "pk_%(table_name)s" } class Base(DeclarativeBase): metadata = MetaData(naming_convention=convention) @declared_attr.directive def __tablename__(cls) -> str: return cls.__name__.lower() |
Explicación: Declarative Base estándar con convención de nombres estable.
app/db/session.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
# app/db/session.py from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession from sqlalchemy.pool import NullPool from app.config.settings import settings engine = create_async_engine( str(settings.DATABASE_URL), echo=settings.DEBUG, poolclass=NullPool, # en prod, usa pool por defecto o configura tamaños pool_pre_ping=True, ) AsyncSessionLocal = async_sessionmaker(bind=engine, expire_on_commit=False, class_=AsyncSession) async def get_session() -> AsyncSession: async with AsyncSessionLocal() as session: yield session |
Explicación: crea un AsyncEngine y un factory de sesiones. En prod, ajusta pool_size y timeouts.
4.3 Alembic y migraciones
alembic.ini (fragmento clave)
1 2 3 4 |
[alembic] script_location = migrations sqlalchemy.url = sqlite:///./dev.db # Ignorado al usar env.py dinámico |
Explicación: usaremos env.py para leer DATABASE_URL desde settings.
migrations/env.py (async)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
# migrations/env.py from logging.config import fileConfig from sqlalchemy import pool from sqlalchemy.engine import Connection from alembic import context from sqlalchemy.ext.asyncio import AsyncEngine from sqlalchemy import engine_from_config from app.db.base import Base from app.models import user, post, comment # importa modelos from app.config.settings import settings config = context.config if config.config_file_name is not None: fileConfig(config.config_file_name) target_metadata = Base.metadata def run_migrations_offline() -> None: url = str(settings.DATABASE_URL).replace('+asyncpg', '') context.configure(url=url, target_metadata=target_metadata, literal_binds=True) with context.begin_transaction(): context.run_migrations() def do_run_migrations(connection: Connection) -> None: context.configure(connection=connection, target_metadata=target_metadata) with context.begin_transaction(): context.run_migrations() async def run_migrations_online() -> None: connectable = AsyncEngine(engine_from_config( {"sqlalchemy.url": str(settings.DATABASE_URL)}, prefix="sqlalchemy.", poolclass=pool.NullPool, future=True, )) async with connectable.connect() as connection: await connection.run_sync(do_run_migrations) await connectable.dispose() if context.is_offline_mode(): run_migrations_offline() else: import asyncio asyncio.run(run_migrations_online()) |
Explicación: env.py usa la URL de settings y permite migraciones async.
Crea la migración inicial y aplica:
1 2 3 4 |
alembic init migrations alembic revision -m "init" --autogenerate alembic upgrade head |
Explicación: genera tablas a partir de modelos. Revisa el diff antes de aplicar.
Seeding simple (scripts/seed.py):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
# scripts/seed.py import asyncio from app.db.session import AsyncSessionLocal from app.models.user import User from app.models.post import Post from passlib.context import CryptContext pwd = CryptContext(schemes=["bcrypt"], deprecated="auto") async def main(): async with AsyncSessionLocal() as s: u = User(email="demo@example.com", hashed_password=pwd.hash("secret")) s.add(u) await s.flush() s.add_all([Post(title=f"Post {i}", body="Lorem", author_id=u.id) for i in range(1, 4)]) await s.commit() asyncio.run(main()) |
Explicación: inserta un usuario y algunos posts para pruebas.
Checklist de la sección:
- [ ] docker-compose up de DB funciona
- [ ] alembic upgrade head crea tablas
- [ ] datos de ejemplo insertados
5) Modelado y repositorios (SQLAlchemy async)
5.1 Modelos: User, Post, Comment
app/models/user.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
# app/models/user.py from sqlalchemy.orm import Mapped, mapped_column, relationship from sqlalchemy import String from sqlalchemy.dialects.postgresql import UUID import uuid from app.db.base import Base class User(Base): id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) email: Mapped[str] = mapped_column(String(255), unique=True, index=True) hashed_password: Mapped[str] = mapped_column(String(255)) is_active: Mapped[bool] = mapped_column(default=True) posts: Mapped[list["Post"]] = relationship(back_populates="author", lazy="raise") |
Explicación: Users con UUID y relación 1-N hacia Post.
app/models/post.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
# app/models/post.py from sqlalchemy.orm import Mapped, mapped_column, relationship from sqlalchemy import String, Text, ForeignKey from sqlalchemy.dialects.postgresql import UUID import uuid from app.db.base import Base class Post(Base): id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) title: Mapped[str] = mapped_column(String(200), index=True) body: Mapped[str] = mapped_column(Text()) author_id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), ForeignKey("user.id", ondelete="CASCADE"), index=True) author: Mapped["User"] = relationship(back_populates="posts", lazy="raise") comments: Mapped[list["Comment"]] = relationship(back_populates="post", cascade="all, delete-orphan", lazy="raise") |
Explicación: Posts con FK a User y relación a Comments.
app/models/comment.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
# app/models/comment.py from sqlalchemy.orm import Mapped, mapped_column, relationship from sqlalchemy import Text, ForeignKey from sqlalchemy.dialects.postgresql import UUID import uuid from app.db.base import Base class Comment(Base): id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) body: Mapped[str] = mapped_column(Text()) post_id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), ForeignKey("post.id", ondelete="CASCADE"), index=True) author_id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), ForeignKey("user.id", ondelete="CASCADE"), index=True) post: Mapped["Post"] = relationship(back_populates="comments", lazy="raise") |
Explicación: Comentarios vinculados a Post y User (para simplificar, omitimos relación inversa User.comments).
5.2 Repositorios async (CRUD + paginación)
app/repos/user_repo.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
# app/repos/user_repo.py from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from app.models.user import User class UserRepo: def __init__(self, session: AsyncSession): self.session = session async def get_by_id(self, user_id): res = await self.session.execute(select(User).where(User.id == user_id)) return res.scalar_one_or_none() async def get_by_email(self, email: str): res = await self.session.execute(select(User).where(User.email == email)) return res.scalar_one_or_none() async def create(self, email: str, hashed_password: str) -> User: u = User(email=email, hashed_password=hashed_password) self.session.add(u) await self.session.flush() return u |
Explicación: consultas básicas por id, email y creación.
app/repos/post_repo.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# app/repos/post_repo.py from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, func from app.models.post import Post from typing import Sequence class PostRepo: def __init__(self, session: AsyncSession): self.session = session async def list(self, *, offset=0, limit=10, author_id=None, q: str | None = None) -> Sequence[Post]: stmt = select(Post) if author_id: stmt = stmt.where(Post.author_id == author_id) if q: stmt = stmt.where(Post.title.ilike(f"%{q}%")) stmt = stmt.offset(offset).limit(limit) res = await self.session.execute(stmt) return res.scalars().all() async def count(self, *, author_id=None, q: str | None = None) -> int: stmt = select(func.count()).select_from(Post) if author_id: stmt = stmt.where(Post.author_id == author_id) if q: stmt = stmt.where(Post.title.ilike(f"%{q}%")) res = await self.session.execute(stmt) return int(res.scalar() or 0) async def get(self, post_id): res = await self.session.execute(select(Post).where(Post.id == post_id)) return res.scalar_one_or_none() async def create(self, *, title: str, body: str, author_id) -> Post: p = Post(title=title, body=body, author_id=author_id) self.session.add(p) await self.session.flush() return p async def update(self, post: Post, **values) -> Post: for k, v in values.items(): setattr(post, k, v) await self.session.flush() return post async def delete(self, post: Post) -> None: await self.session.delete(post) |
Explicación: repositorio con paginación offset/limit y filtro básico por autor o búsqueda.
Checklist de la sección:
- [ ] Modelos creados y migraciones reflejan cambios
- [ ] Repos responden en REPL async
6) Esquema GraphQL con Strawberry
6.1 Tipos, inputs y scalars
app/schemas/graphql.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
# app/schemas/graphql.py import strawberry from typing import Optional, List from datetime import datetime from strawberry.scalars import JSON from strawberry.types import Info from strawberry.schema_directive import Location from app.repos.user_repo import UserRepo from app.repos.post_repo import PostRepo from app.loaders.dataloaders import get_dataloaders @strawberry.type class UserType: id: strawberry.ID email: str @strawberry.type class PostType: id: strawberry.ID title: str body: str author: UserType @strawberry.field async def author(self, info: Info) -> UserType: loader = info.context["loaders"].user_by_id user = await loader.load(self.author.id if hasattr(self, 'author') else self.author_id) return UserType(id=str(user.id), email=user.email) @strawberry.type class CommentType: id: strawberry.ID body: str @strawberry.input class PostInput: title: str body: str @strawberry.type class PageInfo: total: int offset: int limit: int @strawberry.type class PostConnection: items: List[PostType] page_info: PageInfo @strawberry.enum class SortOrder: asc = "asc" desc = "desc" @strawberry.type class Query: @strawberry.field async def user(self, info: Info, id: strawberry.ID) -> Optional[UserType]: repo = UserRepo(info.context['session']) u = await repo.get_by_id(id) return UserType(id=str(u.id), email=u.email) if u else None @strawberry.field async def posts(self, info: Info, offset: int = 0, limit: int = 10, author_id: Optional[str] = None, q: Optional[str] = None) -> PostConnection: repo = PostRepo(info.context['session']) items = await repo.list(offset=offset, limit=limit, author_id=author_id, q=q) total = await repo.count(author_id=author_id, q=q) items_gql = [PostType(id=str(p.id), title=p.title, body=p.body, author=UserType(id=str(p.author_id), email="")) for p in items] return PostConnection(items=items_gql, page_info=PageInfo(total=total, offset=offset, limit=limit)) @strawberry.field async def post_count(self, info: Info) -> int: return await PostRepo(info.context['session']).count() @strawberry.type class Mutation: @strawberry.mutation async def register(self, info: Info, email: str, password: str) -> UserType: from passlib.context import CryptContext from app.auth.jwt import normalize_email pwd = CryptContext(schemes=["bcrypt"], deprecated="auto") repo = UserRepo(info.context['session']) u = await repo.create(email=normalize_email(email), hashed_password=pwd.hash(password)) await info.context['session'].commit() return UserType(id=str(u.id), email=u.email) @strawberry.mutation async def login(self, info: Info, email: str, password: str) -> JSON: from app.auth.jwt import authenticate_user, create_access_token, create_refresh_token user = await authenticate_user(info.context['session'], email, password) return JSON({ "access_token": create_access_token({"sub": str(user.id)}), "refresh_token": create_refresh_token({"sub": str(user.id)}) }) @strawberry.mutation async def create_post(self, info: Info, data: PostInput) -> PostType: user = info.context.get('user') if not user: raise Exception("Unauthorized") repo = PostRepo(info.context['session']) p = await repo.create(title=data.title, body=data.body, author_id=user.id) await info.context['session'].commit() return PostType(id=str(p.id), title=p.title, body=p.body, author=UserType(id=str(user.id), email=user.email)) schema = strawberry.Schema(query=Query, mutation=Mutation) def get_context(session, user): return {"session": session, "user": user, "loaders": get_dataloaders(session)} |
Explicación: definimos tipos, queries y mutations. posts implementa paginación offset/limit. Mutations incluyen registro y login.
6.2 DataLoaders para evitar N+1
app/loaders/dataloaders.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
# app/loaders/dataloaders.py from aiodataloader import DataLoader from sqlalchemy import select from app.models.user import User class UserByIdLoader(DataLoader): def __init__(self, session): super().__init__() self.session = session async def batch_load_fn(self, keys): res = await self.session.execute(select(User).where(User.id.in_(keys))) users = {u.id: u for u in res.scalars().all()} return [users.get(k) for k in keys] class Loaders: def __init__(self, session): self.user_by_id = UserByIdLoader(session) def get_dataloaders(session): return Loaders(session) |
Explicación: agrupa consultas por lote, mitigando el problema N+1 en resolutores.
6.3 Ejemplos GraphQL (GraphiQL/cURL)
Query listar posts:
1 2 3 4 5 6 7 |
query ListPosts($offset:Int!, $limit:Int!) { posts(offset:$offset, limit:$limit) { items { id title author { email } } pageInfo { total offset limit } } } |
Explicación: pagina resultados y trae autor vía DataLoader.
Mutations:
1 2 3 4 5 6 7 8 |
mutation Register { register(email:"demo@example.com", password:"secret") { id email } } mutation Login { login(email:"demo@example.com", password:"secret") } mutation CreatePost { createPost(data:{title:"Hola", body:"Mundo"}) { id title } } |
Explicación: registro, login (retorna tokens) y creación de post autenticada.
cURL (con token):
1 2 3 4 5 6 |
ACCESS=... # token de login curl -X POST http://localhost:8000/graphql \ -H "Content-Type: application/json" \ -H "Authorization: Bearer $ACCESS" \ -d '{"query":"mutation($t:String!){createPost(data:{title:\"T\", body:\"B\"}){id title}}"}' |
Explicación: invoca la mutation autenticada enviando Authorization.
Checklist de la sección:
- [ ] GraphiQL muestra el esquema
- [ ] Queries y Mutations funcionan
- [ ] DataLoader reduce queries (ver logs)
7) Integración con FastAPI
app/api/main.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# app/api/main.py from fastapi import FastAPI, Request from fastapi.middleware.cors import CORSMiddleware from strawberry.fastapi import GraphQLRouter import structlog from slowapi import Limiter from slowapi.util import get_remote_address from slowapi.errors import RateLimitExceeded from fastapi.responses import JSONResponse from app.config.settings import settings from app.schemas.graphql import schema, get_context from app.db.session import get_session from app.auth.jwt import get_user_from_request logger = structlog.get_logger() limiter = Limiter(key_func=get_remote_address) if settings.RATE_LIMIT else None app = FastAPI(title=settings.APP_NAME, debug=settings.DEBUG) app.add_middleware( CORSMiddleware, allow_origins=[o.strip() for o in settings.ALLOWED_ORIGINS.split(',')], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) if limiter: @app.middleware("http") async def rate_limit(request: Request, call_next): try: with limiter.limit(settings.RATE_LIMIT)(lambda req: None)(request): return await call_next(request) except RateLimitExceeded as e: return JSONResponse(status_code=429, content={"detail": "Too Many Requests"}) @app.get("/healthz") async def healthz(): return {"status": "ok"} async def context_getter(request: Request): async for session in get_session(): user = await get_user_from_request(session, request) return get_context(session, user) graphql_app = GraphQLRouter(schema, context_getter=context_getter, graphiql=settings.ENV != 'prod') app.include_router(graphql_app, prefix="/graphql") |
Explicación: monta /graphql con GraphiQL en dev, añade CORS, rate limiting opcional y endpoint /healthz.
Checklist de la sección:
- [ ] /healthz responde ok
- [ ] GraphiQL accesible en /graphql en dev
8) Autenticación y autorización (JWT Python)
app/auth/jwt.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# app/auth/jwt.py from datetime import datetime, timedelta, timezone from jose import jwt, JWTError from passlib.context import CryptContext from fastapi import Request from app.repos.user_repo import UserRepo from app.config.settings import settings pwd = CryptContext(schemes=["bcrypt"], deprecated="auto") def normalize_email(email: str) -> str: return email.strip().lower() def create_token(data: dict, expires_minutes: int) -> str: now = datetime.now(timezone.utc) payload = {**data, "iat": int(now.timestamp()), "exp": int((now + timedelta(minutes=expires_minutes)).timestamp())} return jwt.encode(payload, settings.JWT_SECRET_KEY, algorithm=settings.JWT_ALGORITHM) def create_access_token(data: dict) -> str: return create_token(data, settings.JWT_ACCESS_EXPIRES_MIN) def create_refresh_token(data: dict) -> str: return create_token(data, settings.JWT_REFRESH_EXPIRES_MIN) async def authenticate_user(session, email: str, password: str): repo = UserRepo(session) user = await repo.get_by_email(normalize_email(email)) if not user or not pwd.verify(password, user.hashed_password): raise Exception("Invalid credentials") return user async def get_user_from_request(session, request: Request): auth = request.headers.get("Authorization", "") if not auth.startswith("Bearer "): return None token = auth.split()[1] try: payload = jwt.decode(token, settings.JWT_SECRET_KEY, algorithms=[settings.JWT_ALGORITHM], options={"verify_aud": False}) except JWTError: return None sub = payload.get("sub") if not sub: return None repo = UserRepo(session) return await repo.get_by_id(sub) |
Explicación: genera y verifica tokens con expiración y tolerancia por reloj implícita al usar iat/exp. Las contraseñas se almacenan con bcrypt.
Autorización por rol/scope: puedes extender el payload JWT con claim “roles” y validarlo en los resolutores antes de ejecutar lógica.
Checklist de la sección:
- [ ] Registro y login devuelven tokens
- [ ] Mutations protegidas fallan sin token
9) Manejo de errores y validación
- Validación: usa Inputs de Strawberry (tipos fuertes) y Pydantic para lógica interna si lo prefieres.
- Errores: lanza GraphQLError con extensiones para códigos.
Ejemplo básico en mutation:
1 2 3 4 5 |
from graphql import GraphQLError if not user: raise GraphQLError("Unauthorized", extensions={"code": "UNAUTHENTICATED"}) |
Explicación: GraphQL permite enriquecer el error para clientes.
Checklist de la sección:
- [ ] Errores coherentes y predecibles
- [ ] Inputs validan tipos
10) Paginación, filtros y ordenación
- Offset/limit implementado en PostRepo y Query.posts.
- Alternativa cursor-based (opcional): usa un cursor base64 con el id/created_at.
Ejemplo simple de cursor (idea):
1 2 |
cursor = base64(id) |
Explicación: en producción, codifica múltiples campos para orden estable y soporta direcciones.
Checklist de la sección:
- [ ] Paginación offset/limit probada
- [ ] Filtros por autor y búsqueda activos
11) Observabilidad y salud
Logging estructurado con structlog (config mínima):
1 2 3 4 5 6 7 8 9 10 11 |
# app/utils/logging.py import structlog structlog.configure( processors=[ structlog.processors.TimeStamper(fmt="ISO"), structlog.processors.JSONRenderer() ] ) logger = structlog.get_logger() |
Explicación: logs en JSON facilitan correlación y análisis. Agrega un request-id en middleware si lo deseas.
Endpoint /healthz ya implementado. Para readiness, añade chequeo a DB.
Checklist de la sección:
- [ ] Logs en JSON
- [ ] Healthz responde 200
12) Pruebas con pytest y httpx
tests/testgraphqlapi.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
# tests/test_graphql_api.py import pytest from httpx import AsyncClient from app.api.main import app @pytest.mark.asyncio async def test_healthz(): async with AsyncClient(app=app, base_url="http://test") as ac: r = await ac.get("/healthz") assert r.status_code == 200 assert r.json()["status"] == "ok" @pytest.mark.asyncio async def test_register_and_login(monkeypatch): # En tests reales, usa una BD temporal (Docker) o transacciones con rollback async with AsyncClient(app=app, base_url="http://test") as ac: q = "mutation($e:String!,$p:String!){register(email:$e,password:$p){id email}}" r = await ac.post("/graphql", json={"query": q, "variables": {"e": "test@example.com", "p": "secret"}}) assert r.status_code == 200 q2 = "mutation($e:String!,$p:String!){login(email:$e,password:$p)}" r2 = await ac.post("/graphql", json={"query": q2, "variables": {"e": "test@example.com", "p": "secret"}}) assert r2.status_code == 200 assert "access_token" in r2.text |
Explicación: prueba endpoints básicos. Para integración real, configura una DB efímera y limpia datos tras cada test.
Cobertura:
1 2 |
pytest --cov=app -q |
CI: en GitHub Actions, agrega un servicio de Postgres y ejecuta migraciones antes de tests.
Checklist de la sección:
- [ ] Tests mínimos verdes
- [ ] Cobertura generada
13) Contenerización y ejecución (Docker FastAPI)
Dockerfile (multi-stage)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
# Builder FROM python:3.12-slim AS builder WORKDIR /app RUN apt-get update && apt-get install -y build-essential && rm -rf /var/lib/apt/lists/* COPY pyproject.toml* poetry.lock* /app/ RUN pip install poetry && poetry config virtualenvs.create false && poetry install --no-interaction --no-ansi --only main COPY . /app # Runtime FROM python:3.12-slim WORKDIR /app ENV PYTHONUNBUFFERED=1 COPY --from=builder /usr/local/lib/python3.12 /usr/local/lib/python3.12 COPY --from=builder /usr/local/bin /usr/local/bin COPY . /app EXPOSE 8000 CMD ["uvicorn", "app.api.main:app", "--host", "0.0.0.0", "--port", "8000"] |
Explicación: dos etapas reducen el tamaño final. En producción, puedes usar gunicorn con UvicornWorker.
Producción con Gunicorn:
1 2 |
gunicorn -k uvicorn.workers.UvicornWorker app.api.main:app -w 2 -b 0.0.0.0:8000 --timeout 60 |
Explicación: ajusta workers según CPU (2-4) y timeouts razonables.
.dockerignore
1 2 3 4 5 6 7 8 |
__pycache__ *.pyc .venv .mypy_cache .git .gitignore .env |
Explicación: evita copiar archivos innecesarios y secretos.
Ejecución:
1 2 |
docker compose up --build |
Explicación: construye la imagen y levanta API y DB con healthchecks.
Checklist de la sección:
- [ ] Imagen construida
- [ ] API accesible en 8000
14) Seguridad adicional y rendimiento
- CORS: restringe ALLOWED_ORIGINS en prod.
- Tamaño de request: configura clientmaxbody_size en el reverse proxy (Nginx) y valida inputs.
- Rate limiting: SlowAPI activo con umbrales prudentes.
- Anti N+1: DataLoader en campos con relaciones.
- Índices: añade índices para columnas filtradas (title, author_id).
- Performance SQL: usa EXPLAIN ANALYZE para identificar cuellos de botella.
- Cache lectura: considera Redis para queries frecuentes no mutables.
Checklist de la sección:
- [ ] CORS restringido
- [ ] Índices creados
- [ ] DataLoader aplicado
15) Checklist global, próximos pasos y FAQ
Checklist global de verificación:
- [ ] tutorial GraphQL Python implementado con FastAPI GraphQL y Strawberry GraphQL
- [ ] SQLAlchemy async funcionando con PostgreSQL y Alembic
- [ ] JWT Python para autenticación (access/refresh)
- [ ] Docker FastAPI con healthchecks
- [ ] Tests y cobertura básicos
Próximos pasos:
- Suscripciones GraphQL (WebSockets) para eventos en tiempo real.
- CDN/WAF delante de la API (Cloudflare) y un reverse proxy (Traefik/Nginx).
- Despliegue: Fly.io, Render, AWS ECS/Fargate o Kubernetes.
- Observabilidad avanzada: OpenTelemetry + Grafana/Loki/Tempo.
Resolución de problemas comunes
- Error de conexión a DB: verifica DATABASE_URL y que el servicio db esté healthy (docker compose ps).
- alembic revision no detecta modelos: asegúrate de importar modelos en migrations/env.py.
- Bcrypt error en Docker slim: instala build-essential en etapa builder (incluido en Dockerfile).
- 401 Unauthorized en mutations: confirma que envías Authorization: Bearer
y que no expiró. - N+1 en author: revisa que uses DataLoader en el field author.