feat: god help me

This commit is contained in:
Benexl
2025-07-16 00:54:55 +03:00
parent 49cdd440df
commit 27b1f3f792
6 changed files with 1595 additions and 1 deletions

1
.gitignore vendored
View File

@@ -20,7 +20,6 @@ build/
develop-eggs/
dist/
bin/
downloads/
eggs/
.eggs/
lib/

View File

@@ -0,0 +1,28 @@
"""
Download tracking services for FastAnime.
This module provides comprehensive download tracking and management capabilities
including progress monitoring, queue management, and integration with watch history.
"""
from .manager import DownloadManager, get_download_manager
from .models import (
DownloadIndex,
DownloadQueueItem,
EpisodeDownload,
MediaDownloadRecord,
MediaIndexEntry,
)
from .tracker import DownloadTracker, get_download_tracker
__all__ = [
"DownloadManager",
"get_download_manager",
"DownloadTracker",
"get_download_tracker",
"EpisodeDownload",
"MediaDownloadRecord",
"DownloadIndex",
"MediaIndexEntry",
"DownloadQueueItem",
]

View File

@@ -0,0 +1,506 @@
"""
Core download manager for tracking and managing anime downloads.
This module provides the central DownloadManager class that handles all download
tracking operations, integrates with the existing downloader infrastructure,
and manages the storage of download records.
"""
from __future__ import annotations
import json
import logging
import threading
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List, Optional
from ....core.config.model import DownloadsConfig
from ....core.constants import APP_CACHE_DIR, APP_DATA_DIR
from ....core.downloader import create_downloader
from ....libs.api.types import MediaItem
from .models import (
DownloadIndex,
DownloadQueue,
DownloadQueueItem,
EpisodeDownload,
MediaDownloadRecord,
MediaIndexEntry,
)
logger = logging.getLogger(__name__)
class DownloadManager:
"""
Core download manager using Pydantic models and integrating with existing infrastructure.
Manages download tracking, queue operations, and storage with atomic operations
and thread safety. Integrates with the existing downloader infrastructure.
"""
def __init__(self, config: DownloadsConfig):
self.config = config
self.downloads_dir = config.downloads_dir
# Storage directories
self.tracking_dir = APP_DATA_DIR / "downloads"
self.cache_dir = APP_CACHE_DIR / "downloads"
self.media_dir = self.tracking_dir / "media"
# File paths
self.index_file = self.tracking_dir / "index.json"
self.queue_file = self.tracking_dir / "queue.json"
# Thread safety
self._lock = threading.RLock()
self._loaded_records: Dict[int, MediaDownloadRecord] = {}
self._index: Optional[DownloadIndex] = None
self._queue: Optional[DownloadQueue] = None
# Initialize storage and downloader
self._initialize_storage()
# Use existing downloader infrastructure
try:
self.downloader = create_downloader(config)
except Exception as e:
logger.warning(f"Failed to initialize downloader: {e}")
self.downloader = None
def _initialize_storage(self) -> None:
"""Initialize storage directories and files."""
try:
# Create directories
self.tracking_dir.mkdir(parents=True, exist_ok=True)
self.media_dir.mkdir(parents=True, exist_ok=True)
self.cache_dir.mkdir(parents=True, exist_ok=True)
# Create subdirectories for cache
(self.cache_dir / "thumbnails").mkdir(exist_ok=True)
(self.cache_dir / "metadata").mkdir(exist_ok=True)
(self.cache_dir / "temp").mkdir(exist_ok=True)
# Initialize index if it doesn't exist
if not self.index_file.exists():
self._create_empty_index()
# Initialize queue if it doesn't exist
if not self.queue_file.exists():
self._create_empty_queue()
except Exception as e:
logger.error(f"Failed to initialize download storage: {e}")
raise
def _create_empty_index(self) -> None:
"""Create an empty download index."""
empty_index = DownloadIndex()
self._save_index(empty_index)
def _create_empty_queue(self) -> None:
"""Create an empty download queue."""
empty_queue = DownloadQueue(max_size=self.config.queue_max_size)
self._save_queue(empty_queue)
def _load_index(self) -> DownloadIndex:
"""Load the download index with Pydantic validation."""
if self._index is not None:
return self._index
try:
if not self.index_file.exists():
self._create_empty_index()
with open(self.index_file, 'r', encoding='utf-8') as f:
data = json.load(f)
self._index = DownloadIndex.model_validate(data)
return self._index
except Exception as e:
logger.error(f"Failed to load download index: {e}")
# Create new empty index as fallback
self._create_empty_index()
return self._load_index()
def _save_index(self, index: DownloadIndex) -> None:
"""Save index with atomic write operation."""
temp_file = self.index_file.with_suffix('.tmp')
try:
with open(temp_file, 'w', encoding='utf-8') as f:
json.dump(index.model_dump(), f, indent=2, ensure_ascii=False, default=str)
# Atomic replace
temp_file.replace(self.index_file)
self._index = index
except Exception as e:
logger.error(f"Failed to save download index: {e}")
if temp_file.exists():
temp_file.unlink()
raise
def _load_queue(self) -> DownloadQueue:
"""Load the download queue with Pydantic validation."""
if self._queue is not None:
return self._queue
try:
if not self.queue_file.exists():
self._create_empty_queue()
with open(self.queue_file, 'r', encoding='utf-8') as f:
data = json.load(f)
self._queue = DownloadQueue.model_validate(data)
return self._queue
except Exception as e:
logger.error(f"Failed to load download queue: {e}")
# Create new empty queue as fallback
self._create_empty_queue()
return self._load_queue()
def _save_queue(self, queue: DownloadQueue) -> None:
"""Save queue with atomic write operation."""
temp_file = self.queue_file.with_suffix('.tmp')
try:
with open(temp_file, 'w', encoding='utf-8') as f:
json.dump(queue.model_dump(), f, indent=2, ensure_ascii=False, default=str)
# Atomic replace
temp_file.replace(self.queue_file)
self._queue = queue
except Exception as e:
logger.error(f"Failed to save download queue: {e}")
if temp_file.exists():
temp_file.unlink()
raise
def get_download_record(self, media_id: int) -> Optional[MediaDownloadRecord]:
"""Get download record for an anime with caching."""
with self._lock:
# Check cache first
if media_id in self._loaded_records:
return self._loaded_records[media_id]
try:
record_file = self.media_dir / f"{media_id}.json"
if not record_file.exists():
return None
with open(record_file, 'r', encoding='utf-8') as f:
data = json.load(f)
record = MediaDownloadRecord.model_validate(data)
# Cache the record
self._loaded_records[media_id] = record
return record
except Exception as e:
logger.error(f"Failed to load download record for media {media_id}: {e}")
return None
def save_download_record(self, record: MediaDownloadRecord) -> bool:
"""Save a download record with atomic operation."""
with self._lock:
try:
media_id = record.media_item.id
record_file = self.media_dir / f"{media_id}.json"
temp_file = record_file.with_suffix('.tmp')
# Update last_modified timestamp
record.update_last_modified()
# Write to temp file first
with open(temp_file, 'w', encoding='utf-8') as f:
json.dump(record.model_dump(), f, indent=2, ensure_ascii=False, default=str)
# Atomic replace
temp_file.replace(record_file)
# Update cache
self._loaded_records[media_id] = record
# Update index
index = self._load_index()
index.add_media_entry(record)
self._save_index(index)
logger.debug(f"Saved download record for media {media_id}")
return True
except Exception as e:
logger.error(f"Failed to save download record: {e}")
if temp_file.exists():
temp_file.unlink()
return False
def add_to_queue(self, media_item: MediaItem, episodes: List[int],
quality: Optional[str] = None, priority: int = 0) -> bool:
"""Add episodes to download queue."""
with self._lock:
try:
queue = self._load_queue()
quality = quality or self.config.preferred_quality
success_count = 0
for episode in episodes:
queue_item = DownloadQueueItem(
media_id=media_item.id,
episode_number=episode,
priority=priority,
quality_preference=quality,
max_retries=self.config.retry_attempts
)
if queue.add_item(queue_item):
success_count += 1
logger.info(f"Added episode {episode} of {media_item.title.english or media_item.title.romaji} to download queue")
if success_count > 0:
self._save_queue(queue)
# Create download record if it doesn't exist
if not self.get_download_record(media_item.id):
download_path = self.downloads_dir / self._sanitize_filename(
media_item.title.english or media_item.title.romaji or f"Anime_{media_item.id}"
)
record = MediaDownloadRecord(
media_item=media_item,
download_path=download_path,
preferred_quality=quality
)
self.save_download_record(record)
return success_count > 0
except Exception as e:
logger.error(f"Failed to add episodes to queue: {e}")
return False
def get_next_download(self) -> Optional[DownloadQueueItem]:
"""Get the next item from the download queue."""
with self._lock:
try:
queue = self._load_queue()
return queue.get_next_item()
except Exception as e:
logger.error(f"Failed to get next download: {e}")
return None
def mark_download_started(self, media_id: int, episode: int) -> bool:
"""Mark an episode download as started."""
with self._lock:
try:
record = self.get_download_record(media_id)
if not record:
return False
# Create episode download entry
download_path = record.download_path / f"Episode_{episode:02d}.mkv"
episode_download = EpisodeDownload(
episode_number=episode,
file_path=download_path,
file_size=0,
quality=record.preferred_quality,
source_provider="unknown", # Will be updated by actual downloader
status="downloading"
)
# Update record
new_episodes = record.episodes.copy()
new_episodes[episode] = episode_download
updated_record = record.model_copy(update={"episodes": new_episodes})
self.save_download_record(updated_record)
return True
except Exception as e:
logger.error(f"Failed to mark download started: {e}")
return False
def mark_download_completed(self, media_id: int, episode: int,
file_path: Path, file_size: int,
checksum: Optional[str] = None) -> bool:
"""Mark an episode download as completed."""
with self._lock:
try:
record = self.get_download_record(media_id)
if not record or episode not in record.episodes:
return False
# Update episode download
episode_download = record.episodes[episode]
updated_episode = episode_download.model_copy(update={
"file_path": file_path,
"file_size": file_size,
"status": "completed",
"download_progress": 1.0,
"checksum": checksum
})
# Update record
new_episodes = record.episodes.copy()
new_episodes[episode] = updated_episode
updated_record = record.model_copy(update={"episodes": new_episodes})
self.save_download_record(updated_record)
# Remove from queue
queue = self._load_queue()
queue.remove_item(media_id, episode)
self._save_queue(queue)
logger.info(f"Marked episode {episode} of media {media_id} as completed")
return True
except Exception as e:
logger.error(f"Failed to mark download completed: {e}")
return False
def mark_download_failed(self, media_id: int, episode: int, error_message: str) -> bool:
"""Mark an episode download as failed."""
with self._lock:
try:
record = self.get_download_record(media_id)
if not record or episode not in record.episodes:
return False
# Update episode download
episode_download = record.episodes[episode]
updated_episode = episode_download.model_copy(update={
"status": "failed",
"error_message": error_message
})
# Update record
new_episodes = record.episodes.copy()
new_episodes[episode] = updated_episode
updated_record = record.model_copy(update={"episodes": new_episodes})
self.save_download_record(updated_record)
logger.warning(f"Marked episode {episode} of media {media_id} as failed: {error_message}")
return True
except Exception as e:
logger.error(f"Failed to mark download failed: {e}")
return False
def list_downloads(self, status_filter: Optional[str] = None,
limit: Optional[int] = None) -> List[MediaDownloadRecord]:
"""List download records with optional filtering."""
try:
index = self._load_index()
records = []
media_ids = list(index.media_index.keys())
if limit:
media_ids = media_ids[:limit]
for media_id in media_ids:
record = self.get_download_record(media_id)
if record is None:
continue
if status_filter and record.status != status_filter:
continue
records.append(record)
# Sort by last updated (most recent first)
records.sort(key=lambda x: x.last_updated, reverse=True)
return records
except Exception as e:
logger.error(f"Failed to list downloads: {e}")
return []
def cleanup_failed_downloads(self) -> int:
"""Clean up old failed downloads based on retention policy."""
try:
cutoff_date = datetime.now() - timedelta(days=self.config.retention_days)
cleaned_count = 0
for record in self.list_downloads():
episodes_to_remove = []
for episode_num, episode_download in record.episodes.items():
if (episode_download.status == "failed" and
episode_download.download_date < cutoff_date):
episodes_to_remove.append(episode_num)
if episodes_to_remove:
new_episodes = record.episodes.copy()
for episode_num in episodes_to_remove:
del new_episodes[episode_num]
cleaned_count += 1
updated_record = record.model_copy(update={"episodes": new_episodes})
self.save_download_record(updated_record)
logger.info(f"Cleaned up {cleaned_count} failed downloads")
return cleaned_count
except Exception as e:
logger.error(f"Failed to cleanup failed downloads: {e}")
return 0
def get_download_stats(self) -> Dict:
"""Get download statistics."""
try:
index = self._load_index()
stats = {
"total_anime": index.media_count,
"total_episodes": index.total_episodes,
"total_size_gb": round(index.total_size_gb, 2),
"completion_stats": index.completion_stats,
"queue_size": len(self._load_queue().items)
}
return stats
except Exception as e:
logger.error(f"Failed to get download stats: {e}")
return {}
def _sanitize_filename(self, filename: str) -> str:
"""Sanitize filename for filesystem compatibility."""
# Remove or replace invalid characters
invalid_chars = '<>:"/\\|?*'
for char in invalid_chars:
filename = filename.replace(char, '_')
# Limit length
if len(filename) > 100:
filename = filename[:100]
return filename.strip()
# Global manager instance
_download_manager: Optional[DownloadManager] = None
def get_download_manager(config: DownloadsConfig) -> DownloadManager:
"""Get or create the global download manager instance."""
global _download_manager
if _download_manager is None:
_download_manager = DownloadManager(config)
return _download_manager

