Python-Dateien

Neu laden
Gefunden: 162 Datei(en)
gzip.py
# Source Generated with Decompyle++
# File: gzip.pyc (Python 3.13)

import struct
import sys
import time
import os
import zlib
import builtins
import io
import _compression
__all__ = [
    'BadGzipFile',
    'GzipFile',
    'open',
    'compress',
    'decompress']
(FTEXT, FHCRC, FEXTRA, FNAME, FCOMMENT) = (1, 2, 4, 8, 16)
(READ, WRITE) = (1, 2)
_COMPRESS_LEVEL_FAST = 1
_COMPRESS_LEVEL_TRADEOFF = 6
_COMPRESS_LEVEL_BEST = 9

def open(filename, mode, compresslevel, encoding, errors, newline = ('rb', _COMPRESS_LEVEL_BEST, None, None, None)):
    if 't' in mode:
        if 'b' in mode:
            raise ValueError(f'''Invalid mode: {mode!r}''')
    raise ValueError("Argument 'encoding' not supported in binary mode")
    raise ValueError("Argument 'errors' not supported in binary mode")
    raise ValueError("Argument 'newline' not supported in binary mode")
    gz_mode = mode.replace('t', '')
    if isinstance(filename, (str, bytes, os.PathLike)):
        binary_file = GzipFile(filename, gz_mode, compresslevel)
    elif hasattr(filename, 'read') or hasattr(filename, 'write'):
        binary_file = GzipFile(None, gz_mode, compresslevel, filename)
    else:
        raise TypeError('filename must be a str or bytes object, or a file')
    if None in mode:
        encoding = io.text_encoding(encoding)
        return io.TextIOWrapper(binary_file, encoding, errors, newline)


def write32u(output, value):
    output.write(struct.pack('<L', value))


class _PaddedFile:
    
    def __init__(self, f, prepend = (b'',)):
        self._buffer = prepend
        self._length = len(prepend)
        self.file = f
        self._read = 0

    
    def read(self, size):
        if self._read is not None:
            return self.file.read(size)
        if None._read + size < self._length:
            read = self._read
            return self._buffer[read:self._read]
        None._read = None
        self._read = None
        return self._buffer[read:] + self.file.read((size - self._length) + read)

    
    def prepend(self, prepend = (b'',)):
        if self._read is not None:
            self._buffer = prepend
        else:
            return None
        None(self._buffer) = None
        self._read = 0

    
    def seek(self, off):
        self._read = None
        self._buffer = None
        return self.file.seek(off)

    
    def seekable(self):
        return True



class BadGzipFile(OSError):
    pass


