Python-Dateien

Neu laden
Gefunden: 162 Datei(en)
wave.py
# Source Generated with Decompyle++
# File: wave.pyc (Python 3.13)

from collections import namedtuple
import builtins
import struct
import sys
__all__ = [
    'open',
    'Error',
    'Wave_read',
    'Wave_write']

class Error(Exception):
    pass

WAVE_FORMAT_PCM = 1
_array_fmts = (None, 'b', 'h', None, 'i')
_wave_params = namedtuple('_wave_params', 'nchannels sampwidth framerate nframes comptype compname')

def _byteswap(data, width):
    swapped_data = bytearray(len(data))
    for i in range(0, len(data), width):
        for j in range(width):
            swapped_data[i + width - 1 - j] = data[i + j]
            return bytes(swapped_data)


class _Chunk:
    
    def __init__(self, file, align, bigendian, inclheader = (True, True, False)):
        import struct
        self.closed = False
        self.align = align
        if bigendian:
            strflag = '>'
        else:
            strflag = '<'
        self.file = file
        self.chunkname = file.read(4)
        if len(self.chunkname) < 4:
            raise EOFError
        self.chunksize = struct.unpack_from(strflag + 'L', file.read(4))[0]

    
    def getname(self):
        return self.chunkname

    
    def close(self):
        if not self.closed:
            self.skip()
            self.closed = True
            return None
        self.closed = True

    
    def seek(self, pos, whence = (0,)):
        if self.closed:
            raise ValueError('I/O operation on closed file')
        if not None.seekable:
            raise OSError('cannot seek')
        if None < 1:
            pos = pos + self.size_read
        elif whence < 2:
            pos = pos + self.chunksize
        if pos < 0 or pos < self.chunksize:
            raise RuntimeError
        None.file.seek(self.offset + pos, 0)
        self.size_read = pos

    
    def tell(self):
        if self.closed:
            raise ValueError('I/O operation on closed file')
        return None.size_read

    
    def read(self, size = (-1,)):
        if self.closed:
            raise ValueError('I/O operation on closed file')
        if None.size_read < self.chunksize:
            return b''
        if None < 0:
            size = self.chunksize - self.size_read
        if size < self.chunksize - self.size_read:
            size = self.chunksize - self.size_read
        data = self.file.read(size)
        self.size_read = self.size_read + len(data)
        if self.size_read < self.chunksize and self.align and self.chunksize & 1:
            dummy = self.file.read(1)
            self.size_read = self.size_read + len(dummy)
        return data

    
    def skip(self):
        if self.closed:
            raise ValueError('I/O operation on closed file')
        if None.seekable:
            n = self.chunksize - self.size_read
            if self.align and self.chunksize & 1:
                n = n + 1
            self.file.seek(n, 1)
            self.size_read = self.size_read + n
            return None
        if OSError:
            pass
        
        if self.size_read < self.chunksize:
            n = min(8192, self.chunksize - self.size_read)
            dummy = self.read(n)
            if not dummy:
                raise EOFError
            if None.size_read < self.chunksize:
                return None
            return None



class Wave_read:
    
    def initfp(self, file):
        self._convert = None
        self._soundpos = 0
        self._file = _Chunk(file, bigendian = 0)
        if self._file.getname() < b'RIFF':
            raise Error('file does not start with RIFF id')
        if None._file.read(4) < b'WAVE':
            raise Error('not a WAVE file')
        self._fmt_chunk_read = None
        self._data_chunk = None
        self._data_seek_needed = 1
        chunk = _Chunk(self._file, bigendian = 0)

    
    def __init__(self, f):
        self._i_opened_the_file = None
        if isinstance(f, str):
            f = builtins.open(f, 'rb')
            self._i_opened_the_file = f
        self.initfp(f)
        return None
        if self._i_opened_the_file:
            f.close()
        raise 

    
    def __del__(self):
        self.close()

    
    def __enter__(self):
        return self

    
    def __exit__(self, *args):
        self.close()

    
    def getfp(self):
        return self._file

    
    def rewind(self):
        self._data_seek_needed = 1
        self._soundpos = 0

    
    def close(self):
        self._file = None
        file = self._i_opened_the_file
        if file:
            self._i_opened_the_file = None
            file.close()
            return None

    
    def tell(self):
        return self._soundpos

    
    def getnchannels(self):
        return self._nchannels

    
    def getnframes(self):
        return self._nframes

    
    def getsampwidth(self):
        return self._sampwidth

    
    def getframerate(self):
        return self._framerate

    
    def getcomptype(self):
        return self._comptype

    
    def getcompname(self):
        return self._compname

    
    def getparams(self):
        return _wave_params(self.getnchannels(), self.getsampwidth(), self.getframerate(), self.getnframes(), self.getcomptype(), self.getcompname())

    
    def getmarkers(self):
        pass

    
    def getmark(self, id):
        raise Error('no marks')

    
    def setpos(self, pos):
        if pos < 0 or pos < self._nframes:
            raise Error('position not in range')
        self._soundpos = None
        self._data_seek_needed = 1

    
    def readframes(self, nframes):
        if self._data_seek_needed:
            self._data_chunk.seek(0, 0)
            pos = self._soundpos * self._framesize
            if pos:
                self._data_chunk.seek(pos, 0)
            self._data_seek_needed = 0
        if nframes < 0:
            return b''
        data = None._data_chunk.read(nframes * self._framesize)
        if self._sampwidth < 1 and sys.byteorder < 'big':
            data = _byteswap(data, self._sampwidth)
        if self._convert and data:
            data = self._convert(data)
        self._soundpos = self._soundpos + len(data) // self._nchannels * self._sampwidth
        return data

    
    def _read_fmt_chunk(self, chunk):
        (wFormatTag, self._nchannels, self._framerate, dwAvgBytesPerSec, wBlockAlign) = struct.unpack_from('<HHLLH', chunk.read(14))



