Server IP : 66.29.132.122 / Your IP : 3.147.63.2 Web Server : LiteSpeed System : Linux business142.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64 User : admazpex ( 531) PHP Version : 7.2.34 Disable Function : NONE MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : OFF | Pkexec : OFF Directory : /proc/self/root/proc/thread-self/root/proc/thread-self/root/proc/self/root/opt/imunify360/venv/lib64/python3.11/site-packages/defence360agent/feature_management/ |
Upload File : |
import logging from itertools import chain from typing import List, Any from defence360agent.feature_management import hooks from defence360agent.feature_management.model import FeatureManagementPerms from defence360agent.model import instance from defence360agent.utils import execute_iterable_expression from .lookup import features logger = logging.getLogger(__name__) async def set_feature(user: str, feature: str, value: str) -> bool: """Sets a `feature` to `value` for a given `user`. Calls appropriate hook and returns its (bool) result. Logs the result of setting change. If hook fails rollbacks changes to database. """ with instance.db.atomic() as trx: perm = FeatureManagementPerms.get_perm(user) perm.set_feature(feature, value) hook = hooks.get_hook(feature) ok = hook(user, value) if ok: logger.info( "Applied setting %s=%s for user %s", feature, value, user ) else: logger.error( "Failed to apply setting %s=%s for user %s", feature, value, user, ) trx.rollback() return ok async def update_users( feature: str, users: List[str], value: Any, existing_users: List[str] ): result = {"succeeded": [], "failed": []} for user in users: if user not in existing_users: logger.warning("No such user: %s", user) continue if await set_feature(user, feature, value): result["succeeded"].append(user) else: result["failed"].append(user) return result async def update_default(feature: str, value: Any): perm = FeatureManagementPerms.get_default() perm.set_feature(feature, value) hook = hooks.get_hook(feature) return hook(None, value) async def sync_users(users: List[str]) -> bool: """Synchronize existing permissions with panel users""" panel_users = set(users) perm_users = FeatureManagementPerms.select(FeatureManagementPerms.user) perm_users = set(chain(*perm_users.tuples())) perms_to_remove = perm_users - panel_users perms_to_remove.remove(FeatureManagementPerms.DEFAULT) if perms_to_remove: logger.info("Remove permissions of users %s", perms_to_remove) def expression(perms_to_remove): return FeatureManagementPerms.delete().where( FeatureManagementPerms.user.in_(perms_to_remove) ) execute_iterable_expression(expression, list(perms_to_remove)) perms_to_add = panel_users - perm_users if perms_to_add: logger.info("Add permissions to users %s", perms_to_add) for user in perms_to_add: perm = FeatureManagementPerms.get_perm(user) for feature in features: value = perm.get_feature(feature) callback = hooks.get_hook(feature) callback(user, value) return bool(perms_to_add) or bool(perms_to_remove) async def reset_features(**features): """Sets feature values for all existing users in feature management database to given values in `features`.""" users = list( chain( *FeatureManagementPerms.select(FeatureManagementPerms.user) .where( FeatureManagementPerms.user != FeatureManagementPerms.DEFAULT ) .tuples() ) ) for user in users: for feature, value in features.items(): await set_feature(user, feature, value)