Published on

Integrating FastAPI with Structlog

Authors
  • avatar
    Name
    Daniel Herrmann
    Twitter

The built-in Python logging module is doing a great job. If you're wondering yourself if you need an additonal logging library such as structlog or loguru - the answer might very well be "no". Before deciding to go down this route, please familiarize yourself with Python logging and read (yes, read!) the logging module tutorial. It's an excellent read!

A Structlog Primer

According to the logs section of the Twelve-Factor App (which is another highly recommended read), logs provide visibility into the behaviour of the running app. When running in local development mode, they should be easy to read for the developer. When running in production, they should be easy to parse for the logging environment. We for example use a combination of fluentbit, fluentd and OpenSearch as logging pipeline. All of this could be done with the standard logging module though. Compare these log entries:

DEBUG:__main__:User example triggered deletion of hero 18bcb9d1-9ecd-4297-a7cc-591d0ededf7a
INFO:__main__:Hero 18bcb9d1-9ecd-4297-a7cc-591d0ededf7a deleted successfully 
2024-05-03T18:58:21.443989Z [debug    ] Triggering deletion of hero [app.worker] request_id=97133231229d4af2a0588e468049261c hero_id=18bcb9d1-9ecd-4297-a7cc-591d0ededf7a user_id=example
2024-05-03T18:58:21.509100Z [info     ] Hero deleted successfully [app.worker] request_id=97133231229d4af2a0588e468049261c hero_id=18bcb9d1-9ecd-4297-a7cc-591d0ededf7a user_id=example

The second example provides additional context for each log entry, allowing us to identify all log entries related to a particular request and see which objects are attached. This is better, and is perfectly fine for local development. For parsing in production, this is even better, as it allows for direct ingestion without the need for sophisticated and potentially error-prone parsing:

{"request_id": "33fe32d22dd44868afbf36b0ed40a104", "logger": "arq.worker", "level": "info", "message": "33fe32d22dd44868afbf36b0ed40a104:print_hero", "asctime": "21:42:51", "timestamp": "2024-05-03T19:42:51.613223Z"}
{"request_id": "21b69ca806ac4e64a9af9f91d3d35ef3", "logger": "app.worker.hero", "level": "info", "timestamp": "2024-05-03T19:42:51.617299Z", "message": "Hero 18bcb9d1-9ecd-4297-a7cc-591d0ededf7a printed"}

Adding context to log messages can be quite tedious, especially when you need to add the parameters to each logging call. This is where bound loggers come in very handy. The documentation of structlog is very good, therefore please head over to the documentation to understand the concepts. In short, it allows to do this:

import structlog

logger = structlog.get_logger()
log = logger.bind(foo="bar")

Integrating Structlog with FastAPI

With the integration of structlog into FastAPI, the goal is to:

  • Be able to switch between human readable output and JSON output
  • Attach one request ID to each incoming request and add it to all log messages
  • Automatically attach certain request attributes (path, HTTP method, function, ...)
  • Make sure the uvicorn logger behaves well with structlog
  • Handle exceptions automatically and serialize them properly in the logs
  • Optionally, be able to quickly bind SQLAlchemy models to the logger

Log Settings

Assuming we're using pydantic-settings, first extend your configuration to include the neccessary variables:

# src/core/config.py
from pydantic_settings import BaseSettings, SettingsConfigDict


class Settings(BaseSettings):
    LOG_LEVEL: str = "INFO"
    LOG_JSON_FORMAT: bool = False
    LOG_NAME: str = "your_app.app_logs"
    LOG_ACCESS_NAME: str = "your_app.access_logs"

Configure the Logger

Structlog uses processors to format and/or filter the logs. Some of this code is taken from this excellent Gist on Github, just modified a little and removed their Datadog integration. Also, we're going to handle exceptions in the middleware (more on the resaon for that later), so we're removing that code here as well. The goals are:

  • Configure the required preprocessors and processors to make structlog output everything the way we want
  • Add a formatter to the Python logging module to convert existing logs to the correct format
  • Prepare uvicorn loggers to be able to enrich them with structured information
