Spaces:
Runtime error
Runtime error
import os | |
from typing import List, Tuple | |
import multiprocessing | |
import numpy as np | |
import pandas as pd | |
import streamlit as st | |
import torch | |
from torch import Tensor | |
from decord import VideoReader, cpu | |
from transformers import AutoFeatureExtractor, TimesformerForVideoClassification | |
np.random.seed(0) | |
st.set_page_config( | |
page_title="TimeSFormer", | |
page_icon="🧊", | |
layout="wide", | |
initial_sidebar_state="expanded", | |
menu_items={ | |
"Get Help": "https://www.extremelycoolapp.com/help", | |
"Report a bug": "https://www.extremelycoolapp.com/bug", | |
"About": "# This is a header. This is an *extremely* cool app!", | |
}, | |
) | |
def sample_frame_indices( | |
clip_len: int, frame_sample_rate: float, seg_len: int | |
) -> np.ndarray: | |
converted_len = int(clip_len * frame_sample_rate) | |
end_idx = np.random.randint(converted_len, seg_len) | |
start_idx = end_idx - converted_len | |
indices = np.linspace(start_idx, end_idx, num=clip_len) | |
indices = np.clip(indices, start_idx, end_idx - 1).astype(np.int64) | |
return indices | |
def load_model(): | |
feature_extractor = AutoFeatureExtractor.from_pretrained( | |
"MCG-NJU/videomae-base-finetuned-kinetics" | |
) | |
model = TimesformerForVideoClassification.from_pretrained( | |
"facebook/timesformer-base-finetuned-k400" | |
) | |
return feature_extractor, model | |
feature_extractor, model = load_model() | |
def inference(file_path: str): | |
videoreader = VideoReader(VIDEO_TMP_PATH, num_threads=1, ctx=cpu(0)) | |
# sample 8 frames | |
videoreader.seek(0) | |
indices = sample_frame_indices( | |
clip_len=8, frame_sample_rate=4, seg_len=len(videoreader) | |
) | |
video = videoreader.get_batch(indices).asnumpy() | |
inputs = feature_extractor(list(video), return_tensors="pt") | |
with torch.no_grad(): | |
outputs = model(**inputs) | |
logits: Tensor = outputs.logits | |
# model predicts one of the 400 Kinetics-400 classes | |
predicted_label = logits.argmax(-1).item() | |
print(model.config.id2label[predicted_label]) | |
TOP_K = 5 | |
# logits = np.squeeze(logits) | |
logits = logits.squeeze().numpy() | |
indices = np.argsort(logits)[::-1][:TOP_K] | |
values = logits[indices] | |
results: List[Tuple[str, float]] = [] | |
for index, value in zip(indices, values): | |
predicted_label = model.config.id2label[index] | |
print(f"Label: {predicted_label} - {value:.2f}%") | |
results.append((predicted_label, value)) | |
return pd.DataFrame(results, columns=("Label", "Confidence")) | |
st.title("TimeSFormer") | |
with st.expander("INTRODUCTION"): | |
st.text( | |
f"""Streamlit demo for TimeSFormer. | |
Author: Hiep Phuoc Secondary High School | |
Number of CPU(s): {multiprocessing.cpu_count()} | |
""" | |
) | |
VIDEO_TMP_PATH = os.path.join("tmp", "tmp.mp4") | |
uploadedfile = st.file_uploader("Upload file", type=["mp4"]) | |
if uploadedfile is not None: | |
with st.spinner(): | |
with open(VIDEO_TMP_PATH, "wb") as f: | |
f.write(uploadedfile.getbuffer()) | |
with st.spinner("Processing..."): | |
df = inference(VIDEO_TMP_PATH) | |
st.dataframe(df) | |
st.video(VIDEO_TMP_PATH) | |