403Webshell
Server IP : 66.29.132.122  /  Your IP : 3.16.135.36
Web Server : LiteSpeed
System : Linux business142.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : admazpex ( 531)
PHP Version : 7.2.34
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /proc/self/root/proc/self/root/proc/thread-self/root/opt/hc_python/lib64/python3.8/site-packages/sentry_sdk/integrations/redis/modules/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /proc/self/root/proc/self/root/proc/thread-self/root/opt/hc_python/lib64/python3.8/site-packages/sentry_sdk/integrations/redis/modules/queries.py
"""
Code used for the Queries module in Sentry
"""

from sentry_sdk._types import TYPE_CHECKING
from sentry_sdk.consts import OP, SPANDATA
from sentry_sdk.integrations.redis.utils import _get_safe_command
from sentry_sdk.utils import capture_internal_exceptions


if TYPE_CHECKING:
    from redis import Redis
    from sentry_sdk.integrations.redis import RedisIntegration
    from sentry_sdk.tracing import Span
    from typing import Any


def _compile_db_span_properties(integration, redis_command, args):
    # type: (RedisIntegration, str, tuple[Any, ...]) -> dict[str, Any]
    description = _get_db_span_description(integration, redis_command, args)

    properties = {
        "op": OP.DB_REDIS,
        "description": description,
    }

    return properties


def _get_db_span_description(integration, command_name, args):
    # type: (RedisIntegration, str, tuple[Any, ...]) -> str
    description = command_name

    with capture_internal_exceptions():
        description = _get_safe_command(command_name, args)

    data_should_be_truncated = (
        integration.max_data_size and len(description) > integration.max_data_size
    )
    if data_should_be_truncated:
        description = description[: integration.max_data_size - len("...")] + "..."

    return description


def _set_db_data_on_span(span, connection_params):
    # type: (Span, dict[str, Any]) -> None
    span.set_data(SPANDATA.DB_SYSTEM, "redis")

    db = connection_params.get("db")
    if db is not None:
        span.set_data(SPANDATA.DB_NAME, str(db))

    host = connection_params.get("host")
    if host is not None:
        span.set_data(SPANDATA.SERVER_ADDRESS, host)

    port = connection_params.get("port")
    if port is not None:
        span.set_data(SPANDATA.SERVER_PORT, port)


def _set_db_data(span, redis_instance):
    # type: (Span, Redis[Any]) -> None
    try:
        _set_db_data_on_span(span, redis_instance.connection_pool.connection_kwargs)
    except AttributeError:
        pass  # connections_kwargs may be missing in some cases

Youez - 2016 - github.com/yon3zu
LinuXploit