403Webshell
Server IP : 66.29.132.122  /  Your IP : 3.16.79.33
Web Server : LiteSpeed
System : Linux business142.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : admazpex ( 531)
PHP Version : 7.2.34
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /proc/self/root/opt/cloudlinux/venv/lib/python3.11/site-packages/sentry_sdk/integrations/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /proc/self/root/opt/cloudlinux/venv/lib/python3.11/site-packages/sentry_sdk/integrations/quart.py
from __future__ import absolute_import

import inspect
import threading

from sentry_sdk.hub import _should_send_default_pii, Hub
from sentry_sdk.integrations import DidNotEnable, Integration
from sentry_sdk.integrations._wsgi_common import _filter_headers
from sentry_sdk.integrations.asgi import SentryAsgiMiddleware
from sentry_sdk.scope import Scope
from sentry_sdk.tracing import SOURCE_FOR_STYLE
from sentry_sdk.utils import (
    capture_internal_exceptions,
    event_from_exception,
)

from sentry_sdk._functools import wraps
from sentry_sdk._types import TYPE_CHECKING

if TYPE_CHECKING:
    from typing import Any
    from typing import Dict
    from typing import Union

    from sentry_sdk._types import EventProcessor

try:
    import quart_auth  # type: ignore
except ImportError:
    quart_auth = None

try:
    from quart import (  # type: ignore
        has_request_context,
        has_websocket_context,
        Request,
        Quart,
        request,
        websocket,
    )
    from quart.scaffold import Scaffold  # type: ignore
    from quart.signals import (  # type: ignore
        got_background_exception,
        got_request_exception,
        got_websocket_exception,
        request_started,
        websocket_started,
    )
    from quart.utils import is_coroutine_function  # type: ignore
except ImportError:
    raise DidNotEnable("Quart is not installed")

TRANSACTION_STYLE_VALUES = ("endpoint", "url")


class QuartIntegration(Integration):
    identifier = "quart"

    transaction_style = ""

    def __init__(self, transaction_style="endpoint"):
        # type: (str) -> None
        if transaction_style not in TRANSACTION_STYLE_VALUES:
            raise ValueError(
                "Invalid value for transaction_style: %s (must be in %s)"
                % (transaction_style, TRANSACTION_STYLE_VALUES)
            )
        self.transaction_style = transaction_style

    @staticmethod
    def setup_once():
        # type: () -> None

        request_started.connect(_request_websocket_started)
        websocket_started.connect(_request_websocket_started)
        got_background_exception.connect(_capture_exception)
        got_request_exception.connect(_capture_exception)
        got_websocket_exception.connect(_capture_exception)

        patch_asgi_app()
        patch_scaffold_route()


def patch_asgi_app():
    # type: () -> None
    old_app = Quart.__call__

    async def sentry_patched_asgi_app(self, scope, receive, send):
        # type: (Any, Any, Any, Any) -> Any
        if Hub.current.get_integration(QuartIntegration) is None:
            return await old_app(self, scope, receive, send)

        middleware = SentryAsgiMiddleware(lambda *a, **kw: old_app(self, *a, **kw))
        middleware.__call__ = middleware._run_asgi3
        return await middleware(scope, receive, send)

    Quart.__call__ = sentry_patched_asgi_app


def patch_scaffold_route():
    # type: () -> None
    old_route = Scaffold.route

    def _sentry_route(*args, **kwargs):
        # type: (*Any, **Any) -> Any
        old_decorator = old_route(*args, **kwargs)

        def decorator(old_func):
            # type: (Any) -> Any

            if inspect.isfunction(old_func) and not is_coroutine_function(old_func):

                @wraps(old_func)
                def _sentry_func(*args, **kwargs):
                    # type: (*Any, **Any) -> Any
                    hub = Hub.current
                    integration = hub.get_integration(QuartIntegration)
                    if integration is None:
                        return old_func(*args, **kwargs)

                    with hub.configure_scope() as sentry_scope:
                        if sentry_scope.profile is not None:
                            sentry_scope.profile.active_thread_id = (
                                threading.current_thread().ident
                            )

                        return old_func(*args, **kwargs)

                return old_decorator(_sentry_func)

            return old_decorator(old_func)

        return decorator

    Scaffold.route = _sentry_route


def _set_transaction_name_and_source(scope, transaction_style, request):
    # type: (Scope, str, Request) -> None

    try:
        name_for_style = {
            "url": request.url_rule.rule,
            "endpoint": request.url_rule.endpoint,
        }
        scope.set_transaction_name(
            name_for_style[transaction_style],
            source=SOURCE_FOR_STYLE[transaction_style],
        )
    except Exception:
        pass


async def _request_websocket_started(app, **kwargs):
    # type: (Quart, **Any) -> None
    hub = Hub.current
    integration = hub.get_integration(QuartIntegration)
    if integration is None:
        return

    with hub.configure_scope() as scope:
        if has_request_context():
            request_websocket = request._get_current_object()
        if has_websocket_context():
            request_websocket = websocket._get_current_object()

        # Set the transaction name here, but rely on ASGI middleware
        # to actually start the transaction
        _set_transaction_name_and_source(
            scope, integration.transaction_style, request_websocket
        )

        evt_processor = _make_request_event_processor(
            app, request_websocket, integration
        )
        scope.add_event_processor(evt_processor)


def _make_request_event_processor(app, request, integration):
    # type: (Quart, Request, QuartIntegration) -> EventProcessor
    def inner(event, hint):
        # type: (Dict[str, Any], Dict[str, Any]) -> Dict[str, Any]
        # if the request is gone we are fine not logging the data from
        # it.  This might happen if the processor is pushed away to
        # another thread.
        if request is None:
            return event

        with capture_internal_exceptions():
            # TODO: Figure out what to do with request body. Methods on request
            # are async, but event processors are not.

            request_info = event.setdefault("request", {})
            request_info["url"] = request.url
            request_info["query_string"] = request.query_string
            request_info["method"] = request.method
            request_info["headers"] = _filter_headers(dict(request.headers))

            if _should_send_default_pii():
                request_info["env"] = {"REMOTE_ADDR": request.access_route[0]}
                _add_user_to_event(event)

        return event

    return inner


async def _capture_exception(sender, exception, **kwargs):
    # type: (Quart, Union[ValueError, BaseException], **Any) -> None
    hub = Hub.current
    if hub.get_integration(QuartIntegration) is None:
        return

    # If an integration is there, a client has to be there.
    client = hub.client  # type: Any

    event, hint = event_from_exception(
        exception,
        client_options=client.options,
        mechanism={"type": "quart", "handled": False},
    )

    hub.capture_event(event, hint=hint)


def _add_user_to_event(event):
    # type: (Dict[str, Any]) -> None
    if quart_auth is None:
        return

    user = quart_auth.current_user
    if user is None:
        return

    with capture_internal_exceptions():
        user_info = event.setdefault("user", {})

        user_info["id"] = quart_auth.current_user._auth_id

Youez - 2016 - github.com/yon3zu
LinuXploit