跳转至

request-id实现

RequestID实现

为单次请求赋予一个唯一的RequestID可以方便定位问题,获取调用栈和日志。比较常见的场景为,接收到请求后,为请求分配RequestID,本次请求产生的所有日志、第三方调用等均关联此RequestID,该RequestID在接口的返回体或者返回头中返回给调用者。

实现思路,常见的RequestID一般是在请求进入时分配并存入上下文(context)变量中。第一次实现的时候是将RequestID记录在了request.scope对象里,每次获取RequestID时均需要获取request对象,十分的不方便。直接介绍当前使用的方案,借助 starlette-context 工具库,此库已经实现了RequestID的生成和关联了,在这里接续介绍RequestID在请求体的自动返回和日志关联(可关联至gunicorn的AccesssLog)。

requirements.txt

starlette-context==0.2.3

common/request_id.py

import logging

from starlette_context import context
from starlette_context.plugins.plugin_uuid import PluginUUIDBase

REQUEST_ID_CTX_KEY = 'X-Request-ID'


class RequestIdPlugin(PluginUUIDBase):
    key = REQUEST_ID_CTX_KEY

    def get_new_uuid(self) -> str:
        func = self.uuid_functions_mapper[self.version]
        return str(func())


def current_request_id():
    """get current request_id inside context var"""
    return context.data[REQUEST_ID_CTX_KEY]


class RequestIDLogFilter(logging.Filter):
    """
    Log filter to inject the current request id of the request under `log_record.request_id`
    """

    def filter(self, log_record):
        log_record.request_id = current_request_id()
        return log_record

主要修改点:

  • RequestIdPlugin 重写了 get_new_uuid 函数,调整了下生成的uuid格式。
  • current_request_id会自动返回本次请求对应的request_id
  • RequestIDLogFilter 用于为日志对象添加request_id值

为App新增中间件 main.py

from starlette_context.middleware import ContextMiddleware 
from common.request_id import RequestIdPlugin

def get_application() -> FastAPI:
    application = FastAPI(title="Demo", debug=config.DEBUG, version='1.0')

    # 中间件
    application.add_middleware(ContextMiddleware, plugins=(RequestIdPlugin(),))

    ...

    application.include_router(api_router, prefix='/api')
    return application

接口自动返回RequestID

common/response.py

from typing import Any, List

from pydantic import BaseModel, Field
from common.request_id import current_request_id


class Response(BaseModel):
    """接口返回格式"""
    code: int = Field(..., example=0, description='响应码')
    message: str = Field(..., example="success", description='响应码解释,success或者failed。')
    cause: Any = Field('', example="资源未找到", description='异常解释。')
    request_id: str = Field(default_factory=current_request_id, example="04950cb5-6b24-426b-a5ce-db6344982597",
                            description='RequestId,用于定位单次请求。')
    result: Any = Field(..., example="", description='响应数据,数据格式根据API变化。')

views.py

@router.get("/demo", response_model=Response)
async def demo():
    return Response(message='success', code=0, result='demo。')

单次请求的返回如下:

{'cause': '',
 'code': 0,
 'message': 'success',
 'request_id': '99cf0bf2-42a0-4ed4-a396-da611a370d06',
 'result': 'demo'}

日志关联

common/constances.py

LOG_FORMAT = '%(asctime)s [pid:%(process)d] %(filename)s(line:%(lineno)d) %(levelname)s %(message)s'
REQUEST_ID_LOG_FORMAT = '%(asctime)s [pid:%(process)d] %(filename)s(line:%(lineno)d) %(levelname)s [%(request_id)s] %(message)s'

common/log_handler.py

import json
import logging
import os
from copy import copy
from logging.handlers import WatchedFileHandler

from uvicorn.logging import AccessFormatter

import config
from common.constants import LOG_FORMAT, REQUEST_ID_LOG_FORMAT
from common.request_id import RequestIDLogFilter, current_request_id
from common.request_time import current_request_time

