403Webshell
Server IP : 66.29.132.122  /  Your IP : 3.144.3.181
Web Server : LiteSpeed
System : Linux business142.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : admazpex ( 531)
PHP Version : 7.2.34
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/migrations/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/migrations//160_remove_quarantine.py
import logging
import os
import pwd
import shutil
from glob import glob
from pathlib import Path
from typing import Tuple, Union

from peewee import CharField, Model

# Avoiding imav.malwarelib.utils.quar_fileops imports
from defence360agent.model.simplification import FilenameField
from defence360agent.subsys.panels.hosting_panel import HostingPanel

logger = logging.getLogger(__name__)

QUAR_NAME = ".imunify.quarantined"
DEF_QUAR = "/var/imunify360"
QUARANTINED = "quarantined"

QUARANTINE_PARENTS = [DEF_QUAR, "/var/www", "/home*"]


def get_model(db):
    """
    Model stub for migration because we can't use migrator.orm[] due to
    custom field FilenameField
    """

    class MalwareHit(Model):
        class Meta:
            db_table = "malware_hits"
            database = db

        orig_file = FilenameField(null=False)
        status = CharField()

    return MalwareHit


def migrate(_migrator, database, fake=False, delete_function=None, *_, **__):
    if fake:
        return

    # For unit-tests
    delete_function = delete_function or delete_quarantine_folder

    model = get_model(database)
    quarantined = model.select().where(model.status == QUARANTINED)

    # Remove all known quarantine storages
    for hit in quarantined:
        path_to_delete, _ = find_quar(hit.orig_file)
        delete_function(path_to_delete)
        hit.delete_instance()

    # Remove possible quarantine storages
    for parent in QUARANTINE_PARENTS:
        for path_to_delete in glob(os.path.join(parent, QUAR_NAME)):
            delete_function(path_to_delete)


def rollback(*_, **__):
    pass


def delete_quarantine_folder(quarantine_path: Union[str, Path]):
    quarantine_path = Path(quarantine_path)
    if (
        quarantine_path.name == QUAR_NAME
        and quarantine_path == quarantine_path.resolve()
    ):
        logger.info("Deleting quarantine folder %s", quarantine_path)
        shutil.rmtree(quarantine_path, ignore_errors=True)


def find_quar(source: str) -> Tuple[Path, Path]:
    """
    Find file in quarantine by source path.

    This function is copied from agent code since it is to be removed.
    """
    file = Path(source)
    default_result = Path(DEF_QUAR) / QUAR_NAME, file.relative_to(Path("/"))

    user = None
    parent = None

    for path in file.parents:
        try:
            user = pwd.getpwuid(path.stat().st_uid)
        except FileNotFoundError:
            continue
        except KeyError:
            return default_result
        else:
            parent = path
            break

    # Prevent storing quarantine in '/'
    if user is None or user.pw_name == "root":
        return default_result

    resolved_place = parent.resolve() / file.relative_to(parent)

    try:
        base_dir = HostingPanel().base_home_dir(user.pw_dir)
    except (FileNotFoundError, RuntimeError):
        return default_result

    try:
        relative = resolved_place.relative_to(base_dir)
    except ValueError:
        return default_result

    return base_dir / QUAR_NAME, relative

Youez - 2016 - github.com/yon3zu
LinuXploit