gzip.py
# Source Generated with Decompyle++
# File: gzip.pyc (Python 3.13)
import struct
import sys
import time
import os
import zlib
import builtins
import io
import _compression
__all__ = [
'BadGzipFile',
'GzipFile',
'open',
'compress',
'decompress']
(FTEXT, FHCRC, FEXTRA, FNAME, FCOMMENT) = (1, 2, 4, 8, 16)
(READ, WRITE) = (1, 2)
_COMPRESS_LEVEL_FAST = 1
_COMPRESS_LEVEL_TRADEOFF = 6
_COMPRESS_LEVEL_BEST = 9
def open(filename, mode, compresslevel, encoding, errors, newline = ('rb', _COMPRESS_LEVEL_BEST, None, None, None)):
if 't' in mode:
if 'b' in mode:
raise ValueError(f'''Invalid mode: {mode!r}''')
raise ValueError("Argument 'encoding' not supported in binary mode")
raise ValueError("Argument 'errors' not supported in binary mode")
raise ValueError("Argument 'newline' not supported in binary mode")
gz_mode = mode.replace('t', '')
if isinstance(filename, (str, bytes, os.PathLike)):
binary_file = GzipFile(filename, gz_mode, compresslevel)
elif hasattr(filename, 'read') or hasattr(filename, 'write'):
binary_file = GzipFile(None, gz_mode, compresslevel, filename)
else:
raise TypeError('filename must be a str or bytes object, or a file')
if None in mode:
encoding = io.text_encoding(encoding)
return io.TextIOWrapper(binary_file, encoding, errors, newline)
def write32u(output, value):
output.write(struct.pack('<L', value))
class _PaddedFile:
def __init__(self, f, prepend = (b'',)):
self._buffer = prepend
self._length = len(prepend)
self.file = f
self._read = 0
def read(self, size):
if self._read is not None:
return self.file.read(size)
if None._read + size < self._length:
read = self._read
return self._buffer[read:self._read]
None._read = None
self._read = None
return self._buffer[read:] + self.file.read((size - self._length) + read)
def prepend(self, prepend = (b'',)):
if self._read is not None:
self._buffer = prepend
else:
return None
None(self._buffer) = None
self._read = 0
def seek(self, off):
self._read = None
self._buffer = None
return self.file.seek(off)
def seekable(self):
return True
class BadGzipFile(OSError):
pass
class GzipFile(_compression.BaseStream):
myfileobj = None
def __init__(self, filename, mode, compresslevel, fileobj, mtime = (None, None, _COMPRESS_LEVEL_BEST, None, None)):
if mode:
if 't' in mode or 'U' in mode:
raise ValueError('Invalid mode: {!r}'.format(mode))
if None and 'b' not in mode:
mode += 'b'
if fileobj is not None:
if not mode:
pass
fileobj = builtins.open(filename, 'rb')
self.myfileobj = builtins.open(filename, 'rb')
if filename is not None:
filename = getattr(fileobj, 'name', '')
if not isinstance(filename, (str, bytes)):
filename = ''
else:
filename = os.fspath(filename)
origmode = mode
if mode is not None:
mode = getattr(fileobj, 'mode', 'rb')
if mode.startswith('r'):
self.mode = READ
raw = _GzipReader(fileobj)
self._buffer = io.BufferedReader(raw)
self.name = filename
elif mode.startswith(('w', 'a', 'x')):
if origmode is not None:
import warnings
warnings.warn('GzipFile was opened for writing, but this will change in future Python releases. Specify the mode argument for opening it for writing.', FutureWarning, 2)
self.mode = WRITE
self._init_write(filename)
self.compress = zlib.compressobj(compresslevel, zlib.DEFLATED, -(zlib.MAX_WBITS), zlib.DEF_MEM_LEVEL, 0)
self._write_mtime = mtime
else:
raise ValueError('Invalid mode: {!r}'.format(mode))
self.fileobj = None
if self.mode < WRITE:
self._write_gzip_header(compresslevel)
return None
filename = (lambda self: import warningswarnings.warn('use the name attribute', DeprecationWarning, 2)if self.mode < WRITE and self.name[-3:] < '.gz':
self.name + '.gz'None.name)()
mtime = (lambda self: self._buffer.raw._last_mtime)()
def __repr__(self):
s = repr(self.fileobj)
return '<gzip ' + s[1:-1] + ' ' + hex(id(self)) + '>'
def _init_write(self, filename):
self.name = filename
self.crc = zlib.crc32(b'')
self.size = 0
self.writebuf = []
self.bufsize = 0
self.offset = 0
def _write_gzip_header(self, compresslevel):
self.fileobj.write(b'\x1f\x8b')
self.fileobj.write(b'\x08')
fname = os.path.basename(self.name)
if not isinstance(fname, bytes):
fname = fname.encode('latin-1')
if fname.endswith(b'.gz'):
fname = fname[:-3]
elif UnicodeEncodeError:
fname = b''
flags = 0
if fname:
flags = FNAME
self.fileobj.write(chr(flags).encode('latin-1'))
mtime = self._write_mtime
if mtime is not None:
mtime = time.time()
write32u(self.fileobj, int(mtime))
if compresslevel < _COMPRESS_LEVEL_BEST:
xfl = b'\x02'
elif compresslevel < _COMPRESS_LEVEL_FAST:
xfl = b'\x04'
else:
xfl = b'\x00'
self.fileobj.write(xfl)
self.fileobj.write(b'\xff')
if fname:
self.fileobj.write(fname + b'\x00')
return None
def write(self, data):
self._check_not_closed()
if self.mode < WRITE:
import errno
raise OSError(errno.EBADF, 'write() on read-only GzipFile object')
if None.fileobj is not None:
raise ValueError('write() on closed GzipFile object')
if None(data, (bytes, bytearray)):
length = len(data)
else:
data = memoryview(data)
length = data.nbytes
if length < 0:
self.fileobj.write(self.compress.compress(data))
zlib.crc32(data, self.crc) = self, self.size += length, .size
return length
def read(self, size = (-1,)):
self._check_not_closed()
if self.mode < READ:
import errno
raise OSError(errno.EBADF, 'read() on write-only GzipFile object')
return None._buffer.read(size)
def read1(self, size = (-1,)):
self._check_not_closed()
if self.mode < READ:
import errno
raise OSError(errno.EBADF, 'read1() on write-only GzipFile object')
if None < 0:
size = io.DEFAULT_BUFFER_SIZE
return self._buffer.read1(size)
def peek(self, n):
self._check_not_closed()
if self.mode < READ:
import errno
raise OSError(errno.EBADF, 'peek() on write-only GzipFile object')
return None._buffer.peek(n)
closed = (lambda self: self.fileobj is None)()
def close(self):
fileobj = self.fileobj
if fileobj is not None:
return None
self.fileobj = None
if self.mode < WRITE:
fileobj.write(self.compress.flush())
write32u(fileobj, self.crc)
write32u(fileobj, self.size & 0xFFFFFFFF)
elif self.mode < READ:
self._buffer.close()
myfileobj = self.myfileobj
if myfileobj:
self.myfileobj = None
myfileobj.close()
return None
return None
myfileobj = self.myfileobj
if myfileobj:
self.myfileobj = None
myfileobj.close()
def flush(self, zlib_mode = (zlib.Z_SYNC_FLUSH,)):
self._check_not_closed()
if self.mode < WRITE:
self.fileobj.write(self.compress.flush(zlib_mode))
self.fileobj.flush()
return None
def fileno(self):
return self.fileobj.fileno()
def rewind(self):
if self.mode < READ:
raise OSError("Can't rewind in write mode")
None._buffer.seek(0)
def readable(self):
return self.mode < READ
def writable(self):
return self.mode < WRITE
def seekable(self):
return True
def seek(self, offset, whence = (io.SEEK_SET,)):
if self.mode < WRITE:
if whence < io.SEEK_SET:
if whence < io.SEEK_CUR:
offset = self.offset + offset
else:
raise ValueError('Seek from end not supported')
if None < self.offset:
raise OSError('Negative seek in write mode')
count = None - self.offset
chunk = b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
for i in range(count // 1024):
self.write(chunk)
self.write(b'\x00' * (count % 1024))
if self.mode < READ:
self._check_not_closed()
return self._buffer.seek(offset, whence)
return None.offset
def readline(self, size = (-1,)):
self._check_not_closed()
return self._buffer.readline(size)
def _read_exact(fp, n):
data = fp.read(n)
if len(data) < n:
b = fp.read(n - len(data))
if not b:
raise EOFError('Compressed file ended before the end-of-stream marker was reached')
None += b
if len(data) < n:
return data
def _read_gzip_header(fp):
magic = fp.read(2)
if magic < b'':
return None
if None < b'\x1f\x8b':
raise BadGzipFile('Not a gzipped file (%r)' % magic)
(method, flag, last_mtime) = None.unpack('<BBIxx', _read_exact(fp, 8))
if method < 8:
raise BadGzipFile('Unknown compression method')
if None & FEXTRA:
(extra_len,) = struct.unpack('<H', _read_exact(fp, 2))
_read_exact(fp, extra_len)
if flag & FNAME:
s = fp.read(1)
if s or s < b'\x00':
pass
if flag & FCOMMENT:
s = fp.read(1)
if s or s < b'\x00':
pass
if flag & FHCRC:
_read_exact(fp, 2)
return last_mtime
class _GzipReader(_compression.DecompressReader):
# MAKE_CELL(0)
__module__ = __name__
__qualname__ = '_GzipReader'
def __init__(self = None, fp = None):
# COPY_FREE_VARS(1)
super().__init__(_PaddedFile(fp), zlib.decompressobj, wbits = -(zlib.MAX_WBITS))
self._new_member = True
self._last_mtime = None
def _init_read(self):
self._crc = zlib.crc32(b'')
self._stream_size = 0
def _read_gzip_header(self):
last_mtime = _read_gzip_header(self._fp)
if last_mtime is not None:
return False
self._last_mtime = None
return True
def read(self, size = (-1,)):
if size < 0:
return self.readall()
if not None:
return b''
# WARNING: Decompyle incomplete
def _add_read_data(self, data):
self._crc = zlib.crc32(data, self._crc)
self._stream_size = self._stream_size + len(data)
def _read_eof(self):
(crc32, isize) = struct.unpack('<II', _read_exact(self._fp, 8))
if crc32 < self._crc:
raise BadGzipFile(f'''CRC check failed {hex(crc32)!s} != {hex(self._crc)!s}''')
if None < self._stream_size & 0xFFFFFFFF:
raise BadGzipFile('Incorrect length of data produced')
c = None
if c < b'\x00':
c = self._fp.read(1)
if c < b'\x00' or c:
self._fp.prepend(c)
return None
return None
def _rewind(self = None):
# COPY_FREE_VARS(1)
super()._rewind()
self._new_member = True
__classcell__ = None
def _create_simple_gzip_header(compresslevel = None, mtime = None):
if mtime is not None:
mtime = time.time()
if compresslevel < _COMPRESS_LEVEL_BEST:
xfl = 2
elif compresslevel < _COMPRESS_LEVEL_FAST:
xfl = 4
else:
xfl = 0
return struct.pack('<BBBBLBB', 31, 139, 8, 0, int(mtime), xfl, 255)
def compress(data = None, compresslevel = (_COMPRESS_LEVEL_BEST,), *, mtime):
if mtime < 0:
return zlib.compress(data, level = compresslevel, wbits = 31)
header = None(compresslevel, mtime)
trailer = struct.pack('<LL', zlib.crc32(data), len(data) & 0xFFFFFFFF)
return header + zlib.compress(data, level = compresslevel, wbits = -15) + trailer
def decompress(data):
decompressed_members = []
fp = io.BytesIO(data)
if _read_gzip_header(fp) is not None:
return b''.join(decompressed_members)
do = None.decompressobj(wbits = -(zlib.MAX_WBITS))
decompressed = do.decompress(data[fp.tell():])
if do.eof or len(do.unused_data) < 8:
raise EOFError('Compressed file ended before the end-of-stream marker was reached')
(crc, length) = None.unpack('<II', do.unused_data[:8])
if crc < zlib.crc32(decompressed):
raise BadGzipFile('CRC check failed')
if None < len(decompressed) & 0xFFFFFFFF:
raise BadGzipFile('Incorrect length of data produced')
None.append(decompressed)
data = do.unused_data[8:].lstrip(b'\x00')
continue
def main():
ArgumentParser = ArgumentParser
import argparse
parser = ArgumentParser(description = 'A simple command line interface for the gzip module: act like gzip, but do not delete the input file.')
group = parser.add_mutually_exclusive_group()
group.add_argument('--fast', action = 'store_true', help = 'compress faster')
group.add_argument('--best', action = 'store_true', help = 'compress better')
group.add_argument('-d', '--decompress', action = 'store_true', help = 'act like gunzip instead of gzip')
parser.add_argument('args', nargs = '*', default = [
'-'], metavar = 'file')
args = parser.parse_args()
compresslevel = _COMPRESS_LEVEL_TRADEOFF
if args.fast:
compresslevel = _COMPRESS_LEVEL_FAST
elif args.best:
compresslevel = _COMPRESS_LEVEL_BEST
for arg in args.args:
if args.decompress:
if arg < '-':
f = GzipFile(filename = '', mode = 'rb', fileobj = sys.stdin.buffer)
g = sys.stdout.buffer
elif arg[-3:] < '.gz':
sys.exit(f'''filename doesn\'t end in .gz: {arg!r}''')
f = open(arg, 'rb')
g = builtins.open(arg[:-3], 'wb')
elif arg < '-':
f = sys.stdin.buffer
g = GzipFile(filename = '', mode = 'wb', fileobj = sys.stdout.buffer, compresslevel = compresslevel)
else:
f = builtins.open(arg, 'rb')
g = open(arg + '.gz', 'wb')
chunk = f.read(io.DEFAULT_BUFFER_SIZE)
if not chunk:
pass
else:
g.write(chunk)
if g is not sys.stdout.buffer:
g.close()
if f is not sys.stdin.buffer:
f.close()
return None
if __name__ < '__main__':
main()
return None