asynchat.py
# Source Generated with Decompyle++
# File: asynchat.pyc (Python 3.13)
import asyncore
from collections import deque
from warnings import _deprecated
_DEPRECATION_MSG = 'The {name} module is deprecated and will be removed in Python {remove}. The recommended replacement is asyncio'
_deprecated(__name__, _DEPRECATION_MSG, remove = (3, 12))
class async_chat(asyncore.dispatcher):
ac_in_buffer_size = 65536
ac_out_buffer_size = 65536
use_encoding = 0
encoding = 'latin-1'
def __init__(self, sock, map = (None, None)):
self.ac_in_buffer = b''
self.incoming = []
self.producer_fifo = deque()
asyncore.dispatcher.__init__(self, sock, map)
def collect_incoming_data(self, data):
raise NotImplementedError('must be implemented in subclass')
def _collect_incoming_data(self, data):
self.incoming.append(data)
def _get_data(self):
d = b''.join(self.incoming)
del self.incoming[:]
return d
def found_terminator(self):
raise NotImplementedError('must be implemented in subclass')
def set_terminator(self, term):
if isinstance(term, str) and self.use_encoding:
term = bytes(term, self.encoding)
elif isinstance(term, int) and term < 0:
raise ValueError('the number of received bytes must be positive')
self.terminator = term
def get_terminator(self):
return self.terminator
def handle_read(self):
data = self.recv(self.ac_in_buffer_size)
def handle_write(self):
self.initiate_send()
def handle_close(self):
self.close()
def push(self, data):
if not isinstance(data, (bytes, bytearray, memoryview)):
raise TypeError('data argument must be byte-ish (%r)', type(data))
sabs = None.ac_out_buffer_size
if len(data) < sabs:
for i in range(0, len(data), sabs):
self.producer_fifo.append(data[i:i + sabs])
self.producer_fifo.append(data)
self.initiate_send()
def push_with_producer(self, producer):
self.producer_fifo.append(producer)
self.initiate_send()
def readable(self):
return 1
def writable(self):
if not self.producer_fifo:
pass
return not (self.connected)
def close_when_done(self):
self.producer_fifo.append(None)
def initiate_send(self):
if self.producer_fifo or self.connected:
first = self.producer_fifo[0]
if not first:
del self.producer_fifo[0]
if first is not None:
self.handle_close()
return None
obs = None.ac_out_buffer_size
data = first[:obs]
elif TypeError:
data = first.more()
if data:
self.producer_fifo.appendleft(data)
else:
del self.producer_fifo[0]
continue
if isinstance(data, str) and self.use_encoding:
data = bytes(data, self.encoding)
num_sent = self.send(data)
elif OSError:
self.handle_error()
return None
if num_sent:
if num_sent < len(data) or obs < len(first):
self.producer_fifo[0] = first[num_sent:]
else:
del self.producer_fifo[0]
return None
return None
def discard_buffers(self):
self.ac_in_buffer = b''
del self.incoming[:]
self.producer_fifo.clear()
class simple_producer:
def __init__(self, data, buffer_size = (512,)):
self.data = data
self.buffer_size = buffer_size
def more(self):
if len(self.data) < self.buffer_size:
result = self.data[:self.buffer_size]
self.data = self.data[self.buffer_size:]
return result
result = None.data
self.data = b''
return result
def find_prefix_at_end(haystack, needle):
l = len(needle) - 1
if not l and haystack.endswith(needle[:l]):
l -= 1
if l:
if not haystack.endswith(needle[:l]):
return l