403Webshell
Server IP : 66.29.132.122  /  Your IP : 18.224.70.204
Web Server : LiteSpeed
System : Linux business142.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : admazpex ( 531)
PHP Version : 7.2.34
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/plugins/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/plugins/event_monitor_message_processor.py
import asyncio
import logging
import os
from abc import ABC, abstractmethod
from collections import defaultdict
from heapq import heappop, heappush
from typing import Dict

from defence360agent.contracts.config import ConfigFile, Core
from defence360agent.contracts.messages import MessageType
from defence360agent.contracts.plugins import BaseMessageProcessor, expect

logger = logging.getLogger()


class EventProcessorBase(BaseMessageProcessor, ABC):
    def __init__(self, loop):
        # note: empty list is a heap (no need for heapify here)
        self._msg_buf = defaultdict(list)
        self._loop = loop

    def add_message(self, message):
        heappush(
            self._msg_buf[message["username"]], (message["timestamp"], message)
        )

    async def process_messages(self):
        await asyncio.gather(
            *(
                self.process_user_messages(user_messages)
                for user_messages in self._msg_buf.values()
            )
        )

    @expect(MessageType.cPanelEvent)
    async def process_event(self, message):
        if not self._message_is_relatable(message):  # pragma: no cover
            return

        if message.hook == "Modify":
            await self._process_modify(message)
        elif message.hook == "Create":
            await self._process_create(message)
        elif message.hook == "change_package":
            await self._process_change_package(message)
        elif message.hook == "Remove":
            await self._process_account_removed(message)

    async def process_user_messages(self, messages):
        for _ in range(len(messages)):
            await self.process_message(heappop(messages)[1])

    @abstractmethod
    async def _process_modify(self, message):
        """Modify hook"""

    @abstractmethod
    async def _process_create(self, message):
        """Create hook"""

    @abstractmethod
    async def _process_change_package(self, message):
        """change_package hook"""

    @abstractmethod
    async def _process_account_removed(self, message):
        """Remove hook"""

    @abstractmethod
    def _message_is_relatable(self, message):
        """Whether the message should be processed"""

    @abstractmethod
    async def is_enabled(self):
        """Whether messages should be processed"""


class SettingsChangeBase(EventProcessorBase, ABC):
    """Process hook event messages from cPanel"""

    async def _process_modify(self, message):
        package_field = "plan" if "plan" in message.data else "exclude"
        await self._get_settings_and_update(message, package_field)

    async def _process_create(self, message):
        await self._get_settings_and_update(message, "plan", True)

    async def _process_change_package(self, message):
        await self._get_settings_and_update(message, "new_pkg", True)

    async def _process_account_removed(self, message):
        pass

    async def _get_settings_and_update(
        self,
        message,
        package_field: str,
        add_to_package: bool = False,
    ) -> None:
        logger.info("Get settings from %s", message)
        settings = await self._get_settings_from_message(message)
        await self._apply_settings(
            message, package_field, add_to_package, settings
        )

    async def _apply_settings(
        self, message, package_field, add_to_package, settings
    ):
        logger.info("Step 1 %s ", settings)
        # Do nothing if there are no values for Imunify360 features
        # in the message for Modify hook
        if (
            message.get("plan") is None
            and message["hook"] == "Modify"
            and all(value is None for value in settings.values())
        ):
            return
        if not all(settings.values()):
            try:
                package_name = message.data[package_field]
            except KeyError:
                logger.warning("No information about package in message")
                fallback_settings = self._default_settings()
            else:
                fallback_settings = await self._get_package_settings(
                    package_name, add_to_package
                )
            for feature, value in settings.items():
                if value is None:
                    settings[feature] = fallback_settings[feature]
        logger.info(
            "Settings specified in hook message %s for %s",
            settings,
            message["username"],
        )
        for feature, value in settings.items():
            await self.on_settings_change(message["username"], feature, value)

    @abstractmethod
    def _message_is_relatable(self, message):
        """Whether the message should be processed"""

    @abstractmethod
    async def on_settings_change(self, user, feature, value):
        """What to do after settings were changed (e.g. sync the DB)"""

    @staticmethod
    @abstractmethod
    def _default_settings() -> Dict[str, str]:
        """Get default package settings"""

    @abstractmethod
    async def _get_settings_from_message(self, message):
        """Retrieve settings from the message"""

    @classmethod
    @abstractmethod
    async def _get_package_settings(
        cls, package_name: str, add_to_package: bool
    ) -> Dict[str, str]:
        """Get current package settings"""

    @abstractmethod
    async def is_enabled(self):
        """Whether messages should be processed"""


class UserConfigProcessor(EventProcessorBase):
    def _message_is_relatable(self, message):
        return True

    async def is_enabled(self):
        return True

    async def _process_account_removed(self, message):
        user = message.get("user") or message.get("username")
        if user:
            try:
                os.unlink(ConfigFile(user).path)
            except FileNotFoundError:
                pass
            except OSError:  # pragma: no cover
                logger.warning(
                    "Cannot delete Imunify360 config file for user %s", user
                )

    async def _process_modify(self, message):
        if old_username := message.data.get("old_username"):  # User renamed
            os.rename(
                os.path.join(Core.USER_CONFDIR, old_username),
                os.path.join(Core.USER_CONFDIR, message.username),
            )

    async def _process_create(self, message):
        """Create hook"""

    async def _process_change_package(self, message):
        """change_package hook"""

Youez - 2016 - github.com/yon3zu
LinuXploit