View File

@@ -0,0 +1,419 @@
"""
Pydantic models for download tracking system.
This module defines the data models used throughout the download tracking system,
providing type safety and validation using Pydantic v2.
"""
from __future__ import annotations
import hashlib
import logging
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Literal, Optional
from pydantic import BaseModel, ConfigDict, Field, computed_field, field_validator
from ....core.constants import APP_DATA_DIR
from ....libs.api.types import MediaItem
logger = logging.getLogger(__name__)
# Type aliases for better readability
DownloadStatus = Literal["completed", "failed", "downloading", "queued", "paused"]
QualityOption = Literal["360", "480", "720", "1080", "best"]
MediaStatus = Literal["active", "completed", "paused", "failed"]
class EpisodeDownload(BaseModel):
"""
Pydantic model for individual episode download tracking.
Tracks all information related to a single episode download including
file location, download progress, quality, and integrity information.
"""
model_config = ConfigDict(
str_strip_whitespace=True,
validate_assignment=True,
frozen=True, # Immutable after creation for data integrity
)
episode_number: int = Field(gt=0, description="Episode number")
file_path: Path = Field(description="Path to downloaded file")
file_size: int = Field(ge=0, description="File size in bytes")
download_date: datetime = Field(default_factory=datetime.now)
quality: QualityOption = Field(default="1080")
source_provider: str = Field(description="Provider used for download")
status: DownloadStatus = Field(default="queued")
checksum: Optional[str] = Field(None, description="SHA256 checksum for integrity")
subtitle_files: List[Path] = Field(default_factory=list)
download_progress: float = Field(default=0.0, ge=0.0, le=1.0)
error_message: Optional[str] = Field(None, description="Error message if failed")
download_speed: Optional[float] = Field(None, description="Download speed in bytes/sec")
@field_validator("file_path")
@classmethod
def validate_file_path(cls, v: Path) -> Path:
"""Ensure file path is absolute and within allowed directories."""
if not v.is_absolute():
raise ValueError("File path must be absolute")
return v
@computed_field
@property
def is_completed(self) -> bool:
"""Check if download is completed and file exists."""
return self.status == "completed" and self.file_path.exists()
@computed_field
@property
def file_size_mb(self) -> float:
"""Get file size in megabytes."""
return self.file_size / (1024 * 1024)
@computed_field
@property
def display_status(self) -> str:
"""Get human-readable status."""
status_map = {
"completed": "✓ Completed",
"failed": "✗ Failed",
"downloading": "⬇ Downloading",
"queued": "⏳ Queued",
"paused": "⏸ Paused"
}
return status_map.get(self.status, self.status)
def generate_checksum(self) -> Optional[str]:
"""Generate SHA256 checksum for the downloaded file."""
if not self.file_path.exists():
return None
try:
sha256_hash = hashlib.sha256()
with open(self.file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
sha256_hash.update(chunk)
return sha256_hash.hexdigest()
except Exception as e:
logger.error(f"Failed to generate checksum for {self.file_path}: {e}")
return None
def verify_integrity(self) -> bool:
"""Verify file integrity using stored checksum."""
if not self.checksum or not self.file_path.exists():
return False
current_checksum = self.generate_checksum()
return current_checksum == self.checksum
class MediaDownloadRecord(BaseModel):
"""
Pydantic model for anime series download tracking.
Manages download information for an entire anime series including
individual episodes, metadata, and organization preferences.
"""
model_config = ConfigDict(
str_strip_whitespace=True,
validate_assignment=True,
)
media_item: MediaItem = Field(description="The anime media item")
episodes: Dict[int, EpisodeDownload] = Field(default_factory=dict)
download_path: Path = Field(description="Base download directory for this anime")
created_date: datetime = Field(default_factory=datetime.now)
last_updated: datetime = Field(default_factory=datetime.now)
preferred_quality: QualityOption = Field(default="1080")
auto_download_new: bool = Field(default=False, description="Auto-download new episodes")
tags: List[str] = Field(default_factory=list, description="User-defined tags")
notes: Optional[str] = Field(None, description="User notes")
# Organization preferences
naming_template: str = Field(
default="{title}/Season {season:02d}/{episode:02d} - {episode_title}.{ext}",
description="File naming template"
)
@field_validator("download_path")
@classmethod
def validate_download_path(cls, v: Path) -> Path:
"""Ensure download path is absolute."""
if not v.is_absolute():
raise ValueError("Download path must be absolute")
return v
@computed_field
@property
def total_episodes_downloaded(self) -> int:
"""Get count of successfully downloaded episodes."""
return len([ep for ep in self.episodes.values() if ep.is_completed])
@computed_field
@property
def total_size_bytes(self) -> int:
"""Get total size of all downloaded episodes in bytes."""
return sum(ep.file_size for ep in self.episodes.values() if ep.is_completed)
@computed_field
@property
def total_size_gb(self) -> float:
"""Get total size in gigabytes."""
return self.total_size_bytes / (1024 * 1024 * 1024)
@computed_field
@property
def completion_percentage(self) -> float:
"""Get completion percentage based on total episodes."""
if not self.media_item.episodes or self.media_item.episodes == 0:
return 0.0
return (self.total_episodes_downloaded / self.media_item.episodes) * 100
@computed_field
@property
def display_title(self) -> str:
"""Get display title for the anime."""
return (
self.media_item.title.english
or self.media_item.title.romaji
or f"Anime {self.media_item.id}"
)
@computed_field
@property
def status(self) -> MediaStatus:
"""Determine overall download status for this anime."""
if not self.episodes:
return "active"
statuses = [ep.status for ep in self.episodes.values()]
if all(s == "completed" for s in statuses):
if self.media_item.episodes and len(self.episodes) >= self.media_item.episodes:
return "completed"
if any(s == "failed" for s in statuses):
return "failed"
if any(s in ["downloading", "queued"] for s in statuses):
return "active"
return "paused"
def get_next_episode_to_download(self) -> Optional[int]:
"""Get the next episode number that should be downloaded."""
if not self.media_item.episodes:
return None
downloaded_episodes = set(ep.episode_number for ep in self.episodes.values() if ep.is_completed)
for episode_num in range(1, self.media_item.episodes + 1):
if episode_num not in downloaded_episodes:
return episode_num
return None
def get_failed_episodes(self) -> List[int]:
"""Get list of episode numbers that failed to download."""
return [
ep.episode_number for ep in self.episodes.values()
if ep.status == "failed"
]
def update_last_modified(self) -> None:
"""Update the last_updated timestamp."""
# Create a new instance with updated timestamp since the model might be frozen
object.__setattr__(self, "last_updated", datetime.now())
class MediaIndexEntry(BaseModel):
"""
Lightweight entry in the download index for fast operations.
Provides quick access to basic information about a download record
without loading the full MediaDownloadRecord.
"""
model_config = ConfigDict(validate_assignment=True)
media_id: int = Field(description="AniList media ID")
title: str = Field(description="Display title")
episode_count: int = Field(default=0, ge=0)
completed_episodes: int = Field(default=0, ge=0)
last_download: Optional[datetime] = None
status: MediaStatus = Field(default="active")
total_size: int = Field(default=0, ge=0)
file_path: Path = Field(description="Path to the media record file")
@computed_field
@property
def completion_percentage(self) -> float:
"""Get completion percentage."""
if self.episode_count == 0:
return 0.0
return (self.completed_episodes / self.episode_count) * 100
@computed_field
@property
def total_size_mb(self) -> float:
"""Get total size in megabytes."""
return self.total_size / (1024 * 1024)
class DownloadIndex(BaseModel):
"""
Lightweight index for fast download operations.
Maintains an overview of all download records without loading
the full data, enabling fast searches and filtering.
"""
model_config = ConfigDict(validate_assignment=True)
version: str = Field(default="1.0")
last_updated: datetime = Field(default_factory=datetime.now)
media_count: int = Field(default=0, ge=0)
total_episodes: int = Field(default=0, ge=0)
total_size_bytes: int = Field(default=0, ge=0)
media_index: Dict[int, MediaIndexEntry] = Field(default_factory=dict)
@computed_field
@property
def total_size_gb(self) -> float:
"""Get total size across all downloads in gigabytes."""
return self.total_size_bytes / (1024 * 1024 * 1024)
@computed_field
@property
def completion_stats(self) -> Dict[str, int]:
"""Get completion statistics."""
stats = {"completed": 0, "active": 0, "failed": 0, "paused": 0}
for entry in self.media_index.values():
stats[entry.status] = stats.get(entry.status, 0) + 1
return stats
def add_media_entry(self, media_record: MediaDownloadRecord) -> None:
"""Add or update a media entry in the index."""
entry = MediaIndexEntry(
media_id=media_record.media_item.id,
title=media_record.display_title,
episode_count=media_record.media_item.episodes or 0,
completed_episodes=media_record.total_episodes_downloaded,
last_download=media_record.last_updated,
status=media_record.status,
total_size=media_record.total_size_bytes,
file_path=APP_DATA_DIR / "downloads" / "media" / f"{media_record.media_item.id}.json"
)
self.media_index[media_record.media_item.id] = entry
self.media_count = len(self.media_index)
self.total_episodes = sum(entry.completed_episodes for entry in self.media_index.values())
self.total_size_bytes = sum(entry.total_size for entry in self.media_index.values())
self.last_updated = datetime.now()
def remove_media_entry(self, media_id: int) -> bool:
"""Remove a media entry from the index."""
if media_id in self.media_index:
del self.media_index[media_id]
self.media_count = len(self.media_index)
self.total_episodes = sum(entry.completed_episodes for entry in self.media_index.values())
self.total_size_bytes = sum(entry.total_size for entry in self.media_index.values())
self.last_updated = datetime.now()
return True
return False
class DownloadQueueItem(BaseModel):
"""
Item in the download queue.
Represents a single episode queued for download with priority
and scheduling information.
"""
model_config = ConfigDict(frozen=True)
media_id: int
episode_number: int
priority: int = Field(default=0, description="Higher number = higher priority")
added_date: datetime = Field(default_factory=datetime.now)
estimated_size: Optional[int] = Field(None, description="Estimated file size")
quality_preference: QualityOption = Field(default="1080")
retry_count: int = Field(default=0, ge=0)
max_retries: int = Field(default=3, gt=0)
@computed_field
@property
def can_retry(self) -> bool:
"""Check if this item can be retried."""
return self.retry_count < self.max_retries
@computed_field
@property
def estimated_size_mb(self) -> Optional[float]:
"""Get estimated size in megabytes."""
if self.estimated_size is None:
return None
return self.estimated_size / (1024 * 1024)
class DownloadQueue(BaseModel):
"""
Download queue management.
Manages the queue of episodes waiting to be downloaded with
priority handling and scheduling.
"""
model_config = ConfigDict(validate_assignment=True)
items: List[DownloadQueueItem] = Field(default_factory=list)
max_size: int = Field(default=100, gt=0)
last_updated: datetime = Field(default_factory=datetime.now)
def add_item(self, item: DownloadQueueItem) -> bool:
"""Add an item to the queue."""
if len(self.items) >= self.max_size:
return False
# Check for duplicates
for existing_item in self.items:
if (existing_item.media_id == item.media_id and
existing_item.episode_number == item.episode_number):
return False
self.items.append(item)
# Sort by priority (highest first), then by added date
self.items.sort(key=lambda x: (-x.priority, x.added_date))
self.last_updated = datetime.now()
return True
def get_next_item(self) -> Optional[DownloadQueueItem]:
"""Get the next item to download."""
if not self.items:
return None
return self.items[0]
def remove_item(self, media_id: int, episode_number: int) -> bool:
"""Remove an item from the queue."""
for i, item in enumerate(self.items):
if item.media_id == media_id and item.episode_number == episode_number:
del self.items[i]
self.last_updated = datetime.now()
return True
return False
def clear(self) -> None:
"""Clear all items from the queue."""
self.items.clear()
self.last_updated = datetime.now()
@computed_field
@property
def total_estimated_size(self) -> int:
"""Get total estimated size of all queued items."""
return sum(item.estimated_size or 0 for item in self.items)

View File

@@ -0,0 +1,302 @@
"""
Download progress tracking and integration with the download system.
This module provides real-time tracking of download progress and integrates
with the existing download infrastructure to provide progress updates.
"""
from __future__ import annotations
import logging
import threading
import time
from datetime import datetime
from pathlib import Path
from typing import Callable, Dict, Optional
from ....core.config.model import DownloadsConfig
from .manager import DownloadManager, get_download_manager
from .models import DownloadQueueItem
logger = logging.getLogger(__name__)
# Type alias for progress callback
ProgressCallback = Callable[[int, int, float, float], None] # media_id, episode, progress, speed
class DownloadTracker:
"""
Tracks download progress and integrates with the download manager.
Provides real-time progress updates and handles integration between
the actual download process and the tracking system.
"""
def __init__(self, config: DownloadsConfig):
self.config = config
self.download_manager = get_download_manager(config)
# Track active downloads
self._active_downloads: Dict[str, DownloadSession] = {}
self._lock = threading.RLock()
# Progress callbacks
self._progress_callbacks: list[ProgressCallback] = []
def add_progress_callback(self, callback: ProgressCallback) -> None:
"""Add a callback function to receive progress updates."""
with self._lock:
self._progress_callbacks.append(callback)
def remove_progress_callback(self, callback: ProgressCallback) -> None:
"""Remove a progress callback."""
with self._lock:
if callback in self._progress_callbacks:
self._progress_callbacks.remove(callback)
def start_download(self, queue_item: DownloadQueueItem) -> str:
"""Start tracking a download and return session ID."""
with self._lock:
session_id = f"{queue_item.media_id}_{queue_item.episode_number}_{int(time.time())}"
session = DownloadSession(
session_id=session_id,
queue_item=queue_item,
tracker=self
)
self._active_downloads[session_id] = session
# Mark download as started in manager
self.download_manager.mark_download_started(
queue_item.media_id,
queue_item.episode_number
)
logger.info(f"Started download tracking for session {session_id}")
return session_id
def update_progress(self, session_id: str, progress: float,
speed: Optional[float] = None) -> None:
"""Update download progress for a session."""
with self._lock:
if session_id not in self._active_downloads:
logger.warning(f"Unknown download session: {session_id}")
return
session = self._active_downloads[session_id]
session.update_progress(progress, speed)
# Notify callbacks
for callback in self._progress_callbacks:
try:
callback(
session.queue_item.media_id,
session.queue_item.episode_number,
progress,
speed or 0.0
)
except Exception as e:
logger.error(f"Error in progress callback: {e}")
def complete_download(self, session_id: str, file_path: Path,
file_size: int, checksum: Optional[str] = None) -> bool:
"""Mark a download as completed."""
with self._lock:
if session_id not in self._active_downloads:
logger.warning(f"Unknown download session: {session_id}")
return False
session = self._active_downloads[session_id]
session.mark_completed(file_path, file_size, checksum)
# Update download manager
success = self.download_manager.mark_download_completed(
session.queue_item.media_id,
session.queue_item.episode_number,
file_path,
file_size,
checksum
)
# Remove from active downloads
del self._active_downloads[session_id]
logger.info(f"Completed download session {session_id}")
return success
def fail_download(self, session_id: str, error_message: str) -> bool:
"""Mark a download as failed."""
with self._lock:
if session_id not in self._active_downloads:
logger.warning(f"Unknown download session: {session_id}")
return False
session = self._active_downloads[session_id]
session.mark_failed(error_message)
# Update download manager
success = self.download_manager.mark_download_failed(
session.queue_item.media_id,
session.queue_item.episode_number,
error_message
)
# Remove from active downloads
del self._active_downloads[session_id]
logger.warning(f"Failed download session {session_id}: {error_message}")
return success
def get_active_downloads(self) -> Dict[str, 'DownloadSession']:
"""Get all currently active download sessions."""
with self._lock:
return self._active_downloads.copy()
def cancel_download(self, session_id: str) -> bool:
"""Cancel an active download."""
with self._lock:
if session_id not in self._active_downloads:
return False
session = self._active_downloads[session_id]
session.cancel()
# Mark as failed with cancellation message
self.download_manager.mark_download_failed(
session.queue_item.media_id,
session.queue_item.episode_number,
"Download cancelled by user"
)
del self._active_downloads[session_id]
logger.info(f"Cancelled download session {session_id}")
return True
def cleanup_stale_sessions(self, max_age_hours: int = 24) -> int:
"""Clean up stale download sessions that may have been orphaned."""
with self._lock:
current_time = datetime.now()
stale_sessions = []
for session_id, session in self._active_downloads.items():
age_hours = (current_time - session.start_time).total_seconds() / 3600
if age_hours > max_age_hours:
stale_sessions.append(session_id)
for session_id in stale_sessions:
self.fail_download(session_id, "Session timed out")
return len(stale_sessions)
class DownloadSession:
"""
Represents an active download session with progress tracking.
"""
def __init__(self, session_id: str, queue_item: DownloadQueueItem, tracker: DownloadTracker):
self.session_id = session_id
self.queue_item = queue_item
self.tracker = tracker
self.start_time = datetime.now()
# Progress tracking
self.progress = 0.0
self.download_speed = 0.0
self.bytes_downloaded = 0
self.total_bytes = queue_item.estimated_size or 0
# Status
self.is_cancelled = False
self.is_completed = False
self.error_message: Optional[str] = None
# Thread safety
self._lock = threading.Lock()
def update_progress(self, progress: float, speed: Optional[float] = None) -> None:
"""Update the progress of this download session."""
with self._lock:
if self.is_cancelled or self.is_completed:
return
self.progress = max(0.0, min(1.0, progress))
if speed is not None:
self.download_speed = speed
if self.total_bytes > 0:
self.bytes_downloaded = int(self.total_bytes * self.progress)
logger.debug(f"Session {self.session_id} progress: {self.progress:.2%}")
def mark_completed(self, file_path: Path, file_size: int, checksum: Optional[str] = None) -> None:
"""Mark this session as completed."""
with self._lock:
if self.is_cancelled:
return
self.is_completed = True
self.progress = 1.0
self.bytes_downloaded = file_size
self.total_bytes = file_size
def mark_failed(self, error_message: str) -> None:
"""Mark this session as failed."""
with self._lock:
if self.is_cancelled or self.is_completed:
return
self.error_message = error_message
def cancel(self) -> None:
"""Cancel this download session."""
with self._lock:
if self.is_completed:
return
self.is_cancelled = True
@property
def elapsed_time(self) -> float:
"""Get elapsed time in seconds."""
return (datetime.now() - self.start_time).total_seconds()
@property
def estimated_time_remaining(self) -> Optional[float]:
"""Get estimated time remaining in seconds."""
if self.progress <= 0 or self.download_speed <= 0:
return None
remaining_bytes = self.total_bytes - self.bytes_downloaded
if remaining_bytes <= 0:
return 0.0
return remaining_bytes / self.download_speed
@property
def status_text(self) -> str:
"""Get human-readable status."""
if self.is_cancelled:
return "Cancelled"
elif self.is_completed:
return "Completed"
elif self.error_message:
return f"Failed: {self.error_message}"
else:
return f"Downloading ({self.progress:.1%})"
# Global tracker instance
_download_tracker: Optional[DownloadTracker] = None
def get_download_tracker(config: DownloadsConfig) -> DownloadTracker:
"""Get or create the global download tracker instance."""
global _download_tracker
if _download_tracker is None:
_download_tracker = DownloadTracker(config)
return _download_tracker

View File

@@ -0,0 +1,340 @@
"""
Download validation and integrity checking utilities.
This module provides functionality to validate downloaded files, verify
integrity, and repair corrupted download records.
"""
from __future__ import annotations
import json
import logging
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Tuple
from pydantic import ValidationError
from ....core.constants import APP_DATA_DIR
from .manager import DownloadManager
from .models import DownloadIndex, MediaDownloadRecord
logger = logging.getLogger(__name__)
class DownloadValidator:
"""
Validator for download records and file integrity using Pydantic models.
Provides functionality to validate, repair, and maintain the integrity
of download tracking data and associated files.
"""
def __init__(self, download_manager: DownloadManager):
self.download_manager = download_manager
self.tracking_dir = APP_DATA_DIR / "downloads"
self.media_dir = self.tracking_dir / "media"
def validate_download_record(self, file_path: Path) -> Optional[MediaDownloadRecord]:
"""Load and validate a download record with Pydantic."""
try:
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
record = MediaDownloadRecord.model_validate(data)
logger.debug(f"Successfully validated download record: {file_path}")
return record
except ValidationError as e:
logger.error(f"Invalid download record {file_path}: {e}")
return None
except Exception as e:
logger.error(f"Failed to load download record {file_path}: {e}")
return None
def validate_all_records(self) -> Tuple[List[MediaDownloadRecord], List[Path]]:
"""Validate all download records and return valid records and invalid file paths."""
valid_records = []
invalid_files = []
if not self.media_dir.exists():
logger.warning("Media directory does not exist")
return valid_records, invalid_files
for record_file in self.media_dir.glob("*.json"):
record = self.validate_download_record(record_file)
if record:
valid_records.append(record)
else:
invalid_files.append(record_file)
logger.info(f"Validated {len(valid_records)} records, found {len(invalid_files)} invalid files")
return valid_records, invalid_files
def verify_file_integrity(self, record: MediaDownloadRecord) -> Dict[int, bool]:
"""Verify file integrity for all episodes in a download record."""
integrity_results = {}
for episode_num, episode_download in record.episodes.items():
if episode_download.status != "completed":
integrity_results[episode_num] = True # Skip non-completed downloads
continue
# Check if file exists
if not episode_download.file_path.exists():
logger.warning(f"Missing file for episode {episode_num}: {episode_download.file_path}")
integrity_results[episode_num] = False
continue
# Verify file size
actual_size = episode_download.file_path.stat().st_size
if actual_size != episode_download.file_size:
logger.warning(f"Size mismatch for episode {episode_num}: expected {episode_download.file_size}, got {actual_size}")
integrity_results[episode_num] = False
continue
# Verify checksum if available
if episode_download.checksum:
if not episode_download.verify_integrity():
logger.warning(f"Checksum mismatch for episode {episode_num}")
integrity_results[episode_num] = False
continue
integrity_results[episode_num] = True
return integrity_results
def repair_download_record(self, record_file: Path) -> bool:
"""Attempt to repair a corrupted download record."""
try:
# Try to load raw data
with open(record_file, 'r', encoding='utf-8') as f:
data = json.load(f)
# Attempt basic repairs
repaired_data = self._attempt_basic_repairs(data)
# Try to validate repaired data
try:
repaired_record = MediaDownloadRecord.model_validate(repaired_data)
# Save repaired record
self.download_manager.save_download_record(repaired_record)
logger.info(f"Successfully repaired download record: {record_file}")
return True
except ValidationError as e:
logger.error(f"Could not repair download record {record_file}: {e}")
return False
except Exception as e:
logger.error(f"Failed to repair download record {record_file}: {e}")
return False
def _attempt_basic_repairs(self, data: Dict) -> Dict:
"""Attempt basic repairs on download record data."""
repaired = data.copy()
# Ensure required fields exist with defaults
if "episodes" not in repaired:
repaired["episodes"] = {}
if "created_date" not in repaired:
repaired["created_date"] = "2024-01-01T00:00:00"
if "last_updated" not in repaired:
repaired["last_updated"] = "2024-01-01T00:00:00"
if "tags" not in repaired:
repaired["tags"] = []
if "preferred_quality" not in repaired:
repaired["preferred_quality"] = "1080"
if "auto_download_new" not in repaired:
repaired["auto_download_new"] = False
# Fix episodes data
if isinstance(repaired["episodes"], dict):
fixed_episodes = {}
for ep_num, ep_data in repaired["episodes"].items():
if isinstance(ep_data, dict):
# Ensure required episode fields
if "episode_number" not in ep_data:
ep_data["episode_number"] = int(ep_num) if ep_num.isdigit() else 1
if "status" not in ep_data:
ep_data["status"] = "queued"
if "download_progress" not in ep_data:
ep_data["download_progress"] = 0.0
if "file_size" not in ep_data:
ep_data["file_size"] = 0
if "subtitle_files" not in ep_data:
ep_data["subtitle_files"] = []
fixed_episodes[ep_num] = ep_data
repaired["episodes"] = fixed_episodes
return repaired
def rebuild_index_from_records(self) -> bool:
"""Rebuild the download index from individual record files."""
try:
valid_records, _ = self.validate_all_records()
# Create new index
new_index = DownloadIndex()
# Add all valid records to index
for record in valid_records:
new_index.add_media_entry(record)
# Save rebuilt index
self.download_manager._save_index(new_index)
logger.info(f"Rebuilt download index with {len(valid_records)} records")
return True
except Exception as e:
logger.error(f"Failed to rebuild index: {e}")
return False
def cleanup_orphaned_files(self) -> int:
"""Clean up orphaned files and inconsistent records."""
cleanup_count = 0
try:
# Load current index
index = self.download_manager._load_index()
# Check for orphaned record files
if self.media_dir.exists():
for record_file in self.media_dir.glob("*.json"):
media_id = int(record_file.stem)
if media_id not in index.media_index:
# Check if record is valid
record = self.validate_download_record(record_file)
if record:
# Add to index
index.add_media_entry(record)
logger.info(f"Re-added orphaned record to index: {media_id}")
else:
# Remove invalid file
record_file.unlink()
cleanup_count += 1
logger.info(f"Removed invalid record file: {record_file}")
# Check for missing record files
missing_records = []
for media_id, index_entry in index.media_index.items():
if not index_entry.file_path.exists():
missing_records.append(media_id)
# Remove missing records from index
for media_id in missing_records:
index.remove_media_entry(media_id)
cleanup_count += 1
logger.info(f"Removed missing record from index: {media_id}")
# Save updated index
if cleanup_count > 0:
self.download_manager._save_index(index)
return cleanup_count
except Exception as e:
logger.error(f"Failed to cleanup orphaned files: {e}")
return 0
def validate_file_paths(self, record: MediaDownloadRecord) -> List[str]:
"""Validate file paths in a download record and return issues."""
issues = []
# Check download path
if not record.download_path.is_absolute():
issues.append(f"Download path is not absolute: {record.download_path}")
# Check episode file paths
for episode_num, episode_download in record.episodes.items():
if not episode_download.file_path.is_absolute():
issues.append(f"Episode {episode_num} file path is not absolute: {episode_download.file_path}")
# Check if file exists for completed downloads
if episode_download.status == "completed" and not episode_download.file_path.exists():
issues.append(f"Episode {episode_num} file does not exist: {episode_download.file_path}")
# Check subtitle files
for subtitle_file in episode_download.subtitle_files:
if not subtitle_file.exists():
issues.append(f"Episode {episode_num} subtitle file does not exist: {subtitle_file}")
return issues
def generate_validation_report(self) -> Dict:
"""Generate a comprehensive validation report."""
report = {
"timestamp": str(datetime.now()),
"total_records": 0,
"valid_records": 0,
"invalid_records": 0,
"integrity_issues": 0,
"orphaned_files": 0,
"path_issues": 0,
"details": {
"invalid_files": [],
"integrity_failures": [],
"path_issues": []
}
}
try:
# Validate all records
valid_records, invalid_files = self.validate_all_records()
report["total_records"] = len(valid_records) + len(invalid_files)
report["valid_records"] = len(valid_records)
report["invalid_records"] = len(invalid_files)
report["details"]["invalid_files"] = [str(f) for f in invalid_files]
# Check integrity and paths for valid records
for record in valid_records:
# Check file integrity
integrity_results = self.verify_file_integrity(record)
failed_episodes = [ep for ep, result in integrity_results.items() if not result]
if failed_episodes:
report["integrity_issues"] += len(failed_episodes)
report["details"]["integrity_failures"].append({
"media_id": record.media_item.id,
"title": record.display_title,
"failed_episodes": failed_episodes
})
# Check file paths
path_issues = self.validate_file_paths(record)
if path_issues:
report["path_issues"] += len(path_issues)
report["details"]["path_issues"].append({
"media_id": record.media_item.id,
"title": record.display_title,
"issues": path_issues
})
# Check for orphaned files
orphaned_count = self.cleanup_orphaned_files()
report["orphaned_files"] = orphaned_count
except Exception as e:
logger.error(f"Failed to generate validation report: {e}")
report["error"] = str(e)
return report
def validate_downloads(download_manager: DownloadManager) -> Dict:
"""Convenience function to validate all downloads and return a report."""
validator = DownloadValidator(download_manager)
return validator.generate_validation_report()