import copy import functools import hashlib import math import pathlib import tempfile import typing import warnings from collections import namedtuple from pathlib import Path import julius import numpy as np import soundfile import torch from . import util from .display import DisplayMixin from .dsp import DSPMixin from .effects import EffectMixin from .effects import ImpulseResponseMixin from .ffmpeg import FFMPEGMixin from .loudness import LoudnessMixin from .playback import PlayMixin from .whisper import WhisperMixin STFTParams = namedtuple( "STFTParams", ["window_length", "hop_length", "window_type", "match_stride", "padding_type"], ) """ STFTParams object is a container that holds STFT parameters - window_length, hop_length, and window_type. Not all parameters need to be specified. Ones that are not specified will be inferred by the AudioSignal parameters. Parameters ---------- window_length : int, optional Window length of STFT, by default ``0.032 * self.sample_rate``. hop_length : int, optional Hop length of STFT, by default ``window_length // 4``. window_type : str, optional Type of window to use, by default ``sqrt\_hann``. match_stride : bool, optional Whether to match the stride of convolutional layers, by default False padding_type : str, optional Type of padding to use, by default 'reflect' """ STFTParams.__new__.__defaults__ = (None, None, None, None, None) class AudioSignal( EffectMixin, LoudnessMixin, PlayMixin, ImpulseResponseMixin, DSPMixin, DisplayMixin, FFMPEGMixin, WhisperMixin, ): """This is the core object of this library. Audio is always loaded into an AudioSignal, which then enables all the features of this library, including audio augmentations, I/O, playback, and more. The structure of this object is that the base functionality is defined in ``core/audio_signal.py``, while extensions to that functionality are defined in the other ``core/*.py`` files. For example, all the display-based functionality (e.g. plot spectrograms, waveforms, write to tensorboard) are in ``core/display.py``. Parameters ---------- audio_path_or_array : typing.Union[torch.Tensor, str, Path, np.ndarray] Object to create AudioSignal from. Can be a tensor, numpy array, or a path to a file. The file is always reshaped to sample_rate : int, optional Sample rate of the audio. If different from underlying file, resampling is performed. If passing in an array or tensor, this must be defined, by default None stft_params : STFTParams, optional Parameters of STFT to use. , by default None offset : float, optional Offset in seconds to read from file, by default 0 duration : float, optional Duration in seconds to read from file, by default None device : str, optional Device to load audio onto, by default None Examples -------- Loading an AudioSignal from an array, at a sample rate of 44100. >>> signal = AudioSignal(torch.randn(5*44100), 44100) Note, the signal is reshaped to have a batch size, and one audio channel: >>> print(signal.shape) (1, 1, 44100) You can treat AudioSignals like tensors, and many of the same functions you might use on tensors are defined for AudioSignals as well: >>> signal.to("cuda") >>> signal.cuda() >>> signal.clone() >>> signal.detach() Indexing AudioSignals returns an AudioSignal: >>> signal[..., 3*44100:4*44100] The above signal is 1 second long, and is also an AudioSignal. """ def __init__( self, audio_path_or_array: typing.Union[torch.Tensor, str, Path, np.ndarray], sample_rate: int = None, stft_params: STFTParams = None, offset: float = 0, duration: float = None, device: str = None, ): audio_path = None audio_array = None if isinstance(audio_path_or_array, str): audio_path = audio_path_or_array elif isinstance(audio_path_or_array, pathlib.Path): audio_path = audio_path_or_array elif isinstance(audio_path_or_array, np.ndarray): audio_array = audio_path_or_array elif torch.is_tensor(audio_path_or_array): audio_array = audio_path_or_array else: raise ValueError( "audio_path_or_array must be either a Path, " "string, numpy array, or torch Tensor!" ) self.path_to_file = None self.audio_data = None self.sources = None # List of AudioSignal objects. self.stft_data = None if audio_path is not None: self.load_from_file( audio_path, offset=offset, duration=duration, device=device ) elif audio_array is not None: assert sample_rate is not None, "Must set sample rate!" self.load_from_array(audio_array, sample_rate, device=device) self.window = None self.stft_params = stft_params self.metadata = { "offset": offset, "duration": duration, } @property def path_to_input_file( self, ): """ Path to input file, if it exists. Alias to ``path_to_file`` for backwards compatibility """ return self.path_to_file @classmethod def excerpt( cls, audio_path: typing.Union[str, Path], offset: float = None, duration: float = None, state: typing.Union[np.random.RandomState, int] = None, **kwargs, ): """Randomly draw an excerpt of ``duration`` seconds from an audio file specified at ``audio_path``, between ``offset`` seconds and end of file. ``state`` can be used to seed the random draw. Parameters ---------- audio_path : typing.Union[str, Path] Path to audio file to grab excerpt from. offset : float, optional Lower bound for the start time, in seconds drawn from the file, by default None. duration : float, optional Duration of excerpt, in seconds, by default None state : typing.Union[np.random.RandomState, int], optional RandomState or seed of random state, by default None Returns ------- AudioSignal AudioSignal containing excerpt. Examples -------- >>> signal = AudioSignal.excerpt("path/to/audio", duration=5) """ info = util.info(audio_path) total_duration = info.duration state = util.random_state(state) lower_bound = 0 if offset is None else offset upper_bound = max(total_duration - duration, 0) offset = state.uniform(lower_bound, upper_bound) signal = cls(audio_path, offset=offset, duration=duration, **kwargs) signal.metadata["offset"] = offset signal.metadata["duration"] = duration return signal @classmethod def salient_excerpt( cls, audio_path: typing.Union[str, Path], loudness_cutoff: float = None, num_tries: int = 8, state: typing.Union[np.random.RandomState, int] = None, **kwargs, ): """Similar to AudioSignal.excerpt, except it extracts excerpts only if they are above a specified loudness threshold, which is computed via a fast LUFS routine. Parameters ---------- audio_path : typing.Union[str, Path] Path to audio file to grab excerpt from. loudness_cutoff : float, optional Loudness threshold in dB. Typical values are ``-40, -60``, etc, by default None num_tries : int, optional Number of tries to grab an excerpt above the threshold before giving up, by default 8. state : typing.Union[np.random.RandomState, int], optional RandomState or seed of random state, by default None kwargs : dict Keyword arguments to AudioSignal.excerpt Returns ------- AudioSignal AudioSignal containing excerpt. .. warning:: if ``num_tries`` is set to None, ``salient_excerpt`` may try forever, which can result in an infinite loop if ``audio_path`` does not have any loud enough excerpts. Examples -------- >>> signal = AudioSignal.salient_excerpt( "path/to/audio", loudness_cutoff=-40, duration=5 ) """ state = util.random_state(state) if loudness_cutoff is None: excerpt = cls.excerpt(audio_path, state=state, **kwargs) else: loudness = -np.inf num_try = 0 while loudness <= loudness_cutoff: excerpt = cls.excerpt(audio_path, state=state, **kwargs) loudness = excerpt.loudness() num_try += 1 if num_tries is not None and num_try >= num_tries: break return excerpt @classmethod def zeros( cls, duration: float, sample_rate: int, num_channels: int = 1, batch_size: int = 1, **kwargs, ): """Helper function create an AudioSignal of all zeros. Parameters ---------- duration : float Duration of AudioSignal sample_rate : int Sample rate of AudioSignal num_channels : int, optional Number of channels, by default 1 batch_size : int, optional Batch size, by default 1 Returns ------- AudioSignal AudioSignal containing all zeros. Examples -------- Generate 5 seconds of all zeros at a sample rate of 44100. >>> signal = AudioSignal.zeros(5.0, 44100) """ n_samples = int(duration * sample_rate) return cls( torch.zeros(batch_size, num_channels, n_samples), sample_rate, **kwargs ) @classmethod def wave( cls, frequency: float, duration: float, sample_rate: int, num_channels: int = 1, shape: str = "sine", **kwargs, ): """ Generate a waveform of a given frequency and shape. Parameters ---------- frequency : float Frequency of the waveform duration : float Duration of the waveform sample_rate : int Sample rate of the waveform num_channels : int, optional Number of channels, by default 1 shape : str, optional Shape of the waveform, by default "saw" One of "sawtooth", "square", "sine", "triangle" kwargs : dict Keyword arguments to AudioSignal """ n_samples = int(duration * sample_rate) t = torch.linspace(0, duration, n_samples) if shape == "sawtooth": from scipy.signal import sawtooth wave_data = sawtooth(2 * np.pi * frequency * t, 0.5) elif shape == "square": from scipy.signal import square wave_data = square(2 * np.pi * frequency * t) elif shape == "sine": wave_data = np.sin(2 * np.pi * frequency * t) elif shape == "triangle": from scipy.signal import sawtooth # frequency is doubled by the abs call, so omit the 2 in 2pi wave_data = sawtooth(np.pi * frequency * t, 0.5) wave_data = -np.abs(wave_data) * 2 + 1 else: raise ValueError(f"Invalid shape {shape}") wave_data = torch.tensor(wave_data, dtype=torch.float32) wave_data = wave_data.unsqueeze(0).unsqueeze(0).repeat(1, num_channels, 1) return cls(wave_data, sample_rate, **kwargs) @classmethod def batch( cls, audio_signals: list, pad_signals: bool = False, truncate_signals: bool = False, resample: bool = False, dim: int = 0, ): """Creates a batched AudioSignal from a list of AudioSignals. Parameters ---------- audio_signals : list[AudioSignal] List of AudioSignal objects pad_signals : bool, optional Whether to pad signals to length of the maximum length AudioSignal in the list, by default False truncate_signals : bool, optional Whether to truncate signals to length of shortest length AudioSignal in the list, by default False resample : bool, optional Whether to resample AudioSignal to the sample rate of the first AudioSignal in the list, by default False dim : int, optional Dimension along which to batch the signals. Returns ------- AudioSignal Batched AudioSignal. Raises ------ RuntimeError If not all AudioSignals are the same sample rate, and ``resample=False``, an error is raised. RuntimeError If not all AudioSignals are the same the length, and both ``pad_signals=False`` and ``truncate_signals=False``, an error is raised. Examples -------- Batching a bunch of random signals: >>> signal_list = [AudioSignal(torch.randn(44100), 44100) for _ in range(10)] >>> signal = AudioSignal.batch(signal_list) >>> print(signal.shape) (10, 1, 44100) """ signal_lengths = [x.signal_length for x in audio_signals] sample_rates = [x.sample_rate for x in audio_signals] if len(set(sample_rates)) != 1: if resample: for x in audio_signals: x.resample(sample_rates[0]) else: raise RuntimeError( f"Not all signals had the same sample rate! Got {sample_rates}. " f"All signals must have the same sample rate, or resample must be True. " ) if len(set(signal_lengths)) != 1: if pad_signals: max_length = max(signal_lengths) for x in audio_signals: pad_len = max_length - x.signal_length x.zero_pad(0, pad_len) elif truncate_signals: min_length = min(signal_lengths) for x in audio_signals: x.truncate_samples(min_length) else: raise RuntimeError( f"Not all signals had the same length! Got {signal_lengths}. " f"All signals must be the same length, or pad_signals/truncate_signals " f"must be True. " ) # Concatenate along the specified dimension (default 0) audio_data = torch.cat([x.audio_data for x in audio_signals], dim=dim) audio_paths = [x.path_to_file for x in audio_signals] batched_signal = cls( audio_data, sample_rate=audio_signals[0].sample_rate, ) batched_signal.path_to_file = audio_paths return batched_signal # I/O def load_from_file( self, audio_path: typing.Union[str, Path], offset: float, duration: float, device: str = "cpu", ): """Loads data from file. Used internally when AudioSignal is instantiated with a path to a file. Parameters ---------- audio_path : typing.Union[str, Path] Path to file offset : float Offset in seconds duration : float Duration in seconds device : str, optional Device to put AudioSignal on, by default "cpu" Returns ------- AudioSignal AudioSignal loaded from file """ import librosa data, sample_rate = librosa.load( audio_path, offset=offset, duration=duration, sr=None, mono=False, ) data = util.ensure_tensor(data) if data.shape[-1] == 0: raise RuntimeError( f"Audio file {audio_path} with offset {offset} and duration {duration} is empty!" ) if data.ndim < 2: data = data.unsqueeze(0) if data.ndim < 3: data = data.unsqueeze(0) self.audio_data = data self.original_signal_length = self.signal_length self.sample_rate = sample_rate self.path_to_file = audio_path return self.to(device) def load_from_array( self, audio_array: typing.Union[torch.Tensor, np.ndarray], sample_rate: int, device: str = "cpu", ): """Loads data from array, reshaping it to be exactly 3 dimensions. Used internally when AudioSignal is called with a tensor or an array. Parameters ---------- audio_array : typing.Union[torch.Tensor, np.ndarray] Array/tensor of audio of samples. sample_rate : int Sample rate of audio device : str, optional Device to move audio onto, by default "cpu" Returns ------- AudioSignal AudioSignal loaded from array """ audio_data = util.ensure_tensor(audio_array) if audio_data.dtype == torch.double: audio_data = audio_data.float() if audio_data.ndim < 2: audio_data = audio_data.unsqueeze(0) if audio_data.ndim < 3: audio_data = audio_data.unsqueeze(0) self.audio_data = audio_data self.original_signal_length = self.signal_length self.sample_rate = sample_rate return self.to(device) def write(self, audio_path: typing.Union[str, Path]): """Writes audio to a file. Only writes the audio that is in the very first item of the batch. To write other items in the batch, index the signal along the batch dimension before writing. After writing, the signal's ``path_to_file`` attribute is updated to the new path. Parameters ---------- audio_path : typing.Union[str, Path] Path to write audio to. Returns ------- AudioSignal Returns original AudioSignal, so you can use this in a fluent interface. Examples -------- Creating and writing a signal to disk: >>> signal = AudioSignal(torch.randn(10, 1, 44100), 44100) >>> signal.write("/tmp/out.wav") Writing a different element of the batch: >>> signal[5].write("/tmp/out.wav") Using this in a fluent interface: >>> signal.write("/tmp/original.wav").low_pass(4000).write("/tmp/lowpass.wav") """ if self.audio_data[0].abs().max() > 1: warnings.warn("Audio amplitude > 1 clipped when saving") soundfile.write(str(audio_path), self.audio_data[0].numpy().T, self.sample_rate) self.path_to_file = audio_path return self def deepcopy(self): """Copies the signal and all of its attributes. Returns ------- AudioSignal Deep copy of the audio signal. """ return copy.deepcopy(self) def copy(self): """Shallow copy of signal. Returns ------- AudioSignal Shallow copy of the audio signal. """ return copy.copy(self) def clone(self): """Clones all tensors contained in the AudioSignal, and returns a copy of the signal with everything cloned. Useful when using AudioSignal within autograd computation graphs. Relevant attributes are the stft data, the audio data, and the loudness of the file. Returns ------- AudioSignal Clone of AudioSignal. """ clone = type(self)( self.audio_data.clone(), self.sample_rate, stft_params=self.stft_params, ) if self.stft_data is not None: clone.stft_data = self.stft_data.clone() if self._loudness is not None: clone._loudness = self._loudness.clone() clone.path_to_file = copy.deepcopy(self.path_to_file) clone.metadata = copy.deepcopy(self.metadata) return clone def detach(self): """Detaches tensors contained in AudioSignal. Relevant attributes are the stft data, the audio data, and the loudness of the file. Returns ------- AudioSignal Same signal, but with all tensors detached. """ if self._loudness is not None: self._loudness = self._loudness.detach() if self.stft_data is not None: self.stft_data = self.stft_data.detach() self.audio_data = self.audio_data.detach() return self def hash(self): """Writes the audio data to a temporary file, and then hashes it using hashlib. Useful for creating a file name based on the audio content. Returns ------- str Hash of audio data. Examples -------- Creating a signal, and writing it to a unique file name: >>> signal = AudioSignal(torch.randn(44100), 44100) >>> hash = signal.hash() >>> signal.write(f"{hash}.wav") """ with tempfile.NamedTemporaryFile(suffix=".wav") as f: self.write(f.name) h = hashlib.sha256() b = bytearray(128 * 1024) mv = memoryview(b) with open(f.name, "rb", buffering=0) as f: for n in iter(lambda: f.readinto(mv), 0): h.update(mv[:n]) file_hash = h.hexdigest() return file_hash # Signal operations def to_mono(self): """Converts audio data to mono audio, by taking the mean along the channels dimension. Returns ------- AudioSignal AudioSignal with mean of channels. """ self.audio_data = self.audio_data.mean(1, keepdim=True) return self def resample(self, sample_rate: int): """Resamples the audio, using sinc interpolation. This works on both cpu and gpu, and is much faster on gpu. Parameters ---------- sample_rate : int Sample rate to resample to. Returns ------- AudioSignal Resampled AudioSignal """ if sample_rate == self.sample_rate: return self self.audio_data = julius.resample_frac( self.audio_data, self.sample_rate, sample_rate ) self.sample_rate = sample_rate return self # Tensor operations def to(self, device: str): """Moves all tensors contained in signal to the specified device. Parameters ---------- device : str Device to move AudioSignal onto. Typical values are "cuda", "cpu", or "cuda:n" to specify the nth gpu. Returns ------- AudioSignal AudioSignal with all tensors moved to specified device. """ if self._loudness is not None: self._loudness = self._loudness.to(device) if self.stft_data is not None: self.stft_data = self.stft_data.to(device) if self.audio_data is not None: self.audio_data = self.audio_data.to(device) return self def float(self): """Calls ``.float()`` on ``self.audio_data``. Returns ------- AudioSignal """ self.audio_data = self.audio_data.float() return self def cpu(self): """Moves AudioSignal to cpu. Returns ------- AudioSignal """ return self.to("cpu") def cuda(self): # pragma: no cover """Moves AudioSignal to cuda. Returns ------- AudioSignal """ return self.to("cuda") def numpy(self): """Detaches ``self.audio_data``, moves to cpu, and converts to numpy. Returns ------- np.ndarray Audio data as a numpy array. """ return self.audio_data.detach().cpu().numpy() def zero_pad(self, before: int, after: int): """Zero pads the audio_data tensor before and after. Parameters ---------- before : int How many zeros to prepend to audio. after : int How many zeros to append to audio. Returns ------- AudioSignal AudioSignal with padding applied. """ self.audio_data = torch.nn.functional.pad(self.audio_data, (before, after)) return self def zero_pad_to(self, length: int, mode: str = "after"): """Pad with zeros to a specified length, either before or after the audio data. Parameters ---------- length : int Length to pad to mode : str, optional Whether to prepend or append zeros to signal, by default "after" Returns ------- AudioSignal AudioSignal with padding applied. """ if mode == "before": self.zero_pad(max(length - self.signal_length, 0), 0) elif mode == "after": self.zero_pad(0, max(length - self.signal_length, 0)) return self def trim(self, before: int, after: int): """Trims the audio_data tensor before and after. Parameters ---------- before : int How many samples to trim from beginning. after : int How many samples to trim from end. Returns ------- AudioSignal AudioSignal with trimming applied. """ if after == 0: self.audio_data = self.audio_data[..., before:] else: self.audio_data = self.audio_data[..., before:-after] return self def truncate_samples(self, length_in_samples: int): """Truncate signal to specified length. Parameters ---------- length_in_samples : int Truncate to this many samples. Returns ------- AudioSignal AudioSignal with truncation applied. """ self.audio_data = self.audio_data[..., :length_in_samples] return self @property def device(self): """Get device that AudioSignal is on. Returns ------- torch.device Device that AudioSignal is on. """ if self.audio_data is not None: device = self.audio_data.device elif self.stft_data is not None: device = self.stft_data.device return device # Properties @property def audio_data(self): """Returns the audio data tensor in the object. Audio data is always of the shape (batch_size, num_channels, num_samples). If value has less than 3 dims (e.g. is (num_channels, num_samples)), then it will be reshaped to (1, num_channels, num_samples) - a batch size of 1. Parameters ---------- data : typing.Union[torch.Tensor, np.ndarray] Audio data to set. Returns ------- torch.Tensor Audio samples. """ return self._audio_data @audio_data.setter def audio_data(self, data: typing.Union[torch.Tensor, np.ndarray]): if data is not None: assert torch.is_tensor(data), "audio_data should be torch.Tensor" assert data.ndim == 3, "audio_data should be 3-dim (B, C, T)" self._audio_data = data # Old loudness value not guaranteed to be right, reset it. self._loudness = None return # alias for audio_data samples = audio_data @property def stft_data(self): """Returns the STFT data inside the signal. Shape is (batch, channels, frequencies, time). Returns ------- torch.Tensor Complex spectrogram data. """ return self._stft_data @stft_data.setter def stft_data(self, data: typing.Union[torch.Tensor, np.ndarray]): if data is not None: assert torch.is_tensor(data) and torch.is_complex(data) if self.stft_data is not None and self.stft_data.shape != data.shape: warnings.warn("stft_data changed shape") self._stft_data = data return @property def batch_size(self): """Batch size of audio signal. Returns ------- int Batch size of signal. """ return self.audio_data.shape[0] @property def signal_length(self): """Length of audio signal. Returns ------- int Length of signal in samples. """ return self.audio_data.shape[-1] # alias for signal_length length = signal_length @property def shape(self): """Shape of audio data. Returns ------- tuple Shape of audio data. """ return self.audio_data.shape @property def signal_duration(self): """Length of audio signal in seconds. Returns ------- float Length of signal in seconds. """ return self.signal_length / self.sample_rate # alias for signal_duration duration = signal_duration @property def num_channels(self): """Number of audio channels. Returns ------- int Number of audio channels. """ return self.audio_data.shape[1] # STFT @staticmethod @functools.lru_cache(None) def get_window(window_type: str, window_length: int, device: str): """Wrapper around scipy.signal.get_window so one can also get the popular sqrt-hann window. This function caches for efficiency using functools.lru\_cache. Parameters ---------- window_type : str Type of window to get window_length : int Length of the window device : str Device to put window onto. Returns ------- torch.Tensor Window returned by scipy.signal.get_window, as a tensor. """ from scipy import signal if window_type == "average": window = np.ones(window_length) / window_length elif window_type == "sqrt_hann": window = np.sqrt(signal.get_window("hann", window_length)) else: window = signal.get_window(window_type, window_length) window = torch.from_numpy(window).to(device).float() return window @property def stft_params(self): """Returns STFTParams object, which can be re-used to other AudioSignals. This property can be set as well. If values are not defined in STFTParams, they are inferred automatically from the signal properties. The default is to use 32ms windows, with 8ms hop length, and the square root of the hann window. Returns ------- STFTParams STFT parameters for the AudioSignal. Examples -------- >>> stft_params = STFTParams(128, 32) >>> signal1 = AudioSignal(torch.randn(44100), 44100, stft_params=stft_params) >>> signal2 = AudioSignal(torch.randn(44100), 44100, stft_params=signal1.stft_params) >>> signal1.stft_params = STFTParams() # Defaults """ return self._stft_params @stft_params.setter def stft_params(self, value: STFTParams): default_win_len = int(2 ** (np.ceil(np.log2(0.032 * self.sample_rate)))) default_hop_len = default_win_len // 4 default_win_type = "hann" default_match_stride = False default_padding_type = "reflect" default_stft_params = STFTParams( window_length=default_win_len, hop_length=default_hop_len, window_type=default_win_type, match_stride=default_match_stride, padding_type=default_padding_type, )._asdict() value = value._asdict() if value else default_stft_params for key in default_stft_params: if value[key] is None: value[key] = default_stft_params[key] self._stft_params = STFTParams(**value) self.stft_data = None def compute_stft_padding( self, window_length: int, hop_length: int, match_stride: bool ): """Compute how the STFT should be padded, based on match\_stride. Parameters ---------- window_length : int Window length of STFT. hop_length : int Hop length of STFT. match_stride : bool Whether or not to match stride, making the STFT have the same alignment as convolutional layers. Returns ------- tuple Amount to pad on either side of audio. """ length = self.signal_length if match_stride: assert ( hop_length == window_length // 4 ), "For match_stride, hop must equal n_fft // 4" right_pad = math.ceil(length / hop_length) * hop_length - length pad = (window_length - hop_length) // 2 else: right_pad = 0 pad = 0 return right_pad, pad def stft( self, window_length: int = None, hop_length: int = None, window_type: str = None, match_stride: bool = None, padding_type: str = None, ): """Computes the short-time Fourier transform of the audio data, with specified STFT parameters. Parameters ---------- window_length : int, optional Window length of STFT, by default ``0.032 * self.sample_rate``. hop_length : int, optional Hop length of STFT, by default ``window_length // 4``. window_type : str, optional Type of window to use, by default ``sqrt\_hann``. match_stride : bool, optional Whether to match the stride of convolutional layers, by default False padding_type : str, optional Type of padding to use, by default 'reflect' Returns ------- torch.Tensor STFT of audio data. Examples -------- Compute the STFT of an AudioSignal: >>> signal = AudioSignal(torch.randn(44100), 44100) >>> signal.stft() Vary the window and hop length: >>> stft_params = [STFTParams(128, 32), STFTParams(512, 128)] >>> for stft_param in stft_params: >>> signal.stft_params = stft_params >>> signal.stft() """ window_length = ( self.stft_params.window_length if window_length is None else int(window_length) ) hop_length = ( self.stft_params.hop_length if hop_length is None else int(hop_length) ) window_type = ( self.stft_params.window_type if window_type is None else window_type ) match_stride = ( self.stft_params.match_stride if match_stride is None else match_stride ) padding_type = ( self.stft_params.padding_type if padding_type is None else padding_type ) window = self.get_window(window_type, window_length, self.audio_data.device) window = window.to(self.audio_data.device) audio_data = self.audio_data right_pad, pad = self.compute_stft_padding( window_length, hop_length, match_stride ) audio_data = torch.nn.functional.pad( audio_data, (pad, pad + right_pad), padding_type ) stft_data = torch.stft( audio_data.reshape(-1, audio_data.shape[-1]), n_fft=window_length, hop_length=hop_length, window=window, return_complex=True, center=True, ) _, nf, nt = stft_data.shape stft_data = stft_data.reshape(self.batch_size, self.num_channels, nf, nt) if match_stride: # Drop first two and last two frames, which are added # because of padding. Now num_frames * hop_length = num_samples. stft_data = stft_data[..., 2:-2] self.stft_data = stft_data return stft_data def istft( self, window_length: int = None, hop_length: int = None, window_type: str = None, match_stride: bool = None, length: int = None, ): """Computes inverse STFT and sets it to audio\_data. Parameters ---------- window_length : int, optional Window length of STFT, by default ``0.032 * self.sample_rate``. hop_length : int, optional Hop length of STFT, by default ``window_length // 4``. window_type : str, optional Type of window to use, by default ``sqrt\_hann``. match_stride : bool, optional Whether to match the stride of convolutional layers, by default False length : int, optional Original length of signal, by default None Returns ------- AudioSignal AudioSignal with istft applied. Raises ------ RuntimeError Raises an error if stft was not called prior to istft on the signal, or if stft_data is not set. """ if self.stft_data is None: raise RuntimeError("Cannot do inverse STFT without self.stft_data!") window_length = ( self.stft_params.window_length if window_length is None else int(window_length) ) hop_length = ( self.stft_params.hop_length if hop_length is None else int(hop_length) ) window_type = ( self.stft_params.window_type if window_type is None else window_type ) match_stride = ( self.stft_params.match_stride if match_stride is None else match_stride ) window = self.get_window(window_type, window_length, self.stft_data.device) nb, nch, nf, nt = self.stft_data.shape stft_data = self.stft_data.reshape(nb * nch, nf, nt) right_pad, pad = self.compute_stft_padding( window_length, hop_length, match_stride ) if length is None: length = self.original_signal_length length = length + 2 * pad + right_pad if match_stride: # Zero-pad the STFT on either side, putting back the frames that were # dropped in stft(). stft_data = torch.nn.functional.pad(stft_data, (2, 2)) audio_data = torch.istft( stft_data, n_fft=window_length, hop_length=hop_length, window=window, length=length, center=True, ) audio_data = audio_data.reshape(nb, nch, -1) if match_stride: audio_data = audio_data[..., pad : -(pad + right_pad)] self.audio_data = audio_data return self @staticmethod @functools.lru_cache(None) def get_mel_filters( sr: int, n_fft: int, n_mels: int, fmin: float = 0.0, fmax: float = None ): """Create a Filterbank matrix to combine FFT bins into Mel-frequency bins. Parameters ---------- sr : int Sample rate of audio n_fft : int Number of FFT bins n_mels : int Number of mels fmin : float, optional Lowest frequency, in Hz, by default 0.0 fmax : float, optional Highest frequency, by default None Returns ------- np.ndarray [shape=(n_mels, 1 + n_fft/2)] Mel transform matrix """ from librosa.filters import mel as librosa_mel_fn return librosa_mel_fn( sr=sr, n_fft=n_fft, n_mels=n_mels, fmin=fmin, fmax=fmax, ) def mel_spectrogram( self, n_mels: int = 80, mel_fmin: float = 0.0, mel_fmax: float = None, **kwargs ): """Computes a Mel spectrogram. Parameters ---------- n_mels : int, optional Number of mels, by default 80 mel_fmin : float, optional Lowest frequency, in Hz, by default 0.0 mel_fmax : float, optional Highest frequency, by default None kwargs : dict, optional Keyword arguments to self.stft(). Returns ------- torch.Tensor [shape=(batch, channels, mels, time)] Mel spectrogram. """ stft = self.stft(**kwargs) magnitude = torch.abs(stft) nf = magnitude.shape[2] mel_basis = self.get_mel_filters( sr=self.sample_rate, n_fft=2 * (nf - 1), n_mels=n_mels, fmin=mel_fmin, fmax=mel_fmax, ) mel_basis = torch.from_numpy(mel_basis).to(self.device) mel_spectrogram = magnitude.transpose(2, -1) @ mel_basis.T mel_spectrogram = mel_spectrogram.transpose(-1, 2) return mel_spectrogram @staticmethod @functools.lru_cache(None) def get_dct(n_mfcc: int, n_mels: int, norm: str = "ortho", device: str = None): """Create a discrete cosine transform (DCT) transformation matrix with shape (``n_mels``, ``n_mfcc``), it can be normalized depending on norm. For more information about dct: http://en.wikipedia.org/wiki/Discrete_cosine_transform#DCT-II Parameters ---------- n_mfcc : int Number of mfccs n_mels : int Number of mels norm : str Use "ortho" to get a orthogonal matrix or None, by default "ortho" device : str, optional Device to load the transformation matrix on, by default None Returns ------- torch.Tensor [shape=(n_mels, n_mfcc)] T The dct transformation matrix. """ from torchaudio.functional import create_dct return create_dct(n_mfcc, n_mels, norm).to(device) def mfcc( self, n_mfcc: int = 40, n_mels: int = 80, log_offset: float = 1e-6, **kwargs ): """Computes mel-frequency cepstral coefficients (MFCCs). Parameters ---------- n_mfcc : int, optional Number of mels, by default 40 n_mels : int, optional Number of mels, by default 80 log_offset: float, optional Small value to prevent numerical issues when trying to compute log(0), by default 1e-6 kwargs : dict, optional Keyword arguments to self.mel_spectrogram(), note that some of them will be used for self.stft() Returns ------- torch.Tensor [shape=(batch, channels, mfccs, time)] MFCCs. """ mel_spectrogram = self.mel_spectrogram(n_mels, **kwargs) mel_spectrogram = torch.log(mel_spectrogram + log_offset) dct_mat = self.get_dct(n_mfcc, n_mels, "ortho", self.device) mfcc = mel_spectrogram.transpose(-1, -2) @ dct_mat mfcc = mfcc.transpose(-1, -2) return mfcc @property def magnitude(self): """Computes and returns the absolute value of the STFT, which is the magnitude. This value can also be set to some tensor. When set, ``self.stft_data`` is manipulated so that its magnitude matches what this is set to, and modulated by the phase. Returns ------- torch.Tensor Magnitude of STFT. Examples -------- >>> signal = AudioSignal(torch.randn(44100), 44100) >>> magnitude = signal.magnitude # Computes stft if not computed >>> magnitude[magnitude < magnitude.mean()] = 0 >>> signal.magnitude = magnitude >>> signal.istft() """ if self.stft_data is None: self.stft() return torch.abs(self.stft_data) @magnitude.setter def magnitude(self, value): self.stft_data = value * torch.exp(1j * self.phase) return def log_magnitude( self, ref_value: float = 1.0, amin: float = 1e-5, top_db: float = 80.0 ): """Computes the log-magnitude of the spectrogram. Parameters ---------- ref_value : float, optional The magnitude is scaled relative to ``ref``: ``20 * log10(S / ref)``. Zeros in the output correspond to positions where ``S == ref``, by default 1.0 amin : float, optional Minimum threshold for ``S`` and ``ref``, by default 1e-5 top_db : float, optional Threshold the output at ``top_db`` below the peak: ``max(10 * log10(S/ref)) - top_db``, by default -80.0 Returns ------- torch.Tensor Log-magnitude spectrogram """ magnitude = self.magnitude amin = amin**2 log_spec = 10.0 * torch.log10(magnitude.pow(2).clamp(min=amin)) log_spec -= 10.0 * np.log10(np.maximum(amin, ref_value)) if top_db is not None: log_spec = torch.maximum(log_spec, log_spec.max() - top_db) return log_spec @property def phase(self): """Computes and returns the phase of the STFT. This value can also be set to some tensor. When set, ``self.stft_data`` is manipulated so that its phase matches what this is set to, we original magnitudeith th. Returns ------- torch.Tensor Phase of STFT. Examples -------- >>> signal = AudioSignal(torch.randn(44100), 44100) >>> phase = signal.phase # Computes stft if not computed >>> phase[phase < phase.mean()] = 0 >>> signal.phase = phase >>> signal.istft() """ if self.stft_data is None: self.stft() return torch.angle(self.stft_data) @phase.setter def phase(self, value): self.stft_data = self.magnitude * torch.exp(1j * value) return # Operator overloading def __add__(self, other): new_signal = self.clone() new_signal.audio_data += util._get_value(other) return new_signal def __iadd__(self, other): self.audio_data += util._get_value(other) return self def __radd__(self, other): return self + other def __sub__(self, other): new_signal = self.clone() new_signal.audio_data -= util._get_value(other) return new_signal def __isub__(self, other): self.audio_data -= util._get_value(other) return self def __mul__(self, other): new_signal = self.clone() new_signal.audio_data *= util._get_value(other) return new_signal def __imul__(self, other): self.audio_data *= util._get_value(other) return self def __rmul__(self, other): return self * other # Representation def _info(self): dur = f"{self.signal_duration:0.3f}" if self.signal_duration else "[unknown]" info = { "duration": f"{dur} seconds", "batch_size": self.batch_size, "path": self.path_to_file if self.path_to_file else "path unknown", "sample_rate": self.sample_rate, "num_channels": self.num_channels if self.num_channels else "[unknown]", "audio_data.shape": self.audio_data.shape, "stft_params": self.stft_params, "device": self.device, } return info def markdown(self): """Produces a markdown representation of AudioSignal, in a markdown table. Returns ------- str Markdown representation of AudioSignal. Examples -------- >>> signal = AudioSignal(torch.randn(44100), 44100) >>> print(signal.markdown()) | Key | Value |---|--- | duration | 1.000 seconds | | batch_size | 1 | | path | path unknown | | sample_rate | 44100 | | num_channels | 1 | | audio_data.shape | torch.Size([1, 1, 44100]) | | stft_params | STFTParams(window_length=2048, hop_length=512, window_type='sqrt_hann', match_stride=False) | | device | cpu | """ info = self._info() FORMAT = "| Key | Value \n" "|---|--- \n" for k, v in info.items(): row = f"| {k} | {v} |\n" FORMAT += row return FORMAT def __str__(self): info = self._info() desc = "" for k, v in info.items(): desc += f"{k}: {v}\n" return desc def __rich__(self): from rich.table import Table info = self._info() table = Table(title=f"{self.__class__.__name__}") table.add_column("Key", style="green") table.add_column("Value", style="cyan") for k, v in info.items(): table.add_row(k, str(v)) return table # Comparison def __eq__(self, other): for k, v in list(self.__dict__.items()): if torch.is_tensor(v): if not torch.allclose(v, other.__dict__[k], atol=1e-6): max_error = (v - other.__dict__[k]).abs().max() print(f"Max abs error for {k}: {max_error}") return False return True # Indexing def __getitem__(self, key): if torch.is_tensor(key) and key.ndim == 0 and key.item() is True: assert self.batch_size == 1 audio_data = self.audio_data _loudness = self._loudness stft_data = self.stft_data elif isinstance(key, (bool, int, list, slice, tuple)) or ( torch.is_tensor(key) and key.ndim <= 1 ): # Indexing only on the batch dimension. # Then let's copy over relevant stuff. # Future work: make this work for time-indexing # as well, using the hop length. audio_data = self.audio_data[key] _loudness = self._loudness[key] if self._loudness is not None else None stft_data = self.stft_data[key] if self.stft_data is not None else None sources = None copy = type(self)(audio_data, self.sample_rate, stft_params=self.stft_params) copy._loudness = _loudness copy._stft_data = stft_data copy.sources = sources return copy def __setitem__(self, key, value): if not isinstance(value, type(self)): self.audio_data[key] = value return if torch.is_tensor(key) and key.ndim == 0 and key.item() is True: assert self.batch_size == 1 self.audio_data = value.audio_data self._loudness = value._loudness self.stft_data = value.stft_data return elif isinstance(key, (bool, int, list, slice, tuple)) or ( torch.is_tensor(key) and key.ndim <= 1 ): if self.audio_data is not None and value.audio_data is not None: self.audio_data[key] = value.audio_data if self._loudness is not None and value._loudness is not None: self._loudness[key] = value._loudness if self.stft_data is not None and value.stft_data is not None: self.stft_data[key] = value.stft_data return def __ne__(self, other): return not self == other