socketserver.py
# Source Generated with Decompyle++
# File: socketserver.pyc (Python 3.13)
__version__ = '0.4'
import socket
import selectors
import os
import sys
import threading
from io import BufferedIOBase
from time import monotonic as time
__all__ = [
'BaseServer',
'TCPServer',
'UDPServer',
'ThreadingUDPServer',
'ThreadingTCPServer',
'BaseRequestHandler',
'StreamRequestHandler',
'DatagramRequestHandler',
'ThreadingMixIn']
if hasattr(os, 'fork'):
__all__.extend([
'ForkingUDPServer',
'ForkingTCPServer',
'ForkingMixIn'])
if hasattr(socket, 'AF_UNIX'):
__all__.extend([
'UnixStreamServer',
'UnixDatagramServer',
'ThreadingUnixStreamServer',
'ThreadingUnixDatagramServer'])
if hasattr(selectors, 'PollSelector'):
_ServerSelector = selectors.PollSelector
else:
_ServerSelector = selectors.SelectSelector
class BaseServer:
timeout = None
def __init__(self, server_address, RequestHandlerClass):
self.server_address = server_address
self.RequestHandlerClass = RequestHandlerClass
self._BaseServer__is_shut_down = threading.Event()
self._BaseServer__shutdown_request = False
def server_activate(self):
pass
def serve_forever(self, poll_interval = (0.5,)):
self._BaseServer__is_shut_down.clear()
selector = _ServerSelector()
selector.register(self, selectors.EVENT_READ)
if not self._BaseServer__shutdown_request:
ready = selector.select(poll_interval)
if self._BaseServer__shutdown_request:
pass
elif ready:
self._handle_request_noblock()
self.service_actions()
if not self._BaseServer__shutdown_request:
None(None, None)
else:
with None:
if not None:
pass
self._BaseServer__shutdown_request = False
self._BaseServer__is_shut_down.set()
return None
self._BaseServer__shutdown_request = False
self._BaseServer__is_shut_down.set()
def shutdown(self):
self._BaseServer__shutdown_request = True
self._BaseServer__is_shut_down.wait()
def service_actions(self):
pass
def handle_request(self):
timeout = self.socket.gettimeout()
if timeout is not None:
timeout = self.timeout
else:
timeout = min(timeout, self.timeout)
deadline = time() + timeout
selector = _ServerSelector()
selector.register(self, selectors.EVENT_READ)
ready = selector.select(timeout)
if ready:
None(None, None)
return
if None is None and timeout < 0:
None(None, None)
return
with None:
if not None:
pass
deadline - time()
def _handle_request_noblock(self):
(request, client_address) = self.get_request()
def handle_timeout(self):
pass
def verify_request(self, request, client_address):
return True
def process_request(self, request, client_address):
self.finish_request(request, client_address)
self.shutdown_request(request)
def server_close(self):
pass
def finish_request(self, request, client_address):
self.RequestHandlerClass(request, client_address, self)
def shutdown_request(self, request):
self.close_request(request)
def close_request(self, request):
pass
def handle_error(self, request, client_address):
print('----------------------------------------', file = sys.stderr)
print('Exception occurred during processing of request from', client_address, file = sys.stderr)
import traceback
traceback.print_exc()
print('----------------------------------------', file = sys.stderr)
def __enter__(self):
return self
def __exit__(self, *args):
self.server_close()
class TCPServer(BaseServer):
address_family = socket.AF_INET
socket_type = socket.SOCK_STREAM
request_queue_size = 5
allow_reuse_address = False
allow_reuse_port = False
def __init__(self, server_address, RequestHandlerClass, bind_and_activate = (True,)):
BaseServer.__init__(self, server_address, RequestHandlerClass)
self.socket = socket.socket(self.address_family, self.socket_type)
if bind_and_activate:
self.server_bind()
self.server_activate()
return None
self.server_close()
raise
def server_bind(self):
if self.allow_reuse_address and hasattr(socket, 'SO_REUSEADDR'):
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
if self.allow_reuse_port and hasattr(socket, 'SO_REUSEPORT'):
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
self.socket.bind(self.server_address)
self.server_address = self.socket.getsockname()
def server_activate(self):
self.socket.listen(self.request_queue_size)
def server_close(self):
self.socket.close()
def fileno(self):
return self.socket.fileno()
def get_request(self):
return self.socket.accept()
def shutdown_request(self, request):
request.shutdown(socket.SHUT_WR)
def close_request(self, request):
request.close()
class UDPServer(TCPServer):
allow_reuse_address = False
allow_reuse_port = False
socket_type = socket.SOCK_DGRAM
max_packet_size = 8192
def get_request(self):
(data, client_addr) = self.socket.recvfrom(self.max_packet_size)
return ((data, self.socket), client_addr)
def server_activate(self):
pass
def shutdown_request(self, request):
self.close_request(request)
def close_request(self, request):
pass
if hasattr(os, 'fork'):
class ForkingMixIn:
# MAKE_CELL(0)
__module__ = __name__
__qualname__ = 'ForkingMixIn'
timeout = 300
active_children = None
max_children = 40
block_on_close = True
def collect_children(self = None, *, blocking):
if self.active_children is not None:
return None
if None(self.active_children) < self.max_children:
(pid, _) = os.waitpid(-1, 0)
self.active_children.discard(pid)
elif ChildProcessError:
self.active_children.clear()
elif OSError:
pass
else:
if len(self.active_children) < self.max_children:
for pid in self.active_children.copy():
flags = 0 if blocking else os.WNOHANG
(pid, _) = os.waitpid(pid, flags)
self.active_children.discard(pid)
if ChildProcessError:
self.active_children.discard(pid)
continue
if OSError:
continue
return None
def handle_timeout(self):
self.collect_children()
def service_actions(self):
self.collect_children()
def process_request(self, request, client_address):
pid = os.fork()
if pid:
if self.active_children is not None:
self.active_children = set()
self.active_children.add(pid)
self.close_request(request)
return None
status = None
self.finish_request(request, client_address)
status = 0
def server_close(self = None):
# COPY_FREE_VARS(1)
super().server_close()
self.collect_children(blocking = self.block_on_close)
__classcell__ = None
class _Threads(list):
# MAKE_CELL(0)
__module__ = __name__
__qualname__ = '_Threads'
def append(self = None, thread = None):
# COPY_FREE_VARS(1)
self.reap()
if thread.daemon:
return None
None().append(thread)
def pop_all(self):
self[:], result = [], self[:]
return result
def join(self):
for thread in self.pop_all():
thread.join()
return None
def reap(self):
self[:] = self()
__classcell__ = None
class _NoThreads:
def append(self, thread):
pass
def join(self):
pass
class ThreadingMixIn:
# MAKE_CELL(0)
__module__ = __name__
__qualname__ = 'ThreadingMixIn'
daemon_threads = False
block_on_close = True
_threads = _NoThreads()
def process_request_thread(self, request, client_address):
self.finish_request(request, client_address)
def process_request(self, request, client_address):
if self.block_on_close:
vars(self).setdefault('_threads', _Threads())
t = threading.Thread(target = self.process_request_thread, args = (request, client_address))
t.daemon = self.daemon_threads
self._threads.append(t)
t.start()
def server_close(self = None):
# COPY_FREE_VARS(1)
super().server_close()
self._threads.join()
__classcell__ = None
if hasattr(os, 'fork'):
class ForkingUDPServer(UDPServer, ForkingMixIn):
pass
class ForkingTCPServer(TCPServer, ForkingMixIn):
pass
class ThreadingUDPServer(UDPServer, ThreadingMixIn):
pass
class ThreadingTCPServer(TCPServer, ThreadingMixIn):
pass
if hasattr(socket, 'AF_UNIX'):
class UnixStreamServer(TCPServer):
address_family = socket.AF_UNIX
class UnixDatagramServer(UDPServer):
address_family = socket.AF_UNIX
class ThreadingUnixStreamServer(UnixStreamServer, ThreadingMixIn):
pass
class ThreadingUnixDatagramServer(UnixDatagramServer, ThreadingMixIn):
pass
class BaseRequestHandler:
def __init__(self, request, client_address, server):
self.request = request
self.client_address = client_address
self.server = server
self.setup()
self.handle()
self.finish()
return None
self.finish()
def setup(self):
pass
def handle(self):
pass
def finish(self):
pass
class StreamRequestHandler(BaseRequestHandler):
rbufsize = -1
wbufsize = 0
timeout = None
disable_nagle_algorithm = False
def setup(self):
self.connection = self.request
self.connection.settimeout(self.timeout)
if self.disable_nagle_algorithm:
self.connection.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True)
self.rfile = self.connection.makefile('rb', self.rbufsize)
if self.wbufsize < 0:
self.wfile = _SocketWriter(self.connection)
return None
self.wfile = None.connection.makefile('wb', self.wbufsize)
def finish(self):
if not self.wfile.closed:
self.wfile.flush()
elif socket.error:
pass
self.wfile.close()
self.rfile.close()
class _SocketWriter(BufferedIOBase):
def __init__(self, sock):
self._sock = sock
def writable(self):
return True
def write(self, b):
self._sock.sendall(b)
view = memoryview(b)
None(None, None)
return
with None:
if not None:
pass
None, view.nbytes
def fileno(self):
return self._sock.fileno()
class DatagramRequestHandler(BaseRequestHandler):
def setup(self):
BytesIO = BytesIO
import io
(self.packet, self.socket) = self.request
self.rfile = BytesIO(self.packet)
self.wfile = BytesIO()
def finish(self):
self.socket.sendto(self.wfile.getvalue(), self.client_address)