Server IP : 66.29.132.122 / Your IP : 18.218.125.85 Web Server : LiteSpeed System : Linux business142.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64 User : admazpex ( 531) PHP Version : 7.2.34 Disable Function : NONE MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : OFF | Pkexec : OFF Directory : /opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/utils/ |
Upload File : |
import asyncio import io import json import socket import urllib.parse import urllib.request from functools import partial from logging import getLogger from pathlib import Path from defence360agent.contracts.config import ANTIVIRUS_MODE logger = getLogger(__name__) class ZendeskAPIError(Exception): def __init__(self, error, description, details): self.error = error self.description = description self.details = details super().__init__(description) _API_URL_TMPL = "https://cloudlinux.zendesk.com/api/v2/{}" _HC_URL_TMPL = "https://cloudlinux.zendesk.com/hc/requests/{}" # Identifiers for custom fields in API _PRODUCT_ID = 33267569 _DOCTOR_ID = 43297669 _CLN_ID = 43148369 _PRIVACY_POLICY_ID = 12355021509788 async def send_request( sender_email, subject, description, doctor_key=None, cln=None, attachments=None, ): """ Send request to support of Imunify360 via Zendesk API """ # Uploading attachments to Zendesk upload_token = await _upload_attachments(attachments) # Creating comment object: setting description and attaching # uploads token comment = dict(body=description) if upload_token is not None: comment["uploads"] = [upload_token] # Author of request requester = dict(name=sender_email, email=sender_email) # Custom fields for support convenience custom_fields = [ { "id": _PRODUCT_ID, "value": "pr_imunify_av" if ANTIVIRUS_MODE else "pr_im360", }, {"id": _PRIVACY_POLICY_ID, "value": True}, ] if doctor_key: custom_fields.append({"id": _DOCTOR_ID, "value": doctor_key}) if cln: custom_fields.append({"id": _CLN_ID, "value": cln}) # Ready request request = dict( requester=requester, subject=subject, comment=comment, custom_fields=custom_fields, ) return await _post_support_request(request) def _post_data(url, data: bytes, headers, *, params=None, timeout=None): """HTTP POST *data* to *url* with given *headers*. Add query *params* to the *url* if given. Return (http_status, decoded_json_response) tuple. """ if params is not None: # add params to the url p = urllib.parse.urlparse(url) query = p.query if query: query += "&" query += urllib.parse.urlencode(params) url = urllib.parse.urlunparse( (p.scheme, p.netloc, p.path, p.params, query, p.fragment) ) def decode_as_json(response): return json.load( io.TextIOWrapper( response, encoding=response.headers.get_content_charset("utf-8"), ) ) try: with urllib.request.urlopen( urllib.request.Request(url, data=data, headers=headers), timeout=timeout, ) as response: return (response.code, decode_as_json(response)) # http status except socket.timeout: raise TimeoutError except OSError as e: if not hasattr(e, "code"): raise # HTTPError return (e.code, (decode_as_json(e) if e.fp is not None else {})) async def _post_support_request(request): """Return url of the support request or None if request is suspended, because of we not able to obtain the id of the ticket if it suspended. """ url = _API_URL_TMPL.format("requests.json") headers = {"Content-Type": "application/json"} data = json.dumps(dict(request=request), sort_keys=True).encode("ascii") loop = asyncio.get_event_loop() status, result = await loop.run_in_executor( None, _post_data, url, data, headers ) if status == 201: request_data = result.get("request") if request_data: return _HC_URL_TMPL.format(request_data["id"]) elif "suspended_ticket" in result.keys(): return None else: raise ZendeskAPIError( "Response error", "UNKNOWN ERROR", "{!r}".format(result) ) else: raise ZendeskAPIError( result.get("error", "UNKNOWN ERROR"), result.get("description"), result.get("details", {}), ) async def _upload_attachments(attachments): # Uploading attachments to Zendesk upload_token = None if attachments is None: return upload_token loop = asyncio.get_event_loop() for attachment in attachments: path = Path(attachment) params = {"filename": path.name} if upload_token is not None: params["token"] = upload_token status, result = await loop.run_in_executor( None, partial( _post_data, _API_URL_TMPL.format("uploads.json"), data=path.read_bytes(), headers={"Content-Type": "application/binary"}, params=params, ), ) if status != 201: logger.warning( "Failed to upload file %s to Zendesk: %s", attachment, result["error"], ) continue if upload_token is None: upload_token = result["upload"]["token"] return upload_token