403Webshell
Server IP : 66.29.132.122  /  Your IP : 3.139.239.135
Web Server : LiteSpeed
System : Linux business142.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : admazpex ( 531)
PHP Version : 7.2.34
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /proc/self/root/proc/self/root/proc/thread-self/root/opt/hc_python/lib64/python3.8/site-packages/sentry_sdk/integrations/grpc/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /proc/self/root/proc/self/root/proc/thread-self/root/opt/hc_python/lib64/python3.8/site-packages/sentry_sdk/integrations/grpc//server.py
import sentry_sdk
from sentry_sdk.consts import OP
from sentry_sdk.integrations import DidNotEnable
from sentry_sdk.integrations.grpc.consts import SPAN_ORIGIN
from sentry_sdk.tracing import Transaction, TRANSACTION_SOURCE_CUSTOM

from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from typing import Callable, Optional
    from google.protobuf.message import Message

try:
    import grpc
    from grpc import ServicerContext, HandlerCallDetails, RpcMethodHandler
except ImportError:
    raise DidNotEnable("grpcio is not installed")


class ServerInterceptor(grpc.ServerInterceptor):  # type: ignore
    def __init__(self, find_name=None):
        # type: (ServerInterceptor, Optional[Callable[[ServicerContext], str]]) -> None
        self._find_method_name = find_name or ServerInterceptor._find_name

        super().__init__()

    def intercept_service(self, continuation, handler_call_details):
        # type: (ServerInterceptor, Callable[[HandlerCallDetails], RpcMethodHandler], HandlerCallDetails) -> RpcMethodHandler
        handler = continuation(handler_call_details)
        if not handler or not handler.unary_unary:
            return handler

        def behavior(request, context):
            # type: (Message, ServicerContext) -> Message
            with sentry_sdk.isolation_scope():
                name = self._find_method_name(context)

                if name:
                    metadata = dict(context.invocation_metadata())

                    transaction = Transaction.continue_from_headers(
                        metadata,
                        op=OP.GRPC_SERVER,
                        name=name,
                        source=TRANSACTION_SOURCE_CUSTOM,
                        origin=SPAN_ORIGIN,
                    )

                    with sentry_sdk.start_transaction(transaction=transaction):
                        try:
                            return handler.unary_unary(request, context)
                        except BaseException as e:
                            raise e
                else:
                    return handler.unary_unary(request, context)

        return grpc.unary_unary_rpc_method_handler(
            behavior,
            request_deserializer=handler.request_deserializer,
            response_serializer=handler.response_serializer,
        )

    @staticmethod
    def _find_name(context):
        # type: (ServicerContext) -> str
        return context._rpc_event.call_details.method.decode()

Youez - 2016 - github.com/yon3zu
LinuXploit