403Webshell
Server IP : 66.29.132.122  /  Your IP : 18.219.71.21
Web Server : LiteSpeed
System : Linux business142.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : admazpex ( 531)
PHP Version : 7.2.34
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /proc/self/root/opt/cloudlinux/venv/lib/python3.11/site-packages/sentry_sdk/integrations/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /proc/self/root/opt/cloudlinux/venv/lib/python3.11/site-packages/sentry_sdk/integrations//fastapi.py
import asyncio
from copy import deepcopy

from sentry_sdk._types import TYPE_CHECKING
from sentry_sdk.hub import Hub, _should_send_default_pii
from sentry_sdk.integrations import DidNotEnable
from sentry_sdk.tracing import SOURCE_FOR_STYLE, TRANSACTION_SOURCE_ROUTE
from sentry_sdk.utils import transaction_from_function

if TYPE_CHECKING:
    from typing import Any, Callable, Dict
    from sentry_sdk.scope import Scope

try:
    from sentry_sdk.integrations.starlette import (
        StarletteIntegration,
        StarletteRequestExtractor,
    )
except DidNotEnable:
    raise DidNotEnable("Starlette is not installed")

try:
    import fastapi  # type: ignore
except ImportError:
    raise DidNotEnable("FastAPI is not installed")


_DEFAULT_TRANSACTION_NAME = "generic FastAPI request"


class FastApiIntegration(StarletteIntegration):
    identifier = "fastapi"

    @staticmethod
    def setup_once():
        # type: () -> None
        patch_get_request_handler()


def _set_transaction_name_and_source(scope, transaction_style, request):
    # type: (Scope, str, Any) -> None
    name = ""

    if transaction_style == "endpoint":
        endpoint = request.scope.get("endpoint")
        if endpoint:
            name = transaction_from_function(endpoint) or ""

    elif transaction_style == "url":
        route = request.scope.get("route")
        if route:
            path = getattr(route, "path", None)
            if path is not None:
                name = path

    if not name:
        name = _DEFAULT_TRANSACTION_NAME
        source = TRANSACTION_SOURCE_ROUTE
    else:
        source = SOURCE_FOR_STYLE[transaction_style]

    scope.set_transaction_name(name, source=source)


def patch_get_request_handler():
    # type: () -> None
    old_get_request_handler = fastapi.routing.get_request_handler

    def _sentry_get_request_handler(*args, **kwargs):
        # type: (*Any, **Any) -> Any
        dependant = kwargs.get("dependant")
        if (
            dependant
            and dependant.call is not None
            and not asyncio.iscoroutinefunction(dependant.call)
        ):
            old_call = dependant.call

            def _sentry_call(*args, **kwargs):
                # type: (*Any, **Any) -> Any
                hub = Hub.current
                with hub.configure_scope() as sentry_scope:
                    if sentry_scope.profile is not None:
                        sentry_scope.profile.update_active_thread_id()
                    return old_call(*args, **kwargs)

            dependant.call = _sentry_call

        old_app = old_get_request_handler(*args, **kwargs)

        async def _sentry_app(*args, **kwargs):
            # type: (*Any, **Any) -> Any
            hub = Hub.current
            integration = hub.get_integration(FastApiIntegration)
            if integration is None:
                return await old_app(*args, **kwargs)

            with hub.configure_scope() as sentry_scope:
                request = args[0]

                _set_transaction_name_and_source(
                    sentry_scope, integration.transaction_style, request
                )

                extractor = StarletteRequestExtractor(request)
                info = await extractor.extract_request_info()

                def _make_request_event_processor(req, integration):
                    # type: (Any, Any) -> Callable[[Dict[str, Any], Dict[str, Any]], Dict[str, Any]]
                    def event_processor(event, hint):
                        # type: (Dict[str, Any], Dict[str, Any]) -> Dict[str, Any]

                        # Extract information from request
                        request_info = event.get("request", {})
                        if info:
                            if "cookies" in info and _should_send_default_pii():
                                request_info["cookies"] = info["cookies"]
                            if "data" in info:
                                request_info["data"] = info["data"]
                        event["request"] = deepcopy(request_info)

                        return event

                    return event_processor

                sentry_scope._name = FastApiIntegration.identifier
                sentry_scope.add_event_processor(
                    _make_request_event_processor(request, integration)
                )

            return await old_app(*args, **kwargs)

        return _sentry_app

    fastapi.routing.get_request_handler = _sentry_get_request_handler

Youez - 2016 - github.com/yon3zu
LinuXploit