Server IP : 66.29.132.122 / Your IP : 3.142.54.83 Web Server : LiteSpeed System : Linux business142.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64 User : admazpex ( 531) PHP Version : 7.2.34 Disable Function : NONE MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : OFF | Pkexec : OFF Directory : /proc/self/root/proc/self/root/proc/thread-self/root/proc/thread-self/root/opt/cloudlinux/venv/lib64/python3.11/site-packages/urllib3/util/ |
Upload File : |
from __future__ import annotations import socket import typing from ..exceptions import LocationParseError from .timeout import _DEFAULT_TIMEOUT, _TYPE_TIMEOUT _TYPE_SOCKET_OPTIONS = typing.Sequence[typing.Tuple[int, int, typing.Union[int, bytes]]] if typing.TYPE_CHECKING: from .._base_connection import BaseHTTPConnection def is_connection_dropped(conn: BaseHTTPConnection) -> bool: # Platform-specific """ Returns True if the connection is dropped and should be closed. :param conn: :class:`urllib3.connection.HTTPConnection` object. """ return not conn.is_connected # This function is copied from socket.py in the Python 2.7 standard # library test suite. Added to its signature is only `socket_options`. # One additional modification is that we avoid binding to IPv6 servers # discovered in DNS if the system doesn't have IPv6 functionality. def create_connection( address: tuple[str, int], timeout: _TYPE_TIMEOUT = _DEFAULT_TIMEOUT, source_address: tuple[str, int] | None = None, socket_options: _TYPE_SOCKET_OPTIONS | None = None, ) -> socket.socket: """Connect to *address* and return the socket object. Convenience function. Connect to *address* (a 2-tuple ``(host, port)``) and return the socket object. Passing the optional *timeout* parameter will set the timeout on the socket instance before attempting to connect. If no *timeout* is supplied, the global default timeout setting returned by :func:`socket.getdefaulttimeout` is used. If *source_address* is set it must be a tuple of (host, port) for the socket to bind as a source address before making the connection. An host of '' or port 0 tells the OS to use the default. """ host, port = address if host.startswith("["): host = host.strip("[]") err = None # Using the value from allowed_gai_family() in the context of getaddrinfo lets # us select whether to work with IPv4 DNS records, IPv6 records, or both. # The original create_connection function always returns all records. family = allowed_gai_family() try: host.encode("idna") except UnicodeError: raise LocationParseError(f"'{host}', label empty or too long") from None for res in socket.getaddrinfo(host, port, family, socket.SOCK_STREAM): af, socktype, proto, canonname, sa = res sock = None try: sock = socket.socket(af, socktype, proto) # If provided, set socket level options before connecting. _set_socket_options(sock, socket_options) if timeout is not _DEFAULT_TIMEOUT: sock.settimeout(timeout) if source_address: sock.bind(source_address) sock.connect(sa) # Break explicitly a reference cycle err = None return sock except OSError as _: err = _ if sock is not None: sock.close() if err is not None: try: raise err finally: # Break explicitly a reference cycle err = None else: raise OSError("getaddrinfo returns an empty list") def _set_socket_options( sock: socket.socket, options: _TYPE_SOCKET_OPTIONS | None ) -> None: if options is None: return for opt in options: sock.setsockopt(*opt) def allowed_gai_family() -> socket.AddressFamily: """This function is designed to work in the context of getaddrinfo, where family=socket.AF_UNSPEC is the default and will perform a DNS search for both IPv6 and IPv4 records.""" family = socket.AF_INET if HAS_IPV6: family = socket.AF_UNSPEC return family def _has_ipv6(host: str) -> bool: """Returns True if the system can bind an IPv6 address.""" sock = None has_ipv6 = False if socket.has_ipv6: # has_ipv6 returns true if cPython was compiled with IPv6 support. # It does not tell us if the system has IPv6 support enabled. To # determine that we must bind to an IPv6 address. # https://github.com/urllib3/urllib3/pull/611 # https://bugs.python.org/issue658327 try: sock = socket.socket(socket.AF_INET6) sock.bind((host, 0)) has_ipv6 = True except Exception: pass if sock: sock.close() return has_ipv6 HAS_IPV6 = _has_ipv6("::1")