mirror of
https://github.com/Benexl/FastAnime.git
synced 2025-12-12 15:50:01 -08:00
fix: logical issues with registry
This commit is contained in:
@@ -6,8 +6,8 @@ from typing import Dict, Literal, Optional
|
||||
|
||||
from pydantic import BaseModel, Field, computed_field
|
||||
|
||||
from ....libs.media_api.types import MediaItem, UserMediaListStatus
|
||||
from ....core.utils import converter
|
||||
from ....libs.media_api.types import MediaItem, UserMediaListStatus
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -30,15 +30,15 @@ class MediaEpisode(BaseModel):
|
||||
download_status: DownloadStatus = DownloadStatus.NOT_DOWNLOADED
|
||||
file_path: Path
|
||||
download_date: datetime = Field(default_factory=datetime.now)
|
||||
|
||||
|
||||
# Additional download metadata
|
||||
file_size: Optional[int] = None # File size in bytes
|
||||
quality: Optional[str] = None # Download quality (e.g., "1080p", "720p")
|
||||
quality: Optional[str] = None # Download quality (e.g., "1080p", "720p")
|
||||
provider_name: Optional[str] = None # Name of the provider used
|
||||
server_name: Optional[str] = None # Name of the server used
|
||||
server_name: Optional[str] = None # Name of the server used
|
||||
subtitle_paths: list[Path] = Field(default_factory=list) # Paths to subtitle files
|
||||
download_attempts: int = 0 # Number of download attempts
|
||||
last_error: Optional[str] = None # Last error message if failed
|
||||
download_attempts: int = 0 # Number of download attempts
|
||||
last_error: Optional[str] = None # Last error message if failed
|
||||
|
||||
|
||||
class MediaRecord(BaseModel):
|
||||
@@ -91,7 +91,7 @@ class MediaRegistryIndex(BaseModel):
|
||||
"""Get breakdown by user status."""
|
||||
breakdown = {}
|
||||
for entry in self.media_index.values():
|
||||
breakdown[entry.status] = breakdown.get(entry.status, 0) + 1
|
||||
breakdown[entry.status.value] = breakdown.get(entry.status.value, 0) + 1
|
||||
return breakdown
|
||||
|
||||
@computed_field
|
||||
|
||||
@@ -2,7 +2,7 @@ import json
|
||||
import logging
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Dict, Generator, List, Optional
|
||||
from typing import TYPE_CHECKING, Dict, Generator, List, Optional, TypedDict
|
||||
|
||||
from ....core.config.model import MediaRegistryConfig
|
||||
from ....core.exceptions import FastAnimeError
|
||||
@@ -17,12 +17,19 @@ from ....libs.media_api.types import (
|
||||
)
|
||||
from .models import (
|
||||
REGISTRY_VERSION,
|
||||
DownloadStatus,
|
||||
MediaRecord,
|
||||
MediaRegistryIndex,
|
||||
MediaRegistryIndexEntry,
|
||||
DownloadStatus
|
||||
)
|
||||
|
||||
|
||||
class StatBreakdown(TypedDict):
|
||||
total_media_breakdown: Dict[int, int]
|
||||
status_breakdown: Dict[int, int]
|
||||
last_updated: str
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -205,26 +212,7 @@ class MediaRegistryService:
|
||||
for entry in sorted_entries:
|
||||
record = self.get_media_record(entry.media_id)
|
||||
if record:
|
||||
# Create UserListItem from index entry
|
||||
user_list_item = UserListItem(
|
||||
status=entry.status,
|
||||
progress=int(entry.progress) if entry.progress.isdigit() else 0,
|
||||
score=entry.score,
|
||||
repeat=entry.repeat,
|
||||
notes=entry.notes,
|
||||
start_date=entry.start_date,
|
||||
completed_at=entry.completed_at
|
||||
)
|
||||
|
||||
# Create new MediaItem with user status
|
||||
media_with_status = MediaItem(
|
||||
**record.media_item.model_dump(),
|
||||
user_status=user_list_item
|
||||
)
|
||||
recent_media.append(media_with_status)
|
||||
# if len(recent_media) == limit:
|
||||
# break
|
||||
|
||||
recent_media.append(record.media_item)
|
||||
page_info = PageInfo(
|
||||
total=len(sorted_entries),
|
||||
)
|
||||
@@ -233,143 +221,185 @@ class MediaRegistryService:
|
||||
def search_for_media(self, params: MediaSearchParams) -> MediaSearchResult:
|
||||
"""Search for media in the local registry based on search parameters."""
|
||||
from ....libs.media_api.types import MediaSearchResult, PageInfo
|
||||
|
||||
|
||||
index = self._load_index()
|
||||
all_media: List[MediaItem] = []
|
||||
|
||||
|
||||
# Get all media records and attach user status
|
||||
for entry in index.media_index.values():
|
||||
record = self.get_media_record(entry.media_id)
|
||||
if record:
|
||||
# Create UserListItem from index entry
|
||||
user_list_item = UserListItem(
|
||||
status=entry.status,
|
||||
progress=int(entry.progress) if entry.progress.isdigit() else 0,
|
||||
score=entry.score,
|
||||
repeat=entry.repeat,
|
||||
notes=entry.notes,
|
||||
start_date=entry.start_date,
|
||||
completed_at=entry.completed_at
|
||||
)
|
||||
|
||||
# Create new MediaItem with user status
|
||||
media_with_status = MediaItem(
|
||||
**record.media_item.model_dump(),
|
||||
user_status=user_list_item
|
||||
)
|
||||
all_media.append(media_with_status)
|
||||
|
||||
all_media.append(record.media_item)
|
||||
|
||||
# Apply filters based on search parameters
|
||||
filtered_media = self._apply_search_filters(all_media, params, index)
|
||||
|
||||
|
||||
# Apply sorting
|
||||
sorted_media = self._apply_sorting(filtered_media, params, index)
|
||||
|
||||
|
||||
# Apply pagination
|
||||
page = params.page or 1
|
||||
per_page = params.per_page or 15
|
||||
start_idx = (page - 1) * per_page
|
||||
end_idx = start_idx + per_page
|
||||
|
||||
|
||||
paginated_media = sorted_media[start_idx:end_idx]
|
||||
|
||||
|
||||
page_info = PageInfo(
|
||||
total=len(sorted_media),
|
||||
current_page=page,
|
||||
has_next_page=end_idx < len(sorted_media),
|
||||
per_page=per_page
|
||||
per_page=per_page,
|
||||
)
|
||||
|
||||
|
||||
return MediaSearchResult(page_info=page_info, media=paginated_media)
|
||||
|
||||
def _apply_search_filters(self, media_list: List[MediaItem], params: MediaSearchParams, index) -> List[MediaItem]:
|
||||
def _apply_search_filters(
|
||||
self, media_list: List[MediaItem], params: MediaSearchParams, index
|
||||
) -> List[MediaItem]:
|
||||
"""Apply search filters to media list."""
|
||||
filtered = media_list.copy()
|
||||
|
||||
|
||||
# Query filter (search in title)
|
||||
if params.query:
|
||||
query_lower = params.query.lower()
|
||||
filtered = [
|
||||
media for media in filtered
|
||||
if (query_lower in media.title.english.lower() if media.title.english else False) or
|
||||
(query_lower in media.title.romaji.lower() if media.title.romaji else False) or
|
||||
(query_lower in media.title.native.lower() if media.title.native else False) or
|
||||
any(query_lower in synonym.lower() for synonym in media.synonymns)
|
||||
media
|
||||
for media in filtered
|
||||
if (
|
||||
query_lower in media.title.english.lower()
|
||||
if media.title.english
|
||||
else False
|
||||
)
|
||||
or (
|
||||
query_lower in media.title.romaji.lower()
|
||||
if media.title.romaji
|
||||
else False
|
||||
)
|
||||
or (
|
||||
query_lower in media.title.native.lower()
|
||||
if media.title.native
|
||||
else False
|
||||
)
|
||||
or any(query_lower in synonym.lower() for synonym in media.synonymns)
|
||||
]
|
||||
|
||||
|
||||
# Status filters
|
||||
if params.status:
|
||||
filtered = [media for media in filtered if media.status == params.status]
|
||||
if params.status_in:
|
||||
filtered = [media for media in filtered if media.status in params.status_in]
|
||||
if params.status_not_in:
|
||||
filtered = [media for media in filtered if media.status not in params.status_not_in]
|
||||
|
||||
filtered = [
|
||||
media for media in filtered if media.status not in params.status_not_in
|
||||
]
|
||||
|
||||
# Genre filters
|
||||
if params.genre_in:
|
||||
filtered = [media for media in filtered if any(genre in media.genres for genre in params.genre_in)]
|
||||
filtered = [
|
||||
media
|
||||
for media in filtered
|
||||
if any(genre in media.genres for genre in params.genre_in)
|
||||
]
|
||||
if params.genre_not_in:
|
||||
filtered = [media for media in filtered if not any(genre in media.genres for genre in params.genre_not_in)]
|
||||
|
||||
filtered = [
|
||||
media
|
||||
for media in filtered
|
||||
if not any(genre in media.genres for genre in params.genre_not_in)
|
||||
]
|
||||
|
||||
# Tag filters
|
||||
if params.tag_in:
|
||||
media_tags = [tag.name for media in filtered for tag in media.tags]
|
||||
filtered = [media for media in filtered if any(tag in media_tags for tag in params.tag_in)]
|
||||
filtered = [
|
||||
media
|
||||
for media in filtered
|
||||
if any(tag in media_tags for tag in params.tag_in)
|
||||
]
|
||||
if params.tag_not_in:
|
||||
media_tags = [tag.name for media in filtered for tag in media.tags]
|
||||
filtered = [media for media in filtered if not any(tag in media_tags for tag in params.tag_not_in)]
|
||||
|
||||
filtered = [
|
||||
media
|
||||
for media in filtered
|
||||
if not any(tag in media_tags for tag in params.tag_not_in)
|
||||
]
|
||||
|
||||
# Format filter
|
||||
if params.format_in:
|
||||
filtered = [media for media in filtered if media.format in params.format_in]
|
||||
|
||||
|
||||
# Type filter
|
||||
if params.type:
|
||||
filtered = [media for media in filtered if media.type == params.type]
|
||||
|
||||
|
||||
# Score filters
|
||||
if params.averageScore_greater is not None:
|
||||
filtered = [media for media in filtered if media.average_score and media.average_score >= params.averageScore_greater]
|
||||
filtered = [
|
||||
media
|
||||
for media in filtered
|
||||
if media.average_score
|
||||
and media.average_score >= params.averageScore_greater
|
||||
]
|
||||
if params.averageScore_lesser is not None:
|
||||
filtered = [media for media in filtered if media.average_score and media.average_score <= params.averageScore_lesser]
|
||||
|
||||
filtered = [
|
||||
media
|
||||
for media in filtered
|
||||
if media.average_score
|
||||
and media.average_score <= params.averageScore_lesser
|
||||
]
|
||||
|
||||
# Popularity filters
|
||||
if params.popularity_greater is not None:
|
||||
filtered = [media for media in filtered if media.popularity and media.popularity >= params.popularity_greater]
|
||||
filtered = [
|
||||
media
|
||||
for media in filtered
|
||||
if media.popularity and media.popularity >= params.popularity_greater
|
||||
]
|
||||
if params.popularity_lesser is not None:
|
||||
filtered = [media for media in filtered if media.popularity and media.popularity <= params.popularity_lesser]
|
||||
|
||||
filtered = [
|
||||
media
|
||||
for media in filtered
|
||||
if media.popularity and media.popularity <= params.popularity_lesser
|
||||
]
|
||||
|
||||
# ID filter
|
||||
if params.id_in:
|
||||
filtered = [media for media in filtered if media.id in params.id_in]
|
||||
|
||||
|
||||
# User list filter
|
||||
if params.on_list is not None:
|
||||
if params.on_list:
|
||||
# Only show media that has user status (is on list)
|
||||
filtered = [media for media in filtered if media.user_status is not None]
|
||||
filtered = [
|
||||
media for media in filtered if media.user_status is not None
|
||||
]
|
||||
else:
|
||||
# Only show media that doesn't have user status (not on list)
|
||||
filtered = [media for media in filtered if media.user_status is None]
|
||||
|
||||
|
||||
return filtered
|
||||
|
||||
def _apply_sorting(self, media_list: List[MediaItem], params: MediaSearchParams, index) -> List[MediaItem]:
|
||||
def _apply_sorting(
|
||||
self, media_list: List[MediaItem], params: MediaSearchParams, index
|
||||
) -> List[MediaItem]:
|
||||
"""Apply sorting to media list."""
|
||||
if not params.sort:
|
||||
return media_list
|
||||
|
||||
|
||||
# Get the MediaSort value
|
||||
sort = params.sort
|
||||
if isinstance(sort, list):
|
||||
sort = sort[0] # Use first sort if multiple provided
|
||||
|
||||
|
||||
# Apply sorting based on MediaSort enum
|
||||
try:
|
||||
if sort.value == "POPULARITY_DESC":
|
||||
return sorted(media_list, key=lambda x: x.popularity or 0, reverse=True)
|
||||
elif sort.value == "SCORE_DESC":
|
||||
return sorted(media_list, key=lambda x: x.average_score or 0, reverse=True)
|
||||
return sorted(
|
||||
media_list, key=lambda x: x.average_score or 0, reverse=True
|
||||
)
|
||||
elif sort.value == "FAVOURITES_DESC":
|
||||
return sorted(media_list, key=lambda x: x.favourites or 0, reverse=True)
|
||||
elif sort.value == "TRENDING_DESC":
|
||||
@@ -380,10 +410,13 @@ class MediaRegistryService:
|
||||
def get_last_watched(media):
|
||||
entry = index.media_index.get(f"{self._media_api}_{media.id}")
|
||||
return entry.last_watched if entry else datetime.min
|
||||
|
||||
return sorted(media_list, key=get_last_watched, reverse=True)
|
||||
else:
|
||||
# Default to title sorting
|
||||
return sorted(media_list, key=lambda x: x.title.english or x.title.romaji or "")
|
||||
return sorted(
|
||||
media_list, key=lambda x: x.title.english or x.title.romaji or ""
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to apply sorting {sort}: {e}")
|
||||
return media_list
|
||||
@@ -391,63 +424,58 @@ class MediaRegistryService:
|
||||
def get_media_by_status(self, status: UserMediaListStatus) -> MediaSearchResult:
|
||||
"""Get media filtered by user status from registry."""
|
||||
index = self._load_index()
|
||||
|
||||
|
||||
# Filter entries by status
|
||||
status_entries = [
|
||||
entry for entry in index.media_index.values()
|
||||
if entry.status == status
|
||||
entry for entry in index.media_index.values() if entry.status == status
|
||||
]
|
||||
|
||||
|
||||
# Get media items for these entries
|
||||
media_list: List[MediaItem] = []
|
||||
for entry in status_entries:
|
||||
record = self.get_media_record(entry.media_id)
|
||||
if record:
|
||||
# Create UserListItem from index entry
|
||||
user_status = UserListItem(
|
||||
status=entry.status,
|
||||
progress=int(entry.progress) if entry.progress.isdigit() else 0,
|
||||
score=entry.score,
|
||||
repeat=entry.repeat,
|
||||
notes=entry.notes,
|
||||
start_date=entry.start_date,
|
||||
completed_at=entry.completed_at
|
||||
)
|
||||
|
||||
media_with_status = MediaItem(
|
||||
**record.media_item.model_dump(),
|
||||
user_status=user_status
|
||||
)
|
||||
media_list.append(media_with_status)
|
||||
|
||||
media_list.append(record.media_item)
|
||||
|
||||
# Sort by last watched
|
||||
sorted_media = sorted(
|
||||
media_list,
|
||||
key=lambda media: next(
|
||||
(entry.last_watched for entry in index.media_index.values()
|
||||
if entry.media_id == media.id),
|
||||
datetime.min
|
||||
),
|
||||
reverse=True
|
||||
media_list,
|
||||
key=lambda media_item: next(
|
||||
(
|
||||
entry.last_watched
|
||||
for entry in index.media_index.values()
|
||||
if entry.media_id == media_item.id
|
||||
),
|
||||
datetime.min,
|
||||
),
|
||||
reverse=True,
|
||||
)
|
||||
|
||||
|
||||
page_info = PageInfo(total=len(sorted_media))
|
||||
return MediaSearchResult(page_info=page_info, media=sorted_media)
|
||||
|
||||
def get_registry_stats(self) -> Dict:
|
||||
def get_registry_stats(self) -> "StatBreakdown":
|
||||
"""Get comprehensive registry statistics."""
|
||||
stats: "StatBreakdown" = {} # type: ignore
|
||||
try:
|
||||
index = self._load_index()
|
||||
|
||||
return {
|
||||
"total_media_breakdown": index.media_count_breakdown,
|
||||
"status_breakdown": index.status_breakdown,
|
||||
"last_updated": index.last_updated.strftime("%Y-%m-%d %H:%M:%S"),
|
||||
}
|
||||
stats.update(
|
||||
StatBreakdown(
|
||||
**{
|
||||
"total_media_breakdown": index.media_count_breakdown,
|
||||
"status_breakdown": index.status_breakdown,
|
||||
"last_updated": index.last_updated.strftime(
|
||||
"%Y-%m-%d %H:%M:%S"
|
||||
),
|
||||
}
|
||||
)
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get registry stats: {e}")
|
||||
return {}
|
||||
return stats
|
||||
|
||||
def get_all_media_records(self) -> Generator[MediaRecord, None, List[MediaRecord]]:
|
||||
records = []
|
||||
@@ -466,7 +494,6 @@ class MediaRegistryService:
|
||||
logger.warning(f"{self.media_registry_dir} is impure which caused: {e}")
|
||||
return records
|
||||
|
||||
|
||||
def remove_media_record(self, media_id: int):
|
||||
with self._lock:
|
||||
record_file = self._get_media_file_path(media_id)
|
||||
@@ -501,19 +528,19 @@ class MediaRegistryService:
|
||||
"""Update the download status and metadata for a specific episode."""
|
||||
try:
|
||||
from .models import DownloadStatus, MediaEpisode
|
||||
|
||||
|
||||
record = self.get_media_record(media_id)
|
||||
if not record:
|
||||
logger.error(f"No media record found for ID {media_id}")
|
||||
return False
|
||||
|
||||
|
||||
# Find existing episode or create new one
|
||||
episode_record = None
|
||||
for episode in record.media_episodes:
|
||||
if episode.episode_number == episode_number:
|
||||
episode_record = episode
|
||||
break
|
||||
|
||||
|
||||
if not episode_record:
|
||||
if not file_path:
|
||||
logger.error(f"File path required for new episode {episode_number}")
|
||||
@@ -524,7 +551,7 @@ class MediaRegistryService:
|
||||
download_status=status,
|
||||
)
|
||||
record.media_episodes.append(episode_record)
|
||||
|
||||
|
||||
# Update episode metadata
|
||||
episode_record.download_status = status
|
||||
if file_path:
|
||||
@@ -541,14 +568,14 @@ class MediaRegistryService:
|
||||
episode_record.subtitle_paths = subtitle_paths
|
||||
if error_message:
|
||||
episode_record.last_error = error_message
|
||||
|
||||
|
||||
# Increment download attempts if this is a failure
|
||||
if status == DownloadStatus.FAILED:
|
||||
episode_record.download_attempts += 1
|
||||
|
||||
|
||||
# Save the updated record
|
||||
return self.save_media_record(record)
|
||||
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to update episode download status: {e}")
|
||||
return False
|
||||
@@ -559,14 +586,14 @@ class MediaRegistryService:
|
||||
"""Get all episodes with a specific download status."""
|
||||
try:
|
||||
from .models import DownloadStatus
|
||||
|
||||
|
||||
episodes = []
|
||||
for record in self.get_all_media_records():
|
||||
for episode in record.media_episodes:
|
||||
if episode.download_status == status:
|
||||
episodes.append((record.media_item.id, episode.episode_number))
|
||||
return episodes
|
||||
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get episodes by status: {e}")
|
||||
return []
|
||||
@@ -575,7 +602,7 @@ class MediaRegistryService:
|
||||
"""Get comprehensive download statistics."""
|
||||
try:
|
||||
from .models import DownloadStatus
|
||||
|
||||
|
||||
stats = {
|
||||
"total_episodes": 0,
|
||||
"downloaded": 0,
|
||||
@@ -587,11 +614,11 @@ class MediaRegistryService:
|
||||
"by_quality": {},
|
||||
"by_provider": {},
|
||||
}
|
||||
|
||||
|
||||
for record in self.get_all_media_records():
|
||||
for episode in record.media_episodes:
|
||||
stats["total_episodes"] += 1
|
||||
|
||||
|
||||
# Count by status
|
||||
status_key = episode.download_status.value.lower()
|
||||
if status_key == "completed":
|
||||
@@ -604,25 +631,25 @@ class MediaRegistryService:
|
||||
stats["downloading"] += 1
|
||||
elif status_key == "paused":
|
||||
stats["paused"] += 1
|
||||
|
||||
|
||||
# Aggregate file sizes
|
||||
if episode.file_size:
|
||||
stats["total_size_bytes"] += episode.file_size
|
||||
|
||||
|
||||
# Count by quality
|
||||
if episode.quality:
|
||||
stats["by_quality"][episode.quality] = (
|
||||
stats["by_quality"].get(episode.quality, 0) + 1
|
||||
)
|
||||
|
||||
|
||||
# Count by provider
|
||||
if episode.provider_name:
|
||||
stats["by_provider"][episode.provider_name] = (
|
||||
stats["by_provider"].get(episode.provider_name, 0) + 1
|
||||
)
|
||||
|
||||
|
||||
return stats
|
||||
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get download statistics: {e}")
|
||||
return {}
|
||||
|
||||
Reference in New Issue
Block a user