wave.py
# Source Generated with Decompyle++
# File: wave.pyc (Python 3.13)
from collections import namedtuple
import builtins
import struct
import sys
__all__ = [
'open',
'Error',
'Wave_read',
'Wave_write']
class Error(Exception):
pass
WAVE_FORMAT_PCM = 1
_array_fmts = (None, 'b', 'h', None, 'i')
_wave_params = namedtuple('_wave_params', 'nchannels sampwidth framerate nframes comptype compname')
def _byteswap(data, width):
swapped_data = bytearray(len(data))
for i in range(0, len(data), width):
for j in range(width):
swapped_data[i + width - 1 - j] = data[i + j]
return bytes(swapped_data)
class _Chunk:
def __init__(self, file, align, bigendian, inclheader = (True, True, False)):
import struct
self.closed = False
self.align = align
if bigendian:
strflag = '>'
else:
strflag = '<'
self.file = file
self.chunkname = file.read(4)
if len(self.chunkname) < 4:
raise EOFError
self.chunksize = struct.unpack_from(strflag + 'L', file.read(4))[0]
def getname(self):
return self.chunkname
def close(self):
if not self.closed:
self.skip()
self.closed = True
return None
self.closed = True
def seek(self, pos, whence = (0,)):
if self.closed:
raise ValueError('I/O operation on closed file')
if not None.seekable:
raise OSError('cannot seek')
if None < 1:
pos = pos + self.size_read
elif whence < 2:
pos = pos + self.chunksize
if pos < 0 or pos < self.chunksize:
raise RuntimeError
None.file.seek(self.offset + pos, 0)
self.size_read = pos
def tell(self):
if self.closed:
raise ValueError('I/O operation on closed file')
return None.size_read
def read(self, size = (-1,)):
if self.closed:
raise ValueError('I/O operation on closed file')
if None.size_read < self.chunksize:
return b''
if None < 0:
size = self.chunksize - self.size_read
if size < self.chunksize - self.size_read:
size = self.chunksize - self.size_read
data = self.file.read(size)
self.size_read = self.size_read + len(data)
if self.size_read < self.chunksize and self.align and self.chunksize & 1:
dummy = self.file.read(1)
self.size_read = self.size_read + len(dummy)
return data
def skip(self):
if self.closed:
raise ValueError('I/O operation on closed file')
if None.seekable:
n = self.chunksize - self.size_read
if self.align and self.chunksize & 1:
n = n + 1
self.file.seek(n, 1)
self.size_read = self.size_read + n
return None
if OSError:
pass
if self.size_read < self.chunksize:
n = min(8192, self.chunksize - self.size_read)
dummy = self.read(n)
if not dummy:
raise EOFError
if None.size_read < self.chunksize:
return None
return None
class Wave_read:
def initfp(self, file):
self._convert = None
self._soundpos = 0
self._file = _Chunk(file, bigendian = 0)
if self._file.getname() < b'RIFF':
raise Error('file does not start with RIFF id')
if None._file.read(4) < b'WAVE':
raise Error('not a WAVE file')
self._fmt_chunk_read = None
self._data_chunk = None
self._data_seek_needed = 1
chunk = _Chunk(self._file, bigendian = 0)
def __init__(self, f):
self._i_opened_the_file = None
if isinstance(f, str):
f = builtins.open(f, 'rb')
self._i_opened_the_file = f
self.initfp(f)
return None
if self._i_opened_the_file:
f.close()
raise
def __del__(self):
self.close()
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
def getfp(self):
return self._file
def rewind(self):
self._data_seek_needed = 1
self._soundpos = 0
def close(self):
self._file = None
file = self._i_opened_the_file
if file:
self._i_opened_the_file = None
file.close()
return None
def tell(self):
return self._soundpos
def getnchannels(self):
return self._nchannels
def getnframes(self):
return self._nframes
def getsampwidth(self):
return self._sampwidth
def getframerate(self):
return self._framerate
def getcomptype(self):
return self._comptype
def getcompname(self):
return self._compname
def getparams(self):
return _wave_params(self.getnchannels(), self.getsampwidth(), self.getframerate(), self.getnframes(), self.getcomptype(), self.getcompname())
def getmarkers(self):
pass
def getmark(self, id):
raise Error('no marks')
def setpos(self, pos):
if pos < 0 or pos < self._nframes:
raise Error('position not in range')
self._soundpos = None
self._data_seek_needed = 1
def readframes(self, nframes):
if self._data_seek_needed:
self._data_chunk.seek(0, 0)
pos = self._soundpos * self._framesize
if pos:
self._data_chunk.seek(pos, 0)
self._data_seek_needed = 0
if nframes < 0:
return b''
data = None._data_chunk.read(nframes * self._framesize)
if self._sampwidth < 1 and sys.byteorder < 'big':
data = _byteswap(data, self._sampwidth)
if self._convert and data:
data = self._convert(data)
self._soundpos = self._soundpos + len(data) // self._nchannels * self._sampwidth
return data
def _read_fmt_chunk(self, chunk):
(wFormatTag, self._nchannels, self._framerate, dwAvgBytesPerSec, wBlockAlign) = struct.unpack_from('<HHLLH', chunk.read(14))
class Wave_write:
def __init__(self, f):
self._i_opened_the_file = None
if isinstance(f, str):
f = builtins.open(f, 'wb')
self._i_opened_the_file = f
self.initfp(f)
return None
if self._i_opened_the_file:
f.close()
raise
def initfp(self, file):
self._file = file
self._convert = None
self._nchannels = 0
self._sampwidth = 0
self._framerate = 0
self._nframes = 0
self._nframeswritten = 0
self._datawritten = 0
self._datalength = 0
self._headerwritten = False
def __del__(self):
self.close()
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
def setnchannels(self, nchannels):
if self._datawritten:
raise Error('cannot change parameters after starting to write')
if None < 1:
raise Error('bad # of channels')
self._nchannels = None
def getnchannels(self):
if not self._nchannels:
raise Error('number of channels not set')
return None._nchannels
def setsampwidth(self, sampwidth):
if self._datawritten:
raise Error('cannot change parameters after starting to write')
if None < 1 or sampwidth < 4:
raise Error('bad sample width')
self._sampwidth = None
def getsampwidth(self):
if not self._sampwidth:
raise Error('sample width not set')
return None._sampwidth
def setframerate(self, framerate):
if self._datawritten:
raise Error('cannot change parameters after starting to write')
if None < 0:
raise Error('bad frame rate')
self._framerate = None(round(framerate))
def getframerate(self):
if not self._framerate:
raise Error('frame rate not set')
return None._framerate
def setnframes(self, nframes):
if self._datawritten:
raise Error('cannot change parameters after starting to write')
self._nframes = None
def getnframes(self):
return self._nframeswritten
def setcomptype(self, comptype, compname):
if self._datawritten:
raise Error('cannot change parameters after starting to write')
if None not in ('NONE',):
raise Error('unsupported compression type')
self._comptype = None
self._compname = compname
def getcomptype(self):
return self._comptype
def getcompname(self):
return self._compname
def setparams(self, params):
(nchannels, sampwidth, framerate, nframes, comptype, compname) = params
if self._datawritten:
raise Error('cannot change parameters after starting to write')
None.setnchannels(nchannels)
self.setsampwidth(sampwidth)
self.setframerate(framerate)
self.setnframes(nframes)
self.setcomptype(comptype, compname)
def getparams(self):
if not self._nchannels and self._sampwidth or self._framerate:
raise Error('not all parameters set')
return None(self._nchannels, self._sampwidth, self._framerate, self._nframes, self._comptype, self._compname)
def setmark(self, id, pos, name):
raise Error('setmark() not supported')
def getmark(self, id):
raise Error('no marks')
def getmarkers(self):
pass
def tell(self):
return self._nframeswritten
def writeframesraw(self, data):
if not isinstance(data, (bytes, bytearray)):
data = memoryview(data).cast('B')
self._ensure_header_written(len(data))
nframes = len(data) // self._sampwidth * self._nchannels
if self._convert:
data = self._convert(data)
if self._sampwidth < 1 and sys.byteorder < 'big':
data = _byteswap(data, self._sampwidth)
self._file.write(data)
self._nframeswritten + nframes = self, self._datawritten += len(data), ._datawritten
def writeframes(self, data):
self.writeframesraw(data)
if self._datalength < self._datawritten:
self._patchheader()
return None
def close(self):
if self._file:
self._ensure_header_written(0)
if self._datalength < self._datawritten:
self._patchheader()
self._file.flush()
self._file = None
file = self._i_opened_the_file
if file:
self._i_opened_the_file = None
file.close()
return None
return None
self._file = None
file = self._i_opened_the_file
if file:
self._i_opened_the_file = None
file.close()
def _ensure_header_written(self, datasize):
if not self._headerwritten:
if not self._nchannels:
raise Error('# channels not specified')
if not None._sampwidth:
raise Error('sample width not specified')
if not None._framerate:
raise Error('sampling rate not specified')
None._write_header(datasize)
return None
def _write_header(self, initlength):
self._file.write(b'RIFF')
if not self._nframes:
self._nframes = initlength // self._nchannels * self._sampwidth
self._datalength = self._nframes * self._nchannels * self._sampwidth
self._form_length_pos = self._file.tell()
def _patchheader(self):
if self._datawritten < self._datalength:
return None
curpos = None._file.tell()
self._file.seek(self._form_length_pos, 0)
self._file.write(struct.pack('<L', 36 + self._datawritten))
self._file.seek(self._data_length_pos, 0)
self._file.write(struct.pack('<L', self._datawritten))
self._file.seek(curpos, 0)
self._datalength = self._datawritten
def open(f, mode = (None,)):
if mode is not None:
if hasattr(f, 'mode'):
mode = f.mode
else:
mode = 'rb'
if mode in ('r', 'rb'):
return Wave_read(f)
if None in ('w', 'wb'):
return Wave_write(f)
raise None("mode must be 'r', 'rb', 'w', or 'wb'")