Source code for cligram.proxy_manager

"""Proxy management and testing for Telegram connections."""

import asyncio
import base64
import re
import time
from dataclasses import dataclass
from enum import Enum
from typing import TYPE_CHECKING, List, Optional, Tuple

import socks
from telethon import TelegramClient, sessions
from telethon.network import ConnectionTcpMTProxyRandomizedIntermediate

if TYPE_CHECKING:
    from . import Config


[docs] class ProxyType(Enum): """Type of proxy connection supported by the application.""" MTPROTO = "mtproto" """MTProto proxy protocol used by Telegram.""" SOCKS5 = "socks5" """SOCKS5 proxy protocol with optional authentication.""" DIRECT = "direct" """Direct connection without proxy."""
[docs] @dataclass class Proxy: """Proxy connection configuration. Supports both MTProto and SOCKS5 protocols with their respective authentication methods. For MTProto, requires secret key; for SOCKS5, supports optional username/password authentication. """ url: str """Original proxy URL string""" type: ProxyType """Type of proxy protocol to use""" host: str """Proxy server hostname or IP address""" port: int """Proxy server port number""" secret: Optional[str] = None """MTProto secret key (hex or base64 encoded)""" username: Optional[str] = None """SOCKS5 authentication username""" password: Optional[str] = None """SOCKS5 authentication password""" @property def is_direct(self) -> bool: """Check if the proxy is a direct connection.""" return self.type == ProxyType.DIRECT def _export(self): """Export proxy configuration for Telethon client. Returns: dict: Telethon Client-compatible proxy configuration parameters """ params = {} if self.type == ProxyType.MTPROTO: params["connection"] = ConnectionTcpMTProxyRandomizedIntermediate params["proxy"] = (self.host, self.port, self.secret) elif self.type == ProxyType.SOCKS5: params["proxy"] = ( socks.SOCKS5, self.host, self.port, True, self.username, self.password, ) return params def __eq__(self, other: object) -> bool: """Check equality between two Proxy instances.""" if not isinstance(other, Proxy): return False if self.type == ProxyType.DIRECT and other.type == ProxyType.DIRECT: return True return self.url == other.url def __hash__(self) -> int: """Generate hash for the Proxy instance.""" if self.type == ProxyType.DIRECT: return hash("direct_connection") return hash(self.url)
[docs] class ProxyManager: """Manages proxy connections and testing. Provides functionality to add, test, and select working proxies for Telegram connections. """
[docs] @classmethod def from_config( cls, config: "Config", exclude_direct: bool = False ) -> "ProxyManager": """Create ProxyManager instance from application config. Args: config: Application configuration object exclude_direct: Whether to exclude direct connection from proxy list even if it is enabled in config Returns: ProxyManager instance """ proxy_manager = cls() if config.telegram.connection.direct and not exclude_direct: proxy_manager._add_direct_proxy() for proxy in config.telegram.connection.proxies: proxy_manager.add_proxy(proxy) return proxy_manager
def __init__(self): """Initialize ProxyManager with empty proxy list.""" self.proxies: List[Proxy] = [] """List of configured proxy connections""" self.current_proxy: Optional[Proxy] = None """Currently selected working proxy"""
[docs] def add_proxy(self, proxy_url: str) -> Optional[Proxy]: """Add new proxy from URL string. Args: proxy_url: URL string in supported format Returns: Configured Proxy instance if parsing successful """ proxy = self._parse_proxy_url(proxy_url) if proxy and proxy not in self.proxies: self.proxies.append(proxy) return proxy
def _add_direct_proxy(self): """Add direct connection (no proxy) to the manager.""" direct_proxy = Proxy( url="", type=ProxyType.DIRECT, host="", port=0, ) if direct_proxy not in self.proxies: self.proxies.append(direct_proxy) def _decode_secret(self, secret: str) -> str: """Decode MTProto proxy secret from base64. Args: secret: Base64 encoded secret string Returns: Decoded hex string or original if decoding fails """ secret += "=" * ((4 - len(secret) % 4) % 4) try: return base64.b64decode(secret).hex() except Exception: return secret def _parse_proxy_url(self, proxy_url: str) -> Optional[Proxy]: """Parse proxy URL into proxy configuration. Supports formats: - mtproto://<secret>@<host>:<port> - tg://proxy?server=<host>&port=<port>&secret=<secret> - socks5://[<user>:<pass>@]<host>:<port> Args: proxy_url: Proxy URL string Returns: Proxy configuration object or None if parsing fails """ mtproto_match = re.match(r"mtproto://([^@]+)@([^:]+):(\d+)", proxy_url) mtproto_tg_match = re.match( r"(?:tg|https?://t\.me)/proxy\?server=([^&]+)&port=(\d+)&secret=([^&]+)", proxy_url, ) socks5_match = re.match( r"socks5://(?:([^:]+):([^@]+)@)?([^:]+):(\d+)", proxy_url ) if mtproto_match: secret, host, port = mtproto_match.groups() return Proxy( type=ProxyType.MTPROTO, host=host, port=int(port), secret=self._decode_secret(secret), url=proxy_url, ) elif mtproto_tg_match: host, port, secret = mtproto_tg_match.groups() return Proxy( type=ProxyType.MTPROTO, host=host, port=int(port), secret=self._decode_secret(secret), url=proxy_url, ) elif socks5_match: username, password, host, port = socks5_match.groups() return Proxy( type=ProxyType.SOCKS5, host=host, port=int(port), username=username, password=password, url=proxy_url, ) return None
[docs] async def test_proxies( self, filter: Optional[ProxyType] = None, exclusion: List[Proxy] = [], shutdown_event: Optional[asyncio.Event] = None, timeout: float = 30.0, oneshot: bool = False, ) -> List["ProxyTestResult"]: """Test configured proxies. The best proxy will be set as `current_proxy`. Args: filter: Optional proxy type to filter for testing exclusion: List of proxies to exclude from testing shutdown_event: Optional asyncio event to signal shutdown timeout: Timeout for proxy test in seconds oneshot: If True, stop testing after first successful proxy Returns: List of ProxyTestResult objects with test outcomes """ candidates = [ p for p in self.proxies if (not filter or p.type == filter) and p not in exclusion ] if not candidates: return [] results = await _test_proxies( candidates, timeout=timeout, shutdown_event=shutdown_event, oneshot=oneshot ) # Set first working proxy as current for result in results: if result.success: self.current_proxy = result.proxy break return results
MT_PING_TAG = b"\xee\xee\xee\xee"
[docs] @dataclass class ProxyTestResult: """Result of a proxy test, including latency and status.""" proxy: Proxy success: bool latency: Optional[float] = None error: Optional[str] = None @property def score(self) -> float: """Calculate proxy score (lower is better).""" if not self.success: return float("inf") return self.latency or float("inf") @property def is_good(self) -> bool: """Check if the proxy test was successful.""" return self.success and (self.latency is not None and self.latency < 1000)
async def _ping_mtproto( proxy: "Proxy", timeout: float = 30.0 ) -> Tuple[bool, Optional[float], Optional[str]]: """Test MTProto proxy using actual Telegram connection.""" return await _test_telegram_connection( timeout=timeout, connection=ConnectionTcpMTProxyRandomizedIntermediate, proxy=(proxy.host, proxy.port, proxy.secret), ) async def _ping_socks5( proxy: "Proxy", timeout: float = 5.0 ) -> Tuple[bool, Optional[float], Optional[str]]: """Test SOCKS5 proxy with timeout and error reporting.""" start = time.time() try: reader, writer = await asyncio.wait_for( asyncio.open_connection(proxy.host, proxy.port), timeout ) # Initial greeting writer.write(b"\x05\x01\x00") await writer.drain() resp = await asyncio.wait_for(reader.readexactly(2), timeout) if resp != b"\x05\x00": return False, None, "SOCKS5 no-auth refused" writer.write(b"\x05\x01\x00\x01\x00\x00\x00\x00\x00\x00") await writer.drain() header = await asyncio.wait_for(reader.readexactly(4), timeout) if header[0] != 5 or header[1] != 0: return False, None, f"SOCKS5 connect failed (REP={header[1]})" # Clean up remaining data atyp = header[3] try: if atyp == 1: await reader.readexactly(4 + 2) elif atyp == 3: await reader.readexactly(ord(await reader.readexactly(1)) + 2) elif atyp == 4: await reader.readexactly(16 + 2) else: return False, None, f"Unknown address type: {atyp}" except Exception: pass # Ignore errors in cleanup writer.close() try: await writer.wait_closed() except Exception: pass # Ignore close errors return True, (time.time() - start) * 1000, None except asyncio.TimeoutError: return False, None, "timed out" except Exception as e: return False, None, str(e) async def _ping_direct( proxy: "Proxy", timeout: float = 30.0 ) -> Tuple[bool, Optional[float], Optional[str]]: """Test direct connection to Telegram servers.""" return await _test_telegram_connection(timeout=timeout, proxy=proxy) async def _test_telegram_connection( timeout: float = 30.0, connection=None, proxy=None, ) -> Tuple[bool, Optional[float], Optional[str]]: """Helper function to test Telegram connection with or without proxy.""" start = time.time() client = None try: # Create temporary client for testing kwargs = { "timeout": timeout, "auto_reconnect": False, "connection_retries": 1, "retry_delay": 1, } if connection: kwargs["connection"] = connection if not isinstance(proxy, Proxy) or proxy.type != ProxyType.DIRECT: kwargs["proxy"] = proxy client = TelegramClient( None, # Memory session # type: ignore 1, # Dummy API ID "0" * 32, # Dummy API hash **kwargs, ) # Test basic connection await asyncio.wait_for(client.connect(), timeout) success = True error = None except asyncio.TimeoutError: success = False error = "timed out" except ConnectionError as e: if "Invalid DC" in str(e): success = True error = None else: success = False error = str(e) except Exception as e: success = False error = str(e) finally: if isinstance(client, TelegramClient): if ( isinstance(proxy, Proxy) and proxy.type == ProxyType.DIRECT and isinstance(client.session, sessions.Session) ): proxy.host = client.session.server_address proxy.port = client.session.port proxy.url = f"dc:{client.session.dc_id}" dc = client.disconnect() if dc is not None: await asyncio.shield(dc) latency = (time.time() - start) * 1000 if success else None return success, latency, error async def _test_proxy_task( proxy: "Proxy", timeout: float = 5.0, ) -> ProxyTestResult: """Test a proxy with timeout and shutdown support.""" if proxy.type == ProxyType.MTPROTO: test_func = _ping_mtproto elif proxy.type == ProxyType.SOCKS5: test_func = _ping_socks5 elif proxy.type == ProxyType.DIRECT: test_func = _ping_direct else: return ProxyTestResult(proxy, False, error="Unknown proxy type") try: success, latency, error = await test_func(proxy, timeout) except asyncio.CancelledError: success = False latency = None error = "Cancelled" except Exception as e: success = False latency = None error = str(e) return ProxyTestResult(proxy, success, latency, error) async def _test_proxies( proxies: List["Proxy"], timeout: float = 30.0, shutdown_event: Optional[asyncio.Event] = None, oneshot: bool = False, ) -> List[ProxyTestResult]: """Test multiple proxies concurrently and return sorted results.""" if not proxies: return [] # Create tasks explicitly using asyncio.create_task tasks = [asyncio.create_task(_test_proxy_task(p, timeout)) for p in proxies] results = [] async def wait_for_cancellation(): final_timeout = ( timeout + 0.5 ) # Slightly longer to ensure tasks complete before force stop try: if shutdown_event: await asyncio.wait_for( shutdown_event.wait(), timeout=final_timeout, ) else: await asyncio.sleep(final_timeout) except asyncio.TimeoutError: pass except asyncio.CancelledError: pass watcher = asyncio.create_task(wait_for_cancellation()) pending = set(tasks) # Use a set to track pending tasks pending.add(watcher) # type: ignore should_break = False try: while pending: done, pending = await asyncio.wait( pending, return_when=asyncio.FIRST_COMPLETED ) if watcher in done: # Shutdown event triggered or timeout elapsed break for task in done: if task is not watcher: result = await task if isinstance(result, ProxyTestResult): results.append(result) if oneshot and result.success: should_break = True break if should_break or (len(pending) == 1 and watcher in pending): break finally: # Cancel remaining tasks for task in pending: if task is not watcher and not task.done(): task.cancel() try: result = await task results.append(result) except asyncio.CancelledError: pass if not watcher.done(): watcher.cancel() try: await watcher except asyncio.CancelledError: pass # Sort by score (successful proxies first, then by latency) results.sort(key=lambda r: r.score) return results