class LogHandler(logging.Logger):
    """
    LogHandler
    """
    log_format = LOG_FORMAT

    def __init__(self, name, level=None, stream=True, file=True):
        self.name = name
        if level:
            self.level = level
        else:
            self.level = config.LOG_LEVEL
        logging.Logger.__init__(self, self.name, level=self.level)
        if stream:
            self.__setStreamHandler__()
        if file:
            self.__setFileHandler__()

    def __setFileHandler__(self, level=None):
        """
        set file handler
        :param level:
        :return:
        """
        file_name = os.path.join(LOG_PATH, f'{self.name}.log')
        file_handler = WatchedFileHandler(filename=file_name)
        if not level:
            file_handler.setLevel(self.level)
        else:
            file_handler.setLevel(level)
        formatter = logging.Formatter(self.log_format)

        file_handler.setFormatter(formatter)
        self.file_handler = file_handler
        self.addHandler(file_handler)

    def __setStreamHandler__(self, level=None):
        """
        set stream handler
        :param level:
        :return:
        """
        stream_handler = logging.StreamHandler()
        formatter = logging.Formatter(self.log_format)
        stream_handler.setFormatter(formatter)
        if not level:
            stream_handler.setLevel(self.level)
        else:
            stream_handler.setLevel(level)
        self.addHandler(stream_handler)

    def resetName(self, name):
        """
        reset name
        :param name:
        :return:
        """
        self.name = name
        self.removeHandler(self.file_handler)
        self.__setFileHandler__()


class RequestLogHandler(LogHandler):
    log_format = REQUEST_ID_LOG_FORMAT

    def __setFileHandler__(self, level=None):
        super(RequestLogHandler, self).__setFileHandler__(level=level)
        self.addFilter(RequestIDLogFilter())

    def __setStreamHandler__(self, level=None):
        super(RequestLogHandler, self).__setStreamHandler__(level=level)
        self.addFilter(RequestIDLogFilter())

views.py:

log = RequestLogHandler('demo')

@router.get("/demo", response_model=Response)
async def demo():
    log.info('demo')
    return Response(message='success', code=0, result='demo')

输出日志:

2020-10-01 16:27:53,065 [pid:19120] views.py(line:26) INFO [d42f6cfb-2a0f-4673-a3c9-d8c00d2e58b4] demo

关联AccessLog

FastAPI当前使用的部署方案是 nginx->gunicorn->uvicorn->fastapi,nginx做负载均衡,gunicorn负责管理调度uvicorn,uvicorn再将具体请求传递给fastapi。gunicorn部署的时候一般指定worker type为uvicorn.workers.UvicornWorker,具体可以参考 uvicorn-deployment。为了让Access日志能够显示RequestID,需要的默认的Worker进行重写。

common/workers.py

import logging

from uvicorn.workers import UvicornWorker

from common import constants
from common.log_handler import RequestAccessFormatter


class RequestUvicornWorker(UvicornWorker):
    """自定义worker 生成指定格式的access日志"""

    def __init__(self, *args, **kwargs):
        super(RequestUvicornWorker, self).__init__(*args, **kwargs)
        logger = logging.getLogger("uvicorn.access")
        for handler in self.log.access_log.handlers:
            handler.setFormatter(RequestAccessFormatter(constants.ACCESS_LOG_FORMAT))
        logger.handlers = self.log.access_log.handlers
        logger.setLevel(self.log.access_log.level)

common/log_handler.py

from common.request_time import current_request_time
from uvicorn.logging import AccessFormatter


class RequestAccessFormatter(AccessFormatter):
    def __init__(self, fmt=None, datefmt=None, style="%", use_colors=None):
        super().__init__(fmt=fmt, datefmt=datefmt, style=style)
        self.use_colors = False

    def get_status_code(self, record):
        status_code = record.__dict__["status_code"]
        return status_code

    def formatMessage(self, record):
        record_copy = copy(record)

        record_copy.__dict__['request_id'] = current_request_id()
        return super().formatMessage(record_copy)

common/comstants.py

ACCESS_LOG_FORMAT = '[%(asctime)s] %(request_id)s %(status_code)s %(client_addr)s ' \
                    '- "%(request_line)s"'

gunicorn使用的配置文件 gun.py

import multiprocessing
import config as local_config

bind = f'{local_config.HOST}:{local_config.PORT}'
workers = multiprocessing.cpu_count() * 2
backlog = 2048
worker_class = 'common.workers.RequestUvicornWorker'
proc_name = 'gunicorn.proc'
pidfile = 'log/gunicorn.pid'
errorlog = 'log/error.log'
accesslog = 'log/access.log'
loglevel = 'info'
daemon = True
timeout = 10
limit_request_line = 0

