Server IP : 66.29.132.122 / Your IP : 3.145.48.72 Web Server : LiteSpeed System : Linux business142.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64 User : admazpex ( 531) PHP Version : 7.2.34 Disable Function : NONE MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : OFF | Pkexec : OFF Directory : /proc/self/root/proc/self/root/proc/thread-self/root/proc/thread-self/root/proc/self/root/proc/thread-self/root/proc/self/root/proc/self/root/opt/cloudlinux/venv/lib64/python3.11/site-packages/astroid/brain/ |
Upload File : |
# Licensed under the LGPL: https://www.gnu.org/licenses/old-licenses/lgpl-2.1.en.html # For details: https://github.com/PyCQA/astroid/blob/main/LICENSE # Copyright (c) https://github.com/PyCQA/astroid/blob/main/CONTRIBUTORS.txt from astroid.bases import BoundMethod from astroid.brain.helpers import register_module_extender from astroid.builder import parse from astroid.exceptions import InferenceError from astroid.manager import AstroidManager from astroid.nodes.scoped_nodes import FunctionDef def _multiprocessing_transform(): module = parse( """ from multiprocessing.managers import SyncManager def Manager(): return SyncManager() """ ) # Multiprocessing uses a getattr lookup inside contexts, # in order to get the attributes they need. Since it's extremely # dynamic, we use this approach to fake it. node = parse( """ from multiprocessing.context import DefaultContext, BaseContext default = DefaultContext() base = BaseContext() """ ) try: context = next(node["default"].infer()) base = next(node["base"].infer()) except (InferenceError, StopIteration): return module for node in (context, base): for key, value in node.locals.items(): if key.startswith("_"): continue value = value[0] if isinstance(value, FunctionDef): # We need to rebound this, since otherwise # it will have an extra argument (self). value = BoundMethod(value, node) module[key] = value return module def _multiprocessing_managers_transform(): return parse( """ import array import threading import multiprocessing.pool as pool import queue class Namespace(object): pass class Value(object): def __init__(self, typecode, value, lock=True): self._typecode = typecode self._value = value def get(self): return self._value def set(self, value): self._value = value def __repr__(self): return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value) value = property(get, set) def Array(typecode, sequence, lock=True): return array.array(typecode, sequence) class SyncManager(object): Queue = JoinableQueue = queue.Queue Event = threading.Event RLock = threading.RLock Lock = threading.Lock BoundedSemaphore = threading.BoundedSemaphore Condition = threading.Condition Barrier = threading.Barrier Pool = pool.Pool list = list dict = dict Value = Value Array = Array Namespace = Namespace __enter__ = lambda self: self __exit__ = lambda *args: args def start(self, initializer=None, initargs=None): pass def shutdown(self): pass """ ) register_module_extender( AstroidManager(), "multiprocessing.managers", _multiprocessing_managers_transform ) register_module_extender( AstroidManager(), "multiprocessing", _multiprocessing_transform )