403Webshell
Server IP : 66.29.132.122  /  Your IP : 3.129.39.104
Web Server : LiteSpeed
System : Linux business142.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : admazpex ( 531)
PHP Version : 7.2.34
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /proc/self/root/proc/self/root/proc/thread-self/root/proc/thread-self/root/proc/self/root/proc/thread-self/root/proc/self/root/proc/self/root/proc/self/root/proc/self/root/opt/cloudlinux/venv/lib64/python3.11/site-packages/lvestats/utils/dbmigrator/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /proc/self/root/proc/self/root/proc/thread-self/root/proc/thread-self/root/proc/self/root/proc/thread-self/root/proc/self/root/proc/self/root/proc/self/root/proc/self/root/opt/cloudlinux/venv/lib64/python3.11/site-packages/lvestats/utils/dbmigrator/dbmigrate_lib.py
#!/opt/cloudlinux/venv/bin/python3 -bb
# coding=utf-8
#
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT

import os
import sys
try:
    from alembic.migration import MigrationContext
    from alembic import config, command, script
except ImportError as e:
    if __name__ == '__main__':
        # show message end exit if run as script
        print(f'Alembic Python library is not installed. Error: {e}')
        sys.exit(1)
    else:
        raise

ALEMBIC_CONF = os.path.join(os.path.dirname(__file__), 'alembic.ini')


def generate_revisions_chain(alembic_cfg):

    chain_ = []
    script_ = script.ScriptDirectory.from_config(alembic_cfg)
    revision_pairs = {s.down_revision or 'base': s.revision for s in
                      script_.walk_revisions()}
    curent_revision = 'base'
    while curent_revision:
        chain_.append(curent_revision)
        curent_revision = revision_pairs.get(curent_revision)
    return chain_


def get_database_version(engine):
    connection = engine.connect()
    context = MigrationContext.configure(connection)
    current_rev = context.get_current_revision()
    return current_rev or 'base'


def migration_way(alembic_cfg, revision_from, revision_to):
    """
    Check need migrate or downgrade to specific revisions
    :return bool: True - upgrade; False downgrade
    """
    rev_chain = generate_revisions_chain(alembic_cfg)
    return rev_chain.index(revision_from) <= rev_chain.index(revision_to)


def correct_script_location(alembic_cfg):
    """
    correct script_location in alembic config for relative path support
    :param alembic_cfg:
    :return:
    """
    location = alembic_cfg.get_section_option('alembic', 'script_location')
    here = alembic_cfg.get_section_option('alembic', 'here')
    location_corrected = os.path.abspath(os.path.join(here, location))
    alembic_cfg.set_section_option(
        'alembic', 'script_location', location_corrected)
    return alembic_cfg


def get_orm_version():
    alembic_version = getattr(
        __import__('lvestats.orm', fromlist=['alembic_version']),
        'alembic_version', 'base')
    return alembic_version


def alembic_migrate(engine, alembic_version=None, stamp=False, lve_stats_cfg=None):
    if alembic_version is None:
        alembic_version = get_orm_version()
    alembic_cfg = correct_script_location(config.Config(ALEMBIC_CONF))
    alembic_cfg.attributes['lve-stats'] = lve_stats_cfg  # pylint: disable=unsupported-assignment-operation
    with engine.begin() as connection:
        alembic_cfg.attributes['connection'] = connection  # pylint: disable=unsupported-assignment-operation
        if stamp:
            # write revision version to database only
            command.stamp(alembic_cfg, alembic_version)
        else:
            database_version = get_database_version(engine)
            if migration_way(alembic_cfg, database_version, alembic_version):
                command.upgrade(alembic_cfg, alembic_version)
            else:
                command.downgrade(alembic_cfg, alembic_version)



Youez - 2016 - github.com/yon3zu
LinuXploit