使用gunicorn启动服务后,access.log展示的日志格式将如下:

[2020-09-30 15:28:54,793] ff1216ac-f46e-4997-901b-c749c306e744 200 192.168.11.1:53607 - "GET /redoc HTTP/1.1"

ff1216ac-f46e-4997-901b-c749c306e744 就是本次请求对应的RequestID。

RequestTime实现

Gunicorn部署Flask的时候,是可以自动计算请求耗时的。但是使用Gunicorn+uvicorn之后,Access日志是由uvicorn负责打印的,uvicorn(uvicorn==0.11.8)当前好像还是不支持请求耗时计算。查看单次请求的耗时是个刚需,和RequestID的实现方式类型,依旧借助 starlette-context 实现,改造如下:

common/middlewares.py

from contextvars import Token

from starlette.middleware.base import RequestResponseEndpoint
from starlette.requests import Request
from starlette.responses import Response
from starlette_context import _request_scope_context_storage
from starlette_context.middleware import ContextMiddleware as BaseContextMiddleware


class ContextMiddleware(BaseContextMiddleware):
    async def dispatch(
            self, request: Request, call_next: RequestResponseEndpoint
    ) -> Response:
        _starlette_context_token: Token = _request_scope_context_storage.set(
            await self.set_context(request)
        )
        try:
            response = await call_next(request)
            for plugin in self.plugins:
                await plugin.enrich_response(response)
        finally:
            pass

        return response

主要的修改点是之前请求完成后,会重置全局上下文变量,删除了这个重置动作。

common/request_time.py

import time
from typing import Union

from starlette.requests import Request
from starlette.responses import Response
from starlette_context import _request_scope_context_storage
from starlette_context import context
from starlette_context.plugins.plugin import Plugin

REQUEST_START_TIME_CTX_KEY = 'X-Request-Start-Time'
REQUEST_TIME_CTX_KEY = 'X-Request-Time'


class RequestTimePlugin(Plugin):
    key = REQUEST_START_TIME_CTX_KEY

    async def process_request(self, request: Request) -> Union[str, int, dict, float]:
        """
        Runs always on request.
        Extracts value from header by default.
        """
        assert isinstance(self.key, str)
        return time.time()

    async def enrich_response(self, response: Response) -> None:
        context: dict = _request_scope_context_storage.get()
        start_time = context.pop(self.key)
        context[REQUEST_TIME_CTX_KEY] = time.time() - start_time
        _request_scope_context_storage.set(context)


def current_request_time(precision=3):
    """get current request_time inside context var"""
    context_data = context.data
    if REQUEST_TIME_CTX_KEY in context_data:
        request_time = round(context.data[REQUEST_TIME_CTX_KEY], precision)
    else:
        request_time = round(time.time() - context.data[REQUEST_START_TIME_CTX_KEY], precision)
    return request_time

common/log_handler.py

class RequestAccessFormatter(AccessFormatter):
    def __init__(self, fmt=None, datefmt=None, style="%", use_colors=None):
        super().__init__(fmt=fmt, datefmt=datefmt, style=style)
        self.use_colors = False

    def get_status_code(self, record):
        status_code = record.__dict__["status_code"]
        return status_code

    def formatMessage(self, record):
        record_copy = copy(record)

        record_copy.__dict__['request_time'] = current_request_time()
        record_copy.__dict__['request_id'] = current_request_id()
        return super().formatMessage(record_copy)

common/constants.py

ACCESS_LOG_FORMAT = '[%(asctime)s] %(request_id)s %(status_code)s %(request_time)ss %(client_addr)s - "%(request_line)s"'

main.py:

# 中间件
application.add_middleware(ContextMiddleware, plugins=(RequestIdPlugin(), RequestTimePlugin()))

worker的修改参考RequestID部分的介绍。

最终的AccessLog格式大致为:

[2020-10-01 16:58:27,183] 079fa7bd-e737-40c5-9edf-6aac536595ad 200 0.01s 172.16.120.13:52621 - "PUT /demo HTTP/1.1"