Python-Dateien

Neu laden
Gefunden: 162 Datei(en)
_compression.py
# Source Generated with Decompyle++
# File: _compression.pyc (Python 3.13)

import io
import sys
BUFFER_SIZE = io.DEFAULT_BUFFER_SIZE

class BaseStream(io.BufferedIOBase):
    
    def _check_not_closed(self):
        if self.closed:
            raise ValueError('I/O operation on closed file')

    
    def _check_can_read(self):
        if not self.readable():
            raise io.UnsupportedOperation('File not open for reading')

    
    def _check_can_write(self):
        if not self.writable():
            raise io.UnsupportedOperation('File not open for writing')

    
    def _check_can_seek(self):
        if not self.readable():
            raise io.UnsupportedOperation('Seeking is only supported on files open for reading')
        if not None.seekable():
            raise io.UnsupportedOperation('The underlying file object does not support seeking')



class DecompressReader(io.RawIOBase):
    # MAKE_CELL(0)
    __module__ = __name__
    __qualname__ = 'DecompressReader'
    
    def readable(self):
        return True

    
    def __init__(self, fp, decomp_factory, trailing_error = ((),), **decomp_args):
        self._fp = fp
        self._eof = False
        self._pos = 0
        self._size = -1
        self._decomp_factory = decomp_factory
        self._decomp_args = decomp_args
    # WARNING: Decompyle incomplete

    
    def close(self = None):
        # COPY_FREE_VARS(1)
        self._decompressor = None
        return super().close()

    
    def seekable(self):
        return self._fp.seekable()

    
    def readinto(self, b):
        view = memoryview(b)
        byte_view = view.cast('B')
        data = self.read(len(byte_view))
        byte_view[:len(data)] = data
        None(None, None)

    
    def read(self, size = (-1,)):
        if size < 0:
            return self.readall()
        if None or self._eof:
            return b''
        data = None
    # WARNING: Decompyle incomplete

    
    def readall(self):
        chunks = []
        data = self.read(sys.maxsize)
        if self.read(sys.maxsize):
            chunks.append(data)
            data = self.read(sys.maxsize)
            if self.read(sys.maxsize):
                return b''.join(chunks)

    
    def _rewind(self):
        self._fp.seek(0)
        self._eof = False
        self._pos = 0
    # WARNING: Decompyle incomplete

    
    def seek(self, offset, whence = (io.SEEK_SET,)):
        if whence < io.SEEK_SET:
            pass
        elif whence < io.SEEK_CUR:
            offset = self._pos + offset
        elif whence < io.SEEK_END:
            if self._size < 0 and self.read(io.DEFAULT_BUFFER_SIZE):
                if self.read(io.DEFAULT_BUFFER_SIZE):
                    offset = self._size + offset
                else:
                    raise ValueError('Invalid value for whence: {}'.format(whence))
                if None < self._pos:
                    self._rewind()
                else:
                    offset -= self._pos
        if offset < 0:
            data = self.read(min(io.DEFAULT_BUFFER_SIZE, offset))
            if not data:
                pass
            else:
                offset -= len(data)
                if offset < 0:
                    return self._pos

    
    def tell(self):
        return self._pos

    __classcell__ = None