# src/core/logger.py
import logging
import re
import sys
from typing import Any

import structlog
from structlog.types import EventDict, Processor

from .config import settings


def drop_color_message_key(_, __, event_dict: EventDict) -> EventDict:
    """
    Uvicorn logs the message a second time in the extra `color_message`, but we don't
    need it. This processor drops the key from the event dict if it exists.
    """
    event_dict.pop("color_message", None)
    return event_dict


def setup_logging(json_logs: bool = False, log_level: str = "INFO"):
    timestamper = structlog.processors.TimeStamper(fmt="iso")

    shared_processors: list[Processor] = [
        structlog.contextvars.merge_contextvars,
        structlog.stdlib.add_logger_name,
        structlog.stdlib.add_log_level,
        structlog.stdlib.PositionalArgumentsFormatter(),
        structlog.stdlib.ExtraAdder(),
        drop_color_message_key,
        timestamper,
        structlog.processors.StackInfoRenderer(),
    ]

    if json_logs:
        # Format the exception only for JSON logs, as we want to pretty-print them when
        # using the ConsoleRenderer
        shared_processors.append(structlog.processors.format_exc_info)

    structlog.configure(
        processors=shared_processors
        + [
            structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
        ],
        logger_factory=structlog.stdlib.LoggerFactory(),
        cache_logger_on_first_use=True,
    )

    log_renderer: structlog.types.Processor
    if json_logs:
        log_renderer = structlog.processors.JSONRenderer()
    else:
        log_renderer = structlog.dev.ConsoleRenderer()

    formatter = structlog.stdlib.ProcessorFormatter(
        # These run ONLY on `logging` entries that do NOT originate within
        # structlog.
        foreign_pre_chain=shared_processors,
        # These run on ALL entries after the pre_chain is done.
        processors=[
            # Remove _record & _from_structlog.
            structlog.stdlib.ProcessorFormatter.remove_processors_meta,
            log_renderer,
        ],
    )

    # Reconfigure the root logger to use our structlog formatter, effectively emitting the logs via structlog
    handler = logging.StreamHandler()
    handler.setFormatter(formatter)
    root_logger = logging.getLogger()
    root_logger.addHandler(handler)
    root_logger.setLevel(log_level.upper())

    for _log in ["uvicorn", "uvicorn.error"]:
        # Make sure the logs are handled by the root logger
        logging.getLogger(_log).handlers.clear()
        logging.getLogger(_log).propagate = True

    # Uvicorn logs are re-emitted with more context. We effectively silence them here
    logging.getLogger("uvicorn.access").handlers.clear()
    logging.getLogger("uvicorn.access").propagate = False

Important notes:

  • structlog.processors.format_exc_info is used to consistently parse exception information and stack traces. For this to work, the exception must be passed as exc_info attribute with the log.
  • Native uvicorn logs are suppressed and re-emitted in the middleware to use structlog and enrich it with proper formatting
  • The json_logs argument is used to determine whether the ConsoleRenderer should be used (for local development) or the JSONRenderer should be used for production environments

Thin logging wrapper

With native structlog, there are two ways to attach contextual information to structlog:

Bound Logger

See the structlog documentation on bound logger for more details.

import structlog

log1 = structlog.get_logger()
log1.debug("This is a message without context")

log2 = logger.bind(foo="bar")
log2.debug("This is a message with context")

log1 == log2  # Will be false

There are two things I don't like too much about this:

  • Binding a value returns a new logger instance which then needs to be used
  • The values added to the logger are not context aware, which means binding them means they're bound for all requests

Context Variables

Built on the standard library contextvars module, this integration allows us to bind values to the logger locally to a certain context. This fits very well into FastAPI, as contextvars will be locally relevant for each request. There is a nice writeup by the FastAPI creator on Github on this topic.

