Spaces:
Running
on
Zero
Running
on
Zero
import os | |
from .whisper import load_model | |
import numpy as np | |
import time | |
import sys | |
sys.path.append("..") | |
class Audio2Feature(): | |
def __init__(self, | |
whisper_model_type="tiny", | |
model_path="./models/whisper/tiny.pt", | |
device="cuda"): | |
self.whisper_model_type = whisper_model_type | |
self.model = load_model(model_path, device=device) # | |
def get_sliced_feature(self, | |
feature_array, | |
vid_idx, | |
audio_feat_length=[2,2], | |
fps=25): | |
""" | |
Get sliced features based on a given index | |
:param feature_array: | |
:param start_idx: the start index of the feature | |
:param audio_feat_length: | |
:return: | |
""" | |
length = len(feature_array) | |
selected_feature = [] | |
selected_idx = [] | |
center_idx = int(vid_idx*50/fps) | |
left_idx = center_idx-audio_feat_length[0]*2 | |
right_idx = center_idx + (audio_feat_length[1]+1)*2 | |
for idx in range(left_idx,right_idx): | |
idx = max(0, idx) | |
idx = min(length-1, idx) | |
x = feature_array[idx] | |
selected_feature.append(x) | |
selected_idx.append(idx) | |
selected_feature = np.concatenate(selected_feature, axis=0) | |
# print("before reshape:", selected_feature.shape) | |
selected_feature = selected_feature.reshape(-1, 384)# 50*384 | |
return selected_feature,selected_idx | |
def get_sliced_feature_sparse(self,feature_array, vid_idx, audio_feat_length= [2,2],fps = 25): | |
""" | |
Get sliced features based on a given index | |
:param feature_array: | |
:param start_idx: the start index of the feature | |
:param audio_feat_length: | |
:return: | |
""" | |
length = len(feature_array) | |
selected_feature = [] | |
selected_idx = [] | |
for dt in range(-audio_feat_length[0],audio_feat_length[1]+1): | |
left_idx = int((vid_idx+dt)*50/fps) | |
if left_idx<1 or left_idx>length-1: | |
left_idx = max(0, left_idx) | |
left_idx = min(length-1, left_idx) | |
x = feature_array[left_idx] | |
x = x[np.newaxis,:,:] | |
x = np.repeat(x, 2, axis=0) | |
selected_feature.append(x) | |
selected_idx.append(left_idx) | |
selected_idx.append(left_idx) | |
else: | |
x = feature_array[left_idx-1:left_idx+1] | |
selected_feature.append(x) | |
selected_idx.append(left_idx-1) | |
selected_idx.append(left_idx) | |
selected_feature = np.concatenate(selected_feature, axis=0) | |
selected_feature = selected_feature.reshape(-1, 384)# 50*384 | |
return selected_feature,selected_idx | |
def feature2chunks(self,feature_array,fps,audio_feat_length = [2,2]): | |
whisper_chunks = [] | |
whisper_idx_multiplier = 50./fps | |
i = 0 | |
print(f"video in {fps} FPS, audio idx in 50FPS") | |
while 1: | |
start_idx = int(i * whisper_idx_multiplier) | |
selected_feature,selected_idx = self.get_sliced_feature(feature_array= feature_array,vid_idx = i,audio_feat_length=audio_feat_length,fps=fps) | |
#print(f"i:{i},selected_idx {selected_idx}") | |
whisper_chunks.append(selected_feature) | |
i += 1 | |
if start_idx>len(feature_array): | |
break | |
return np.array(whisper_chunks) | |
def audio2feat(self,audio_path): | |
# get the sample rate of the audio | |
result = self.model.transcribe(audio_path) | |
embed_list = [] | |
for emb in result['segments']: | |
encoder_embeddings = emb['encoder_embeddings'] | |
encoder_embeddings = encoder_embeddings.transpose(0,2,1,3) | |
encoder_embeddings = encoder_embeddings.squeeze(0) | |
start_idx = int(emb['start']) | |
end_idx = int(emb['end']) | |
emb_end_idx = int((end_idx - start_idx)/2) | |
embed_list.append(encoder_embeddings[:emb_end_idx]) | |
concatenated_array = np.concatenate(embed_list, axis=0) | |
return concatenated_array | |
def load_audio_model(model_path, device): | |
audio_processor = Audio2Feature(model_path=model_path, device=device) | |
return audio_processor | |
if __name__ == "__main__": | |
audio_processor = Audio2Feature(model_path="../../models/whisper/whisper_tiny.pt") | |
audio_path = "./test.mp3" | |
array = audio_processor.audio2feat(audio_path) | |
print(array.shape) | |
fps = 25 | |
whisper_idx_multiplier = 50./fps | |
i = 0 | |
print(f"video in {fps} FPS, audio idx in 50FPS") | |
while 1: | |
start_idx = int(i * whisper_idx_multiplier) | |
selected_feature,selected_idx = audio_processor.get_sliced_feature(feature_array= array,vid_idx = i,audio_feat_length=[2,2],fps=fps) | |
print(f"video idx {i},\t audio idx {selected_idx},\t shape {selected_feature.shape}") | |
i += 1 | |
if start_idx>len(array): | |
break | |