403Webshell
Server IP : 66.29.132.122  /  Your IP : 3.135.184.186
Web Server : LiteSpeed
System : Linux business142.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : admazpex ( 531)
PHP Version : 7.2.34
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /proc/self/root/proc/thread-self/root/proc/thread-self/root/proc/self/root/opt/imunify360/venv/lib64/python3.11/site-packages/sentry_sdk/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /proc/self/root/proc/thread-self/root/proc/thread-self/root/proc/self/root/opt/imunify360/venv/lib64/python3.11/site-packages/sentry_sdk//sessions.py
import os
import uuid
import time
from datetime import datetime
from threading import Thread, Lock
from contextlib import contextmanager

from sentry_sdk._types import MYPY
from sentry_sdk.utils import format_timestamp

if MYPY:
    import sentry_sdk

    from typing import Optional
    from typing import Union
    from typing import Any
    from typing import Dict
    from typing import Generator

    from sentry_sdk._types import SessionStatus


def is_auto_session_tracking_enabled(hub=None):
    # type: (Optional[sentry_sdk.Hub]) -> bool
    """Utility function to find out if session tracking is enabled."""
    if hub is None:
        hub = sentry_sdk.Hub.current
    should_track = hub.scope._force_auto_session_tracking
    if should_track is None:
        exp = hub.client.options["_experiments"] if hub.client else {}
        should_track = exp.get("auto_session_tracking")
    return should_track


@contextmanager
def auto_session_tracking(hub=None):
    # type: (Optional[sentry_sdk.Hub]) -> Generator[None, None, None]
    """Starts and stops a session automatically around a block."""
    if hub is None:
        hub = sentry_sdk.Hub.current
    should_track = is_auto_session_tracking_enabled(hub)
    if should_track:
        hub.start_session()
    try:
        yield
    finally:
        if should_track:
            hub.end_session()


def _make_uuid(
    val,  # type: Union[str, uuid.UUID]
):
    # type: (...) -> uuid.UUID
    if isinstance(val, uuid.UUID):
        return val
    return uuid.UUID(val)


TERMINAL_SESSION_STATES = ("exited", "abnormal", "crashed")


class SessionFlusher(object):
    def __init__(
        self,
        flush_func,  # type: Any
        flush_interval=10,  # type: int
    ):
        # type: (...) -> None
        self.flush_func = flush_func
        self.flush_interval = flush_interval
        self.pending = {}  # type: Dict[str, Any]
        self._thread = None  # type: Optional[Thread]
        self._thread_lock = Lock()
        self._thread_for_pid = None  # type: Optional[int]
        self._running = True

    def flush(self):
        # type: (...) -> None
        pending = self.pending
        self.pending = {}
        self.flush_func(list(pending.values()))

    def _ensure_running(self):
        # type: (...) -> None
        if self._thread_for_pid == os.getpid() and self._thread is not None:
            return None
        with self._thread_lock:
            if self._thread_for_pid == os.getpid() and self._thread is not None:
                return None

            def _thread():
                # type: (...) -> None
                while self._running:
                    time.sleep(self.flush_interval)
                    if self.pending and self._running:
                        self.flush()

            thread = Thread(target=_thread)
            thread.daemon = True
            thread.start()
            self._thread = thread
            self._thread_for_pid = os.getpid()
        return None

    def add_session(
        self, session  # type: Session
    ):
        # type: (...) -> None
        self.pending[session.sid.hex] = session.to_json()
        self._ensure_running()

    def kill(self):
        # type: (...) -> None
        self._running = False

    def __del__(self):
        # type: (...) -> None
        self.kill()


class Session(object):
    def __init__(
        self,
        sid=None,  # type: Optional[Union[str, uuid.UUID]]
        did=None,  # type: Optional[str]
        timestamp=None,  # type: Optional[datetime]
        started=None,  # type: Optional[datetime]
        duration=None,  # type: Optional[float]
        status=None,  # type: Optional[SessionStatus]
        release=None,  # type: Optional[str]
        environment=None,  # type: Optional[str]
        user_agent=None,  # type: Optional[str]
        ip_address=None,  # type: Optional[str]
        errors=None,  # type: Optional[int]
        user=None,  # type: Optional[Any]
    ):
        # type: (...) -> None
        if sid is None:
            sid = uuid.uuid4()
        if started is None:
            started = datetime.utcnow()
        if status is None:
            status = "ok"
        self.status = status
        self.did = None  # type: Optional[str]
        self.started = started
        self.release = None  # type: Optional[str]
        self.environment = None  # type: Optional[str]
        self.duration = None  # type: Optional[float]
        self.user_agent = None  # type: Optional[str]
        self.ip_address = None  # type: Optional[str]
        self.errors = 0

        self.update(
            sid=sid,
            did=did,
            timestamp=timestamp,
            duration=duration,
            release=release,
            environment=environment,
            user_agent=user_agent,
            ip_address=ip_address,
            errors=errors,
            user=user,
        )

    def update(
        self,
        sid=None,  # type: Optional[Union[str, uuid.UUID]]
        did=None,  # type: Optional[str]
        timestamp=None,  # type: Optional[datetime]
        started=None,  # type: Optional[datetime]
        duration=None,  # type: Optional[float]
        status=None,  # type: Optional[SessionStatus]
        release=None,  # type: Optional[str]
        environment=None,  # type: Optional[str]
        user_agent=None,  # type: Optional[str]
        ip_address=None,  # type: Optional[str]
        errors=None,  # type: Optional[int]
        user=None,  # type: Optional[Any]
    ):
        # type: (...) -> None
        # If a user is supplied we pull some data form it
        if user:
            if ip_address is None:
                ip_address = user.get("ip_address")
            if did is None:
                did = user.get("id") or user.get("email") or user.get("username")

        if sid is not None:
            self.sid = _make_uuid(sid)
        if did is not None:
            self.did = str(did)
        if timestamp is None:
            timestamp = datetime.utcnow()
        self.timestamp = timestamp
        if started is not None:
            self.started = started
        if duration is not None:
            self.duration = duration
        if release is not None:
            self.release = release
        if environment is not None:
            self.environment = environment
        if ip_address is not None:
            self.ip_address = ip_address
        if user_agent is not None:
            self.user_agent = user_agent
        if errors is not None:
            self.errors = errors

        if status is not None:
            self.status = status

    def close(
        self, status=None  # type: Optional[SessionStatus]
    ):
        # type: (...) -> Any
        if status is None and self.status == "ok":
            status = "exited"
        if status is not None:
            self.update(status=status)

    def to_json(self):
        # type: (...) -> Any
        rv = {
            "sid": str(self.sid),
            "init": True,
            "started": format_timestamp(self.started),
            "timestamp": format_timestamp(self.timestamp),
            "status": self.status,
        }  # type: Dict[str, Any]
        if self.errors:
            rv["errors"] = self.errors
        if self.did is not None:
            rv["did"] = self.did
        if self.duration is not None:
            rv["duration"] = self.duration

        attrs = {}
        if self.release is not None:
            attrs["release"] = self.release
        if self.environment is not None:
            attrs["environment"] = self.environment
        if self.ip_address is not None:
            attrs["ip_address"] = self.ip_address
        if self.user_agent is not None:
            attrs["user_agent"] = self.user_agent
        if attrs:
            rv["attrs"] = attrs
        return rv

Youez - 2016 - github.com/yon3zu
LinuXploit