import structlog

structlog.contextvars.clear_contextvars()

log = structlog.get_logger()
log.debug("This is a message without context")

structlog.contextvars.bind_contextvars(a=1, b=2)
log2.debug("This is a message with context")

This looks much better already. At the beginning of the request, we can clear any existing contextvars, assign a unique request ID and other important information and therefore make sure that every log emitted within the request context will contain this information.

Writing this every time still seems a bit verbose though. Also, one common usecase we found very often is to assign SQLAlchemy objects to the request (rather, their ID). Our goal was to simplify this as much as possible:

# instead of doing this every time
log.bind(user_id=user_object.id)

# we'd want to this and have the logger figure it out behind the scenes
log.bind(user_object)

Therefore, we usually implement a thin wrapper, which translates the easy to use bind and unbind syntax to the _contextvars methods and handles SQLAlchemy models for us. Models will be translated to snake case, so adding a model called MyNiceModel would result in a key my_nice_model and a value containing the id of the model. If you use a different field as primary key, you may need to change this part for your case.

import structlog


class FastAPIStructLogger:
    def __init__(self, log_name=settings.LOG_NAME):
        self.logger = structlog.stdlib.get_logger(log_name)

    @staticmethod
    def _to_snake_case(name):
        return re.sub(r"(?<!^)(?=[A-Z])", "_", name).lower()

    def bind(self, *args, **new_values: Any):
        for arg in args:
            if not issubclass(type(arg), Base):
                self.logger.error(
                    "Unsupported argument when trying to log."
                    f"Unnamed argument must be a subclass of Base. Invalid argument: {type(arg).__name__}"
                )
                continue

            key = self._to_snake_case(type(arg).__name__)

            structlog.contextvars.bind_contextvars(**{key: arg.id})

        structlog.contextvars.bind_contextvars(**new_values)

    @staticmethod
    def unbind(*keys: str):
        structlog.contextvars.unbind_contextvars(*keys)

    def debug(self, event: str | None = None, *args: Any, **kw: Any):
        self.logger.debug(event, *args, **kw)

    def info(self, event: str | None = None, *args: Any, **kw: Any):
        self.logger.info(event, *args, **kw)

    def warning(self, event: str | None = None, *args: Any, **kw: Any):
        self.logger.warning(event, *args, **kw)

    warn = warning

    def error(self, event: str | None = None, *args: Any, **kw: Any):
        self.logger.error(event, *args, **kw)

    def critical(self, event: str | None = None, *args: Any, **kw: Any):
        self.logger.critical(event, *args, **kw)

    def exception(self, event: str | None = None, *args: Any, **kw: Any):
        self.logger.exception(event, *args, **kw)

Logging Middleware

Most of the actual magic is still to come, its mainly implemented in the following middleware. Its job is to automatically adjust the contextvars to include request parameters, implement structured uvicorn logging and handle exceptions gracefully.

First, we need a way to uniquely identify requests. For this we're going to use the asgi-correlation-id package, which provides a ready to use middleware for ASGI based web servers such as uvicorn. The default behaviour is to watch for the presence of the X-Request-ID header. If present (e.g. if your frontend generates a unique request id) it will use this value, if not it will auto-generate a random value and add it to the HTTP response. This allows for scenarios where you can display the request or correlation ID to the user in case of an error, and its easy to find related log entries.

The logging middleware itself then simply clears the contextvars on the beginning of each request, adds the detected correlation ID to the bound logger and then adds additional details such as the HTTP path, the HTTP method and so on to the log as well. You can of course customize this to your needs.

We also want to automatically handle exceptions that are raised from within the code. FastAPI does provide build-in exception hooks, which are quite flexible and powerful. These exception hooks are however not used when the exception is raised (a) in a background task or (b) in a middleware. As we would need to re-raise the exceptions in our logging middleware, we cannot rely on the hooks but instead handle exceptions directly in the middleware as well. There is an extremely nice writeup and recommended read about this topic.

