Server IP : 66.29.132.122 / Your IP : 18.118.95.164 Web Server : LiteSpeed System : Linux business142.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64 User : admazpex ( 531) PHP Version : 7.2.34 Disable Function : NONE MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : OFF | Pkexec : OFF Directory : /proc/self/root/proc/self/root/proc/thread-self/root/proc/thread-self/root/proc/self/root/proc/thread-self/root/proc/self/root/proc/self/root/proc/self/root/proc/self/root/opt/imunify360/venv/lib/python3.11/site-packages/imav/malwarelib/scan/cleaners/ |
Upload File : |
""" This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see <https://www.gnu.org/licenses/>. Copyright © 2019 Cloud Linux Software Inc. This software is also available under ImunifyAV commercial license, see <https://www.imunify360.com/legal/eula> """ from asyncio import gather from datetime import timedelta from defence360agent.internals.global_scope import g from imav.malwarelib.scan.queue_supervisor_sync import ( QueueSupervisorSync, ) from imav.malwarelib.scan.ai_bolit.detached import ( AiBolitDetachedScan, ) from defence360agent.mr_proper import BaseCleaner from defence360agent.utils import antivirus_mode class UncompletedScansCleaner(BaseCleaner): """ Cleaner for uncompleted (including aborted) detached scans. """ if antivirus_mode.enabled: _DETACHED_OPERATIONS = [ AiBolitDetachedScan, ] else: from imav.malwarelib.scan.mds.detached import ( MDSDetachedCleanup, MDSDetachedRestore, MDSDetachedScan, ) _DETACHED_OPERATIONS = [ AiBolitDetachedScan, MDSDetachedCleanup, MDSDetachedRestore, MDSDetachedScan, ] @classmethod async def cleanup(cls) -> None: # We need to bring the queue to a consistent state on agent start. # Running 'check_detached_operation' and '_recheck_scan_queue' in async # manner leads to already started scan re-running (Wrong 'QueuedScan' # state on 'recheck') coros = [ op.check_detached_operation_dir(g.sink) for op in cls._DETACHED_OPERATIONS ] await gather(*coros) # We could do a recheck in case some of the scans are 'ABORTED' since # handle_aborted_process is called for them all, so they are removed # from queue and would not be re-run QueueSupervisorSync(sink=g.sink).recheck()