Server IP : 66.29.132.122 / Your IP : 18.188.18.197 Web Server : LiteSpeed System : Linux business142.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64 User : admazpex ( 531) PHP Version : 7.2.34 Disable Function : NONE MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : OFF | Pkexec : OFF Directory : /proc/self/root/opt/cloudlinux/venv/lib64/python3.11/site-packages/clcommon/lib/ |
Upload File : |
# -*- coding: utf-8 -*- # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2018 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT # mysql_governor.py - module for interfacing with dbctl utility for get/set MySQL limits and user's status import os import xml.dom.minidom as xml import subprocess import json from xml.parsers.expat import ExpatError from typing import Tuple, Optional, Dict # NOQA from packaging.version import Version from clcommon.clexception import FormattedException from clcommon.utils_cmd import run_command, ExternalProgramFailed class GovernorStatus: ENABLED = 'enabled' ERROR = 'error' DISABLED = 'disabled' class MySQLGovException(FormattedException): pass class MySQLGovernorDisabled(MySQLGovException): """ Exception raised when dbgovernor daemon is offline """ def __init__(self): super().__init__({ 'message': "%(util)s is disabled in the system. " "Please, run \"%(command)s\" to start the service", 'context': { 'util': 'MySQL governor', 'command': 'service db_governor restart' } }) class MySQLGovernorAbsent(MySQLGovException): """ Exception raised when dbgovernor isn't installed """ def __init__(self): super().__init__({ 'message': "%(util)s not present in system", 'context': {'util': 'Governor'} }) def _get_exc_message(param): return { 'message': "Invalid %(param)s parameter", 'context': {'param': param} } class MySQLGovernor: """ MysqlGovernor library """ # Constants to calculate individial limits for cpu(current, short, middle period) # and read(current, short, middle, long periods) # cpu: # c, s = c*95%, m = c*87%, l = c*75% # io (read == write): # c = io / 2, s = c*83%, m = c*76%, l = c*59% # refer to LU-167 IO_PERCENTS = (0.83, 0.76, 0.59) # percents for calculate read/write limits CPU_PERCENTS = (0.95, 0.87, 0.75) # percents for calculate cpu limits _UTILITY_PATH = "/usr/sbin/dbctl" _PACKAGE_UTILITY_PATH = "/usr/share/lve/dbgovernor/governor_package_limitting.py" _GOVERNOR_BINARY_PATH = "/usr/sbin/db_governor" _CONTAINER_PATH = "/etc/container/mysql-governor.xml" # available for root only _CONTAINER_PATH_V2 = "/var/run/mysql-governor-config.xml" # available for everyone; governor-mysql >= 1.1-14 _S_DEFAULT = 'DEFAULT' def __init__(self): # List of ignored users in governor self._governor_ignored_users = None # MySQL Governor limits cache # Dictionary format: user_name -> (cpu_limit: int, io_limit: int, cpu_limit_marked: str, io_limit_marked: str) # cpu_limit: int, io_limit: int - "old" limits, without package differs marks. For backward compatibility # Both are integers always, IO limit - KB/s # cpu_limit_marked: str, io_limit_marked: str - "new" limits, with package differs marks. # Both are strings always, IO limit - KB/s # Examples: (150, 3072, '150', '3072'), (200, 3072, '*200', '3072'), etc self._governor_limits = None # Error flag self._is_governor_error = False # MySQL Governor presence flag self._is_governor_present = os.path.isfile(self._UTILITY_PATH) self._governor_mode = self._detect_governor_mode() self._governor_version = self.get_governor_version() def get_governor_mode(self): return self._governor_mode def _detect_governor_mode(self): if self._is_governor_present: try: governor_cfg = self._get_xml_config() return governor_cfg.getElementsByTagName('lve')[0].getAttribute('use') except (MySQLGovException, IndexError): return None else: return None def _run_dbctl_with_args(self, args, check_exit_code=False): """ Run dbctl utility with given arguments and handle common errors: - governor is down: MySQLGovernorDisabled :param check_exit_code: whether we should raise exception when dbctl returs code != 0 """ ret_code, std_out, std_err = run_command( [self._UTILITY_PATH] + list(args), return_full_output=True) if "can't connect to socket" in std_out: # Governor not started self._is_governor_error = True self._governor_limits = None self._governor_ignored_users = None raise MySQLGovernorDisabled() elif check_exit_code and (ret_code or std_err): raise MySQLGovException({'message': "dbctl error: %(output)s", 'context': {'output': std_err}}) return ret_code, std_out, std_err def get_governor_status(self): # type: () -> Tuple[str, Optional[MySQLGovException]] return self._detect_governor_status() def _detect_governor_status(self): # type: () -> Tuple[str, Optional[MySQLGovException]] if self.is_governor_present(): try: self._load_info() except MySQLGovException as e: return GovernorStatus.ERROR, e except ExternalProgramFailed as e: return GovernorStatus.ERROR, MySQLGovException({ 'message': str(e) }) else: return GovernorStatus.ENABLED, None return GovernorStatus.DISABLED, None def _get_xml_config(self): # type: () -> xml.Document config_path = self._get_config_path() try: with open(config_path, 'r', encoding='utf-8') as f: return xml.parseString(f.read()) except (IOError, OSError) as e: self._is_governor_error = True raise MySQLGovException( {'message': "An error occured while loading governor " "config from %(path)s. Error: %(error)s", 'context': {'path': config_path, 'error': str(e)}} ) from e except ExpatError as e: self._is_governor_error = True raise MySQLGovException( {'message': "An error occured while parsing governor " "config from %(path)s. Error: %(error)s", 'context': {'path': config_path, 'error': str(e)}} ) from e @staticmethod def _load_ignore_users_from_xml(governor_cfg): """ Loads information about igrored users :type governor_cfg: xml.Document :return: list of ignore users """ ignore_users_list = [] try: gov_data = governor_cfg.getElementsByTagName("governor")[0] except IndexError as e: raise MySQLGovException({ 'message': 'Malformed mysql-governor config. ' 'Unable to find element \'%(element)s\'.', 'context': {'element': 'governor'} }) from e users_data_list = gov_data.getElementsByTagName("user") for user_data in users_data_list: user_mode = user_data.getAttribute("mode") if user_mode == "ignore": # ignore_users_list.append(user_data.getAttribute("mysql_name")) # ignore only system users if user_data.getAttribute("name"): ignore_users_list.append(user_data.getAttribute("name")) return ignore_users_list def _calc_rw_io_limits(self, io): """ Calculate db R/W IO limits based on governor version :param io: requested limits in MB/s :return: string values suitable to pass to "dbctl set ..." """ # Only Governor >= 1.2-18 has support for limits in bytes # This check should be removed when new Governor will be in stable repo if self._is_governor_newer_then('1.2-17'): io_limits = io * 2 ** 20 # MB to Bytes limits_tmpl = "%sb,%sb,%sb,%sb" else: io_limits = io # Use MBytes as is if io_limits == 1: # This should prevent dropping to defaults even for old # Governor, and just set 1MB instead io_limits = 2 limits_tmpl = "%s,%s,%s,%s" read = limits_tmpl % self._percentage(int(io_limits // 2), self.IO_PERCENTS) write = limits_tmpl % self._percentage(int(io_limits // 2), self.IO_PERCENTS) return read, write def get_governor_version(self): if not self.is_governor_present(): return None try: res = subprocess.check_output([self._GOVERNOR_BINARY_PATH, '--version'], text=True) # example of valid output # res = 'governor-mysql version 1.2-36' version = res.strip().split(' ')[2] return version except (subprocess.CalledProcessError, OSError): return None def _is_governor_newer_then(self, version): current = self._governor_version if not current: return False # assume "No" if we can't determine version return Version(version) < Version(current) @staticmethod def _parse_line(line): """ Convert data line from dbctl to list :param line: Data line could be like: "default\t400/380/350/300\t953/791/724/562\t953/791/724/562" or: "default 400/380/350/300 1000/830/760/590 1000/830/760/590" depending on --kb/mb/bb option passed to dbctl :return: list: ['default', '400/380/350/300', '953/791/724/562', '953/791/724/562'] """ return [part for part in line.split() if part] @staticmethod def _percentage(value, percents): """ Calculate full list of governor limits by one value and percents koeff """ res = [value] for k in percents: res.append(int(value*k)) return tuple(res) def is_governor_present(self): """ Get governor presence flag :return: """ return self._is_governor_present def get_governor_status_by_username(self, username): """ Get MySQL governor status for supplied user :param username: Username for get status :return: Governor status: "watched"/"ignored" or None if error """ # Load Governor data self._load_info() if username in self._governor_ignored_users: return 'ignored' return 'watched' def get_limits_by_user(self, username, with_package_mark: bool = False) -> Tuple[int, int]: """ Get MySQL governor limits for supplied user :param username: Username for read limits :param with_package_mark: False - without package limits difference mark (for compatibility with non-package governor, used in cloudlinux-top/cloudlinux-statistics), True - with package limits difference mark :return: Tuple (CPU limit, IO limit). Examples: (150, 3072) - with_package_mark == False ('*150', '3072') - with_package_mark == True ('*150', '*4096') - with_package_mark == True * - user has individual limits, differ from package limit MySQLGovException will be thrown if governor not present or error """ # Load Governor data if need self._load_info() if username in self._governor_limits: user_cpu_limit, user_io_limit_kb, user_cpu_limit_marked, user_io_limit_marked =\ self._governor_limits[username] else: user_cpu_limit, user_io_limit_kb, user_cpu_limit_marked, user_io_limit_marked = \ self._governor_limits['default'] if with_package_mark: limits_for_return = (user_cpu_limit_marked, user_io_limit_marked) else: limits_for_return = (user_cpu_limit, user_io_limit_kb) return limits_for_return def set_governor_status_for_user(self, username, status): """ Set MySQLGovernor status for single user :param: `str` username: Username for set status :param: `bool` status: True for "monitor", False for "ignore" :return: `bool`: operation status result """ self._load_info() status_cmd = "monitor" if status else "ignore" ret, std_out, std_err = self._run_dbctl_with_args([status_cmd, username]) if std_err or ret: exc_message = {'message': "Set governor status error(%(ret)s): %(output)s", 'context': {'ret': ret, 'output': std_err or std_out}} raise MySQLGovException(exc_message) return 0 def set_restricted_status_for_user(self, username, status): """ Set user restricted with dbctl utility :param: `str` username: Username for set restricted status :param: `bool` status: True for "restricted", False for "unrestricted" :return: `bool`: operation status result """ self._load_info() status_cmd = "restrict" if status else "unrestrict" if username in ["root", "admin"]: username = "default" ret, std_out, std_err = self._run_dbctl_with_args([status_cmd, username]) if std_err or ret: exc_message = {'message': "Set user restrict error(%(ret)s): %(output)s", 'context': {'ret': ret, 'output': std_err or std_out}} raise MySQLGovException(exc_message) return 0 def set_unrestricted_status_for_all_users(self): """ Set user restricted with dbctl utility :return: `bool`: operation status result """ self._load_info() ret, std_out, std_err = self._run_dbctl_with_args(["unrestrict-all"]) if std_err or ret: exc_message = {'message': "Set all users unrestrict status error(%(ret)s): %(output)s", 'context': {'ret': ret, 'output': std_err or std_out}} raise MySQLGovException(exc_message) return 0 def get_restrict_status_by_username(self, username): """ Get MySQL governor status for supplied user :param username: Username for get status :return: Governor restricted status: "restricted"/"unrestricted" """ # Load Governor data self._load_info() if username in ["root", "admin"]: username = "default" if username in self._governor_restricted_users: return 'restricted' return 'unrestricted' def set_limits_for_user(self, username, cpu=None, io=None): # this function interface for full edit mode # def set_limits_for_user(self, username, cpu=None, io=None, read=None, # write=None): """ Set MySQLGovernor limits for user :param: username `str`: username for set limits :param: `int`|`list` cpu: governor cpu limit. when it param int - calculate by percentage other params :param: `int`|`list` io: io value means that read and write limits similar :param: `int`|`list` read: read limit :param: `int`|`list` write: write limit :return: 0 """ if cpu is None and io is None: # and read is None and write is None: return 0 self._load_info() cmd = ["set", username] if cpu is not None: if isinstance(cpu, int): cpu = ",".join(map(str, self._percentage(cpu, self.CPU_PERCENTS))) else: raise MySQLGovException(_get_exc_message('cpu')) # uncomment this lines for add full edit mode # elif isinstance(cpu, (list, tuple)) and len(cpu) == 4: # cpu = ",".join(map(str, cpu)) # else: # raise MySQLGovException(_get_exc_message('cpu')) cmd.append(f"--cpu={cpu}") if io is not None: # uncomment this line for add full edit mode # if io is not None or read is not None or write is not None: if isinstance(io, int): read, write = self._calc_rw_io_limits(io) else: raise MySQLGovException(_get_exc_message('io')) # uncomment this lines for add full edit mode # elif isinstance(io, (list, tuple)) and len(io) == 4: # read = write = ",".join(map(str, io)) # else: # if isinstance(read, int): # read = "%s,%s,%s,%s" % (read, read*0.83, read*0.76, read*0.59) # elif isinstance(read, (list, tuple)) and len(read) == 4: # read = ",".join(map(str, read)) # if isinstance(write, int): # write = "%s,%s,%s,%s" % (write, write*0.83, write*0.76, write*0.59) # elif isinstance(write, (list, tuple)) and len(write) == 4: # write = ",".join(map(str, write)) # else: # raise MySQLGovException(_get_exc_message('limit')) cmd.append(f"--read={read}") cmd.append(f"--write={write}") try: ret, std_out, std_err = self._run_dbctl_with_args(cmd) except ExternalProgramFailed as e: raise MySQLGovException(str(e)) from e if std_err or ret: exc_message = {'message': "Set all users unrestrict status error(%(ret)s): %(output)s", 'context': {'ret': ret, 'output': std_err or std_out}} raise MySQLGovException(exc_message) # Reset users limits cache self._governor_limits = None return 0 def _get_package_raw_limits_from_utility(self, package_name: Optional[str]) -> dict: """ Retrieve MySQL Governor package limits :param package_name: Package name. If None, get all packages name :return: Dict with limits. Example: {'pack1': {'cpu': 100, 'io': 900}, 'pack2': {'cpu': 100, 'io': 900}} """ if package_name: cmd_list = [self._PACKAGE_UTILITY_PATH, 'get', f"--package={package_name.encode().decode('unicode-escape')}", '--format=kb'] else: cmd_list = [self._PACKAGE_UTILITY_PATH, 'get', '--all', '--format=kb'] ret_code, std_out, std_err = run_command(cmd_list, return_full_output=True) if ret_code != 0 or std_err: raise MySQLGovException({ 'message': "'%(cmd)s' failed, stderr is: %(stderr)s", 'context': {'cmd': ' '.join(cmd_list), 'stderr': std_err} }) try: package_data_from_util = json.loads(std_out) except (json.JSONDecodeError, ) as e: raise MySQLGovException({ 'message': "%(util)s output invalid, error is: %(error)s", 'context': {'util': self._PACKAGE_UTILITY_PATH, 'error': str(e)} }) from e return package_data_from_util def _calc_package_limits_from_raw(self, cpu_limits_list: list, read_limits_list: list, write_limits_list: list) -> Tuple[int, int]: """ Calculate package limits from raw governor limits :param cpu_limits_list: CPU limits list :param read_limits_list: Read limits list :param write_limits_list: Write limits list :return: Tuple: (cpu_limit, io_limit) """ return int(cpu_limits_list[0]), self._get_user_io_limit(str(read_limits_list[0]), str(write_limits_list[0])) def get_package_limits(self, package_name: Optional[str] = None) -> Optional[Dict]: """ Retrieve MySQL Governor package limits :param package_name: Package name. If None, get all packages name :return: Dict with limits. Example: {'pack1': {'cpu': 100, 'io': 900}, 'pack2': {'cpu': 100, 'io': 900}} """ try: package_data_from_util = self._get_package_raw_limits_from_utility(package_name) except MySQLGovException: # pylint: disable=try-except-raise raise # Convert limits from governor packages_data = {} for pack_name, pack_limits in package_data_from_util.items(): cpu_limit, io_limit = self._calc_package_limits_from_raw(pack_limits['cpu'], pack_limits['read'], pack_limits['write']) packages_data[pack_name] = {'cpu': cpu_limit, 'io': io_limit} return packages_data def reset_user_limits_to_defaults(self, username: str, limit_names: list): """ Reset users limits to default :param username: User name ro reset limits :param limit_names: Limit names list to reset """ # /usr/share/lve/dbgovernor/governor_package_limitting.py reset_individual \ # --user=res1 --limits=mysql-cpu,mysql-io limits_string = ','.join(limit_names) cmd_list = [self._PACKAGE_UTILITY_PATH, 'reset_individual', f'--user={username}', f'--limits={limits_string}'] ret_code, _, std_err = run_command(cmd_list, return_full_output=True) # For reliability we check both retcode and std_err if ret_code != 0 or std_err: raise MySQLGovException({ 'message': "'%(cmd)s' is failed, stderr is: %(stderr)s", 'context': {'cmd': ' '.join(cmd_list), 'stderr': std_err} }) def _set_governor_limits_to_cpanel_package( # pylint: disable=too-many-branches self, package_name: str, cpu_limit: Optional[int], io_limit: Optional[int] ): """ Set MySQL Governor to cPanel package file. If limits not changed, package file will not be written :param package_name: Package name :param cpu_limit: MySQL CPU limit to set :param io_limit: MySQL IO limit to set """ from clcommon.utils import get_file_lines, write_file_lines # pylint: disable=import-outside-toplevel package_path = f'/var/cpanel/packages/{package_name}' try: cpanel_package_lines = get_file_lines(package_path) except (OSError, IOError, ): return if len(cpanel_package_lines) == 0: return lines_to_write = [] is_change_made = False # Find and change limit lines: # lve_mysql_cpu=4000 # lve_mysql_io=4096 for line in cpanel_package_lines: if line.startswith('lve_mysql_cpu') and cpu_limit is not None: parts = line.strip().split('=') if len(parts) != 2: continue if cpu_limit == 0: s_cpu_limit = self._S_DEFAULT else: s_cpu_limit = str(cpu_limit) s_old_cpu_limit = parts[1].strip() if s_old_cpu_limit != s_cpu_limit: lines_to_write.append(f'lve_mysql_cpu={s_cpu_limit}\n') is_change_made = True else: # MYSQL CPU limit unchanged, save old line lines_to_write.append(f'{line}\n') elif line.startswith('lve_mysql_io') and io_limit is not None: if io_limit == 0: s_io_limit = self._S_DEFAULT else: s_io_limit = str(io_limit) parts = line.strip().split('=') if len(parts) != 2: continue value = parts[1].strip() if value != s_io_limit: lines_to_write.append(f'lve_mysql_io={s_io_limit}\n') is_change_made = True else: # MYSQL IO limit unchanged, save old line lines_to_write.append(f'{line}\n') else: lines_to_write.append(line) if is_change_made: write_file_lines(package_path, lines_to_write, 'w') def _apply_package_limits_to_cpanel(self, package_name: str, cpu_limit: Optional[int], io_limit: Optional[int]): """ Apply all MySQL Governor packages limits to cpanel package file. In not cPanel, do nothing :param package_name: Package name to update :param cpu_limit: MySQL CPU limit to set, None - not change :param io_limit: MySQL IO limit to set, None - not change """ from clcommon.cpapi import getCPName # pylint: disable=import-outside-toplevel if getCPName() != 'cPanel': return self._set_governor_limits_to_cpanel_package(package_name, cpu_limit, io_limit) def set_package_limits(self, package_name: str, cpu_limit: Optional[int] = None, io_limit: Optional[int] = None): """ Set limits for Governor package :param package_name: Package name for set :param cpu_limit: MySQL CPU limit to set :param io_limit: MySQL CPU limit to set """ # Argument validation # /usr/share/lve/dbgovernor/governor_package_limitting.py set --package pack2 --cpu=200,201,202,203 # --read=500,501,502,503 --write=400,401,402,403 if package_name is None or cpu_limit is None and io_limit is None: raise MySQLGovException("MySQLGovernor.set_package_limits arguments error: " "Package name and at least one limit " f"should be provided. Current arguments: package name: {package_name}; " f"cpu limit is {cpu_limit}; IO limit is {io_limit};") cmd_list = [self._PACKAGE_UTILITY_PATH, 'set', '--package', package_name] # Check arguments and prepare command line parameters for governor utility if cpu_limit is not None: if isinstance(cpu_limit, int): cpu = ",".join(map(str, self._percentage(cpu_limit, self.CPU_PERCENTS))) else: raise MySQLGovException(_get_exc_message('cpu_limit')) cmd_list.append(f"--cpu={cpu}") if io_limit is not None: if isinstance(io_limit, int): read, write = self._calc_rw_io_limits(io_limit) else: raise MySQLGovException(_get_exc_message('io_limit')) cmd_list.append(f"--read={read}") cmd_list.append(f"--write={write}") ret_code, std_out, std_err = run_command(cmd_list, return_full_output=True) if ret_code != 0 or std_err: raise MySQLGovException({ 'message': "'%(command)s' is failed, stdout is: '%(stdout)s', stderr is: '%(stderr)s'", 'context': {'command': ' '.join(cmd_list), 'stdout': std_out, 'stderr': std_err} }) # Apply limits to cPanel package self._apply_package_limits_to_cpanel(package_name, cpu_limit, io_limit) def _get_config_path(self): """ Get config path for mysql-governor; :rtype: str|None """ if os.path.isfile(self._CONTAINER_PATH_V2): return self._CONTAINER_PATH_V2 return self._CONTAINER_PATH def _read_ignore_users(self): """Load ignore users list from container file""" try: governor_xml = self._get_xml_config() self._governor_ignored_users = \ self._load_ignore_users_from_xml(governor_xml) except MySQLGovException: self._governor_limits = None self._governor_ignored_users = None raise def _load_info(self): """ Loads users info from MySQL governor :return: None """ # Exit if governor data already loaded if self._governor_ignored_users is not None and self._governor_limits is not None: return # Exit if governor not present if not self._is_governor_present: self._is_governor_error = True raise MySQLGovernorAbsent() utility_exc_message = {'message': "%(utility)s output is invalid", 'context': {'utility': self._UTILITY_PATH}} self._read_ignore_users() # Load governor limits is_kb_limits_ok, gov_data_str = self._run_dbctl_list() _, gov_restricted_str, _ = \ self._run_dbctl_with_args(['list-restricted'], check_exit_code=True) self._governor_restricted_users = [ line.split()[0] for line in gov_restricted_str.strip().split('\n')[1:] ] # Parse dbctl output gov_data_lines = gov_data_str.split('\n') self._governor_limits = self._parse_dbctl_data_lines(gov_data_lines, is_kb_limits_ok, utility_exc_message) # Check default settings presence if 'default' not in self._governor_limits: self._is_governor_error = True self._governor_limits = None self._governor_ignored_users = None exc_message = { 'message': "There is no %(what)s found in %(where)s", 'context': {'what': 'default settings', 'where': f'{self._UTILITY_PATH} output'} } raise MySQLGovException(exc_message) @staticmethod def _get_user_io_limit(read_limit: str, write_limit: str): """ Calculates the io limit. Handles the situation when user's write or read io limit is less than 1mb/s (PTCLLIB-85). :type write_limit: str :type read_limit: str :rtype: int """ try: user_io_limit = int(read_limit) + int(write_limit) except ValueError: if read_limit == write_limit == "<1": user_io_limit = 1 elif write_limit == "<1": user_io_limit = int(read_limit) else: user_io_limit = int(write_limit) return user_io_limit def _run_dbctl_list(self, _is_incorrect_syntax=False): """ Executes dbctl list-marked --kb or dbctl list-marked :param _is_incorrect_syntax: True is emulate dbctl error. Only for testing! :return: Cortege (is_kb_limits_ok, stdout_str), where is_kb_limits_ok == True, if dbctl returned limits in KB, else - False stdout_str - dbctl stdout string """ ret_code, gov_data_str, _ = self._run_dbctl_with_args( ['list-marked', '--kb'], check_exit_code=True) # Check is KB limits supported is_kb_limits_ok = True if _is_incorrect_syntax or 'Incorrect syntax' in gov_data_str: # --kb option not supported, call without it _, gov_data_str, _ = self._run_dbctl_with_args( ['list'], check_exit_code=True) is_kb_limits_ok = False return is_kb_limits_ok, gov_data_str def _parse_dbctl_data_lines(self, data_lines_list, is_kb_limits_ok: bool, utility_exc_message: dict) -> dict: """ Converts data lines from dbctl stdout to dictionary :param data_lines_list: List of lines from dbctl stdout :param is_kb_limits_ok: Is limits already in KB/s :param utility_exc_message: Message dict for exception :return: Tuple(dict, dict) dbctl data dictionary. Example: {'default': (400, 1953124, '400', '1953124'), 'cltest1': (350, 2025138, '*350', '2025138') } """ governor_limits = {} for line in data_lines_list: line = line.strip() # Pass header line and empty lines if not line or 'cpu(%)' in line: continue # List: [0] - username, [1] - CPU limits, [2] - read limits, [3] - write limits, [4] - package limits marks user_limits_list = MySQLGovernor._parse_line(line) # Data format verification if len(user_limits_list) != 5 or len(user_limits_list[4]) != 2: self._is_governor_error = True self._governor_limits = None self._governor_ignored_users = None raise MySQLGovException(utility_exc_message) cpu_limits_list = user_limits_list[1].split('/') # '400/380/350/300' read_limits_list = user_limits_list[2].split('/') # '1000/830/760/590' write_limits_list = user_limits_list[3].split('/') # '1000/830/760/590' if len(cpu_limits_list) != 4 or len(read_limits_list) != 4 or len(write_limits_list) != 4: self._is_governor_error = True self._governor_limits = None self._governor_ignored_users = None raise MySQLGovException(utility_exc_message) user_name = user_limits_list[0] # Determine CPU limit as [0] limit user_cpu_limit = int(cpu_limits_list[0]) # Determine IO limit as read_limit[0] + write_limit[0] user_io_limit = self._get_user_io_limit(read_limits_list[0], write_limits_list[0]) # limit if is_kb_limits_ok else limit*1024 user_io_limit_kb = user_io_limit if is_kb_limits_ok else user_io_limit*1024 # Process package differ marks # Package limits marks are placed in user_limits_list[4] and shows as '+-' (for example) # There are 2 marks always, each or '-' or '+' # 1st mark - CPU limit # 2nd mark - IO limit # If mark is '-' - package limit, if '+' - individual (should be marked by '*') marks = user_limits_list[4] if marks[0] == '+': user_cpu_limit_marked = f'*{cpu_limits_list[0]}' else: user_cpu_limit_marked = cpu_limits_list[0] if marks[1] == '+': user_io_limit_marked = f'*{user_io_limit_kb}' else: user_io_limit_marked = str(user_io_limit_kb) # Add limits to dictionary governor_limits[user_name] = (user_cpu_limit, user_io_limit_kb, user_cpu_limit_marked, user_io_limit_marked) return governor_limits