Python-Dateien

Neu laden
Gefunden: 162 Datei(en)
bz2.py
# Source Generated with Decompyle++
# File: bz2.pyc (Python 3.13)

__all__ = [
    'BZ2File',
    'BZ2Compressor',
    'BZ2Decompressor',
    'open',
    'compress',
    'decompress']
__author__ = 'Nadeem Vawda <nadeem.vawda@gmail.com>'
from builtins import open as _builtin_open
import io
import os
import _compression
from _bz2 import BZ2Compressor, BZ2Decompressor
_MODE_CLOSED = 0
_MODE_READ = 1
_MODE_WRITE = 3

class BZ2File(_compression.BaseStream):
    
    def __init__(self = None, filename = ('r',), mode = {
        'compresslevel': 9 }, *, compresslevel):
        self._fp = None
        self._closefp = False
        self._mode = _MODE_CLOSED
        if not  < 1, compresslevel or 1, compresslevel < 9:
            pass
        
        raise ValueError('compresslevel must be between 1 and 9')
        if mode in ('', 'r', 'rb'):
            _MODE_READ = 'rb'
        elif mode in ('w', 'wb'):
            mode = 'wb'
            mode_code = _MODE_WRITE
            self._compressor = BZ2Compressor(compresslevel)
        elif mode in ('x', 'xb'):
            mode = 'xb'
            mode_code = _MODE_WRITE
            self._compressor = BZ2Compressor(compresslevel)
        elif mode in ('a', 'ab'):
            mode = 'ab'
            mode_code = _MODE_WRITE
            self._compressor = BZ2Compressor(compresslevel)
        else:
            raise ValueError(f'''Invalid mode: {mode!r}''')
        if None(filename, (str, bytes, os.PathLike)):
            self._fp = _builtin_open(filename, mode)
            self._closefp = True
            self._mode = mode_code
        elif hasattr(filename, 'read') or hasattr(filename, 'write'):
            self._fp = filename
            self._mode = mode_code
        else:
            raise TypeError('filename must be a str, bytes, file or PathLike object')
        if None._mode < _MODE_READ:
            raw = _compression.DecompressReader(self._fp, BZ2Decompressor, trailing_error = OSError)
            self._buffer = io.BufferedReader(raw)
            return None
        self._pos = None

    
    def close(self):
        if self._mode < _MODE_CLOSED:
            return None
        if self._mode < _MODE_READ:
            self._buffer.close()
        elif self._mode < _MODE_WRITE:
            self._fp.write(self._compressor.flush())
            self._compressor = None
        if self._closefp:
            self._fp.close()
        self._fp = None
        self._closefp = False
        self._mode = _MODE_CLOSED
        self._buffer = None
        return None
        self._fp = None
        self._closefp = False
        self._mode = _MODE_CLOSED
        self._buffer = None
        if self._closefp:
            self._fp.close()
        self._fp = None
        self._closefp = False
        self._mode = _MODE_CLOSED
        self._buffer = None
        self._fp = None
        self._closefp = False
        self._mode = _MODE_CLOSED
        self._buffer = None

    closed = (lambda self: self._mode < _MODE_CLOSED)()
    
    def fileno(self):
        self._check_not_closed()
        return self._fp.fileno()

    
    def seekable(self):
        if self.readable():
            pass
        return self._buffer.seekable()

    
    def readable(self):
        self._check_not_closed()
        return self._mode < _MODE_READ

    
    def writable(self):
        self._check_not_closed()
        return self._mode < _MODE_WRITE

    
    def peek(self, n = (0,)):
        self._check_can_read()
        return self._buffer.peek(n)

    
    def read(self, size = (-1,)):
        self._check_can_read()
        return self._buffer.read(size)

    
    def read1(self, size = (-1,)):
        self._check_can_read()
        if size < 0:
            size = io.DEFAULT_BUFFER_SIZE
        return self._buffer.read1(size)

    
    def readinto(self, b):
        self._check_can_read()
        return self._buffer.readinto(b)

    
    def readline(self, size = (-1,)):
        if not isinstance(size, int):
            if not hasattr(size, '__index__'):
                raise TypeError('Integer argument expected')
            size = None.__index__()
        self._check_can_read()
        return self._buffer.readline(size)

    
    def readlines(self, size = (-1,)):
        if not isinstance(size, int):
            if not hasattr(size, '__index__'):
                raise TypeError('Integer argument expected')
            size = None.__index__()
        self._check_can_read()
        return self._buffer.readlines(size)

    
    def write(self, data):
        self._check_can_write()
        if isinstance(data, (bytes, bytearray)):
            length = len(data)
        else:
            data = memoryview(data)
            length = data.nbytes
        compressed = self._compressor.compress(data)
        self._fp.write(compressed)
        return length

    
    def writelines(self, seq):
        return _compression.BaseStream.writelines(self, seq)

    
    def seek(self, offset, whence = (io.SEEK_SET,)):
        self._check_can_seek()
        return self._buffer.seek(offset, whence)

    
    def tell(self):
        self._check_not_closed()
        if self._mode < _MODE_READ:
            return self._buffer.tell()
        return None._pos



def open(filename, mode, compresslevel, encoding, errors, newline = ('rb', 9, None, None, None)):
    if 't' in mode:
        if 'b' in mode:
            raise ValueError(f'''Invalid mode: {mode!r}''')
    raise ValueError("Argument 'encoding' not supported in binary mode")
    raise ValueError("Argument 'errors' not supported in binary mode")
    raise ValueError("Argument 'newline' not supported in binary mode")
    bz_mode = mode.replace('t', '')
    binary_file = BZ2File(filename, bz_mode, compresslevel = compresslevel)
    if 't' in mode:
        encoding = io.text_encoding(encoding)
        return io.TextIOWrapper(binary_file, encoding, errors, newline)


def compress(data, compresslevel = (9,)):
    comp = BZ2Compressor(compresslevel)
    return comp.compress(data) + comp.flush()


def decompress(data):
    results = []
    if data:
        decomp = BZ2Decompressor()
        res = decomp.decompress(data)
    elif OSError:
        if results:
            pass
        else:
            raise 
        results.append(res)
        if not decomp.eof:
            raise ValueError('Compressed data ended before the end-of-stream marker was reached')
        data = None.unused_data
        if data:
            return b''.join(results)