mirror of
https://github.com/Benexl/FastAnime.git
synced 2025-12-12 15:50:01 -08:00
feat: Add downloads menu and related actions for local media management
This commit is contained in:
232
fastanime/cli/interactive/menu/media/downloads.py
Normal file
232
fastanime/cli/interactive/menu/media/downloads.py
Normal file
@@ -0,0 +1,232 @@
|
||||
import logging
|
||||
import random
|
||||
from typing import Callable, Dict
|
||||
|
||||
from .....libs.media_api.params import MediaSearchParams
|
||||
from .....libs.media_api.types import (
|
||||
MediaSort,
|
||||
MediaStatus,
|
||||
UserMediaListStatus,
|
||||
)
|
||||
from ...session import Context, session
|
||||
from ...state import InternalDirective, MediaApiState, MenuName, State
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
MenuAction = Callable[[], State | InternalDirective]
|
||||
|
||||
|
||||
@session.menu
|
||||
def downloads(ctx: Context, state: State) -> State | InternalDirective:
|
||||
"""Downloads menu showing locally stored media from registry."""
|
||||
icons = ctx.config.general.icons
|
||||
feedback = ctx.service.feedback
|
||||
feedback.clear_console()
|
||||
|
||||
options: Dict[str, MenuAction] = {
|
||||
f"{'🔥 ' if icons else ''}Trending (Local)": _create_local_media_list_action(
|
||||
ctx, state, MediaSort.TRENDING_DESC
|
||||
),
|
||||
f"{'🎞️ ' if icons else ''}Recent (Local)": _create_local_recent_media_action(ctx, state),
|
||||
f"{'📺 ' if icons else ''}Watching (Local)": _create_local_status_action(
|
||||
ctx, state, UserMediaListStatus.WATCHING
|
||||
),
|
||||
f"{'🔁 ' if icons else ''}Rewatching (Local)": _create_local_status_action(
|
||||
ctx, state, UserMediaListStatus.REPEATING
|
||||
),
|
||||
f"{'⏸️ ' if icons else ''}Paused (Local)": _create_local_status_action(
|
||||
ctx, state, UserMediaListStatus.PAUSED
|
||||
),
|
||||
f"{'📑 ' if icons else ''}Planned (Local)": _create_local_status_action(
|
||||
ctx, state, UserMediaListStatus.PLANNING
|
||||
),
|
||||
f"{'🔎 ' if icons else ''}Search (Local)": _create_local_search_media_list(ctx, state),
|
||||
f"{'🔔 ' if icons else ''}Recently Updated (Local)": _create_local_media_list_action(
|
||||
ctx, state, MediaSort.UPDATED_AT_DESC
|
||||
),
|
||||
f"{'✨ ' if icons else ''}Popular (Local)": _create_local_media_list_action(
|
||||
ctx, state, MediaSort.POPULARITY_DESC
|
||||
),
|
||||
f"{'💯 ' if icons else ''}Top Scored (Local)": _create_local_media_list_action(
|
||||
ctx, state, MediaSort.SCORE_DESC
|
||||
),
|
||||
f"{'💖 ' if icons else ''}Favourites (Local)": _create_local_media_list_action(
|
||||
ctx, state, MediaSort.FAVOURITES_DESC
|
||||
),
|
||||
f"{'🎲 ' if icons else ''}Random (Local)": _create_local_random_media_list(ctx, state),
|
||||
f"{'🎬 ' if icons else ''}Upcoming (Local)": _create_local_media_list_action(
|
||||
ctx, state, MediaSort.POPULARITY_DESC, MediaStatus.NOT_YET_RELEASED
|
||||
),
|
||||
f"{'✅ ' if icons else ''}Completed (Local)": _create_local_status_action(
|
||||
ctx, state, UserMediaListStatus.COMPLETED
|
||||
),
|
||||
f"{'🚮 ' if icons else ''}Dropped (Local)": _create_local_status_action(
|
||||
ctx, state, UserMediaListStatus.DROPPED
|
||||
),
|
||||
f"{'↩️ ' if icons else ''}Back to Main": lambda: InternalDirective.BACK,
|
||||
f"{'❌ ' if icons else ''}Exit": lambda: InternalDirective.EXIT,
|
||||
}
|
||||
|
||||
choice = ctx.selector.choose(
|
||||
prompt="Select Downloads Category",
|
||||
choices=list(options.keys()),
|
||||
)
|
||||
if not choice:
|
||||
return InternalDirective.BACK
|
||||
|
||||
selected_action = options[choice]
|
||||
next_step = selected_action()
|
||||
return next_step
|
||||
|
||||
|
||||
def _create_local_media_list_action(
|
||||
ctx: Context, state: State, sort: MediaSort, status: MediaStatus | None = None
|
||||
) -> MenuAction:
|
||||
"""Create action for searching local media with sorting and optional status filter."""
|
||||
def action():
|
||||
feedback = ctx.service.feedback
|
||||
search_params = MediaSearchParams(sort=sort, status=status)
|
||||
|
||||
loading_message = "Searching local media registry"
|
||||
result = None
|
||||
with feedback.progress(loading_message):
|
||||
result = ctx.service.media_registry.search_for_media(search_params)
|
||||
|
||||
if result and result.media:
|
||||
return State(
|
||||
menu_name=MenuName.RESULTS,
|
||||
media_api=MediaApiState(
|
||||
search_result={
|
||||
media_item.id: media_item for media_item in result.media
|
||||
},
|
||||
search_params=search_params,
|
||||
page_info=result.page_info,
|
||||
),
|
||||
)
|
||||
else:
|
||||
feedback.info("No media found in local registry")
|
||||
return InternalDirective.BACK
|
||||
|
||||
return action
|
||||
|
||||
|
||||
def _create_local_random_media_list(ctx: Context, state: State) -> MenuAction:
|
||||
"""Create action for getting random local media."""
|
||||
def action():
|
||||
feedback = ctx.service.feedback
|
||||
|
||||
loading_message = "Getting random local media"
|
||||
with feedback.progress(loading_message):
|
||||
# Get all records and pick random ones
|
||||
all_records = list(ctx.service.media_registry.get_all_media_records())
|
||||
|
||||
if not all_records:
|
||||
feedback.info("No media found in local registry")
|
||||
return InternalDirective.BACK
|
||||
|
||||
# Get up to 50 random records
|
||||
random_records = random.sample(all_records, min(50, len(all_records)))
|
||||
random_ids = [record.media_item.id for record in random_records]
|
||||
|
||||
search_params = MediaSearchParams(id_in=random_ids)
|
||||
result = ctx.service.media_registry.search_for_media(search_params)
|
||||
|
||||
if result and result.media:
|
||||
return State(
|
||||
menu_name=MenuName.RESULTS,
|
||||
media_api=MediaApiState(
|
||||
search_result={
|
||||
media_item.id: media_item for media_item in result.media
|
||||
},
|
||||
search_params=search_params,
|
||||
page_info=result.page_info,
|
||||
),
|
||||
)
|
||||
else:
|
||||
feedback.info("No media found in local registry")
|
||||
return InternalDirective.BACK
|
||||
|
||||
return action
|
||||
|
||||
|
||||
def _create_local_search_media_list(ctx: Context, state: State) -> MenuAction:
|
||||
"""Create action for searching local media by query."""
|
||||
def action():
|
||||
feedback = ctx.service.feedback
|
||||
|
||||
query = ctx.selector.ask("Search Local Anime")
|
||||
if not query:
|
||||
return InternalDirective.BACK
|
||||
|
||||
search_params = MediaSearchParams(query=query)
|
||||
|
||||
loading_message = "Searching local media registry"
|
||||
result = None
|
||||
with feedback.progress(loading_message):
|
||||
result = ctx.service.media_registry.search_for_media(search_params)
|
||||
|
||||
if result and result.media:
|
||||
return State(
|
||||
menu_name=MenuName.RESULTS,
|
||||
media_api=MediaApiState(
|
||||
search_result={
|
||||
media_item.id: media_item for media_item in result.media
|
||||
},
|
||||
search_params=search_params,
|
||||
page_info=result.page_info,
|
||||
),
|
||||
)
|
||||
else:
|
||||
feedback.info("No media found in local registry")
|
||||
return InternalDirective.BACK
|
||||
|
||||
return action
|
||||
|
||||
|
||||
def _create_local_status_action(
|
||||
ctx: Context, state: State, status: UserMediaListStatus
|
||||
) -> MenuAction:
|
||||
"""Create action for getting local media by user status."""
|
||||
def action():
|
||||
feedback = ctx.service.feedback
|
||||
|
||||
loading_message = f"Getting {status.value} media from local registry"
|
||||
result = None
|
||||
with feedback.progress(loading_message):
|
||||
result = ctx.service.media_registry.get_media_by_status(status)
|
||||
|
||||
if result and result.media:
|
||||
return State(
|
||||
menu_name=MenuName.RESULTS,
|
||||
media_api=MediaApiState(
|
||||
search_result={
|
||||
media_item.id: media_item for media_item in result.media
|
||||
},
|
||||
page_info=result.page_info,
|
||||
),
|
||||
)
|
||||
else:
|
||||
feedback.info(f"No {status.value} media found in local registry")
|
||||
return InternalDirective.BACK
|
||||
|
||||
return action
|
||||
|
||||
|
||||
def _create_local_recent_media_action(ctx: Context, state: State) -> MenuAction:
|
||||
"""Create action for getting recently watched local media."""
|
||||
def action():
|
||||
result = ctx.service.media_registry.get_recently_watched()
|
||||
if result and result.media:
|
||||
return State(
|
||||
menu_name=MenuName.RESULTS,
|
||||
media_api=MediaApiState(
|
||||
search_result={
|
||||
media_item.id: media_item for media_item in result.media
|
||||
},
|
||||
page_info=result.page_info,
|
||||
),
|
||||
)
|
||||
else:
|
||||
ctx.service.feedback.info("No recently watched media found in local registry")
|
||||
return InternalDirective.BACK
|
||||
|
||||
return action
|
||||
@@ -39,7 +39,8 @@ def main(ctx: Context, state: State) -> State | InternalDirective:
|
||||
ctx, state, UserMediaListStatus.PLANNING
|
||||
),
|
||||
f"{'🔎 ' if icons else ''}Search": _create_search_media_list(ctx, state),
|
||||
f"{'🔔 ' if icons else ''}Recently Updated": _create_media_list_action(
|
||||
f"{'<EFBFBD> ' if icons else ''}Downloads": _create_downloads_action(ctx, state),
|
||||
f"{'<EFBFBD>🔔 ' if icons else ''}Recently Updated": _create_media_list_action(
|
||||
ctx, state, MediaSort.UPDATED_AT_DESC
|
||||
),
|
||||
f"{'✨ ' if icons else ''}Popular": _create_media_list_action(
|
||||
@@ -218,3 +219,11 @@ def _create_recent_media_action(ctx: Context, state: State) -> MenuAction:
|
||||
return InternalDirective.MAIN
|
||||
|
||||
return action
|
||||
|
||||
|
||||
def _create_downloads_action(ctx: Context, state: State) -> MenuAction:
|
||||
"""Create action to navigate to the downloads menu."""
|
||||
def action():
|
||||
return State(menu_name=MenuName.DOWNLOADS)
|
||||
|
||||
return action
|
||||
|
||||
@@ -33,7 +33,6 @@ class Services:
|
||||
watch_history: WatchHistoryService
|
||||
session: SessionsService
|
||||
auth: AuthService
|
||||
download: "DownloadService"
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
|
||||
@@ -39,6 +39,7 @@ class MenuName(Enum):
|
||||
USER_MEDIA_LIST = "USER_MEDIA_LIST"
|
||||
SESSION_MANAGEMENT = "SESSION_MANAGEMENT"
|
||||
MEDIA_ACTIONS = "MEDIA_ACTIONS"
|
||||
DOWNLOADS = "DOWNLOADS"
|
||||
|
||||
|
||||
class StateModel(BaseModel):
|
||||
|
||||
@@ -1,277 +0,0 @@
|
||||
from typing import List
|
||||
|
||||
from ....libs.media_api.params import MediaSearchParams
|
||||
from ....libs.media_api.types import MediaItem
|
||||
|
||||
|
||||
class MediaFilter:
|
||||
"""
|
||||
A class to filter, sort, and paginate a list of MediaItem objects
|
||||
based on ApiSearchParams.
|
||||
"""
|
||||
|
||||
# Mapping for season to month range (MMDD format)
|
||||
_SEASON_MONTH_RANGES = {
|
||||
"WINTER": (101, 331), # Jan 1 - Mar 31
|
||||
"SPRING": (401, 630), # Apr 1 - Jun 30
|
||||
"SUMMER": (701, 930), # Jul 1 - Sep 30
|
||||
"FALL": (1001, 1231), # Oct 1 - Dec 31
|
||||
}
|
||||
|
||||
# Mapping for sort parameters to MediaItem attributes and order
|
||||
# (attribute_name, is_descending, is_nested_title_field)
|
||||
_SORT_MAPPING = {
|
||||
"ID": ("id", False, False),
|
||||
"ID_DESC": ("id", True, False),
|
||||
"POPULARITY": ("popularity", False, False),
|
||||
"POPULARITY_DESC": ("popularity", True, False),
|
||||
"SCORE": ("average_score", False, False),
|
||||
"SCORE_DESC": ("average_score", True, False),
|
||||
"TITLE_ROMAJI": ("romaji", False, True), # Nested under title
|
||||
"TITLE_ROMAJI_DESC": ("romaji", True, True),
|
||||
"TITLE_ENGLISH": ("english", False, True),
|
||||
"TITLE_ENGLISH_DESC": ("english", True, True),
|
||||
"START_DATE": ("start_date", False, False),
|
||||
"START_DATE_DESC": ("start_date", True, False),
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def apply(
|
||||
cls, media_items: List[MediaItem], filters: MediaSearchParams
|
||||
) -> List[MediaItem]:
|
||||
"""
|
||||
Applies filtering, sorting, and pagination to a list of MediaItem objects.
|
||||
|
||||
Args:
|
||||
media_items: The initial list of MediaItem objects to filter.
|
||||
params: An ApiSearchParams object containing the filter, sort, and pagination criteria.
|
||||
|
||||
Returns:
|
||||
A new list of MediaItem objects, filtered, sorted, and paginated.
|
||||
"""
|
||||
filtered_items = list(media_items) # Create a mutable copy
|
||||
|
||||
if filters.query:
|
||||
query_lower = filters.query.lower()
|
||||
filtered_items = [
|
||||
item
|
||||
for item in filtered_items
|
||||
if (
|
||||
item.title
|
||||
and (
|
||||
(item.title.romaji and query_lower in item.title.romaji.lower())
|
||||
or (
|
||||
item.title.english
|
||||
and query_lower in item.title.english.lower()
|
||||
)
|
||||
or (
|
||||
item.title.native
|
||||
and query_lower in item.title.native.lower()
|
||||
)
|
||||
)
|
||||
)
|
||||
or (item.description and query_lower in item.description.lower())
|
||||
or any(query_lower in syn.lower() for syn in item.synonymns)
|
||||
]
|
||||
|
||||
# IDs
|
||||
if filters.id_in:
|
||||
id_set = set(filters.id_in)
|
||||
filtered_items = [item for item in filtered_items if item.id in id_set]
|
||||
|
||||
# Genres
|
||||
if filters.genre_in:
|
||||
genre_in_set = set(g.lower() for g in filters.genre_in)
|
||||
filtered_items = [
|
||||
item
|
||||
for item in filtered_items
|
||||
if any(g.lower() in genre_in_set for g in item.genres)
|
||||
]
|
||||
if filters.genre_not_in:
|
||||
genre_not_in_set = set(g.lower() for g in filters.genre_not_in)
|
||||
filtered_items = [
|
||||
item
|
||||
for item in filtered_items
|
||||
if not any(g.lower() in genre_not_in_set for g in item.genres)
|
||||
]
|
||||
|
||||
# Tags
|
||||
if filters.tag_in:
|
||||
tag_in_set = set(t.lower() for t in filters.tag_in)
|
||||
filtered_items = [
|
||||
item
|
||||
for item in filtered_items
|
||||
if any(tag.name and tag.name.lower() in tag_in_set for tag in item.tags)
|
||||
]
|
||||
if filters.tag_not_in:
|
||||
tag_not_in_set = set(t.lower() for t in filters.tag_not_in)
|
||||
filtered_items = [
|
||||
item
|
||||
for item in filtered_items
|
||||
if not any(
|
||||
tag.name and tag.name.lower() in tag_not_in_set for tag in item.tags
|
||||
)
|
||||
]
|
||||
|
||||
# Status
|
||||
combined_status_in = set()
|
||||
if filters.status_in:
|
||||
combined_status_in.update(s.upper() for s in filters.status_in)
|
||||
if filters.status:
|
||||
combined_status_in.add(filters.status.upper())
|
||||
|
||||
if combined_status_in:
|
||||
filtered_items = [
|
||||
item
|
||||
for item in filtered_items
|
||||
if item.status and item.status.upper() in combined_status_in
|
||||
]
|
||||
if filters.status_not_in:
|
||||
status_not_in_set = set(s.upper() for s in filters.status_not_in)
|
||||
filtered_items = [
|
||||
item
|
||||
for item in filtered_items
|
||||
if item.status and item.status.upper() not in status_not_in_set
|
||||
]
|
||||
|
||||
# Popularity
|
||||
if filters.popularity_greater is not None:
|
||||
filtered_items = [
|
||||
item
|
||||
for item in filtered_items
|
||||
if item.popularity is not None
|
||||
and item.popularity > filters.popularity_greater
|
||||
]
|
||||
if filters.popularity_lesser is not None:
|
||||
filtered_items = [
|
||||
item
|
||||
for item in filtered_items
|
||||
if item.popularity is not None
|
||||
and item.popularity < filters.popularity_lesser
|
||||
]
|
||||
|
||||
# Average Score
|
||||
if filters.averageScore_greater is not None:
|
||||
filtered_items = [
|
||||
item
|
||||
for item in filtered_items
|
||||
if item.average_score is not None
|
||||
and item.average_score > filters.averageScore_greater
|
||||
]
|
||||
if filters.averageScore_lesser is not None:
|
||||
filtered_items = [
|
||||
item
|
||||
for item in filtered_items
|
||||
if item.average_score is not None
|
||||
and item.average_score < filters.averageScore_lesser
|
||||
]
|
||||
|
||||
# Date Filtering (combining season/year with startDate parameters)
|
||||
effective_start_date_greater = filters.startDate_greater
|
||||
effective_start_date_lesser = filters.startDate_lesser
|
||||
|
||||
if filters.seasonYear is not None and filters.season is not None:
|
||||
season_range = cls._SEASON_MONTH_RANGES.get(filters.season.upper())
|
||||
if season_range:
|
||||
# Calculate start and end of the season in YYYYMMDD format
|
||||
season_start_date = filters.seasonYear * 10000 + season_range[0]
|
||||
season_end_date = filters.seasonYear * 10000 + season_range[1]
|
||||
|
||||
# Combine with existing startDate_greater/lesser, taking the stricter boundary
|
||||
effective_start_date_greater = max(
|
||||
effective_start_date_greater or 0, season_start_date
|
||||
)
|
||||
effective_start_date_lesser = min(
|
||||
effective_start_date_lesser or 99999999, season_end_date
|
||||
)
|
||||
|
||||
# TODO: re enable date filtering since date is a datetime
|
||||
|
||||
# if filters.startDate is not None:
|
||||
# # If a specific start date is given, it overrides ranges for exact match
|
||||
# filtered_items = [
|
||||
# item for item in filtered_items if item.start_date == filters.startDate
|
||||
# ]
|
||||
# else:
|
||||
# if effective_start_date_greater is not None:
|
||||
# filtered_items = [
|
||||
# item
|
||||
# for item in filtered_items
|
||||
# if item.start_date is not None
|
||||
# and item.start_date >= datetime(y,m,d)
|
||||
# ]
|
||||
# if effective_start_date_lesser is not None:
|
||||
# filtered_items = [
|
||||
# item
|
||||
# for item in filtered_items
|
||||
# if item.start_date is not None
|
||||
# and item.start_date <= effective_start_date_lesser
|
||||
# ]
|
||||
|
||||
# if filters.endDate_greater is not None:
|
||||
# filtered_items = [
|
||||
# item
|
||||
# for item in filtered_items
|
||||
# if item.end_date is not None
|
||||
# and item.end_date >= filters.endDate_greater
|
||||
# ]
|
||||
# if filters.endDate_lesser is not None:
|
||||
# filtered_items = [
|
||||
# item
|
||||
# for item in filtered_items
|
||||
# if item.end_date is not None and item.end_date <= filters.endDate_lesser
|
||||
# ]
|
||||
|
||||
# Format and Type
|
||||
if filters.format_in:
|
||||
format_in_set = set(f.upper() for f in filters.format_in)
|
||||
filtered_items = [
|
||||
item
|
||||
for item in filtered_items
|
||||
if item.format and item.format.upper() in format_in_set
|
||||
]
|
||||
if filters.type:
|
||||
filtered_items = [
|
||||
item
|
||||
for item in filtered_items
|
||||
if item.type and item.type.upper() == filters.type.upper()
|
||||
]
|
||||
|
||||
# --- 2. Apply Sorting ---
|
||||
if filters.sort:
|
||||
sort_criteria = (
|
||||
[filters.sort] if isinstance(filters.sort, str) else filters.sort
|
||||
)
|
||||
|
||||
# Sort in reverse order of criteria so the first criterion is primary
|
||||
for sort_param in reversed(sort_criteria):
|
||||
sort_info = cls._SORT_MAPPING.get(sort_param.upper())
|
||||
if sort_info:
|
||||
attr_name, is_descending, is_nested_title = sort_info
|
||||
|
||||
def sort_key(item: MediaItem):
|
||||
if is_nested_title:
|
||||
# Handle nested title attributes
|
||||
title_obj = item.title
|
||||
if title_obj and hasattr(title_obj, attr_name):
|
||||
val = getattr(title_obj, attr_name)
|
||||
return val.lower() if isinstance(val, str) else val
|
||||
return None # Handle missing title or attribute gracefully
|
||||
else:
|
||||
# Handle direct attributes
|
||||
return getattr(item, attr_name)
|
||||
|
||||
# Sort, handling None values (None typically sorts first in ascending)
|
||||
filtered_items.sort(
|
||||
key=lambda item: (sort_key(item) is None, sort_key(item)),
|
||||
reverse=is_descending,
|
||||
)
|
||||
else:
|
||||
print(f"Warning: Unknown sort parameter '{sort_param}'. Skipping.")
|
||||
|
||||
# --- 3. Apply Pagination ---
|
||||
start_index = (filters.page - 1) * filters.per_page
|
||||
end_index = start_index + filters.per_page
|
||||
paginated_items = filtered_items[start_index:end_index]
|
||||
|
||||
return paginated_items
|
||||
@@ -12,14 +12,15 @@ from ....libs.media_api.types import (
|
||||
MediaItem,
|
||||
MediaSearchResult,
|
||||
PageInfo,
|
||||
UserListItem,
|
||||
UserMediaListStatus,
|
||||
)
|
||||
from .filters import MediaFilter
|
||||
from .models import (
|
||||
REGISTRY_VERSION,
|
||||
MediaRecord,
|
||||
MediaRegistryIndex,
|
||||
MediaRegistryIndexEntry,
|
||||
DownloadStatus
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -204,7 +205,23 @@ class MediaRegistryService:
|
||||
for entry in sorted_entries:
|
||||
record = self.get_media_record(entry.media_id)
|
||||
if record:
|
||||
recent_media.append(record.media_item)
|
||||
# Create UserListItem from index entry
|
||||
user_list_item = UserListItem(
|
||||
status=entry.status,
|
||||
progress=int(entry.progress) if entry.progress.isdigit() else 0,
|
||||
score=entry.score,
|
||||
repeat=entry.repeat,
|
||||
notes=entry.notes,
|
||||
start_date=entry.start_date,
|
||||
completed_at=entry.completed_at
|
||||
)
|
||||
|
||||
# Create new MediaItem with user status
|
||||
media_with_status = MediaItem(
|
||||
**record.media_item.model_dump(),
|
||||
user_status=user_list_item
|
||||
)
|
||||
recent_media.append(media_with_status)
|
||||
# if len(recent_media) == limit:
|
||||
# break
|
||||
|
||||
@@ -213,6 +230,210 @@ class MediaRegistryService:
|
||||
)
|
||||
return MediaSearchResult(page_info=page_info, media=recent_media)
|
||||
|
||||
def search_for_media(self, params: MediaSearchParams) -> MediaSearchResult:
|
||||
"""Search for media in the local registry based on search parameters."""
|
||||
from ....libs.media_api.types import MediaSearchResult, PageInfo
|
||||
|
||||
index = self._load_index()
|
||||
all_media: List[MediaItem] = []
|
||||
|
||||
# Get all media records and attach user status
|
||||
for entry in index.media_index.values():
|
||||
record = self.get_media_record(entry.media_id)
|
||||
if record:
|
||||
# Create UserListItem from index entry
|
||||
user_list_item = UserListItem(
|
||||
status=entry.status,
|
||||
progress=int(entry.progress) if entry.progress.isdigit() else 0,
|
||||
score=entry.score,
|
||||
repeat=entry.repeat,
|
||||
notes=entry.notes,
|
||||
start_date=entry.start_date,
|
||||
completed_at=entry.completed_at
|
||||
)
|
||||
|
||||
# Create new MediaItem with user status
|
||||
media_with_status = MediaItem(
|
||||
**record.media_item.model_dump(),
|
||||
user_status=user_list_item
|
||||
)
|
||||
all_media.append(media_with_status)
|
||||
|
||||
# Apply filters based on search parameters
|
||||
filtered_media = self._apply_search_filters(all_media, params, index)
|
||||
|
||||
# Apply sorting
|
||||
sorted_media = self._apply_sorting(filtered_media, params, index)
|
||||
|
||||
# Apply pagination
|
||||
page = params.page or 1
|
||||
per_page = params.per_page or 15
|
||||
start_idx = (page - 1) * per_page
|
||||
end_idx = start_idx + per_page
|
||||
|
||||
paginated_media = sorted_media[start_idx:end_idx]
|
||||
|
||||
page_info = PageInfo(
|
||||
total=len(sorted_media),
|
||||
current_page=page,
|
||||
has_next_page=end_idx < len(sorted_media),
|
||||
per_page=per_page
|
||||
)
|
||||
|
||||
return MediaSearchResult(page_info=page_info, media=paginated_media)
|
||||
|
||||
def _apply_search_filters(self, media_list: List[MediaItem], params: MediaSearchParams, index) -> List[MediaItem]:
|
||||
"""Apply search filters to media list."""
|
||||
filtered = media_list.copy()
|
||||
|
||||
# Query filter (search in title)
|
||||
if params.query:
|
||||
query_lower = params.query.lower()
|
||||
filtered = [
|
||||
media for media in filtered
|
||||
if (query_lower in media.title.english.lower() if media.title.english else False) or
|
||||
(query_lower in media.title.romaji.lower() if media.title.romaji else False) or
|
||||
(query_lower in media.title.native.lower() if media.title.native else False) or
|
||||
any(query_lower in synonym.lower() for synonym in media.synonymns)
|
||||
]
|
||||
|
||||
# Status filters
|
||||
if params.status:
|
||||
filtered = [media for media in filtered if media.status == params.status]
|
||||
if params.status_in:
|
||||
filtered = [media for media in filtered if media.status in params.status_in]
|
||||
if params.status_not_in:
|
||||
filtered = [media for media in filtered if media.status not in params.status_not_in]
|
||||
|
||||
# Genre filters
|
||||
if params.genre_in:
|
||||
filtered = [media for media in filtered if any(genre in media.genres for genre in params.genre_in)]
|
||||
if params.genre_not_in:
|
||||
filtered = [media for media in filtered if not any(genre in media.genres for genre in params.genre_not_in)]
|
||||
|
||||
# Tag filters
|
||||
if params.tag_in:
|
||||
media_tags = [tag.name for tag in media.tags]
|
||||
filtered = [media for media in filtered if any(tag in media_tags for tag in params.tag_in)]
|
||||
if params.tag_not_in:
|
||||
media_tags = [tag.name for tag in media.tags]
|
||||
filtered = [media for media in filtered if not any(tag in media_tags for tag in params.tag_not_in)]
|
||||
|
||||
# Format filter
|
||||
if params.format_in:
|
||||
filtered = [media for media in filtered if media.format in params.format_in]
|
||||
|
||||
# Type filter
|
||||
if params.type:
|
||||
filtered = [media for media in filtered if media.type == params.type]
|
||||
|
||||
# Score filters
|
||||
if params.averageScore_greater is not None:
|
||||
filtered = [media for media in filtered if media.average_score and media.average_score >= params.averageScore_greater]
|
||||
if params.averageScore_lesser is not None:
|
||||
filtered = [media for media in filtered if media.average_score and media.average_score <= params.averageScore_lesser]
|
||||
|
||||
# Popularity filters
|
||||
if params.popularity_greater is not None:
|
||||
filtered = [media for media in filtered if media.popularity and media.popularity >= params.popularity_greater]
|
||||
if params.popularity_lesser is not None:
|
||||
filtered = [media for media in filtered if media.popularity and media.popularity <= params.popularity_lesser]
|
||||
|
||||
# ID filter
|
||||
if params.id_in:
|
||||
filtered = [media for media in filtered if media.id in params.id_in]
|
||||
|
||||
# User list filter
|
||||
if params.on_list is not None:
|
||||
if params.on_list:
|
||||
# Only show media that has user status (is on list)
|
||||
filtered = [media for media in filtered if media.user_status is not None]
|
||||
else:
|
||||
# Only show media that doesn't have user status (not on list)
|
||||
filtered = [media for media in filtered if media.user_status is None]
|
||||
|
||||
return filtered
|
||||
|
||||
def _apply_sorting(self, media_list: List[MediaItem], params: MediaSearchParams, index) -> List[MediaItem]:
|
||||
"""Apply sorting to media list."""
|
||||
if not params.sort:
|
||||
return media_list
|
||||
|
||||
# Get the MediaSort value
|
||||
sort = params.sort
|
||||
if isinstance(sort, list):
|
||||
sort = sort[0] # Use first sort if multiple provided
|
||||
|
||||
# Apply sorting based on MediaSort enum
|
||||
try:
|
||||
if sort.value == "POPULARITY_DESC":
|
||||
return sorted(media_list, key=lambda x: x.popularity or 0, reverse=True)
|
||||
elif sort.value == "SCORE_DESC":
|
||||
return sorted(media_list, key=lambda x: x.average_score or 0, reverse=True)
|
||||
elif sort.value == "FAVOURITES_DESC":
|
||||
return sorted(media_list, key=lambda x: x.favourites or 0, reverse=True)
|
||||
elif sort.value == "TRENDING_DESC":
|
||||
# For local registry, we'll sort by popularity as proxy for trending
|
||||
return sorted(media_list, key=lambda x: x.popularity or 0, reverse=True)
|
||||
elif sort.value == "UPDATED_AT_DESC":
|
||||
# Sort by last watched time from registry
|
||||
def get_last_watched(media):
|
||||
entry = index.media_index.get(f"{self._media_api}_{media.id}")
|
||||
return entry.last_watched if entry else datetime.min
|
||||
return sorted(media_list, key=get_last_watched, reverse=True)
|
||||
else:
|
||||
# Default to title sorting
|
||||
return sorted(media_list, key=lambda x: x.title.english or x.title.romaji or "")
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to apply sorting {sort}: {e}")
|
||||
return media_list
|
||||
|
||||
def get_media_by_status(self, status: UserMediaListStatus) -> MediaSearchResult:
|
||||
"""Get media filtered by user status from registry."""
|
||||
index = self._load_index()
|
||||
|
||||
# Filter entries by status
|
||||
status_entries = [
|
||||
entry for entry in index.media_index.values()
|
||||
if entry.status == status
|
||||
]
|
||||
|
||||
# Get media items for these entries
|
||||
media_list: List[MediaItem] = []
|
||||
for entry in status_entries:
|
||||
record = self.get_media_record(entry.media_id)
|
||||
if record:
|
||||
# Create UserListItem from index entry
|
||||
user_status = UserListItem(
|
||||
status=entry.status,
|
||||
progress=int(entry.progress) if entry.progress.isdigit() else 0,
|
||||
score=entry.score,
|
||||
repeat=entry.repeat,
|
||||
notes=entry.notes,
|
||||
start_date=entry.start_date,
|
||||
completed_at=entry.completed_at
|
||||
)
|
||||
|
||||
media_with_status = MediaItem(
|
||||
**record.media_item.model_dump(),
|
||||
user_status=user_status
|
||||
)
|
||||
media_list.append(media_with_status)
|
||||
|
||||
# Sort by last watched
|
||||
sorted_media = sorted(
|
||||
media_list,
|
||||
key=lambda media: next(
|
||||
(entry.last_watched for entry in index.media_index.values()
|
||||
if entry.media_id == media.id),
|
||||
datetime.min
|
||||
),
|
||||
reverse=True
|
||||
)
|
||||
|
||||
page_info = PageInfo(total=len(sorted_media))
|
||||
return MediaSearchResult(page_info=page_info, media=sorted_media)
|
||||
|
||||
def get_registry_stats(self) -> Dict:
|
||||
"""Get comprehensive registry statistics."""
|
||||
try:
|
||||
@@ -245,17 +466,6 @@ class MediaRegistryService:
|
||||
logger.warning(f"{self.media_registry_dir} is impure which caused: {e}")
|
||||
return records
|
||||
|
||||
def search_for_media(self, params: MediaSearchParams) -> List[MediaItem]:
|
||||
"""Search media by title."""
|
||||
try:
|
||||
# TODO: enhance performance
|
||||
media_items = [record.media_item for record in self.get_all_media_records()]
|
||||
|
||||
return MediaFilter.apply(media_items, params)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to search media: {e}")
|
||||
return []
|
||||
|
||||
def remove_media_record(self, media_id: int):
|
||||
with self._lock:
|
||||
|
||||
Reference in New Issue
Block a user