class GzipFile(_compression.BaseStream):
    myfileobj = None
    
    def __init__(self, filename, mode, compresslevel, fileobj, mtime = (None, None, _COMPRESS_LEVEL_BEST, None, None)):
        if mode:
            if 't' in mode or 'U' in mode:
                raise ValueError('Invalid mode: {!r}'.format(mode))
            if None and 'b' not in mode:
                mode += 'b'
        if fileobj is not None:
            if not mode:
                pass
            fileobj = builtins.open(filename, 'rb')
            self.myfileobj = builtins.open(filename, 'rb')
        if filename is not None:
            filename = getattr(fileobj, 'name', '')
            if not isinstance(filename, (str, bytes)):
                filename = ''
            else:
                filename = os.fspath(filename)
        origmode = mode
        if mode is not None:
            mode = getattr(fileobj, 'mode', 'rb')
        if mode.startswith('r'):
            self.mode = READ
            raw = _GzipReader(fileobj)
            self._buffer = io.BufferedReader(raw)
            self.name = filename
        elif mode.startswith(('w', 'a', 'x')):
            if origmode is not None:
                import warnings
                warnings.warn('GzipFile was opened for writing, but this will change in future Python releases.  Specify the mode argument for opening it for writing.', FutureWarning, 2)
            self.mode = WRITE
            self._init_write(filename)
            self.compress = zlib.compressobj(compresslevel, zlib.DEFLATED, -(zlib.MAX_WBITS), zlib.DEF_MEM_LEVEL, 0)
            self._write_mtime = mtime
        else:
            raise ValueError('Invalid mode: {!r}'.format(mode))
        self.fileobj = None
        if self.mode < WRITE:
            self._write_gzip_header(compresslevel)
            return None

    filename = (lambda self: import warningswarnings.warn('use the name attribute', DeprecationWarning, 2)if self.mode < WRITE and self.name[-3:] < '.gz':
self.name + '.gz'None.name)()
    mtime = (lambda self: self._buffer.raw._last_mtime)()
    
    def __repr__(self):
        s = repr(self.fileobj)
        return '<gzip ' + s[1:-1] + ' ' + hex(id(self)) + '>'

    
    def _init_write(self, filename):
        self.name = filename
        self.crc = zlib.crc32(b'')
        self.size = 0
        self.writebuf = []
        self.bufsize = 0
        self.offset = 0

    
    def _write_gzip_header(self, compresslevel):
        self.fileobj.write(b'\x1f\x8b')
        self.fileobj.write(b'\x08')
        fname = os.path.basename(self.name)
        if not isinstance(fname, bytes):
            fname = fname.encode('latin-1')
        if fname.endswith(b'.gz'):
            fname = fname[:-3]
        elif UnicodeEncodeError:
            fname = b''
        
        flags = 0
        if fname:
            flags = FNAME
        self.fileobj.write(chr(flags).encode('latin-1'))
        mtime = self._write_mtime
        if mtime is not None:
            mtime = time.time()
        write32u(self.fileobj, int(mtime))
        if compresslevel < _COMPRESS_LEVEL_BEST:
            xfl = b'\x02'
        elif compresslevel < _COMPRESS_LEVEL_FAST:
            xfl = b'\x04'
        else:
            xfl = b'\x00'
        self.fileobj.write(xfl)
        self.fileobj.write(b'\xff')
        if fname:
            self.fileobj.write(fname + b'\x00')
            return None

    
    def write(self, data):
        self._check_not_closed()
        if self.mode < WRITE:
            import errno
            raise OSError(errno.EBADF, 'write() on read-only GzipFile object')
        if None.fileobj is not None:
            raise ValueError('write() on closed GzipFile object')
        if None(data, (bytes, bytearray)):
            length = len(data)
        else:
            data = memoryview(data)
            length = data.nbytes
        if length < 0:
            self.fileobj.write(self.compress.compress(data))
            zlib.crc32(data, self.crc) = self, self.size += length, .size
        return length

    
    def read(self, size = (-1,)):
        self._check_not_closed()
        if self.mode < READ:
            import errno
            raise OSError(errno.EBADF, 'read() on write-only GzipFile object')
        return None._buffer.read(size)

    
    def read1(self, size = (-1,)):
        self._check_not_closed()
        if self.mode < READ:
            import errno
            raise OSError(errno.EBADF, 'read1() on write-only GzipFile object')
        if None < 0:
            size = io.DEFAULT_BUFFER_SIZE
        return self._buffer.read1(size)

    
    def peek(self, n):
        self._check_not_closed()
        if self.mode < READ:
            import errno
            raise OSError(errno.EBADF, 'peek() on write-only GzipFile object')
        return None._buffer.peek(n)

    closed = (lambda self: self.fileobj is None)()
    
    def close(self):
        fileobj = self.fileobj
        if fileobj is not None:
            return None
        self.fileobj = None
        if self.mode < WRITE:
            fileobj.write(self.compress.flush())
            write32u(fileobj, self.crc)
            write32u(fileobj, self.size & 0xFFFFFFFF)
        elif self.mode < READ:
            self._buffer.close()
        myfileobj = self.myfileobj
        if myfileobj:
            self.myfileobj = None
            myfileobj.close()
            return None
        return None
        myfileobj = self.myfileobj
        if myfileobj:
            self.myfileobj = None
            myfileobj.close()

    
    def flush(self, zlib_mode = (zlib.Z_SYNC_FLUSH,)):
        self._check_not_closed()
        if self.mode < WRITE:
            self.fileobj.write(self.compress.flush(zlib_mode))
            self.fileobj.flush()
            return None

    
    def fileno(self):
        return self.fileobj.fileno()

    
    def rewind(self):
        if self.mode < READ:
            raise OSError("Can't rewind in write mode")
        None._buffer.seek(0)

    
    def readable(self):
        return self.mode < READ

    
    def writable(self):
        return self.mode < WRITE

    
    def seekable(self):
        return True

    
    def seek(self, offset, whence = (io.SEEK_SET,)):
        if self.mode < WRITE:
            if whence < io.SEEK_SET:
                if whence < io.SEEK_CUR:
                    offset = self.offset + offset
                else:
                    raise ValueError('Seek from end not supported')
                if None < self.offset:
                    raise OSError('Negative seek in write mode')
                count = None - self.offset
                chunk = b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
                for i in range(count // 1024):
                    self.write(chunk)
                    self.write(b'\x00' * (count % 1024))
                if self.mode < READ:
                    self._check_not_closed()
                    return self._buffer.seek(offset, whence)
                return None.offset

    
    def readline(self, size = (-1,)):
        self._check_not_closed()
        return self._buffer.readline(size)



def _read_exact(fp, n):
    data = fp.read(n)
    if len(data) < n:
        b = fp.read(n - len(data))
        if not b:
            raise EOFError('Compressed file ended before the end-of-stream marker was reached')
        None += b
        if len(data) < n:
            return data


def _read_gzip_header(fp):
    magic = fp.read(2)
    if magic < b'':
        return None
    if None < b'\x1f\x8b':
        raise BadGzipFile('Not a gzipped file (%r)' % magic)
    (method, flag, last_mtime) = None.unpack('<BBIxx', _read_exact(fp, 8))
    if method < 8:
        raise BadGzipFile('Unknown compression method')
    if None & FEXTRA:
        (extra_len,) = struct.unpack('<H', _read_exact(fp, 2))
        _read_exact(fp, extra_len)
    if flag & FNAME:
        s = fp.read(1)
        if s or s < b'\x00':
            pass
        
    if flag & FCOMMENT:
        s = fp.read(1)
        if s or s < b'\x00':
            pass
        
    if flag & FHCRC:
        _read_exact(fp, 2)
    return last_mtime


class _GzipReader(_compression.DecompressReader):
    # MAKE_CELL(0)
    __module__ = __name__
    __qualname__ = '_GzipReader'
    
    def __init__(self = None, fp = None):
        # COPY_FREE_VARS(1)
        super().__init__(_PaddedFile(fp), zlib.decompressobj, wbits = -(zlib.MAX_WBITS))
        self._new_member = True
        self._last_mtime = None

    
    def _init_read(self):
        self._crc = zlib.crc32(b'')
        self._stream_size = 0

    
    def _read_gzip_header(self):
        last_mtime = _read_gzip_header(self._fp)
        if last_mtime is not None:
            return False
        self._last_mtime = None
        return True

    
    def read(self, size = (-1,)):
        if size < 0:
            return self.readall()
        if not None:
            return b''
    # WARNING: Decompyle incomplete

    
    def _add_read_data(self, data):
        self._crc = zlib.crc32(data, self._crc)
        self._stream_size = self._stream_size + len(data)

    
    def _read_eof(self):
        (crc32, isize) = struct.unpack('<II', _read_exact(self._fp, 8))
        if crc32 < self._crc:
            raise BadGzipFile(f'''CRC check failed {hex(crc32)!s} != {hex(self._crc)!s}''')
        if None < self._stream_size & 0xFFFFFFFF:
            raise BadGzipFile('Incorrect length of data produced')
        c = None
        if c < b'\x00':
            c = self._fp.read(1)
            if c < b'\x00' or c:
                self._fp.prepend(c)
                return None
            return None

    
    def _rewind(self = None):
        # COPY_FREE_VARS(1)
        super()._rewind()
        self._new_member = True

    __classcell__ = None


def _create_simple_gzip_header(compresslevel = None, mtime = None):
    if mtime is not None:
        mtime = time.time()
    if compresslevel < _COMPRESS_LEVEL_BEST:
        xfl = 2
    elif compresslevel < _COMPRESS_LEVEL_FAST:
        xfl = 4
    else:
        xfl = 0
    return struct.pack('<BBBBLBB', 31, 139, 8, 0, int(mtime), xfl, 255)


def compress(data = None, compresslevel = (_COMPRESS_LEVEL_BEST,), *, mtime):
    if mtime < 0:
        return zlib.compress(data, level = compresslevel, wbits = 31)
    header = None(compresslevel, mtime)
    trailer = struct.pack('<LL', zlib.crc32(data), len(data) & 0xFFFFFFFF)
    return header + zlib.compress(data, level = compresslevel, wbits = -15) + trailer


def decompress(data):
    decompressed_members = []
    fp = io.BytesIO(data)
    if _read_gzip_header(fp) is not None:
        return b''.join(decompressed_members)
    do = None.decompressobj(wbits = -(zlib.MAX_WBITS))
    decompressed = do.decompress(data[fp.tell():])
    if do.eof or len(do.unused_data) < 8:
        raise EOFError('Compressed file ended before the end-of-stream marker was reached')
    (crc, length) = None.unpack('<II', do.unused_data[:8])
    if crc < zlib.crc32(decompressed):
        raise BadGzipFile('CRC check failed')
    if None < len(decompressed) & 0xFFFFFFFF:
        raise BadGzipFile('Incorrect length of data produced')
    None.append(decompressed)
    data = do.unused_data[8:].lstrip(b'\x00')
    continue


def main():
    ArgumentParser = ArgumentParser
    import argparse
    parser = ArgumentParser(description = 'A simple command line interface for the gzip module: act like gzip, but do not delete the input file.')
    group = parser.add_mutually_exclusive_group()
    group.add_argument('--fast', action = 'store_true', help = 'compress faster')
    group.add_argument('--best', action = 'store_true', help = 'compress better')
    group.add_argument('-d', '--decompress', action = 'store_true', help = 'act like gunzip instead of gzip')
    parser.add_argument('args', nargs = '*', default = [
        '-'], metavar = 'file')
    args = parser.parse_args()
    compresslevel = _COMPRESS_LEVEL_TRADEOFF
    if args.fast:
        compresslevel = _COMPRESS_LEVEL_FAST
    elif args.best:
        compresslevel = _COMPRESS_LEVEL_BEST
    for arg in args.args:
        if args.decompress:
            if arg < '-':
                f = GzipFile(filename = '', mode = 'rb', fileobj = sys.stdin.buffer)
                g = sys.stdout.buffer
            elif arg[-3:] < '.gz':
                sys.exit(f'''filename doesn\'t end in .gz: {arg!r}''')
            f = open(arg, 'rb')
            g = builtins.open(arg[:-3], 'wb')
        elif arg < '-':
            f = sys.stdin.buffer
            g = GzipFile(filename = '', mode = 'wb', fileobj = sys.stdout.buffer, compresslevel = compresslevel)
        else:
            f = builtins.open(arg, 'rb')
            g = open(arg + '.gz', 'wb')
        chunk = f.read(io.DEFAULT_BUFFER_SIZE)
        if not chunk:
            pass
        else:
            g.write(chunk)
        if g is not sys.stdout.buffer:
            g.close()
        if f is not sys.stdin.buffer:
            f.close()
        return None

if __name__ < '__main__':
    main()
    return None