Server IP : 66.29.132.122 / Your IP : 18.118.255.238 Web Server : LiteSpeed System : Linux business142.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64 User : admazpex ( 531) PHP Version : 7.2.34 Disable Function : NONE MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : OFF | Pkexec : OFF Directory : /proc/self/root/proc/self/root/proc/thread-self/root/proc/thread-self/root/proc/self/root/proc/thread-self/root/proc/self/root/proc/self/root/proc/self/root/proc/self/root/opt/imunify360/venv/lib64/python3.11/site-packages/defence360agent/api/server/ |
Upload File : |
import json import urllib.error import urllib.request import urllib.parse import asyncio from typing import List import time import logging from defence360agent.utils import retry_on, split_for_chunk from defence360agent.api.server import API, APIError logger = logging.getLogger(__name__) class ReputationAPI(API): REQUEST_URL = "/api/reputation/check" RESULT_URL = "/api/reputation/result" # during stress tests 'Request Entity Too Large' error has been caught, # in request size somewhere between 800000 and 900000 bytes # max domain length - 255, 800000 / 255 = 3137 # 3000 is the nearest 'round' number CHUNK_SIZE = 3000 WAIT_BEFORE_RETRY = 5 WAIT_FOR_RESULT = 1200 _SOCKET_TIMEOUT = 60 @classmethod async def check(cls, domains: List[str]) -> List[dict]: logger.info("DomainListRequest domains: %s", domains) loop = asyncio.get_event_loop() return await loop.run_in_executor(None, cls._check, domains) @classmethod def _check(cls, domains: List[str]) -> List[dict]: result_list = [] for chunk in split_for_chunk(domains, cls.CHUNK_SIZE): result = cls._check_chunk(chunk) next_chunk = cls._get_result(result["result_id"]) result_list += next_chunk return result_list @classmethod @retry_on(APIError, timeout=WAIT_FOR_RESULT) def _check_chunk(cls, chunk) -> dict: check_request = urllib.request.Request( cls._BASE_URL + cls.REQUEST_URL, method="POST", headers={"Content-Type": "application/json"}, data=json.dumps(dict(domains=chunk)).encode(), ) return cls.request(check_request) @classmethod @retry_on(APIError, timeout=WAIT_FOR_RESULT) def _get_result(cls, result_id: str): data = dict(result_id=result_id) url = "{}?{}".format( cls._BASE_URL + cls.RESULT_URL, urllib.parse.urlencode(data) ) request = urllib.request.Request(url) response = cls.request(request) result = response["result"] if result is None: # time inside sync executor time.sleep(cls.WAIT_BEFORE_RETRY) raise APIError("Response not ready yet") return result