import time
from typing import TypedDict

import structlog
from asgi_correlation_id import correlation_id
from starlette.responses import JSONResponse
from starlette.types import ASGIApp, Receive, Scope, Send
from uvicorn.protocols.utils import get_path_with_query_string

from backend.core.config import settings

app_logger = structlog.stdlib.get_logger(settings.LOG_NAME)
access_logger = structlog.stdlib.get_logger(settings.LOG_ACCESS_NAME)


class AccessInfo(TypedDict, total=False):
    status_code: int
    start_time: float


class StructLogMiddleware:
    def __init__(self, app: ASGIApp):
        self.app = app
        pass

    async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
        # If the request is not an HTTP request, we don't need to do anything special
        if scope["type"] != "http":
            await self.app(scope, receive, send)
            return

        structlog.contextvars.clear_contextvars()
        structlog.contextvars.bind_contextvars(request_id=correlation_id.get())

        info = AccessInfo()

        # Inner send function
        async def inner_send(message):
            if message["type"] == "http.response.start":
                info["status_code"] = message["status"]
            await send(message)

        try:
            info["start_time"] = time.perf_counter_ns()
            await self.app(scope, receive, inner_send)
        except Exception as e:
            app_logger.exception(
                "An unhandled exception was caught by last resort middleware",
                exception_class=e.__class__.__name__,
                exc_info=e,
                stack_info=settings.LOG_INCLUDE_STACK,
            )
            info["status_code"] = 500
            response = JSONResponse(
                status_code=500,
                content={
                    "error": "Internal Server Error",
                    "message": "An unexpected error occurred.",
                },
            )
            await response(scope, receive, send)
        finally:
            process_time = time.perf_counter_ns() - info["start_time"]
            client_host, client_port = scope["client"]
            http_method = scope["method"]
            http_version = scope["http_version"]
            url = get_path_with_query_string(scope)

            # Recreate the Uvicorn access log format, but add all parameters as structured information
            access_logger.info(
                f"""{client_host}:{client_port} - "{http_method} {scope["path"]} HTTP/{http_version}" {info["status_code"]}""",
                http={
                    "url": str(url),
                    "status_code": info["status_code"],
                    "method": http_method,
                    "request_id": correlation_id.get(),
                    "version": http_version,
                },
                network={"client": {"ip": client_host, "port": client_port}},
                duration=process_time,
            )

Note that this implementation uses two different loggers:

  • Application logs (name configured using settings.LOG_NAME): this is mainly used for your own logs. It will automatically include the additional fields listed above
  • Uvicorn access logs (name configured using settings.LOG_ACCESS_NAME): this contains the uvicorn access logs, re-emitted using structured information

Bring it all together

Finally, the FastAPI app itself needs to be configured accordingly.

from asgi_correlation_id import CorrelationIdMiddleware
from fastapi import FastAPI

from .middleware import (
    StructLogMiddleware,
    XForwardedMiddleware,
)

# Do your lifespan stuff here such as opening DB connections, ...
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator:
    pass

app = FastAPI(lifespan=lifespan)

application.add_middleware(StructLogMiddleware)
application.add_middleware(CorrelationIdMiddleware)

Keep in mind that the order of middlewares is important. They're applied in an "onion" model, where each added middleware gets wrapped around the application including potentially added middlewars. In the particular example above, the resulting stack would look like this to make sure the request ID is generated first.

CorrelationIdMiddleware PRE-Code
    StructLogMiddleware PRE-Code
        FastAPI App
    StructLogMiddleware POST-Code
CorrelationIdMiddleware POST-Code

Use the new logging solution

With all this work done, we can finally go ahead and use our new logging ability. This is obviously just a stupid example, but it should show the general use.

import uuid

from fastapi import APIRouter, HTTPException
from fastapi_async_sqlalchemy import db

