403Webshell
Server IP : 66.29.132.122  /  Your IP : 3.15.226.120
Web Server : LiteSpeed
System : Linux business142.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : admazpex ( 531)
PHP Version : 7.2.34
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /proc/self/root/proc/thread-self/root/opt/imunify360/venv/lib64/python3.11/site-packages/defence360agent/subsys/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /proc/self/root/proc/thread-self/root/opt/imunify360/venv/lib64/python3.11/site-packages/defence360agent/subsys//notifier.py
"""Send events via Notification service"""

import asyncio
import base64
import json

SOCKET_PATH = "/opt/imunify360/lib/event.sock"
SOCKET_TIMEOUT = 10.0  # seconds
_LEN_BYTES = 4
_MAX_SIZE = 1024 * 1024

CONFIG_UPDATED_EVENT_ID = "CONFIG_UPDATED"
USER_SCAN_STARTED_EVENT_ID = "USER_SCAN_STARTED"
USER_SCAN_FINISHED_EVENT_ID = "USER_SCAN_FINISHED"
USER_SCAN_MALWARE_FOUND_EVENT_ID = "USER_SCAN_MALWARE_FOUND"
CUSTOM_SCAN_STARTED_EVENT_ID = "CUSTOM_SCAN_STARTED"
CUSTOM_SCAN_FINISHED_EVENT_ID = "CUSTOM_SCAN_FINISHED"
CUSTOM_SCAN_MALWARE_FOUND_EVENT_ID = "CUSTOM_SCAN_MALWARE_FOUND"
SCRIPT_BLOCKED_EVENT_ID = "SCRIPT_BLOCKED"


def _prepare_event(event_id: str, user: str, body: dict) -> bytes:
    event = json.dumps(
        {
            "event_id": event_id,
            "user": user,
            "body": base64.b64encode(json.dumps(body).encode("utf-8")).decode(
                "utf-8"
            ),
        }
    )
    binary = event.encode("utf-8")
    if len(binary) > _MAX_SIZE:
        raise Exception(
            "message size {} exceeds limit of {}".format(
                len(binary), _MAX_SIZE
            )
        )
    return len(binary).to_bytes(_LEN_BYTES, byteorder="big") + binary


async def _send_event(event: bytes) -> None:
    _, writer = await asyncio.open_unix_connection(SOCKET_PATH)
    try:
        writer.write(event)
        await writer.drain()
    finally:
        writer.close()


async def trigger_event(event_id: str, user: str, body: dict) -> None:
    """Send an event with given event_id and user, having given body."""
    event = _prepare_event(event_id, user, body)
    await asyncio.wait_for(_send_event(event), SOCKET_TIMEOUT)


async def config_updated() -> None:
    """Send CONFIG_UPDATED event.

    This forces imunify-notifier to reread its config."""
    await trigger_event(CONFIG_UPDATED_EVENT_ID, "", {})

Youez - 2016 - github.com/yon3zu
LinuXploit