403Webshell
Server IP : 66.29.132.122  /  Your IP : 18.219.64.172
Web Server : LiteSpeed
System : Linux business142.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : admazpex ( 531)
PHP Version : 7.2.34
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /proc/self/root/proc/thread-self/root/opt/imunify360/venv/lib64/python3.11/site-packages/defence360agent/feature_management/plugins/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /proc/self/root/proc/thread-self/root/opt/imunify360/venv/lib64/python3.11/site-packages/defence360agent/feature_management/plugins//native.py
from logging import getLogger
from typing import Dict

from defence360agent.feature_management.constants import (
    AV,
    EXTENSION_DEFAULTS,
    FEATURE_EXT_VARIABLES,
    NATIVE_EXTENSION_NAME,
    PROACTIVE,
)
from defence360agent.feature_management.control import (
    is_native_feature_management_enabled,
)
from defence360agent.feature_management.model import FeatureManagementPerms
from defence360agent.feature_management.utils import set_feature
from defence360agent.plugins.event_monitor_message_processor import (
    SettingsChangeBase,
)
from defence360agent.subsys.panels.cpanel import packages

logger = getLogger(__name__)


class NativeFeatureManagementSettingsChange(SettingsChangeBase):
    FEATURES_LIST = [PROACTIVE, AV]

    async def _process_account_removed(self, message):
        await super()._process_account_removed(message)
        user = message.get("user") or message.get("username")
        if user:
            logger.info("Resetting FM settings for %s", user)
            FeatureManagementPerms.delete().where(
                FeatureManagementPerms.user == user
            ).execute()

    async def _process_modify(self, message):
        await super()._process_modify(message)
        if "old_username" in message.data:  # User renamed
            old_username = message.data["old_username"]
            FeatureManagementPerms.update(user=message.username).where(
                FeatureManagementPerms.user == old_username
            ).execute()

    async def _get_settings_from_message(self, message):
        settings = {}
        for feature in self.FEATURES_LIST:
            try:
                value = message.data[FEATURE_EXT_VARIABLES[feature]]
            except KeyError:
                value = None
            settings[feature] = value
        return settings

    @classmethod
    async def _get_package_settings(
        cls, package_name: str, add_to_package: bool
    ) -> Dict[str, str]:
        logger.info("Getting package settings %s", package_name)
        pkg_info = await packages.get_package_info(package_name)
        package_settings = {}
        try:
            for feature, pe_var in FEATURE_EXT_VARIABLES.items():
                package_settings[feature] = pkg_info[pe_var]
            return package_settings
        except KeyError:
            logger.info(
                "No extension's fields in package settings %s", pkg_info
            )
            if add_to_package:
                await packages.add_extension(
                    NATIVE_EXTENSION_NAME, pkg_info, **EXTENSION_DEFAULTS
                )
        except packages.PackageNotExistError:
            logger.warning("Package %s doesn't exist", package_name)

        return cls._default_settings()

    @staticmethod
    def _default_settings() -> Dict[str, str]:
        return {
            feature: EXTENSION_DEFAULTS[pe_var]
            for feature, pe_var in FEATURE_EXT_VARIABLES.items()
        }

    async def on_settings_change(self, user, feature, value):
        await set_feature(user, feature, value)

    def _message_is_relatable(self, message):
        return True

    async def is_enabled(self):
        return await is_native_feature_management_enabled()

Youez - 2016 - github.com/yon3zu
LinuXploit