from app import models, schemas
from app.core import FastAPIStructLogger
from app.crud import crud_hero

log = FastAPIStructLogger()
router = APIRouter(prefix="/hero", tags=["heroes"])


# Demonstrate logger usage
@router.get("/{hero_id}/ability", response_model=schemas.HeroSchema)
async def get_hero_ability(hero_id: uuid.UUID):

    # Bind requested log ID
    log.bind(requested_hero_id=hero_id)
    hero = await crud_hero.get(db.session, hero_id)

    if not hero:
        log.warning("Hero not found")
        raise HTTPException(status_code=404, detail="Hero not found")
    await db.session.refresh(hero, attribute_names=["ability"])

    log.info("Successfully fetched hero ability")
    return hero.ability

In local development, the output is like this:

# Successful request
2024-05-04T12:39:38.923009Z [info     ] Successfully fetched hero ability [mksp.backend.app] request_id=1620de462c854bbf9164404d05997d70 requested_hero_id=UUID('18bcb9d1-9ecd-4297-a7cc-591d0ededf7a')
2024-05-04T12:39:38.924908Z [info     ] 127.0.0.1:54518 - "GET /api/v1/hero/18bcb9d1-9ecd-4297-a7cc-591d0ededf7a/ability HTTP/1.1" 200 [mksp.backend.access] duration=82099875 http={'url': '/api/v1/hero/18bcb9d1-9ecd-4297-a7cc-591d0ededf7a/ability', 'status_code': 200, 'method': 'GET', 'request_id': '1620de462c854bbf9164404d05997d70', 'version': '1.1'} network={'client': {'ip': '127.0.0.1', 'port': 54518}} request_id=1620de462c854bbf9164404d05997d70

# Non-existent request
2024-05-04T12:40:11.174939Z [warning  ] Hero not found                 [mksp.backend.app] request_id=1c2ce023eda3490eb92fc4153823e636 requested_hero_id=UUID('e4e242a9-ed29-4adf-b15b-403d25e98e0b')
2024-05-04T12:40:11.177626Z [info     ] 127.0.0.1:54529 - "GET /api/v1/hero/e4e242a9-ed29-4adf-b15b-403d25e98e0b/ability HTTP/1.1" 404 [mksp.backend.access] duration=9780292 http={'url': '/api/v1/hero/e4e242a9-ed29-4adf-b15b-403d25e98e0b/ability', 'status_code': 404, 'method': 'GET', 'request_id': '1c2ce023eda3490eb92fc4153823e636', 'version': '1.1'} network={'client': {'ip': '127.0.0.1', 'port': 54529}} request_id=1c2ce023eda3490eb92fc4153823e636

Or, if we enable JSON output by setting the LOG_JSON_FORMAT environment to true:

{"event": "Hero not found", "request_id": "6fb1e9a3b27541ffbdd3e95e0376e514", "requested_hero_id": "UUID('18bcb9d1-9ecd-4297-a7cc-591d0edadf7a')", "logger": "mksp.backend.app", "level": "warning", "timestamp": "2024-05-04T12:41:56.312632Z"}
{"http": {"url": "/api/v1/hero/18bcb9d1-9ecd-4297-a7cc-591d0edadf7a/ability", "status_code": 404, "method": "GET", "request_id": "6fb1e9a3b27541ffbdd3e95e0376e514", "version": "1.1"}, "network": {"client": {"ip": "127.0.0.1", "port": 54761}}, "duration": 105338709, "event": "127.0.0.1:54761 - \"GET /api/v1/hero/18bcb9d1-9ecd-4297-a7cc-591d0edadf7a/ability HTTP/1.1\" 404", "request_id": "6fb1e9a3b27541ffbdd3e95e0376e514", "logger": "mksp.backend.access", "level": "info", "timestamp": "2024-05-04T12:41:56.314768Z"}

This can easily be parsed by whatever log ingestion pipeline, including all contextual information.