Implement Recovery Mode: Use yt-dlp to recover missing metadata for unindexed files
This commit is contained in:
parent
180e0632e5
commit
07a7dd3c07
4 changed files with 291 additions and 7 deletions
166
ta_symlink.py
166
ta_symlink.py
|
|
@ -6,6 +6,7 @@ import sys
|
|||
import threading
|
||||
import time
|
||||
import ipaddress
|
||||
import shutil
|
||||
from functools import wraps
|
||||
from flask import Flask, jsonify, render_template, request, abort, Response
|
||||
|
||||
|
|
@ -19,6 +20,7 @@ UI_USERNAME = os.getenv("UI_USERNAME", "admin")
|
|||
UI_PASSWORD = os.getenv("UI_PASSWORD", "password")
|
||||
SOURCE_DIR = Path("/app/source")
|
||||
TARGET_DIR = Path("/app/target")
|
||||
IMPORT_DIR = Path("/app/import")
|
||||
HEADERS = {"Authorization": f"Token {API_TOKEN}"}
|
||||
|
||||
app = Flask(__name__)
|
||||
|
|
@ -439,6 +441,148 @@ def check_orphaned_links():
|
|||
log(f"✅ Check complete. Scanned {total_checked} files, found {len(orphaned)} orphaned symlinks.")
|
||||
return orphaned
|
||||
|
||||
def extract_id_from_filename(filename):
|
||||
"""
|
||||
Extracts YouTube ID from filename.
|
||||
Expects format: 'Title [VIDEO_ID].ext' or just '[VIDEO_ID].ext'
|
||||
"""
|
||||
# Regex for [VIDEO_ID] at end of stem
|
||||
match = re.search(r'\[([a-zA-Z0-9_-]{11})\]$', Path(filename).stem)
|
||||
if match:
|
||||
return match.group(1)
|
||||
|
||||
# Fallback: maybe the whole filename is the ID?
|
||||
if re.match(r'^[a-zA-Z0-9_-]{11}$', Path(filename).stem):
|
||||
return Path(filename).stem
|
||||
|
||||
return None
|
||||
|
||||
def scan_for_unindexed_videos():
|
||||
"""
|
||||
Scans SOURCE_DIR for files that are NOT in the TubeArchivist database/metadata.
|
||||
Returns a list of candidate files for recovery.
|
||||
"""
|
||||
log("🔍 Scanning for unindexed files...")
|
||||
|
||||
# 1. Fetch current known IDs
|
||||
video_map = fetch_all_metadata()
|
||||
known_ids = set(video_map.keys())
|
||||
|
||||
unindexed = []
|
||||
|
||||
if not SOURCE_DIR.exists():
|
||||
return []
|
||||
|
||||
for channel_path in SOURCE_DIR.iterdir():
|
||||
if not channel_path.is_dir():
|
||||
continue
|
||||
|
||||
for video_file in channel_path.glob("*.*"):
|
||||
# Skip non-video files broadly (adjust extensions if needed)
|
||||
if video_file.suffix.lower() not in ['.mp4', '.mkv', '.webm', '.mov']:
|
||||
continue
|
||||
|
||||
# Try to identify
|
||||
vid_id = extract_id_from_filename(video_file.name)
|
||||
|
||||
# If we found an ID and it's NOT in known_ids
|
||||
if vid_id and vid_id not in known_ids:
|
||||
unindexed.append({
|
||||
"path": str(video_file),
|
||||
"filename": video_file.name,
|
||||
"video_id": vid_id,
|
||||
"channel_folder": channel_path.name,
|
||||
"size_mb": round(video_file.stat().st_size / (1024 * 1024), 2)
|
||||
})
|
||||
elif not vid_id:
|
||||
# File without ID? Maybe worth listing too
|
||||
pass
|
||||
|
||||
log(f"✅ Found {len(unindexed)} unindexed video files.")
|
||||
return unindexed
|
||||
|
||||
def recover_video_metadata(filepath):
|
||||
"""
|
||||
Uses yt-dlp to fetch metadata for a video file and prepares it for import.
|
||||
"""
|
||||
import subprocess
|
||||
import shutil
|
||||
import json
|
||||
|
||||
src_path = Path(filepath)
|
||||
if not src_path.exists():
|
||||
return False, "File not found"
|
||||
|
||||
vid_id = extract_id_from_filename(src_path.name)
|
||||
if not vid_id:
|
||||
return False, "Could not extract Video ID from filename"
|
||||
|
||||
# Ensure import dir exists
|
||||
IMPORT_DIR.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Target paths
|
||||
dest_video = IMPORT_DIR / src_path.name
|
||||
dest_json = IMPORT_DIR / f"{src_path.stem}.info.json"
|
||||
|
||||
log(f"🚑 Recovering: {vid_id} ...")
|
||||
|
||||
# 1. Fetch Metadata using yt-dlp
|
||||
cmd = [
|
||||
"yt-dlp",
|
||||
"--write-info-json",
|
||||
"--skip-download",
|
||||
"--id",
|
||||
f"https://www.youtube.com/watch?v={vid_id}",
|
||||
"-o", f"{IMPORT_DIR}/{src_path.stem}"
|
||||
]
|
||||
|
||||
try:
|
||||
result = subprocess.run(cmd, capture_output=True, text=True)
|
||||
|
||||
if result.returncode != 0:
|
||||
log(f" ⚠️ yt-dlp failed (Video likely deleted). Generating offline metadata...")
|
||||
# START OFFLINE GENERATION
|
||||
# Create a minimal .info.json manually
|
||||
offline_meta = {
|
||||
"id": vid_id,
|
||||
"title": src_path.stem.replace(f" [{vid_id}]", ""),
|
||||
"description": "Recovered by TA-Organizerr (Offline Mode)",
|
||||
"uploader": src_path.parent.name, # Guess channel from folder name
|
||||
"channel_id": "UC_UNKNOWN", # We can't know this without online check
|
||||
"upload_date": "20000101", # Unknown
|
||||
"thumbnail": "", # No thumbnail
|
||||
"webpage_url": f"https://www.youtube.com/watch?v={vid_id}",
|
||||
}
|
||||
with open(dest_json, 'w') as f:
|
||||
json.dump(offline_meta, f, indent=4)
|
||||
log(" ✅ Generated offline metadata.")
|
||||
else:
|
||||
log(" ✅ Fetched online metadata.")
|
||||
|
||||
# 2. Copy/Symlink Video File
|
||||
try:
|
||||
# We hardlink if possible to save space/time, otherwise copy
|
||||
if dest_video.exists():
|
||||
dest_video.unlink()
|
||||
|
||||
# Try symlink first? No, TA import consumes files. Copying is safer or hardlink.
|
||||
# Let's try hardlink (link)
|
||||
try:
|
||||
os.link(src_path, dest_video)
|
||||
log(" 🔗 Hardlinked video file.")
|
||||
except OSError:
|
||||
shutil.copy2(src_path, dest_video)
|
||||
log(" 📂 Copied video file.")
|
||||
|
||||
except Exception as e:
|
||||
return False, f"Failed to move video: {e}"
|
||||
|
||||
return True, "Ready for import"
|
||||
|
||||
except Exception as e:
|
||||
log(f" ❌ Recovery failed: {e}")
|
||||
return False, str(e)
|
||||
|
||||
# Main logic
|
||||
|
||||
def process_videos():
|
||||
|
|
@ -711,6 +855,28 @@ def api_transcode_logs():
|
|||
"next_index": len(transcode_log_buffer)
|
||||
})
|
||||
|
||||
@app.route("/api/recovery/scan", methods=["POST"])
|
||||
@requires_auth
|
||||
def api_recovery_scan():
|
||||
files = scan_for_unindexed_videos()
|
||||
return jsonify({"files": files, "count": len(files)})
|
||||
|
||||
@app.route("/api/recovery/start", methods=["POST"])
|
||||
@requires_auth
|
||||
def api_recovery_start():
|
||||
data = request.get_json()
|
||||
filepath = data.get('filepath')
|
||||
|
||||
if not filepath:
|
||||
return jsonify({"error": "No filepath provided"}), 400
|
||||
|
||||
def run_recovery():
|
||||
success, msg = recover_video_metadata(filepath)
|
||||
log(f"Recovery Result for {filepath}: {msg}")
|
||||
|
||||
threading.Thread(target=run_recovery).start()
|
||||
return jsonify({"message": "Recovery started", "status": "started"})
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Start scheduler in background thread
|
||||
thread = threading.Thread(target=scheduler, daemon=True)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue