feat: implement unified media registry and tracking system for anime

This commit is contained in:
Benexl
2025-07-16 01:16:38 +03:00
parent 27b1f3f792
commit ac3c6801d7
5 changed files with 1053 additions and 10 deletions

View File

@@ -84,13 +84,14 @@ def player_controls(ctx: Context, state: State) -> State | ControlFlow:
ctx, anilist_anime.id, int(current_episode_num)
)
# Also update local watch history if enabled
# Update unified media registry with actual PlayerResult data
if config.stream.continue_from_watch_history and config.stream.preferred_watch_history == "local":
from ...utils.watch_history_tracker import update_episode_progress
from ...services.media_registry.tracker import get_media_tracker
try:
update_episode_progress(anilist_anime.id, int(current_episode_num), completion_pct)
except (ValueError, AttributeError):
pass # Skip if episode number conversion fails
tracker = get_media_tracker()
tracker.track_from_player_result(anilist_anime, int(current_episode_num), player_result)
except (ValueError, AttributeError) as e:
logger.warning(f"Failed to update media registry: {e}")
# --- Auto-Next Logic ---
available_episodes = getattr(
@@ -102,13 +103,14 @@ def player_controls(ctx: Context, state: State) -> State | ControlFlow:
console.print("[cyan]Auto-playing next episode...[/cyan]")
next_episode_num = available_episodes[current_index + 1]
# Track next episode in watch history
# Track next episode in unified media registry
if config.stream.continue_from_watch_history and config.stream.preferred_watch_history == "local" and anilist_anime:
from ...utils.watch_history_tracker import track_episode_viewing
from ...services.media_registry.tracker import get_media_tracker
try:
track_episode_viewing(anilist_anime, int(next_episode_num), start_tracking=True)
except (ValueError, AttributeError):
pass
tracker = get_media_tracker()
tracker.track_episode_start(anilist_anime, int(next_episode_num))
except (ValueError, AttributeError) as e:
logger.warning(f"Failed to track episode start: {e}")
return State(
menu_name="SERVERS",

View File

@@ -0,0 +1,26 @@
"""
Unified Media Registry for FastAnime.
This module provides a unified system for tracking both watch history and downloads
for anime, eliminating data duplication between separate systems.
"""
from .manager import MediaRegistryManager, get_media_registry
from .models import (
EpisodeStatus,
MediaRecord,
MediaRegistryIndex,
UserMediaData,
)
from .tracker import MediaTracker, get_media_tracker
__all__ = [
"MediaRegistryManager",
"get_media_registry",
"EpisodeStatus",
"MediaRecord",
"MediaRegistryIndex",
"UserMediaData",
"MediaTracker",
"get_media_tracker",
]

View File

@@ -0,0 +1,380 @@
"""
Unified Media Registry Manager.
Provides centralized management of anime metadata, downloads, and watch history
through a single interface, eliminating data duplication.
"""
from __future__ import annotations
import json
import logging
import threading
from pathlib import Path
from typing import Dict, List, Optional
from ....core.constants import APP_DATA_DIR
from ....libs.api.types import MediaItem
from .models import MediaRecord, MediaRegistryIndex, EpisodeStatus, UserMediaData
logger = logging.getLogger(__name__)
class MediaRegistryManager:
"""
Unified manager for anime data, downloads, and watch history.
Provides a single interface for all media-related operations,
eliminating duplication between download and watch systems.
"""
def __init__(self, registry_path: Path = None):
self.registry_path = registry_path or APP_DATA_DIR / "media_registry"
self.media_dir = self.registry_path / "media"
self.cache_dir = self.registry_path / "cache"
self.index_file = self.registry_path / "index.json"
# Thread safety
self._lock = threading.RLock()
# Cached data
self._index: Optional[MediaRegistryIndex] = None
self._loaded_records: Dict[int, MediaRecord] = {}
self._ensure_directories()
def _ensure_directories(self) -> None:
"""Ensure registry directories exist."""
try:
self.registry_path.mkdir(parents=True, exist_ok=True)
self.media_dir.mkdir(exist_ok=True)
self.cache_dir.mkdir(exist_ok=True)
except Exception as e:
logger.error(f"Failed to create registry directories: {e}")
def _load_index(self) -> MediaRegistryIndex:
"""Load or create the registry index."""
if self._index is not None:
return self._index
try:
if self.index_file.exists():
with open(self.index_file, 'r', encoding='utf-8') as f:
data = json.load(f)
self._index = MediaRegistryIndex.model_validate(data)
else:
self._index = MediaRegistryIndex()
self._save_index()
logger.debug(f"Loaded registry index with {self._index.media_count} entries")
return self._index
except Exception as e:
logger.error(f"Failed to load registry index: {e}")
self._index = MediaRegistryIndex()
return self._index
def _save_index(self) -> bool:
"""Save the registry index."""
try:
# Atomic write
temp_file = self.index_file.with_suffix('.tmp')
with open(temp_file, 'w', encoding='utf-8') as f:
json.dump(self._index.model_dump(), f, indent=2, ensure_ascii=False, default=str)
temp_file.replace(self.index_file)
logger.debug("Saved registry index")
return True
except Exception as e:
logger.error(f"Failed to save registry index: {e}")
return False
def _get_media_file_path(self, media_id: int) -> Path:
"""Get file path for media record."""
return self.media_dir / str(media_id) / "record.json"
def get_media_record(self, media_id: int) -> Optional[MediaRecord]:
"""Get media record by ID."""
with self._lock:
# Check cache first
if media_id in self._loaded_records:
return self._loaded_records[media_id]
try:
record_file = self._get_media_file_path(media_id)
if not record_file.exists():
return None
with open(record_file, 'r', encoding='utf-8') as f:
data = json.load(f)
record = MediaRecord.model_validate(data)
self._loaded_records[media_id] = record
logger.debug(f"Loaded media record for {media_id}")
return record
except Exception as e:
logger.error(f"Failed to load media record {media_id}: {e}")
return None
def save_media_record(self, record: MediaRecord) -> bool:
"""Save media record to storage."""
with self._lock:
try:
media_id = record.media_item.id
record_file = self._get_media_file_path(media_id)
# Ensure directory exists
record_file.parent.mkdir(parents=True, exist_ok=True)
# Atomic write
temp_file = record_file.with_suffix('.tmp')
with open(temp_file, 'w', encoding='utf-8') as f:
json.dump(record.model_dump(), f, indent=2, ensure_ascii=False, default=str)
temp_file.replace(record_file)
# Update cache and index
self._loaded_records[media_id] = record
index = self._load_index()
index.add_media_entry(record)
self._save_index()
logger.debug(f"Saved media record for {media_id}")
return True
except Exception as e:
logger.error(f"Failed to save media record: {e}")
return False
def get_or_create_record(self, media_item: MediaItem) -> MediaRecord:
"""Get existing record or create new one."""
record = self.get_media_record(media_item.id)
if record is None:
record = MediaRecord(media_item=media_item)
self.save_media_record(record)
else:
# Update media_item in case metadata changed
record.media_item = media_item
record.user_data.update_timestamp()
self.save_media_record(record)
return record
def update_download_completion(self, media_item: MediaItem, episode_number: int,
file_path: Path, file_size: int, quality: str,
checksum: Optional[str] = None) -> bool:
"""Update record when download completes."""
try:
record = self.get_or_create_record(media_item)
record.update_from_download_completion(
episode_number, file_path, file_size, quality, checksum
)
return self.save_media_record(record)
except Exception as e:
logger.error(f"Failed to update download completion: {e}")
return False
def update_from_player_result(self, media_item: MediaItem, episode_number: int,
stop_time: str, total_time: str) -> bool:
"""Update record from player feedback."""
try:
record = self.get_or_create_record(media_item)
record.update_from_player_result(episode_number, stop_time, total_time)
return self.save_media_record(record)
except Exception as e:
logger.error(f"Failed to update from player result: {e}")
return False
def mark_episode_watched(self, media_id: int, episode_number: int,
progress: float = 1.0) -> bool:
"""Mark episode as watched."""
try:
record = self.get_media_record(media_id)
if not record:
return False
episode = record.get_episode_status(episode_number)
episode.watch_status = "completed" if progress >= 0.8 else "watching"
episode.watch_progress = progress
episode.watch_date = datetime.now()
episode.watch_count += 1
record.user_data.update_timestamp()
return self.save_media_record(record)
except Exception as e:
logger.error(f"Failed to mark episode watched: {e}")
return False
def get_currently_watching(self) -> List[MediaRecord]:
"""Get anime currently being watched."""
try:
index = self._load_index()
watching_records = []
for entry in index.media_index.values():
if entry.user_status == "watching":
record = self.get_media_record(entry.media_id)
if record:
watching_records.append(record)
return watching_records
except Exception as e:
logger.error(f"Failed to get currently watching: {e}")
return []
def get_recently_watched(self, limit: int = 10) -> List[MediaRecord]:
"""Get recently watched anime."""
try:
index = self._load_index()
# Sort by last updated
sorted_entries = sorted(
index.media_index.values(),
key=lambda x: x.last_updated,
reverse=True
)
recent_records = []
for entry in sorted_entries[:limit]:
if entry.episodes_watched > 0: # Only include if actually watched
record = self.get_media_record(entry.media_id)
if record:
recent_records.append(record)
return recent_records
except Exception as e:
logger.error(f"Failed to get recently watched: {e}")
return []
def get_download_queue_candidates(self) -> List[MediaRecord]:
"""Get anime that have downloads queued or in progress."""
try:
index = self._load_index()
download_records = []
for entry in index.media_index.values():
if entry.episodes_downloaded < entry.total_episodes:
record = self.get_media_record(entry.media_id)
if record:
# Check if any episodes are queued/downloading
has_active_downloads = any(
ep.download_status in ["queued", "downloading"]
for ep in record.episodes.values()
)
if has_active_downloads:
download_records.append(record)
return download_records
except Exception as e:
logger.error(f"Failed to get download queue candidates: {e}")
return []
def get_continue_episode(self, media_id: int, available_episodes: List[str]) -> Optional[str]:
"""Get episode to continue from based on watch history."""
try:
record = self.get_media_record(media_id)
if not record:
return None
next_episode = record.next_episode_to_watch
if next_episode and str(next_episode) in available_episodes:
return str(next_episode)
return None
except Exception as e:
logger.error(f"Failed to get continue episode: {e}")
return None
def get_registry_stats(self) -> Dict:
"""Get comprehensive registry statistics."""
try:
index = self._load_index()
total_downloaded = sum(entry.episodes_downloaded for entry in index.media_index.values())
total_watched = sum(entry.episodes_watched for entry in index.media_index.values())
return {
"total_anime": index.media_count,
"status_breakdown": index.status_breakdown,
"total_episodes_downloaded": total_downloaded,
"total_episodes_watched": total_watched,
"last_updated": index.last_updated.strftime("%Y-%m-%d %H:%M:%S"),
}
except Exception as e:
logger.error(f"Failed to get registry stats: {e}")
return {}
def search_media(self, query: str) -> List[MediaRecord]:
"""Search media by title."""
try:
index = self._load_index()
query_lower = query.lower()
results = []
for entry in index.media_index.values():
if query_lower in entry.title.lower():
record = self.get_media_record(entry.media_id)
if record:
results.append(record)
return results
except Exception as e:
logger.error(f"Failed to search media: {e}")
return []
def remove_media_record(self, media_id: int) -> bool:
"""Remove media record completely."""
with self._lock:
try:
# Remove from cache
if media_id in self._loaded_records:
del self._loaded_records[media_id]
# Remove file
record_file = self._get_media_file_path(media_id)
if record_file.exists():
record_file.unlink()
# Remove directory if empty
try:
record_file.parent.rmdir()
except OSError:
pass # Directory not empty
# Update index
index = self._load_index()
if media_id in index.media_index:
del index.media_index[media_id]
index.media_count = len(index.media_index)
self._save_index()
logger.debug(f"Removed media record {media_id}")
return True
except Exception as e:
logger.error(f"Failed to remove media record {media_id}: {e}")
return False
# Global instance
_media_registry: Optional[MediaRegistryManager] = None
def get_media_registry() -> MediaRegistryManager:
"""Get or create the global media registry instance."""
global _media_registry
if _media_registry is None:
_media_registry = MediaRegistryManager()
return _media_registry

View File

@@ -0,0 +1,346 @@
"""
Unified data models for Media Registry.
Provides single source of truth for anime metadata, episode tracking,
and user data, eliminating duplication between download and watch systems.
"""
from __future__ import annotations
import logging
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Literal, Optional
from pydantic import BaseModel, ConfigDict, Field, computed_field
from ....libs.api.types import MediaItem
logger = logging.getLogger(__name__)
# Type aliases
DownloadStatus = Literal["not_downloaded", "queued", "downloading", "completed", "failed", "paused"]
WatchStatus = Literal["not_watched", "watching", "completed", "dropped", "paused"]
MediaUserStatus = Literal["planning", "watching", "completed", "dropped", "paused"]
class EpisodeStatus(BaseModel):
"""
Unified episode status tracking both download and watch state.
Single source of truth for episode-level data.
"""
model_config = ConfigDict(
str_strip_whitespace=True,
validate_assignment=True,
)
episode_number: int = Field(gt=0)
# Download tracking
download_status: DownloadStatus = "not_downloaded"
file_path: Optional[Path] = None
file_size: Optional[int] = None
download_date: Optional[datetime] = None
download_quality: Optional[str] = None
checksum: Optional[str] = None
# Watch tracking (from player feedback)
watch_status: WatchStatus = "not_watched"
watch_progress: float = Field(default=0.0, ge=0.0, le=1.0)
last_watch_position: Optional[str] = None # "HH:MM:SS" from PlayerResult
total_duration: Optional[str] = None # "HH:MM:SS" from PlayerResult
watch_date: Optional[datetime] = None
watch_count: int = Field(default=0, ge=0)
# Integration fields
auto_marked_watched: bool = Field(default=False, description="Auto-marked watched from download")
@computed_field
@property
def is_available_locally(self) -> bool:
"""Check if episode is downloaded and file exists."""
return (
self.download_status == "completed"
and self.file_path is not None
and self.file_path.exists()
)
@computed_field
@property
def completion_percentage(self) -> float:
"""Calculate actual watch completion from player data."""
if self.last_watch_position and self.total_duration:
try:
last_seconds = self._time_to_seconds(self.last_watch_position)
total_seconds = self._time_to_seconds(self.total_duration)
if total_seconds > 0:
return min(100.0, (last_seconds / total_seconds) * 100)
except (ValueError, AttributeError):
pass
return self.watch_progress * 100
@computed_field
@property
def should_auto_mark_watched(self) -> bool:
"""Check if episode should be auto-marked as watched."""
return self.completion_percentage >= 80.0 and self.watch_status != "completed"
def _time_to_seconds(self, time_str: str) -> int:
"""Convert HH:MM:SS to seconds."""
try:
parts = time_str.split(':')
if len(parts) == 3:
h, m, s = map(int, parts)
return h * 3600 + m * 60 + s
except (ValueError, AttributeError):
pass
return 0
def update_from_player_result(self, stop_time: str, total_time: str) -> None:
"""Update watch status from PlayerResult."""
self.last_watch_position = stop_time
self.total_duration = total_time
self.watch_date = datetime.now()
self.watch_count += 1
# Auto-mark as completed if 80%+ watched
if self.should_auto_mark_watched:
self.watch_status = "completed"
self.watch_progress = 1.0
class UserMediaData(BaseModel):
"""
User-specific data for a media item.
Consolidates user preferences from both download and watch systems.
"""
model_config = ConfigDict(
str_strip_whitespace=True,
validate_assignment=True,
)
# User status and preferences
status: MediaUserStatus = "planning"
notes: str = ""
tags: List[str] = Field(default_factory=list)
rating: Optional[int] = Field(None, ge=1, le=10)
favorite: bool = False
priority: int = Field(default=0, ge=0)
# Download preferences
preferred_quality: str = "1080"
auto_download_new: bool = False
download_path: Optional[Path] = None
# Watch preferences
continue_from_history: bool = True
auto_mark_watched_on_download: bool = False
# Timestamps
created_date: datetime = Field(default_factory=datetime.now)
last_updated: datetime = Field(default_factory=datetime.now)
def update_timestamp(self) -> None:
"""Update last_updated timestamp."""
self.last_updated = datetime.now()
class MediaRecord(BaseModel):
"""
Unified media record - single source of truth for anime data.
Replaces both MediaDownloadRecord and WatchHistoryEntry.
"""
model_config = ConfigDict(
str_strip_whitespace=True,
validate_assignment=True,
)
media_item: MediaItem
episodes: Dict[int, EpisodeStatus] = Field(default_factory=dict)
user_data: UserMediaData = Field(default_factory=UserMediaData)
@computed_field
@property
def display_title(self) -> str:
"""Get display title for the anime."""
return (
self.media_item.title.english
or self.media_item.title.romaji
or self.media_item.title.native
or f"Anime #{self.media_item.id}"
)
@computed_field
@property
def total_episodes_downloaded(self) -> int:
"""Count of successfully downloaded episodes."""
return len([ep for ep in self.episodes.values() if ep.is_available_locally])
@computed_field
@property
def total_episodes_watched(self) -> int:
"""Count of completed episodes."""
return len([ep for ep in self.episodes.values() if ep.watch_status == "completed"])
@computed_field
@property
def last_watched_episode(self) -> int:
"""Get highest watched episode number."""
watched_episodes = [
ep.episode_number for ep in self.episodes.values()
if ep.watch_status == "completed"
]
return max(watched_episodes) if watched_episodes else 0
@computed_field
@property
def next_episode_to_watch(self) -> Optional[int]:
"""Get next episode to watch based on progress."""
if not self.episodes:
return 1
# Find highest completed episode
last_watched = self.last_watched_episode
if last_watched == 0:
return 1
next_ep = last_watched + 1
total_eps = self.media_item.episodes or float('inf')
return next_ep if next_ep <= total_eps else None
@computed_field
@property
def download_completion_percentage(self) -> float:
"""Download completion percentage."""
if not self.media_item.episodes or self.media_item.episodes == 0:
return 0.0
return (self.total_episodes_downloaded / self.media_item.episodes) * 100
@computed_field
@property
def watch_completion_percentage(self) -> float:
"""Watch completion percentage."""
if not self.media_item.episodes or self.media_item.episodes == 0:
return 0.0
return (self.total_episodes_watched / self.media_item.episodes) * 100
def get_episode_status(self, episode_number: int) -> EpisodeStatus:
"""Get or create episode status."""
if episode_number not in self.episodes:
self.episodes[episode_number] = EpisodeStatus(episode_number=episode_number)
return self.episodes[episode_number]
def update_from_download_completion(self, episode_number: int, file_path: Path,
file_size: int, quality: str, checksum: Optional[str] = None) -> None:
"""Update episode status when download completes."""
episode = self.get_episode_status(episode_number)
episode.download_status = "completed"
episode.file_path = file_path
episode.file_size = file_size
episode.download_quality = quality
episode.checksum = checksum
episode.download_date = datetime.now()
# Auto-mark as watched if enabled
if self.user_data.auto_mark_watched_on_download and episode.watch_status == "not_watched":
episode.watch_status = "completed"
episode.watch_progress = 1.0
episode.auto_marked_watched = True
episode.watch_date = datetime.now()
self.user_data.update_timestamp()
def update_from_player_result(self, episode_number: int, stop_time: str, total_time: str) -> None:
"""Update episode status from player feedback."""
episode = self.get_episode_status(episode_number)
episode.update_from_player_result(stop_time, total_time)
self.user_data.update_timestamp()
# Update overall status based on progress
if episode.watch_status == "completed":
if self.user_data.status == "planning":
self.user_data.status = "watching"
# Check if anime is completed
if self.media_item.episodes and self.total_episodes_watched >= self.media_item.episodes:
self.user_data.status = "completed"
class MediaRegistryIndex(BaseModel):
"""
Lightweight index for fast media registry operations.
Provides quick access without loading full MediaRecord files.
"""
model_config = ConfigDict(validate_assignment=True)
version: str = Field(default="1.0")
last_updated: datetime = Field(default_factory=datetime.now)
media_count: int = Field(default=0, ge=0)
# Quick access index
media_index: Dict[int, "MediaIndexEntry"] = Field(default_factory=dict)
@computed_field
@property
def status_breakdown(self) -> Dict[str, int]:
"""Get breakdown by user status."""
breakdown = {"planning": 0, "watching": 0, "completed": 0, "dropped": 0, "paused": 0}
for entry in self.media_index.values():
breakdown[entry.user_status] = breakdown.get(entry.user_status, 0) + 1
return breakdown
def add_media_entry(self, media_record: MediaRecord) -> None:
"""Add or update media entry in index."""
entry = MediaIndexEntry(
media_id=media_record.media_item.id,
title=media_record.display_title,
user_status=media_record.user_data.status,
episodes_downloaded=media_record.total_episodes_downloaded,
episodes_watched=media_record.total_episodes_watched,
total_episodes=media_record.media_item.episodes or 0,
last_updated=media_record.user_data.last_updated,
last_watched_episode=media_record.last_watched_episode,
next_episode=media_record.next_episode_to_watch
)
self.media_index[media_record.media_item.id] = entry
self.media_count = len(self.media_index)
self.last_updated = datetime.now()
class MediaIndexEntry(BaseModel):
"""Lightweight index entry for a media item."""
model_config = ConfigDict(validate_assignment=True)
media_id: int
title: str
user_status: MediaUserStatus
episodes_downloaded: int = 0
episodes_watched: int = 0
total_episodes: int = 0
last_updated: datetime
last_watched_episode: int = 0
next_episode: Optional[int] = None
@computed_field
@property
def download_progress(self) -> float:
"""Download progress percentage."""
if self.total_episodes == 0:
return 0.0
return (self.episodes_downloaded / self.total_episodes) * 100
@computed_field
@property
def watch_progress(self) -> float:
"""Watch progress percentage."""
if self.total_episodes == 0:
return 0.0
return (self.episodes_watched / self.total_episodes) * 100

View File

@@ -0,0 +1,289 @@
"""
Unified Media Tracker for player integration and real-time updates.
Provides automatic tracking of watch progress and download completion
through a single interface.
"""
from __future__ import annotations
import logging
from typing import Optional
from ....libs.api.types import MediaItem
from ....libs.players.types import PlayerResult
from .manager import MediaRegistryManager, get_media_registry
logger = logging.getLogger(__name__)
class MediaTracker:
"""
Unified tracker for media interactions.
Handles automatic updates from player results and download completion,
providing seamless integration between watching and downloading.
"""
def __init__(self, registry_manager: MediaRegistryManager = None):
self.registry = registry_manager or get_media_registry()
def track_episode_start(self, media_item: MediaItem, episode: int) -> bool:
"""
Track when episode playback starts.
Args:
media_item: The anime being watched
episode: Episode number being started
Returns:
True if tracking was successful
"""
try:
record = self.registry.get_or_create_record(media_item)
episode_status = record.get_episode_status(episode)
# Only update to "watching" if not already completed
if episode_status.watch_status not in ["completed"]:
episode_status.watch_status = "watching"
# Update overall user status if still planning
if record.user_data.status == "planning":
record.user_data.status = "watching"
return self.registry.save_media_record(record)
except Exception as e:
logger.error(f"Failed to track episode start: {e}")
return False
def track_from_player_result(self, media_item: MediaItem, episode: int,
player_result: PlayerResult) -> bool:
"""
Update watch status based on actual player feedback.
Args:
media_item: The anime that was watched
episode: Episode number that was watched
player_result: Result from the player session
Returns:
True if tracking was successful
"""
try:
if not player_result.stop_time or not player_result.total_time:
logger.warning("PlayerResult missing timing data - cannot track accurately")
return False
return self.registry.update_from_player_result(
media_item, episode, player_result.stop_time, player_result.total_time
)
except Exception as e:
logger.error(f"Failed to track from player result: {e}")
return False
def track_download_completion(self, media_item: MediaItem, episode: int,
file_path, file_size: int, quality: str,
checksum: Optional[str] = None) -> bool:
"""
Update status when download completes.
Args:
media_item: The anime that was downloaded
episode: Episode number that was downloaded
file_path: Path to downloaded file
file_size: File size in bytes
quality: Download quality
checksum: Optional file checksum
Returns:
True if tracking was successful
"""
try:
from pathlib import Path
file_path = Path(file_path) if not isinstance(file_path, Path) else file_path
return self.registry.update_download_completion(
media_item, episode, file_path, file_size, quality, checksum
)
except Exception as e:
logger.error(f"Failed to track download completion: {e}")
return False
def get_continue_episode(self, media_item: MediaItem,
available_episodes: list) -> Optional[str]:
"""
Get episode to continue watching based on history.
Args:
media_item: The anime
available_episodes: List of available episode numbers
Returns:
Episode number to continue from or None
"""
try:
return self.registry.get_continue_episode(
media_item.id, [str(ep) for ep in available_episodes]
)
except Exception as e:
logger.error(f"Failed to get continue episode: {e}")
return None
def get_watch_progress(self, media_id: int) -> Optional[dict]:
"""
Get current watch progress for an anime.
Args:
media_id: ID of the anime
Returns:
Dictionary with progress info or None if not found
"""
try:
record = self.registry.get_media_record(media_id)
if not record:
return None
return {
"last_episode": record.last_watched_episode,
"next_episode": record.next_episode_to_watch,
"status": record.user_data.status,
"title": record.display_title,
"watch_percentage": record.watch_completion_percentage,
"download_percentage": record.download_completion_percentage,
"episodes_watched": record.total_episodes_watched,
"episodes_downloaded": record.total_episodes_downloaded,
}
except Exception as e:
logger.error(f"Failed to get watch progress: {e}")
return None
def update_anime_status(self, media_id: int, status: str) -> bool:
"""
Update overall anime status.
Args:
media_id: ID of the anime
status: New status (planning, watching, completed, dropped, paused)
Returns:
True if update was successful
"""
try:
record = self.registry.get_media_record(media_id)
if not record:
return False
if status in ["planning", "watching", "completed", "dropped", "paused"]:
record.user_data.status = status
record.user_data.update_timestamp()
return self.registry.save_media_record(record)
return False
except Exception as e:
logger.error(f"Failed to update anime status: {e}")
return False
def add_anime_to_registry(self, media_item: MediaItem, status: str = "planning") -> bool:
"""
Add anime to registry with initial status.
Args:
media_item: The anime to add
status: Initial status
Returns:
True if added successfully
"""
try:
record = self.registry.get_or_create_record(media_item)
if status in ["planning", "watching", "completed", "dropped", "paused"]:
record.user_data.status = status
record.user_data.update_timestamp()
return self.registry.save_media_record(record)
return False
except Exception as e:
logger.error(f"Failed to add anime to registry: {e}")
return False
def should_auto_download_next(self, media_id: int) -> Optional[int]:
"""
Check if next episode should be auto-downloaded based on watch progress.
Args:
media_id: ID of the anime
Returns:
Episode number to download or None
"""
try:
record = self.registry.get_media_record(media_id)
if not record or not record.user_data.auto_download_new:
return None
# Only if currently watching
if record.user_data.status != "watching":
return None
next_episode = record.next_episode_to_watch
if not next_episode:
return None
# Check if already downloaded
episode_status = record.episodes.get(next_episode)
if episode_status and episode_status.is_available_locally:
return None
return next_episode
except Exception as e:
logger.error(f"Failed to check auto download: {e}")
return None
# Global tracker instance
_media_tracker: Optional[MediaTracker] = None
def get_media_tracker() -> MediaTracker:
"""Get or create the global media tracker instance."""
global _media_tracker
if _media_tracker is None:
_media_tracker = MediaTracker()
return _media_tracker
# Convenience functions for backward compatibility
def track_episode_viewing(media_item: MediaItem, episode: int, start_tracking: bool = True) -> bool:
"""Track episode viewing (backward compatibility)."""
tracker = get_media_tracker()
return tracker.track_episode_start(media_item, episode)
def get_continue_episode(media_item: MediaItem, available_episodes: list,
prefer_history: bool = True) -> Optional[str]:
"""Get continue episode (backward compatibility)."""
if not prefer_history:
return None
tracker = get_media_tracker()
return tracker.get_continue_episode(media_item, available_episodes)
def update_episode_progress(media_id: int, episode: int, completion_percentage: float) -> bool:
"""Update episode progress (backward compatibility)."""
# This would need more context to implement properly with PlayerResult
# For now, just mark as watched if 80%+
if completion_percentage >= 80:
tracker = get_media_tracker()
registry = get_media_registry()
return registry.mark_episode_watched(media_id, episode, completion_percentage / 100)
return True