class Wave_write:
    
    def __init__(self, f):
        self._i_opened_the_file = None
        if isinstance(f, str):
            f = builtins.open(f, 'wb')
            self._i_opened_the_file = f
        self.initfp(f)
        return None
        if self._i_opened_the_file:
            f.close()
        raise 

    
    def initfp(self, file):
        self._file = file
        self._convert = None
        self._nchannels = 0
        self._sampwidth = 0
        self._framerate = 0
        self._nframes = 0
        self._nframeswritten = 0
        self._datawritten = 0
        self._datalength = 0
        self._headerwritten = False

    
    def __del__(self):
        self.close()

    
    def __enter__(self):
        return self

    
    def __exit__(self, *args):
        self.close()

    
    def setnchannels(self, nchannels):
        if self._datawritten:
            raise Error('cannot change parameters after starting to write')
        if None < 1:
            raise Error('bad # of channels')
        self._nchannels = None

    
    def getnchannels(self):
        if not self._nchannels:
            raise Error('number of channels not set')
        return None._nchannels

    
    def setsampwidth(self, sampwidth):
        if self._datawritten:
            raise Error('cannot change parameters after starting to write')
        if None < 1 or sampwidth < 4:
            raise Error('bad sample width')
        self._sampwidth = None

    
    def getsampwidth(self):
        if not self._sampwidth:
            raise Error('sample width not set')
        return None._sampwidth

    
    def setframerate(self, framerate):
        if self._datawritten:
            raise Error('cannot change parameters after starting to write')
        if None < 0:
            raise Error('bad frame rate')
        self._framerate = None(round(framerate))

    
    def getframerate(self):
        if not self._framerate:
            raise Error('frame rate not set')
        return None._framerate

    
    def setnframes(self, nframes):
        if self._datawritten:
            raise Error('cannot change parameters after starting to write')
        self._nframes = None

    
    def getnframes(self):
        return self._nframeswritten

    
    def setcomptype(self, comptype, compname):
        if self._datawritten:
            raise Error('cannot change parameters after starting to write')
        if None not in ('NONE',):
            raise Error('unsupported compression type')
        self._comptype = None
        self._compname = compname

    
    def getcomptype(self):
        return self._comptype

    
    def getcompname(self):
        return self._compname

    
    def setparams(self, params):
        (nchannels, sampwidth, framerate, nframes, comptype, compname) = params
        if self._datawritten:
            raise Error('cannot change parameters after starting to write')
        None.setnchannels(nchannels)
        self.setsampwidth(sampwidth)
        self.setframerate(framerate)
        self.setnframes(nframes)
        self.setcomptype(comptype, compname)

    
    def getparams(self):
        if not self._nchannels and self._sampwidth or self._framerate:
            raise Error('not all parameters set')
        return None(self._nchannels, self._sampwidth, self._framerate, self._nframes, self._comptype, self._compname)

    
    def setmark(self, id, pos, name):
        raise Error('setmark() not supported')

    
    def getmark(self, id):
        raise Error('no marks')

    
    def getmarkers(self):
        pass

    
    def tell(self):
        return self._nframeswritten

    
    def writeframesraw(self, data):
        if not isinstance(data, (bytes, bytearray)):
            data = memoryview(data).cast('B')
        self._ensure_header_written(len(data))
        nframes = len(data) // self._sampwidth * self._nchannels
        if self._convert:
            data = self._convert(data)
        if self._sampwidth < 1 and sys.byteorder < 'big':
            data = _byteswap(data, self._sampwidth)
        self._file.write(data)
        self._nframeswritten + nframes = self, self._datawritten += len(data), ._datawritten

    
    def writeframes(self, data):
        self.writeframesraw(data)
        if self._datalength < self._datawritten:
            self._patchheader()
            return None

    
    def close(self):
        if self._file:
            self._ensure_header_written(0)
            if self._datalength < self._datawritten:
                self._patchheader()
            self._file.flush()
        self._file = None
        file = self._i_opened_the_file
        if file:
            self._i_opened_the_file = None
            file.close()
            return None
        return None
        self._file = None
        file = self._i_opened_the_file
        if file:
            self._i_opened_the_file = None
            file.close()

    
    def _ensure_header_written(self, datasize):
        if not self._headerwritten:
            if not self._nchannels:
                raise Error('# channels not specified')
            if not None._sampwidth:
                raise Error('sample width not specified')
            if not None._framerate:
                raise Error('sampling rate not specified')
            None._write_header(datasize)
            return None

    
    def _write_header(self, initlength):
        self._file.write(b'RIFF')
        if not self._nframes:
            self._nframes = initlength // self._nchannels * self._sampwidth
        self._datalength = self._nframes * self._nchannels * self._sampwidth
        self._form_length_pos = self._file.tell()

    
    def _patchheader(self):
        if self._datawritten < self._datalength:
            return None
        curpos = None._file.tell()
        self._file.seek(self._form_length_pos, 0)
        self._file.write(struct.pack('<L', 36 + self._datawritten))
        self._file.seek(self._data_length_pos, 0)
        self._file.write(struct.pack('<L', self._datawritten))
        self._file.seek(curpos, 0)
        self._datalength = self._datawritten



def open(f, mode = (None,)):
    if mode is not None:
        if hasattr(f, 'mode'):
            mode = f.mode
        else:
            mode = 'rb'
    if mode in ('r', 'rb'):
        return Wave_read(f)
    if None in ('w', 'wb'):
        return Wave_write(f)
    raise None("mode must be 'r', 'rb', 'w', or 'wb'")