199 lines
5.9 KiB
Python
Executable File
199 lines
5.9 KiB
Python
Executable File
"""Stille- und Szenen-Erkennung sowie Schnitt-Logik."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import re
|
|
import subprocess
|
|
import tempfile
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
|
|
|
|
@dataclass
|
|
class TimeRange:
|
|
start: float
|
|
end: float
|
|
|
|
@property
|
|
def duration(self) -> float:
|
|
return self.end - self.start
|
|
|
|
|
|
def _run(cmd: list[str]) -> subprocess.CompletedProcess:
|
|
return subprocess.run(cmd, capture_output=True, text=True, check=False)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Stille-Erkennung
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def detect_silence(
|
|
input_path: Path,
|
|
threshold_db: float = -40,
|
|
min_duration: float = 0.5,
|
|
) -> list[TimeRange]:
|
|
"""Stille-Abschnitte via ffmpeg silencedetect erkennen."""
|
|
cmd = [
|
|
"ffmpeg", "-i", str(input_path),
|
|
"-af", f"silencedetect=n={threshold_db}dB:d={min_duration}",
|
|
"-f", "null", "-",
|
|
]
|
|
result = _run(cmd)
|
|
output = result.stderr
|
|
|
|
starts = [float(m) for m in re.findall(r"silence_start: ([\d.]+)", output)]
|
|
ends = [float(m) for m in re.findall(r"silence_end: ([\d.]+)", output)]
|
|
|
|
return [TimeRange(s, e) for s, e in zip(starts, ends)]
|
|
|
|
|
|
def _get_duration(input_path: Path) -> float:
|
|
"""Video-Länge in Sekunden ermitteln."""
|
|
cmd = [
|
|
"ffprobe", "-v", "error",
|
|
"-show_entries", "format=duration",
|
|
"-of", "default=noprint_wrappers=1:nokey=1",
|
|
str(input_path),
|
|
]
|
|
result = _run(cmd)
|
|
return float(result.stdout.strip())
|
|
|
|
|
|
def invert_ranges(silence_ranges: list[TimeRange], total_duration: float) -> list[TimeRange]:
|
|
"""Stille-Abschnitte umkehren → Abschnitte mit Ton."""
|
|
speech: list[TimeRange] = []
|
|
cursor = 0.0
|
|
for silence in sorted(silence_ranges, key=lambda r: r.start):
|
|
if silence.start > cursor:
|
|
speech.append(TimeRange(cursor, silence.start))
|
|
cursor = silence.end
|
|
if cursor < total_duration:
|
|
speech.append(TimeRange(cursor, total_duration))
|
|
return speech
|
|
|
|
|
|
def remove_silence(
|
|
input_path: Path,
|
|
output_path: Path,
|
|
threshold_db: float = -40,
|
|
min_duration: float = 0.5,
|
|
) -> Path:
|
|
"""Stille aus Video entfernen und Ergebnis speichern."""
|
|
silence = detect_silence(input_path, threshold_db, min_duration)
|
|
total = _get_duration(input_path)
|
|
segments = invert_ranges(silence, total)
|
|
|
|
if not segments:
|
|
raise RuntimeError("Keine Ton-Abschnitte gefunden — Stille-Schwelle zu hoch?")
|
|
|
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
with tempfile.TemporaryDirectory() as tmp_dir:
|
|
tmp = Path(tmp_dir)
|
|
clip_files: list[Path] = []
|
|
|
|
for i, seg in enumerate(segments):
|
|
clip = tmp / f"seg_{i:04d}.mp4"
|
|
cmd = [
|
|
"ffmpeg", "-y",
|
|
"-ss", str(seg.start),
|
|
"-to", str(seg.end),
|
|
"-i", str(input_path),
|
|
"-c", "copy",
|
|
str(clip),
|
|
]
|
|
result = _run(cmd)
|
|
if result.returncode != 0:
|
|
raise RuntimeError(f"ffmpeg Fehler beim Ausschneiden: {result.stderr}")
|
|
clip_files.append(clip)
|
|
|
|
_concat_clips(clip_files, output_path)
|
|
|
|
return output_path
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Szenen-Erkennung
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def detect_scenes(input_path: Path, threshold: float = 27.0) -> list[TimeRange]:
|
|
"""Szenen-Grenzen via PySceneDetect erkennen."""
|
|
try:
|
|
from scenedetect import VideoManager, SceneManager
|
|
from scenedetect.detectors import ContentDetector
|
|
except ImportError:
|
|
raise ImportError("PySceneDetect nicht installiert: pip install scenedetect[opencv]")
|
|
|
|
video_manager = VideoManager([str(input_path)])
|
|
scene_manager = SceneManager()
|
|
scene_manager.add_detector(ContentDetector(threshold=threshold))
|
|
|
|
video_manager.set_downscale_factor()
|
|
video_manager.start()
|
|
scene_manager.detect_scenes(frame_source=video_manager)
|
|
scene_list = scene_manager.get_scene_list()
|
|
video_manager.release()
|
|
|
|
return [
|
|
TimeRange(
|
|
start.get_seconds(),
|
|
end.get_seconds(),
|
|
)
|
|
for start, end in scene_list
|
|
]
|
|
|
|
|
|
def split_scenes(
|
|
input_path: Path,
|
|
output_folder: Path,
|
|
threshold: float = 27.0,
|
|
) -> list[Path]:
|
|
"""Video an Szenen-Grenzen aufteilen."""
|
|
scenes = detect_scenes(input_path, threshold)
|
|
output_folder.mkdir(parents=True, exist_ok=True)
|
|
clips: list[Path] = []
|
|
|
|
for i, scene in enumerate(scenes):
|
|
out = output_folder / f"scene_{i:04d}.mp4"
|
|
cmd = [
|
|
"ffmpeg", "-y",
|
|
"-ss", str(scene.start),
|
|
"-to", str(scene.end),
|
|
"-i", str(input_path),
|
|
"-c", "copy",
|
|
str(out),
|
|
]
|
|
result = _run(cmd)
|
|
if result.returncode != 0:
|
|
raise RuntimeError(f"ffmpeg Fehler: {result.stderr}")
|
|
clips.append(out)
|
|
|
|
return clips
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Hilfsfunktion concat
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _concat_clips(clips: list[Path], output: Path) -> None:
|
|
"""Clips via ffmpeg concat demuxer zusammenfügen."""
|
|
output.parent.mkdir(parents=True, exist_ok=True)
|
|
with tempfile.NamedTemporaryFile(
|
|
mode="w", suffix=".txt", delete=False, encoding="utf-8"
|
|
) as fh:
|
|
list_file = Path(fh.name)
|
|
for clip in clips:
|
|
fh.write(f"file '{clip.resolve()}'\n")
|
|
|
|
cmd = [
|
|
"ffmpeg", "-y",
|
|
"-f", "concat", "-safe", "0",
|
|
"-i", str(list_file),
|
|
"-c", "copy",
|
|
str(output),
|
|
]
|
|
result = _run(cmd)
|
|
list_file.unlink(missing_ok=True)
|
|
if result.returncode != 0:
|
|
raise RuntimeError(f"ffmpeg concat Fehler: {result.stderr}")
|