Server IP : 66.29.132.122 / Your IP : 3.148.112.17 Web Server : LiteSpeed System : Linux business142.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64 User : admazpex ( 531) PHP Version : 7.2.34 Disable Function : NONE MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : OFF | Pkexec : OFF Directory : /proc/self/root/proc/self/root/proc/thread-self/root/proc/self/root/opt/imunify360/venv/lib64/python3.11/site-packages/defence360agent/simple_rpc/ |
Upload File : |
import logging from defence360agent.rpc_tools import lookup from defence360agent.rpc_tools.validate import ( ValidationError, validate_av_plus_license, ) from defence360agent.subsys.panels.base import PanelException from defence360agent.model.infected_domain import InfectedDomainList from defence360agent.subsys.panels import hosting_panel from defence360agent.api.server.reputation import ReputationAPI from defence360agent.model.simplification import run_in_executor logger = logging.getLogger(__name__) class ReputationManagementEndpoints(lookup.RootEndpoints): @lookup.bind("infected-domains") @validate_av_plus_license async def list_domains(self, limit, offset): existing_users = set(await hosting_panel.HostingPanel().get_users()) items, max_count = InfectedDomainList.get_by_user( existing_users, offset=offset, limit=limit ) return { "items": items, "max_count": max_count, } @lookup.bind("check-domains") @validate_av_plus_license async def check_domains(self): hp = hosting_panel.HostingPanel() # TODO: strange behaviour is detected # I think it's normal case for cPanel DNS only # we should do not process domains if it not found or panel # not available if not hp.is_installed(): raise ValidationError("No avaliable control panel found!") try: domains = await hp.get_user_domains() except PanelException as e: raise ValidationError(str(e)) if not domains: raise ValidationError("Domains not found") reputation_data = await ReputationAPI.check(domains) domain_to_user = ( await hosting_panel.HostingPanel().get_domain_to_owner() ) await run_in_executor( None, lambda: InfectedDomainList.refresh_domains( reputation_data, domain_to_user ), )