chore: cleanup

This commit is contained in:
Benexl
2025-07-28 19:48:20 +03:00
parent 2717d0b012
commit 65aa8fcb4e
10 changed files with 1 additions and 3594 deletions

View File

@@ -7,7 +7,7 @@ commands = {
# "trending": "trending.trending",
# "recent": "recent.recent",
"search": "search.search",
# "downloads": "downloads.downloads",
"download": "download.download",
"auth": "auth.auth",
"stats": "stats.stats",
}

View File

@@ -1,981 +0,0 @@
"""AniList download command using the modern download service."""
from typing import TYPE_CHECKING
import click
from .....core.config import AppConfig
from .....core.exceptions import FastAnimeError
from .....libs.media_api.api import create_api_client
from .....libs.media_api.params import MediaSearchParams
from .....libs.media_api.types import (
MediaFormat,
MediaGenre,
MediaSeason,
MediaSort,
MediaStatus,
MediaTag,
MediaType,
MediaYear,
)
from .....libs.provider.anime.provider import create_provider
from .....libs.provider.anime.params import SearchParams, AnimeParams
from .....libs.selectors import create_selector
from ....service.download import DownloadService
from ....service.feedback import FeedbackService
from ....service.registry import MediaRegistryService
from ....utils.completion import anime_titles_shell_complete
from ....utils import parse_episode_range
from .. import examples
if TYPE_CHECKING:
from typing import TypedDict
from typing_extensions import Unpack
class DownloadOptions(TypedDict, total=False):
title: str | None
episode_range: str | None
quality: str | None
force_redownload: bool
page: int
per_page: int | None
season: str | None
status: tuple[str, ...]
status_not: tuple[str, ...]
sort: str | None
genres: tuple[str, ...]
genres_not: tuple[str, ...]
tags: tuple[str, ...]
tags_not: tuple[str, ...]
media_format: tuple[str, ...]
media_type: str | None
year: str | None
popularity_greater: int | None
popularity_lesser: int | None
score_greater: int | None
score_lesser: int | None
start_date_greater: int | None
start_date_lesser: int | None
end_date_greater: int | None
end_date_lesser: int | None
on_list: bool | None
max_concurrent: int | None
@click.command(
help="Download anime episodes using AniList API for search and provider integration",
short_help="Download anime episodes",
epilog=examples.download,
)
@click.option(
"--title",
"-t",
shell_complete=anime_titles_shell_complete,
help="Title of the anime to search for",
)
@click.option(
"--episode-range",
"-r",
help="Range of episodes to download (e.g., '1:5', '3:', ':5', '1:10:2')",
)
@click.option(
"--quality",
"-q",
type=click.Choice(["360", "480", "720", "1080", "best"]),
help="Preferred download quality",
)
@click.option(
"--force-redownload",
"-f",
is_flag=True,
help="Force redownload even if episode already exists",
)
@click.option(
"--page",
"-p",
type=click.IntRange(min=1),
default=1,
help="Page number for search pagination",
)
@click.option(
"--per-page",
type=click.IntRange(min=1, max=50),
help="Number of results per page (max 50)",
)
@click.option(
"--season",
help="The season the media was released",
type=click.Choice([season.value for season in MediaSeason]),
)
@click.option(
"--status",
"-S",
help="The media status of the anime",
multiple=True,
type=click.Choice([status.value for status in MediaStatus]),
)
@click.option(
"--status-not",
help="Exclude media with these statuses",
multiple=True,
type=click.Choice([status.value for status in MediaStatus]),
)
@click.option(
"--sort",
"-s",
help="What to sort the search results on",
type=click.Choice([sort.value for sort in MediaSort]),
)
@click.option(
"--genres",
"-g",
multiple=True,
help="the genres to filter by",
type=click.Choice([genre.value for genre in MediaGenre]),
)
@click.option(
"--genres-not",
multiple=True,
help="Exclude these genres",
type=click.Choice([genre.value for genre in MediaGenre]),
)
@click.option(
"--tags",
"-T",
multiple=True,
help="the tags to filter by",
type=click.Choice([tag.value for tag in MediaTag]),
)
@click.option(
"--tags-not",
multiple=True,
help="Exclude these tags",
type=click.Choice([tag.value for tag in MediaTag]),
)
@click.option(
"--media-format",
"-F",
multiple=True,
help="Media format",
type=click.Choice([format.value for format in MediaFormat]),
)
@click.option(
"--media-type",
help="Media type (ANIME or MANGA)",
type=click.Choice([media_type.value for media_type in MediaType]),
)
@click.option(
"--year",
"-y",
type=click.Choice([year.value for year in MediaYear]),
help="the year the media was released",
)
@click.option(
"--popularity-greater",
type=click.IntRange(min=0),
help="Minimum popularity score",
)
@click.option(
"--popularity-lesser",
type=click.IntRange(min=0),
help="Maximum popularity score",
)
@click.option(
"--score-greater",
type=click.IntRange(min=0, max=100),
help="Minimum average score (0-100)",
)
@click.option(
"--score-lesser",
type=click.IntRange(min=0, max=100),
help="Maximum average score (0-100)",
)
@click.option(
"--start-date-greater",
type=click.IntRange(min=10000101, max=99991231),
help="Minimum start date (YYYYMMDD format, e.g., 20240101)",
)
@click.option(
"--start-date-lesser",
type=click.IntRange(min=10000101, max=99991231),
help="Maximum start date (YYYYMMDD format, e.g., 20241231)",
)
@click.option(
"--end-date-greater",
type=click.IntRange(min=10000101, max=99991231),
help="Minimum end date (YYYYMMDD format, e.g., 20240101)",
)
@click.option(
"--end-date-lesser",
type=click.IntRange(min=10000101, max=99991231),
help="Maximum end date (YYYYMMDD format, e.g., 20241231)",
)
@click.option(
"--on-list/--not-on-list",
"-L/-no-L",
help="Whether the anime should be in your list or not",
type=bool,
)
@click.option(
"--max-concurrent",
"-c",
type=click.IntRange(min=1, max=10),
help="Maximum number of concurrent downloads",
)
@click.pass_obj
def download(config: AppConfig, **options: "Unpack[DownloadOptions]"):
"""Download anime episodes using AniList search and provider integration."""
feedback = FeedbackService(config.general.icons)
try:
# Extract and validate options
title = options.get("title")
episode_range = options.get("episode_range")
quality = options.get("quality")
force_redownload = options.get("force_redownload", False)
max_concurrent = options.get("max_concurrent", config.downloads.max_concurrent)
_validate_options(options)
# Initialize services
feedback.info("Initializing services...")
api_client, provider, selector, media_registry, download_service = (
_initialize_services(config)
)
feedback.info(f"Using provider: {provider.__class__.__name__}")
feedback.info(f"Using media API: {config.general.media_api}")
feedback.info(f"Translation type: {config.stream.translation_type}")
# Search for anime
search_params = _build_search_params(options, config)
search_result = _search_anime(api_client, search_params, feedback)
# Let user select anime (single or multiple)
selected_anime_list = _select_anime(search_result, selector, feedback)
if not selected_anime_list:
feedback.info("No anime selected. Exiting.")
return
# Process each selected anime
for selected_anime in selected_anime_list:
feedback.info(
f"Processing: {selected_anime.title.english or selected_anime.title.romaji}"
)
feedback.info(f"AniList ID: {selected_anime.id}")
# Get available episodes from provider
episodes_result = _get_available_episodes(
provider, selected_anime, config, feedback
)
if not episodes_result:
feedback.warning(
f"No episodes found for {selected_anime.title.english or selected_anime.title.romaji}"
)
_suggest_alternatives(selected_anime, provider, config, feedback)
continue
# Unpack the result
if len(episodes_result) == 2:
available_episodes, provider_anime_data = episodes_result
else:
# Fallback for backwards compatibility
available_episodes = episodes_result
provider_anime_data = None
# Determine episodes to download
episodes_to_download = _determine_episodes_to_download(
episode_range, available_episodes, selector, feedback
)
if not episodes_to_download:
feedback.warning("No episodes selected for download")
continue
feedback.info(
f"About to download {len(episodes_to_download)} episodes: {', '.join(episodes_to_download)}"
)
# Test stream availability before attempting download (using provider anime data)
if episodes_to_download and provider_anime_data:
test_episode = episodes_to_download[0]
feedback.info(
f"Testing stream availability for episode {test_episode}..."
)
success = _test_episode_stream_availability(
provider, provider_anime_data, test_episode, config, feedback
)
if not success:
feedback.warning(f"Stream test failed for episode {test_episode}.")
feedback.info("Possible solutions:")
feedback.info("1. Try a different provider (check your config)")
feedback.info("2. Check if the episode number is correct")
feedback.info("3. Try a different translation type (sub/dub)")
feedback.info(
"4. The anime might not be available on this provider"
)
# Ask user if they want to continue anyway
continue_anyway = (
input("\nContinue with download anyway? (y/N): ")
.strip()
.lower()
)
if continue_anyway not in ["y", "yes"]:
feedback.info("Download cancelled by user")
continue
# Download episodes (using provider anime data if available, otherwise AniList data)
anime_for_download = (
provider_anime_data if provider_anime_data else selected_anime
)
_download_episodes(
download_service,
anime_for_download,
episodes_to_download,
quality,
force_redownload,
max_concurrent,
feedback,
)
# Show final statistics
_show_final_statistics(download_service, feedback)
except FastAnimeError as e:
feedback.error("Download failed", str(e))
raise click.Abort()
except Exception as e:
feedback.error("Unexpected error occurred", str(e))
raise click.Abort()
def _validate_options(options: "DownloadOptions") -> None:
"""Validate command line options."""
score_greater = options.get("score_greater")
score_lesser = options.get("score_lesser")
popularity_greater = options.get("popularity_greater")
popularity_lesser = options.get("popularity_lesser")
start_date_greater = options.get("start_date_greater")
start_date_lesser = options.get("start_date_lesser")
end_date_greater = options.get("end_date_greater")
end_date_lesser = options.get("end_date_lesser")
# Score validation
if (
score_greater is not None
and score_lesser is not None
and score_greater > score_lesser
):
raise FastAnimeError("Minimum score cannot be higher than maximum score")
# Popularity validation
if (
popularity_greater is not None
and popularity_lesser is not None
and popularity_greater > popularity_lesser
):
raise FastAnimeError(
"Minimum popularity cannot be higher than maximum popularity"
)
# Date validation
if (
start_date_greater is not None
and start_date_lesser is not None
and start_date_greater > start_date_lesser
):
raise FastAnimeError("Minimum start date cannot be after maximum start date")
if (
end_date_greater is not None
and end_date_lesser is not None
and end_date_greater > end_date_lesser
):
raise FastAnimeError("Minimum end date cannot be after maximum end date")
def _initialize_services(config: AppConfig) -> tuple:
"""Initialize all required services."""
api_client = create_api_client(config.general.media_api, config)
provider = create_provider(config.general.provider)
selector = create_selector(config)
media_registry = MediaRegistryService(
config.general.media_api, config.media_registry
)
download_service = DownloadService(config, media_registry, provider)
return api_client, provider, selector, media_registry, download_service
def _build_search_params(
options: "DownloadOptions", config: AppConfig
) -> MediaSearchParams:
"""Build MediaSearchParams from command options."""
return MediaSearchParams(
query=options.get("title"),
page=options.get("page", 1),
per_page=options.get("per_page") or config.anilist.per_page or 50,
sort=MediaSort(options.get("sort")) if options.get("sort") else None,
status_in=[MediaStatus(s) for s in options.get("status", ())]
if options.get("status")
else None,
status_not_in=[MediaStatus(s) for s in options.get("status_not", ())]
if options.get("status_not")
else None,
genre_in=[MediaGenre(g) for g in options.get("genres", ())]
if options.get("genres")
else None,
genre_not_in=[MediaGenre(g) for g in options.get("genres_not", ())]
if options.get("genres_not")
else None,
tag_in=[MediaTag(t) for t in options.get("tags", ())]
if options.get("tags")
else None,
tag_not_in=[MediaTag(t) for t in options.get("tags_not", ())]
if options.get("tags_not")
else None,
format_in=[MediaFormat(f) for f in options.get("media_format", ())]
if options.get("media_format")
else None,
type=MediaType(options.get("media_type"))
if options.get("media_type")
else None,
season=MediaSeason(options.get("season")) if options.get("season") else None,
seasonYear=int(year) if (year := options.get("year")) else None,
popularity_greater=options.get("popularity_greater"),
popularity_lesser=options.get("popularity_lesser"),
averageScore_greater=options.get("score_greater"),
averageScore_lesser=options.get("score_lesser"),
startDate_greater=options.get("start_date_greater"),
startDate_lesser=options.get("start_date_lesser"),
endDate_greater=options.get("end_date_greater"),
endDate_lesser=options.get("end_date_lesser"),
on_list=options.get("on_list"),
)
def _search_anime(api_client, search_params, feedback):
"""Search for anime using the API client."""
from rich.progress import Progress, SpinnerColumn, TextColumn
# Check if we have any search criteria at all
has_criteria = any(
[
search_params.query,
search_params.genre_in,
search_params.tag_in,
search_params.status_in,
search_params.season,
search_params.seasonYear,
search_params.format_in,
search_params.popularity_greater,
search_params.averageScore_greater,
]
)
if not has_criteria:
raise FastAnimeError(
"Please provide at least one search criterion (title, genre, tag, status, etc.)"
)
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
transient=True,
) as progress:
progress.add_task("Searching for anime...", total=None)
search_result = api_client.search_media(search_params)
if not search_result or not search_result.media:
raise FastAnimeError("No anime found matching your search criteria")
return search_result
def _select_anime(search_result, selector, feedback):
"""Let user select anime from search results."""
if len(search_result.media) == 1:
selected_anime = search_result.media[0]
feedback.info(
f"Auto-selected: {selected_anime.title.english or selected_anime.title.romaji}"
)
return [selected_anime]
# Create choice strings with additional info
choices = []
for i, anime in enumerate(search_result.media, 1):
title = anime.title.english or anime.title.romaji or "Unknown"
year = str(anime.start_date.year) if anime.start_date else "N/A"
score = f"{anime.average_score}%" if anime.average_score else "N/A"
status = anime.status.value if anime.status else "N/A"
choices.append(f"{i:2d}. {title} ({year}) [Score: {score}, Status: {status}]")
# Use multi-selection
selected_choices = selector.choose_multiple(
prompt="Select anime to download",
choices=choices,
header="Use TAB to select multiple anime, ENTER to confirm",
)
if not selected_choices:
return []
# Extract anime objects from selections
selected_anime_list = []
for choice in selected_choices:
# Extract index from choice string (format: "XX. Title...")
try:
index = int(choice.split(".")[0].strip()) - 1
selected_anime_list.append(search_result.media[index])
except (ValueError, IndexError):
feedback.error(f"Invalid selection: {choice}")
continue
return selected_anime_list
def _get_available_episodes(provider, anime, config, feedback):
"""Get available episodes from provider."""
try:
# Search for anime in provider first
media_title = anime.title.english or anime.title.romaji
feedback.info(
f"Searching provider '{provider.__class__.__name__}' for: '{media_title}'"
)
feedback.info(f"Using translation type: '{config.stream.translation_type}'")
provider_search_results = provider.search(
SearchParams(
query=media_title, translation_type=config.stream.translation_type
)
)
if not provider_search_results or not provider_search_results.results:
feedback.warning(
f"Could not find '{media_title}' on provider '{provider.__class__.__name__}'"
)
return []
feedback.info(
f"Found {len(provider_search_results.results)} results on provider"
)
# Show the first few results for debugging
for i, result in enumerate(provider_search_results.results[:3]):
feedback.info(
f"Result {i + 1}: ID={result.id}, Title='{getattr(result, 'title', 'Unknown')}'"
)
# Get the first result (could be enhanced with fuzzy matching)
first_result = provider_search_results.results[0]
feedback.info(f"Using first result: ID={first_result.id}")
# Now get the full anime data using the PROVIDER'S ID, not AniList ID
provider_anime_data = provider.get(
AnimeParams(id=first_result.id, query=media_title)
)
if not provider_anime_data:
feedback.warning("Failed to get anime details from provider")
return []
# Check all available translation types
translation_types = ["sub", "dub"]
for trans_type in translation_types:
episodes = getattr(provider_anime_data.episodes, trans_type, [])
feedback.info(
f"Translation '{trans_type}': {len(episodes)} episodes available"
)
available_episodes = getattr(
provider_anime_data.episodes, config.stream.translation_type, []
)
if not available_episodes:
feedback.warning(f"No '{config.stream.translation_type}' episodes found")
# Suggest alternative translation type if available
for trans_type in translation_types:
if trans_type != config.stream.translation_type:
other_episodes = getattr(
provider_anime_data.episodes, trans_type, []
)
if other_episodes:
feedback.info(
f"Suggestion: Try using translation type '{trans_type}' (has {len(other_episodes)} episodes)"
)
return []
feedback.info(
f"Found {len(available_episodes)} episodes available for download"
)
# Return both episodes and the provider anime data for later use
return available_episodes, provider_anime_data
except Exception as e:
feedback.error(f"Error getting episodes from provider: {e}")
import traceback
feedback.error("Full traceback", traceback.format_exc())
return []
def _determine_episodes_to_download(
episode_range, available_episodes, selector, feedback
):
"""Determine which episodes to download based on range or user selection."""
if not available_episodes:
feedback.warning("No episodes available to download")
return []
if episode_range:
try:
episodes_to_download = list(
parse_episode_range(episode_range, available_episodes)
)
feedback.info(
f"Episodes from range '{episode_range}': {', '.join(episodes_to_download)}"
)
return episodes_to_download
except (ValueError, IndexError) as e:
feedback.error(f"Invalid episode range '{episode_range}': {e}")
return []
else:
# Let user select episodes
selected_episodes = selector.choose_multiple(
prompt="Select episodes to download",
choices=available_episodes,
header="Use TAB to select multiple episodes, ENTER to confirm",
)
if selected_episodes:
feedback.info(f"Selected episodes: {', '.join(selected_episodes)}")
return selected_episodes
def _suggest_alternatives(anime, provider, config, feedback):
"""Suggest alternatives when episodes are not found."""
feedback.info("Troubleshooting suggestions:")
feedback.info(f"1. Current provider: {provider.__class__.__name__}")
feedback.info(f"2. AniList ID being used: {anime.id}")
feedback.info(f"3. Translation type: {config.stream.translation_type}")
# Special message for AllAnime provider
if provider.__class__.__name__ == "AllAnimeProvider":
feedback.info(
"4. AllAnime ID mismatch: AllAnime uses different IDs than AniList"
)
feedback.info(" The provider searches by title, but episodes use AniList ID")
feedback.info(
" This can cause episodes to not be found even if the anime exists"
)
# Check if provider has different ID mapping
anime_titles = []
if anime.title.english:
anime_titles.append(anime.title.english)
if anime.title.romaji:
anime_titles.append(anime.title.romaji)
if anime.title.native:
anime_titles.append(anime.title.native)
feedback.info(f"5. Available titles: {', '.join(anime_titles)}")
feedback.info("6. Possible solutions:")
feedback.info(" - Try a different provider (GogoAnime, 9anime, etc.)")
feedback.info(" - Check provider configuration")
feedback.info(" - Try different translation type (sub/dub)")
feedback.info(" - Manual search on the provider website")
feedback.info(" - Check if anime is available in your region")
def _download_episodes(
download_service,
anime,
episodes,
quality,
force_redownload,
max_concurrent,
feedback,
):
"""Download the specified episodes."""
from concurrent.futures import ThreadPoolExecutor, as_completed
from rich.console import Console
from rich.progress import (
BarColumn,
MofNCompleteColumn,
Progress,
SpinnerColumn,
TaskProgressColumn,
TextColumn,
TimeElapsedColumn,
)
import logging
console = Console()
anime_title = anime.title.english or anime.title.romaji
console.print(f"\n[bold green]Starting downloads for: {anime_title}[/bold green]")
# Set up logging capture to get download errors
log_messages = []
class ListHandler(logging.Handler):
def emit(self, record):
log_messages.append(self.format(record))
handler = ListHandler()
handler.setLevel(logging.ERROR)
logger = logging.getLogger("fastanime")
logger.addHandler(handler)
try:
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
MofNCompleteColumn(),
TaskProgressColumn(),
TimeElapsedColumn(),
) as progress:
task = progress.add_task("Downloading episodes...", total=len(episodes))
if max_concurrent == 1:
# Sequential downloads
results = {}
for episode in episodes:
progress.update(
task, description=f"Downloading episode {episode}..."
)
# Clear previous log messages for this episode
log_messages.clear()
try:
success = download_service.download_episode(
media_item=anime,
episode_number=episode,
quality=quality,
force_redownload=force_redownload,
)
results[episode] = success
if not success:
# Try to get more detailed error from registry
error_msg = _get_episode_error_details(
download_service, anime, episode
)
if error_msg:
feedback.error(f"Episode {episode}", error_msg)
elif log_messages:
# Show any log messages that were captured
for msg in log_messages[
-3:
]: # Show last 3 error messages
feedback.error(f"Episode {episode}", msg)
else:
feedback.error(
f"Episode {episode}",
"Download failed - check logs for details",
)
except Exception as e:
results[episode] = False
feedback.error(f"Episode {episode} failed", str(e))
progress.advance(task)
else:
# Concurrent downloads
results = {}
with ThreadPoolExecutor(max_workers=max_concurrent) as executor:
# Submit all download tasks
future_to_episode = {
executor.submit(
download_service.download_episode,
media_item=anime,
episode_number=episode,
server=None,
quality=quality,
force_redownload=force_redownload,
): episode
for episode in episodes
}
# Process completed downloads
for future in as_completed(future_to_episode):
episode = future_to_episode[future]
try:
success = future.result()
results[episode] = success
if not success:
# Try to get more detailed error from registry
error_msg = _get_episode_error_details(
download_service, anime, episode
)
if error_msg:
feedback.error(f"Episode {episode}", error_msg)
else:
feedback.error(
f"Episode {episode}",
"Download failed - check logs for details",
)
except Exception as e:
results[episode] = False
feedback.error(
f"Download failed for episode {episode}", str(e)
)
progress.advance(task)
finally:
# Remove the log handler
logger.removeHandler(handler)
# Display results
_display_download_results(console, results, anime)
def _get_episode_error_details(download_service, anime, episode_number):
"""Get detailed error information from the registry for a failed episode."""
try:
# Get the media record from registry
media_record = download_service.media_registry.get_record(anime.id)
if not media_record:
return None
# Find the episode in the record
for episode_record in media_record.episodes:
if episode_record.episode_number == episode_number:
if episode_record.error_message:
error_msg = episode_record.error_message
# Provide more helpful error messages for common issues
if "Failed to get server for episode" in error_msg:
return f"Episode {episode_number} not available on current provider. Try a different provider or check episode number."
elif "NoneType" in error_msg or "not subscriptable" in error_msg:
return f"Episode {episode_number} data not found on provider (API returned null). Episode may not exist or be accessible."
else:
return error_msg
elif episode_record.download_status:
return f"Download status: {episode_record.download_status.value}"
break
return None
except Exception:
return None
def _test_episode_stream_availability(
provider, anime, episode_number, config, feedback
):
"""Test if streams are available for a specific episode."""
try:
from .....libs.provider.anime.params import EpisodeStreamsParams
media_title = anime.title.english or anime.title.romaji
feedback.info(
f"Testing stream availability for '{media_title}' episode {episode_number}"
)
# Test episode streams
streams = provider.episode_streams(
EpisodeStreamsParams(
anime_id=str(anime.id),
query=media_title,
episode=episode_number,
translation_type=config.stream.translation_type,
)
)
if not streams:
feedback.warning(f"No streams found for episode {episode_number}")
return False
# Convert to list to check actual availability
stream_list = list(streams)
if not stream_list:
feedback.warning(
f"No stream servers available for episode {episode_number}"
)
return False
feedback.info(
f"Found {len(stream_list)} stream server(s) for episode {episode_number}"
)
# Show details about the first server for debugging
first_server = stream_list[0]
feedback.info(
f"First server: name='{first_server.name}', type='{type(first_server).__name__}'"
)
return True
except TypeError as e:
if "'NoneType' object is not subscriptable" in str(e):
feedback.warning(
f"Episode {episode_number} not available on provider (API returned null)"
)
feedback.info(
"This usually means the episode doesn't exist on this provider or isn't accessible"
)
return False
else:
feedback.error(f"Type error testing stream availability: {e}")
return False
except Exception as e:
feedback.error(f"Error testing stream availability: {e}")
import traceback
feedback.error("Stream test traceback", traceback.format_exc())
return False
def _display_download_results(console, results: dict[str, bool], anime):
"""Display download results in a formatted table."""
from rich.table import Table
table = Table(
title=f"Download Results for {anime.title.english or anime.title.romaji}"
)
table.add_column("Episode", justify="center", style="cyan")
table.add_column("Status", justify="center")
for episode, success in sorted(results.items(), key=lambda x: float(x[0])):
status = "[green]✓ Success[/green]" if success else "[red]✗ Failed[/red]"
table.add_row(episode, status)
console.print(table)
# Summary
total = len(results)
successful = sum(results.values())
failed = total - successful
if failed == 0:
console.print(
f"\n[bold green]All {total} episodes downloaded successfully![/bold green]"
)
else:
console.print(
f"\n[yellow]Download complete: {successful}/{total} successful, {failed} failed[/yellow]"
)
def _show_final_statistics(download_service, feedback):
"""Show final download statistics."""
from rich.console import Console
console = Console()
stats = download_service.get_download_statistics()
if stats:
console.print("\n[bold blue]Overall Download Statistics:[/bold blue]")
console.print(f"Total episodes tracked: {stats.get('total_episodes', 0)}")
console.print(f"Successfully downloaded: {stats.get('downloaded', 0)}")
console.print(f"Failed downloads: {stats.get('failed', 0)}")
console.print(f"Queued downloads: {stats.get('queued', 0)}")
if stats.get("total_size_bytes", 0) > 0:
size_mb = stats["total_size_bytes"] / (1024 * 1024)
if size_mb > 1024:
console.print(f"Total size: {size_mb / 1024:.2f} GB")
else:
console.print(f"Total size: {size_mb:.2f} MB")

View File

@@ -1,283 +0,0 @@
"""
Interactive authentication menu for AniList OAuth login/logout and user profile management.
Implements Step 5: AniList Authentication Flow
"""
import webbrowser
from typing import Optional
from rich.console import Console
from rich.panel import Panel
from rich.table import Table
from ....libs.media_api.types import UserProfile
from ...auth.manager import AuthManager
from ...utils.feedback import create_feedback_manager, execute_with_feedback
from ..session import Context, session
from ..state import InternalDirective, State
@session.menu
def auth(ctx: Context, state: State) -> State | InternalDirective:
"""
Interactive authentication menu for managing AniList login/logout and viewing user profile.
"""
icons = ctx.config.general.icons
feedback = create_feedback_manager(icons)
console = Console()
console.clear()
# Get current authentication status
user_profile = getattr(ctx.media_api, "user_profile", None)
auth_manager = AuthManager()
# Display current authentication status
_display_auth_status(console, user_profile, icons)
# Menu options based on authentication status
if user_profile:
options = [
f"{'👤 ' if icons else ''}View Profile Details",
f"{'🔓 ' if icons else ''}Logout",
f"{'↩️ ' if icons else ''}Back to Main Menu",
]
else:
options = [
f"{'🔐 ' if icons else ''}Login to AniList",
f"{'' if icons else ''}How to Get Token",
f"{'↩️ ' if icons else ''}Back to Main Menu",
]
choice = ctx.selector.choose(
prompt="Select Authentication Action",
choices=options,
header="AniList Authentication Menu",
)
if not choice:
return InternalDirective.BACK
# Handle menu choices
if "Login to AniList" in choice:
return _handle_login(ctx, auth_manager, feedback, icons)
elif "Logout" in choice:
return _handle_logout(ctx, auth_manager, feedback, icons)
elif "View Profile Details" in choice:
_display_user_profile_details(console, user_profile, icons)
feedback.pause_for_user("Press Enter to continue")
return InternalDirective.RELOAD
elif "How to Get Token" in choice:
_display_token_help(console, icons)
feedback.pause_for_user("Press Enter to continue")
return InternalDirective.RELOAD
else: # Back to Main Menu
return InternalDirective.BACK
def _display_auth_status(
console: Console, user_profile: Optional[UserProfile], icons: bool
):
"""Display current authentication status in a nice panel."""
if user_profile:
status_icon = "🟢" if icons else "[green]●[/green]"
status_text = f"{status_icon} Authenticated"
user_info = f"Logged in as: [bold cyan]{user_profile.name}[/bold cyan]\nUser ID: {user_profile.id}"
else:
status_icon = "🔴" if icons else "[red]○[/red]"
status_text = f"{status_icon} Not Authenticated"
user_info = "Log in to access personalized features like:\n• Your anime lists (Watching, Completed, etc.)\n• Progress tracking\n• List management"
panel = Panel(
user_info,
title=f"Authentication Status: {status_text}",
border_style="green" if user_profile else "red",
)
console.print(panel)
console.print()
def _handle_login(
ctx: Context, auth_manager: AuthManager, feedback, icons: bool
) -> State | InternalDirective:
"""Handle the interactive login process."""
def perform_login():
# Open browser to AniList OAuth page
oauth_url = "https://anilist.co/api/v2/oauth/authorize?client_id=20148&response_type=token"
if feedback.confirm(
"Open AniList authorization page in browser?", default=True
):
try:
webbrowser.open(oauth_url)
feedback.info(
"Browser opened",
"Complete the authorization process in your browser",
)
except Exception:
feedback.warning(
"Could not open browser automatically",
f"Please manually visit: {oauth_url}",
)
else:
feedback.info("Manual authorization", f"Please visit: {oauth_url}")
# Get token from user
feedback.info(
"Token Input", "Paste the token from the browser URL after '#access_token='"
)
token = ctx.selector.ask("Enter your AniList Access Token")
if not token or not token.strip():
feedback.error("Login cancelled", "No token provided")
return None
# Authenticate with the API
profile = ctx.media_api.authenticate(token.strip())
if not profile:
feedback.error(
"Authentication failed", "The token may be invalid or expired"
)
return None
# Save credentials using the auth manager
auth_manager.save_user_profile(profile, token.strip())
return profile
success, profile = execute_with_feedback(
perform_login,
feedback,
"authenticate",
loading_msg="Validating token with AniList",
success_msg="Successfully logged in! 🎉"
if icons
else "Successfully logged in!",
error_msg="Login failed",
show_loading=True,
)
if success and profile:
feedback.success(
f"Logged in as {profile.name}" if profile else "Successfully logged in"
)
feedback.pause_for_user("Press Enter to continue")
return InternalDirective.RELOAD
def _handle_logout(
ctx: Context, auth_manager: AuthManager, feedback, icons: bool
) -> State | InternalDirective:
"""Handle the logout process with confirmation."""
if not feedback.confirm(
"Are you sure you want to logout?",
"This will remove your saved AniList token and log you out",
default=False,
):
return InternalDirective.RELOAD
def perform_logout():
# Clear from auth manager
if hasattr(auth_manager, "logout"):
auth_manager.logout()
else:
auth_manager.clear_user_profile()
# Clear from API client
ctx.media_api.token = None
ctx.media_api.user_profile = None
if hasattr(ctx.media_api, "http_client"):
ctx.media_api.http_client.headers.pop("Authorization", None)
return True
success, _ = execute_with_feedback(
perform_logout,
feedback,
"logout",
loading_msg="Logging out",
success_msg="Successfully logged out 👋"
if icons
else "Successfully logged out",
error_msg="Logout failed",
show_loading=False,
)
if success:
feedback.pause_for_user("Press Enter to continue")
return InternalDirective.CONFIG_EDIT
def _display_user_profile_details(
console: Console, user_profile: UserProfile, icons: bool
):
"""Display detailed user profile information."""
if not user_profile:
console.print("[red]No user profile available[/red]")
return
# Create a detailed profile table
table = Table(title=f"{'👤 ' if icons else ''}User Profile: {user_profile.name}")
table.add_column("Property", style="cyan", no_wrap=True)
table.add_column("Value", style="green")
table.add_row("Name", user_profile.name)
table.add_row("User ID", str(user_profile.id))
if user_profile.avatar_url:
table.add_row("Avatar URL", user_profile.avatar_url)
if user_profile.banner_url:
table.add_row("Banner URL", user_profile.banner_url)
console.print()
console.print(table)
console.print()
# Show available features
features_panel = Panel(
"Available Features:\n"
f"{'📺 ' if icons else ''}Access your anime lists (Watching, Completed, etc.)\n"
f"{'✏️ ' if icons else ''}Update watch progress and scores\n"
f"{' ' if icons else ''}Add/remove anime from your lists\n"
f"{'🔄 ' if icons else ''}Sync progress with AniList\n"
f"{'🔔 ' if icons else ''}Access AniList notifications",
title="Available with Authentication",
border_style="green",
)
console.print(features_panel)
def _display_token_help(console: Console, icons: bool):
"""Display help information about getting an AniList token."""
help_text = """
[bold cyan]How to get your AniList Access Token:[/bold cyan]
[bold]Step 1:[/bold] Visit the AniList authorization page
https://anilist.co/api/v2/oauth/authorize?client_id=20148&response_type=token
[bold]Step 2:[/bold] Log in to your AniList account if prompted
[bold]Step 3:[/bold] Click "Authorize" to grant FastAnime access
[bold]Step 4:[/bold] Copy the token from the browser URL
Look for the part after "#access_token=" in the address bar
[bold]Step 5:[/bold] Paste the token when prompted in FastAnime
[yellow]Note:[/yellow] The token will be stored securely and used for all AniList features.
You only need to do this once unless you revoke access or the token expires.
[yellow]Privacy:[/yellow] FastAnime only requests minimal permissions needed for
list management and does not access sensitive account information.
"""
panel = Panel(
help_text,
title=f"{'' if icons else ''}AniList Token Help",
border_style="blue",
)
console.print()
console.print(panel)

View File

@@ -1,254 +0,0 @@
"""
Session management menu for the interactive CLI.
Provides options to save, load, and manage session state.
"""
from datetime import datetime
from pathlib import Path
from typing import Callable, Dict
from rich.console import Console
from rich.table import Table
from ....core.constants import APP_DIR
from ...utils.feedback import create_feedback_manager
from ..session import Context, session
from ..state import ControlFlow, State
MenuAction = Callable[[], str]
@session.menu
def session_management(ctx: Context, state: State) -> State | ControlFlow:
"""
Session management menu for saving, loading, and managing session state.
"""
icons = ctx.config.general.icons
feedback = create_feedback_manager(icons)
console = Console()
console.clear()
# Show current session stats
_display_session_info(console, icons)
options: Dict[str, MenuAction] = {
f"{'💾 ' if icons else ''}Save Current Session": lambda: _save_session(
ctx, feedback
),
f"{'📂 ' if icons else ''}Load Session": lambda: _load_session(ctx, feedback),
f"{'📋 ' if icons else ''}List Saved Sessions": lambda: _list_sessions(
ctx, feedback
),
f"{'🗑️ ' if icons else ''}Cleanup Old Sessions": lambda: _cleanup_sessions(
ctx, feedback
),
f"{'💾 ' if icons else ''}Create Manual Backup": lambda: _create_backup(
ctx, feedback
),
f"{'⚙️ ' if icons else ''}Session Settings": lambda: _session_settings(
ctx, feedback
),
f"{'🔙 ' if icons else ''}Back to Main Menu": lambda: "BACK",
}
choice_str = ctx.selector.choose(
prompt="Select Session Action",
choices=list(options.keys()),
header="Session Management",
)
if not choice_str:
return ControlFlow.BACK
result = options[choice_str]()
if result == "BACK":
return ControlFlow.BACK
else:
return ControlFlow.CONTINUE
def _display_session_info(console: Console, icons: bool):
"""Display current session information."""
session_stats = session.get_session_stats()
table = Table(title=f"{'📊 ' if icons else ''}Current Session Info")
table.add_column("Property", style="cyan")
table.add_column("Value", style="green")
table.add_row("Current States", str(session_stats["current_states"]))
table.add_row("Current Menu", session_stats["current_menu"] or "None")
table.add_row(
"Auto-Save", "Enabled" if session_stats["auto_save_enabled"] else "Disabled"
)
table.add_row("Has Auto-Save", "Yes" if session_stats["has_auto_save"] else "No")
table.add_row(
"Has Crash Backup", "Yes" if session_stats["has_crash_backup"] else "No"
)
console.print(table)
console.print()
def _save_session(ctx: Context, feedback) -> str:
"""Save the current session."""
session_name = ctx.selector.ask("Enter session name (optional):")
description = ctx.selector.ask("Enter session description (optional):")
if not session_name:
session_name = f"session_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
sessions_dir = APP_DIR / "sessions"
file_path = sessions_dir / f"{session_name}.json"
if file_path.exists():
if not feedback.confirm(f"Session '{session_name}' already exists. Overwrite?"):
feedback.info("Save cancelled")
return "CONTINUE"
success = session.save(file_path, session_name, description or "")
if success:
feedback.success(f"Session saved as '{session_name}'")
return "CONTINUE"
def _load_session(ctx: Context, feedback) -> str:
"""Load a saved session."""
sessions = session.list_saved_sessions()
if not sessions:
feedback.warning("No saved sessions found")
return "CONTINUE"
# Create choices with session info
choices = []
session_map = {}
for sess in sessions:
choice_text = f"{sess['name']} - {sess['description'][:50]}{'...' if len(sess['description']) > 50 else ''}"
choices.append(choice_text)
session_map[choice_text] = sess
choices.append("Cancel")
choice = ctx.selector.choose(
"Select session to load:", choices=choices, header="Available Sessions"
)
if not choice or choice == "Cancel":
return "CONTINUE"
selected_session = session_map[choice]
file_path = Path(selected_session["path"])
if feedback.confirm(
f"Load session '{selected_session['name']}'? This will replace your current session."
):
success = session.resume(file_path, feedback)
if success:
feedback.info("Session loaded successfully. Returning to main menu.")
# Return to main menu after loading
return "MAIN"
return "CONTINUE"
def _list_sessions(ctx: Context, feedback) -> str:
"""List all saved sessions."""
sessions = session.list_saved_sessions()
if not sessions:
feedback.info("No saved sessions found")
return "CONTINUE"
console = Console()
table = Table(title="Saved Sessions")
table.add_column("Name", style="cyan")
table.add_column("Description", style="yellow")
table.add_column("States", style="green")
table.add_column("Created", style="blue")
for sess in sessions:
# Format the created date
created = sess["created"]
if "T" in created:
created = created.split("T")[0] # Just show the date part
table.add_row(
sess["name"],
sess["description"][:40] + "..."
if len(sess["description"]) > 40
else sess["description"],
str(sess["state_count"]),
created,
)
console.print(table)
feedback.pause_for_user()
return "CONTINUE"
def _cleanup_sessions(ctx: Context, feedback) -> str:
"""Clean up old sessions."""
sessions = session.list_saved_sessions()
if len(sessions) <= 5:
feedback.info("No cleanup needed. You have 5 or fewer sessions.")
return "CONTINUE"
max_sessions_str = ctx.selector.ask("How many sessions to keep? (default: 10)")
try:
max_sessions = int(max_sessions_str) if max_sessions_str else 10
except ValueError:
feedback.error("Invalid number entered")
return "CONTINUE"
if feedback.confirm(f"Delete sessions older than the {max_sessions} most recent?"):
deleted_count = session.cleanup_old_sessions(max_sessions)
feedback.success(f"Deleted {deleted_count} old sessions")
return "CONTINUE"
def _create_backup(ctx: Context, feedback) -> str:
"""Create a manual backup."""
backup_name = ctx.selector.ask("Enter backup name (optional):")
success = session.create_manual_backup(backup_name or "")
if success:
feedback.success("Manual backup created successfully")
return "CONTINUE"
def _session_settings(ctx: Context, feedback) -> str:
"""Configure session settings."""
current_auto_save = session._auto_save_enabled
choices = [
f"Auto-Save: {'Enabled' if current_auto_save else 'Disabled'}",
"Clear Auto-Save File",
"Clear Crash Backup",
"Back",
]
choice = ctx.selector.choose("Session Settings:", choices=choices)
if choice and choice.startswith("Auto-Save"):
new_setting = not current_auto_save
session.enable_auto_save(new_setting)
feedback.success(f"Auto-save {'enabled' if new_setting else 'disabled'}")
elif choice == "Clear Auto-Save File":
if feedback.confirm("Clear the auto-save file?"):
session._session_manager.clear_auto_save()
feedback.success("Auto-save file cleared")
elif choice == "Clear Crash Backup":
if feedback.confirm("Clear the crash backup file?"):
session._session_manager.clear_crash_backup()
feedback.success("Crash backup cleared")
return "CONTINUE"

View File

@@ -1,827 +0,0 @@
"""
AniList Watch List Operations Menu
Implements Step 8: Remote Watch List Operations
Provides comprehensive AniList list management including:
- Viewing user lists (Watching, Completed, Planning, etc.)
- Interactive list selection and navigation
- Adding/removing anime from lists
- List statistics and overview
"""
import logging
from rich.console import Console
from rich.panel import Panel
from rich.table import Table
from ....libs.media_api.params import UpdateUserMediaListEntryParams, UserListParams
from ....libs.media_api.types import MediaItem, MediaSearchResult, UserListItem
from ...utils.feedback import create_feedback_manager, execute_with_feedback
from ..session import Context, session
from ..state import ControlFlow, MediaApiState, State
logger = logging.getLogger(__name__)
@session.menu
def anilist_lists(ctx: Context, state: State) -> State | ControlFlow:
"""
Main AniList lists management menu.
Shows all user lists with statistics and navigation options.
"""
icons = ctx.config.general.icons
feedback = create_feedback_manager(icons)
console = Console()
console.clear()
# Check authentication
if not ctx.media_api.user_profile:
feedback.error(
"Authentication Required",
"You must be logged in to access your AniList lists. Please authenticate first.",
)
feedback.pause_for_user("Press Enter to continue")
return State(menu_name="AUTH")
# Display user profile and lists overview
_display_lists_overview(console, ctx, icons)
# Menu options
options = [
f"{'📺 ' if icons else ''}Currently Watching",
f"{'📋 ' if icons else ''}Planning to Watch",
f"{'' if icons else ''}Completed",
f"{'⏸️ ' if icons else ''}Paused",
f"{'🚮 ' if icons else ''}Dropped",
f"{'🔁 ' if icons else ''}Rewatching",
f"{'📊 ' if icons else ''}View All Lists Statistics",
f"{'🔍 ' if icons else ''}Search Across All Lists",
f"{' ' if icons else ''}Add Anime to List",
f"{'↩️ ' if icons else ''}Back to Main Menu",
]
choice = ctx.selector.choose(
prompt="Select List Action",
choices=options,
header=f"AniList Lists - {ctx.media_api.user_profile.name}",
)
if not choice:
return ControlFlow.BACK
# Handle menu choices
if "Currently Watching" in choice:
return _navigate_to_list(ctx, "CURRENT")
elif "Planning to Watch" in choice:
return _navigate_to_list(ctx, "PLANNING")
elif "Completed" in choice:
return _navigate_to_list(ctx, "COMPLETED")
elif "Paused" in choice:
return _navigate_to_list(ctx, "PAUSED")
elif "Dropped" in choice:
return _navigate_to_list(ctx, "DROPPED")
elif "Rewatching" in choice:
return _navigate_to_list(ctx, "REPEATING")
elif "View All Lists Statistics" in choice:
return _show_all_lists_stats(ctx, feedback, icons)
elif "Search Across All Lists" in choice:
return _search_all_lists(ctx, feedback, icons)
elif "Add Anime to List" in choice:
return _add_anime_to_list(ctx, feedback, icons)
else: # Back to Main Menu
return ControlFlow.BACK
@session.menu
def anilist_list_view(ctx: Context, state: State) -> State | ControlFlow:
"""
View and manage a specific AniList list (e.g., Watching, Completed).
"""
icons = ctx.config.general.icons
feedback = create_feedback_manager(icons)
console = Console()
console.clear()
# Get list status from state data
list_status = state.data.get("list_status") if state.data else "CURRENT"
page = state.data.get("page", 1) if state.data else 1
# Fetch list data
def fetch_list():
return ctx.media_api.search_media_list(
UserListParams(status=list_status, page=page, per_page=20)
)
success, result = execute_with_feedback(
fetch_list,
feedback,
f"fetch {_status_to_display_name(list_status)} list",
loading_msg=f"Loading {_status_to_display_name(list_status)} list...",
success_msg=f"Loaded {_status_to_display_name(list_status)} list",
error_msg=f"Failed to load {_status_to_display_name(list_status)} list",
)
if not success or not result:
feedback.pause_for_user("Press Enter to continue")
return ControlFlow.BACK
# Display list contents
_display_list_contents(console, result, list_status, page, icons)
# Menu options
options = [
f"{'👁️ ' if icons else ''}View/Edit Anime Details",
f"{'🔄 ' if icons else ''}Refresh List",
f"{' ' if icons else ''}Add New Anime",
f"{'🗑️ ' if icons else ''}Remove from List",
]
# Add pagination options
if result.page_info.has_next_page:
options.append(f"{'➡️ ' if icons else ''}Next Page")
if page > 1:
options.append(f"{'⬅️ ' if icons else ''}Previous Page")
options.extend(
[
f"{'📊 ' if icons else ''}List Statistics",
f"{'↩️ ' if icons else ''}Back to Lists Menu",
]
)
choice = ctx.selector.choose(
prompt="Select Action",
choices=options,
header=f"{_status_to_display_name(list_status)} - Page {page}",
)
if not choice:
return ControlFlow.BACK
# Handle menu choices
if "View/Edit Anime Details" in choice:
return _select_anime_for_details(ctx, result, list_status, page)
elif "Refresh List" in choice:
return ControlFlow.CONTINUE
elif "Add New Anime" in choice:
return _add_anime_to_specific_list(ctx, list_status, feedback, icons)
elif "Remove from List" in choice:
return _remove_anime_from_list(ctx, result, list_status, page, feedback, icons)
elif "Next Page" in choice:
return State(
menu_name="ANILIST_LIST_VIEW",
data={"list_status": list_status, "page": page + 1},
)
elif "Previous Page" in choice:
return State(
menu_name="ANILIST_LIST_VIEW",
data={"list_status": list_status, "page": page - 1},
)
elif "List Statistics" in choice:
return _show_list_statistics(ctx, list_status, feedback, icons)
else: # Back to Lists Menu
return State(menu_name="ANILIST_LISTS")
@session.menu
def anilist_anime_details(ctx: Context, state: State) -> State | ControlFlow:
"""
View and edit details for a specific anime in a user's list.
"""
icons = ctx.config.general.icons
feedback = create_feedback_manager(icons)
console = Console()
console.clear()
# Get anime and list info from state
if not state.data:
return ControlFlow.BACK
anime = state.data.get("anime")
list_status = state.data.get("list_status")
return_page = state.data.get("return_page", 1)
from_media_actions = state.data.get("from_media_actions", False)
if not anime:
return ControlFlow.BACK
# Display anime details
_display_anime_list_details(console, anime, icons)
# Menu options
options = [
f"{'✏️ ' if icons else ''}Edit Progress",
f"{'' if icons else ''}Edit Rating",
f"{'📝 ' if icons else ''}Edit Status",
f"{'🎬 ' if icons else ''}Watch/Stream",
f"{'🗑️ ' if icons else ''}Remove from List",
f"{'↩️ ' if icons else ''}Back to List",
]
choice = ctx.selector.choose(
prompt="Select Action",
choices=options,
header=f"{anime.title.english or anime.title.romaji}",
)
if not choice:
# Return to appropriate menu based on how we got here
if from_media_actions:
return ControlFlow.BACK
elif list_status:
return State(
menu_name="ANILIST_LIST_VIEW",
data={"list_status": list_status, "page": return_page},
)
else:
return State(menu_name="ANILIST_LISTS")
# Handle menu choices
if "Edit Progress" in choice:
return _edit_anime_progress(
ctx, anime, list_status, return_page, feedback, from_media_actions
)
elif "Edit Rating" in choice:
return _edit_anime_rating(
ctx, anime, list_status, return_page, feedback, from_media_actions
)
elif "Edit Status" in choice:
return _edit_anime_status(
ctx, anime, list_status, return_page, feedback, from_media_actions
)
elif "Watch/Stream" in choice:
return _stream_anime(ctx, anime)
elif "Remove from List" in choice:
return _confirm_remove_anime(
ctx, anime, list_status, return_page, feedback, icons, from_media_actions
)
else: # Back to List/Media Actions
# Return to appropriate menu based on how we got here
if from_media_actions:
return ControlFlow.BACK
elif list_status:
return State(
menu_name="ANILIST_LIST_VIEW",
data={"list_status": list_status, "page": return_page},
)
else:
return State(menu_name="ANILIST_LISTS")
def _display_lists_overview(console: Console, ctx: Context, icons: bool):
"""Display overview of all user lists with counts."""
user = ctx.media_api.user_profile
# Create overview panel
overview_text = f"[bold cyan]{user.name}[/bold cyan]'s AniList Management\n"
overview_text += f"User ID: {user.id}\n\n"
overview_text += "Manage your anime lists, track progress, and sync with AniList"
panel = Panel(
overview_text,
title=f"{'📚 ' if icons else ''}AniList Lists Overview",
border_style="cyan",
)
console.print(panel)
console.print()
def _display_list_contents(
console: Console,
result: MediaSearchResult,
list_status: str,
page: int,
icons: bool,
):
"""Display the contents of a specific list in a table."""
if not result.media:
console.print(
f"[yellow]No anime found in {_status_to_display_name(list_status)} list[/yellow]"
)
return
table = Table(title=f"{_status_to_display_name(list_status)} - Page {page}")
table.add_column("Title", style="cyan", no_wrap=False, width=40)
table.add_column("Episodes", justify="center", width=10)
table.add_column("Progress", justify="center", width=10)
table.add_column("Score", justify="center", width=8)
table.add_column("Status", justify="center", width=12)
for i, anime in enumerate(result.media, 1):
title = anime.title.english or anime.title.romaji or "Unknown Title"
episodes = str(anime.episodes or "?")
# Get list entry details if available
progress = "?"
score = "?"
status = _status_to_display_name(list_status)
# Note: In a real implementation, you'd get these from the MediaList entry
# For now, we'll show placeholders
if hasattr(anime, "media_list_entry") and anime.media_list_entry:
progress = str(anime.media_list_entry.progress or 0)
score = str(anime.media_list_entry.score or "-")
table.add_row(f"{i}. {title}", episodes, progress, score, status)
console.print(table)
console.print(
f"\nShowing {len(result.media)} anime from {_status_to_display_name(list_status)} list"
)
# Show pagination info
if result.page_info.has_next_page:
console.print("[dim]More results available on next page[/dim]")
def _display_anime_list_details(console: Console, anime: MediaItem, icons: bool):
"""Display detailed information about an anime in the user's list."""
title = anime.title.english or anime.title.romaji or "Unknown Title"
details_text = f"[bold]{title}[/bold]\n\n"
details_text += f"Episodes: {anime.episodes or 'Unknown'}\n"
details_text += f"Status: {anime.status or 'Unknown'}\n"
details_text += (
f"Genres: {', '.join(anime.genres) if anime.genres else 'Unknown'}\n"
)
if anime.description:
# Truncate description for display
desc = (
anime.description[:300] + "..."
if len(anime.description) > 300
else anime.description
)
details_text += f"\nDescription:\n{desc}"
# Add list-specific information if available
if hasattr(anime, "media_list_entry") and anime.media_list_entry:
entry = anime.media_list_entry
details_text += "\n\n[bold cyan]Your List Info:[/bold cyan]\n"
details_text += f"Progress: {entry.progress or 0} episodes\n"
details_text += f"Score: {entry.score or 'Not rated'}\n"
details_text += f"Status: {_status_to_display_name(entry.status) if hasattr(entry, 'status') else 'Unknown'}\n"
panel = Panel(
details_text,
title=f"{'📺 ' if icons else ''}Anime Details",
border_style="blue",
)
console.print(panel)
def _navigate_to_list(ctx: Context, list_status: UserListItem) -> State:
"""Navigate to a specific list view."""
return State(
menu_name="ANILIST_LIST_VIEW", data={"list_status": list_status, "page": 1}
)
def _select_anime_for_details(
ctx: Context, result: MediaSearchResult, list_status: str, page: int
) -> State | ControlFlow:
"""Let user select an anime from the list to view/edit details."""
if not result.media:
return ControlFlow.CONTINUE
# Create choices from anime list
choices = []
for i, anime in enumerate(result.media, 1):
title = anime.title.english or anime.title.romaji or "Unknown Title"
choices.append(f"{i}. {title}")
choice = ctx.selector.choose(
prompt="Select anime to view/edit",
choices=choices,
header="Select Anime",
)
if not choice:
return ControlFlow.CONTINUE
# Extract index and get selected anime
try:
index = int(choice.split(".")[0]) - 1
selected_anime = result.media[index]
return State(
menu_name="ANILIST_ANIME_DETAILS",
data={
"anime": selected_anime,
"list_status": list_status,
"return_page": page,
},
)
except (ValueError, IndexError):
return ControlFlow.CONTINUE
def _edit_anime_progress(
ctx: Context,
anime: MediaItem,
list_status: str,
return_page: int,
feedback,
from_media_actions: bool = False,
) -> State | ControlFlow:
"""Edit the progress (episodes watched) for an anime."""
current_progress = 0
if hasattr(anime, "media_list_entry") and anime.media_list_entry:
current_progress = anime.media_list_entry.progress or 0
max_episodes = anime.episodes or 999
try:
new_progress = click.prompt(
f"Enter new progress (0-{max_episodes}, current: {current_progress})",
type=int,
default=current_progress,
)
if new_progress < 0 or new_progress > max_episodes:
feedback.error(
"Invalid progress", f"Progress must be between 0 and {max_episodes}"
)
feedback.pause_for_user("Press Enter to continue")
return ControlFlow.CONTINUE
# Update via API
def update_progress():
return ctx.media_api.update_list_entry(
UpdateUserMediaListEntryParams(media_id=anime.id, progress=new_progress)
)
success, _ = execute_with_feedback(
update_progress,
feedback,
"update progress",
loading_msg="Updating progress...",
success_msg=f"Progress updated to {new_progress} episodes",
error_msg="Failed to update progress",
)
if success:
feedback.pause_for_user("Press Enter to continue")
except click.Abort:
pass
# Return to appropriate menu based on how we got here
if from_media_actions:
return ControlFlow.BACK
elif list_status:
return State(
menu_name="ANILIST_LIST_VIEW",
data={"list_status": list_status, "page": return_page},
)
else:
return State(menu_name="ANILIST_LISTS")
def _edit_anime_rating(
ctx: Context,
anime: MediaItem,
list_status: str,
return_page: int,
feedback,
from_media_actions: bool = False,
) -> State | ControlFlow:
"""Edit the rating/score for an anime."""
current_score = 0.0
if hasattr(anime, "media_list_entry") and anime.media_list_entry:
current_score = anime.media_list_entry.score or 0.0
try:
new_score = click.prompt(
f"Enter new rating (0.0-10.0, current: {current_score})",
type=float,
default=current_score,
)
if new_score < 0.0 or new_score > 10.0:
feedback.error("Invalid rating", "Rating must be between 0.0 and 10.0")
feedback.pause_for_user("Press Enter to continue")
return ControlFlow.CONTINUE
# Update via API
def update_score():
return ctx.media_api.update_list_entry(
UpdateUserMediaListEntryParams(media_id=anime.id, score=new_score)
)
success, _ = execute_with_feedback(
update_score,
feedback,
"update rating",
loading_msg="Updating rating...",
success_msg=f"Rating updated to {new_score}/10",
error_msg="Failed to update rating",
)
if success:
feedback.pause_for_user("Press Enter to continue")
except click.Abort:
pass
# Return to appropriate menu based on how we got here
if from_media_actions:
return ControlFlow.BACK
elif list_status:
return State(
menu_name="ANILIST_LIST_VIEW",
data={"list_status": list_status, "page": return_page},
)
else:
return State(menu_name="ANILIST_LISTS")
def _edit_anime_status(
ctx: Context,
anime: MediaItem,
list_status: str,
return_page: int,
feedback,
from_media_actions: bool = False,
) -> State | ControlFlow:
"""Edit the list status for an anime."""
status_options = [
"CURRENT (Currently Watching)",
"PLANNING (Plan to Watch)",
"COMPLETED (Completed)",
"PAUSED (Paused)",
"DROPPED (Dropped)",
"REPEATING (Rewatching)",
]
choice = ctx.selector.choose(
prompt="Select new status",
choices=status_options,
header="Change List Status",
)
if not choice:
return ControlFlow.CONTINUE
new_status = choice.split(" ")[0]
# Update via API
def update_status():
return ctx.media_api.update_list_entry(
UpdateUserMediaListEntryParams(media_id=anime.id, status=new_status)
)
success, _ = execute_with_feedback(
update_status,
feedback,
"update status",
loading_msg="Updating status...",
success_msg=f"Status updated to {_status_to_display_name(new_status)}",
error_msg="Failed to update status",
)
if success:
feedback.pause_for_user("Press Enter to continue")
# If status changed, return to main lists menu since the anime
# is no longer in the current list
if new_status != list_status:
if from_media_actions:
return ControlFlow.BACK
else:
return State(menu_name="ANILIST_LISTS")
# Return to appropriate menu based on how we got here
if from_media_actions:
return ControlFlow.BACK
elif list_status:
return State(
menu_name="ANILIST_LIST_VIEW",
data={"list_status": list_status, "page": return_page},
)
else:
return State(menu_name="ANILIST_LISTS")
def _confirm_remove_anime(
ctx: Context,
anime: MediaItem,
list_status: str,
return_page: int,
feedback,
icons: bool,
from_media_actions: bool = False,
) -> State | ControlFlow:
"""Confirm and remove an anime from the user's list."""
title = anime.title.english or anime.title.romaji or "Unknown Title"
if not feedback.confirm(
f"Remove '{title}' from your {_status_to_display_name(list_status)} list?",
default=False,
):
return ControlFlow.CONTINUE
# Remove via API
def remove_anime():
return ctx.media_api.delete_list_entry(anime.id)
success, _ = execute_with_feedback(
remove_anime,
feedback,
"remove anime",
loading_msg="Removing anime from list...",
success_msg=f"'{title}' removed from list",
error_msg="Failed to remove anime from list",
)
if success:
feedback.pause_for_user("Press Enter to continue")
# Return to appropriate menu based on how we got here
if from_media_actions:
return ControlFlow.BACK
elif list_status:
return State(
menu_name="ANILIST_LIST_VIEW",
data={"list_status": list_status, "page": return_page},
)
else:
return State(menu_name="ANILIST_LISTS")
def _stream_anime(ctx: Context, anime: MediaItem) -> State:
"""Navigate to streaming interface for the selected anime."""
return State(
menu_name="RESULTS",
data=MediaApiState(
results=[anime], # Pass as single-item list
query=anime.title.english or anime.title.romaji or "Unknown",
page=1,
api_params=None,
user_list_params=None,
),
)
def _show_all_lists_stats(ctx: Context, feedback, icons: bool) -> State | ControlFlow:
"""Show comprehensive statistics across all user lists."""
console = Console()
console.clear()
# This would require fetching data from all lists
# For now, show a placeholder implementation
stats_text = "[bold cyan]📊 Your AniList Statistics[/bold cyan]\n\n"
stats_text += "[dim]Loading comprehensive list statistics...[/dim]\n"
stats_text += "[dim]This feature requires fetching data from all lists.[/dim]"
panel = Panel(
stats_text,
title=f"{'📊 ' if icons else ''}AniList Statistics",
border_style="green",
)
console.print(panel)
feedback.pause_for_user("Press Enter to continue")
return ControlFlow.CONTINUE
def _search_all_lists(ctx: Context, feedback, icons: bool) -> State | ControlFlow:
"""Search across all user lists."""
try:
query = click.prompt("Enter search query", type=str)
if not query.strip():
return ControlFlow.CONTINUE
# This would require implementing search across all lists
feedback.info(
"Search functionality",
"Cross-list search will be implemented in a future update",
)
feedback.pause_for_user("Press Enter to continue")
except click.Abort:
pass
return ControlFlow.CONTINUE
def _add_anime_to_list(ctx: Context, feedback, icons: bool) -> State | ControlFlow:
"""Add a new anime to one of the user's lists."""
try:
query = click.prompt("Enter anime name to search", type=str)
if not query.strip():
return ControlFlow.CONTINUE
# Navigate to search with intent to add to list
return State(
menu_name="PROVIDER_SEARCH", data={"query": query, "add_to_list_mode": True}
)
except click.Abort:
pass
return ControlFlow.CONTINUE
def _add_anime_to_specific_list(
ctx: Context, list_status: str, feedback, icons: bool
) -> State | ControlFlow:
"""Add a new anime to a specific list."""
try:
query = click.prompt("Enter anime name to search", type=str)
if not query.strip():
return ControlFlow.CONTINUE
# Navigate to search with specific list target
return State(
menu_name="PROVIDER_SEARCH",
data={"query": query, "target_list": list_status},
)
except click.Abort:
pass
return ControlFlow.CONTINUE
def _remove_anime_from_list(
ctx: Context,
result: MediaSearchResult,
list_status: str,
page: int,
feedback,
icons: bool,
) -> State | ControlFlow:
"""Select and remove an anime from the current list."""
if not result.media:
feedback.info("Empty list", "No anime to remove from this list")
feedback.pause_for_user("Press Enter to continue")
return ControlFlow.CONTINUE
# Create choices from anime list
choices = []
for i, anime in enumerate(result.media, 1):
title = anime.title.english or anime.title.romaji or "Unknown Title"
choices.append(f"{i}. {title}")
choice = ctx.selector.choose(
prompt="Select anime to remove",
choices=choices,
header="Remove Anime from List",
)
if not choice:
return ControlFlow.CONTINUE
# Extract index and get selected anime
try:
index = int(choice.split(".")[0]) - 1
selected_anime = result.media[index]
return _confirm_remove_anime(
ctx, selected_anime, list_status, page, feedback, icons
)
except (ValueError, IndexError):
return ControlFlow.CONTINUE
def _show_list_statistics(
ctx: Context, list_status: str, feedback, icons: bool
) -> State | ControlFlow:
"""Show statistics for a specific list."""
console = Console()
console.clear()
list_name = _status_to_display_name(list_status)
stats_text = f"[bold cyan]📊 {list_name} Statistics[/bold cyan]\n\n"
stats_text += "[dim]Loading list statistics...[/dim]\n"
stats_text += "[dim]This feature requires comprehensive list analysis.[/dim]"
panel = Panel(
stats_text,
title=f"{'📊 ' if icons else ''}{list_name} Stats",
border_style="blue",
)
console.print(panel)
feedback.pause_for_user("Press Enter to continue")
return ControlFlow.CONTINUE
def _status_to_display_name(status: str) -> str:
"""Convert API status to human-readable display name."""
status_map = {
"CURRENT": "Currently Watching",
"PLANNING": "Planning to Watch",
"COMPLETED": "Completed",
"PAUSED": "Paused",
"DROPPED": "Dropped",
"REPEATING": "Rewatching",
}
return status_map.get(status, status)
# Import click for user input
import click

View File

@@ -1,572 +0,0 @@
"""
Watch History Management Menu for the interactive CLI.
Provides comprehensive watch history viewing, editing, and management capabilities.
"""
import logging
from typing import Callable, Dict, List
from rich.console import Console
from rich.table import Table
from ....core.constants import APP_DATA_DIR
from ...utils.feedback import create_feedback_manager
from ...utils.watch_history_manager import WatchHistoryManager
from ...utils.watch_history_types import WatchHistoryEntry
from ..session import Context, session
from ..state import InternalDirective, State
logger = logging.getLogger(__name__)
MenuAction = Callable[[], str]
@session.menu
def watch_history(ctx: Context, state: State) -> State | InternalDirective:
"""
Watch history management menu for viewing and managing local watch history.
"""
icons = ctx.config.general.icons
feedback = create_feedback_manager(icons)
console = Console()
console.clear()
# Initialize watch history manager
history_manager = WatchHistoryManager()
# Show watch history stats
_display_history_stats(console, history_manager, icons)
options: Dict[str, MenuAction] = {
f"{'📺 ' if icons else ''}Currently Watching": lambda: _view_watching(
ctx, history_manager, feedback
),
f"{'' if icons else ''}Completed Anime": lambda: _view_completed(
ctx, history_manager, feedback
),
f"{'🕒 ' if icons else ''}Recently Watched": lambda: _view_recent(
ctx, history_manager, feedback
),
f"{'📋 ' if icons else ''}View All History": lambda: _view_all_history(
ctx, history_manager, feedback
),
f"{'🔍 ' if icons else ''}Search History": lambda: _search_history(
ctx, history_manager, feedback
),
f"{'✏️ ' if icons else ''}Edit Entry": lambda: _edit_entry(
ctx, history_manager, feedback
),
f"{'🗑️ ' if icons else ''}Remove Entry": lambda: _remove_entry(
ctx, history_manager, feedback
),
f"{'📊 ' if icons else ''}View Statistics": lambda: _view_stats(
ctx, history_manager, feedback
),
f"{'💾 ' if icons else ''}Export History": lambda: _export_history(
ctx, history_manager, feedback
),
f"{'📥 ' if icons else ''}Import History": lambda: _import_history(
ctx, history_manager, feedback
),
f"{'🧹 ' if icons else ''}Clear All History": lambda: _clear_history(
ctx, history_manager, feedback
),
f"{'🔙 ' if icons else ''}Back to Main Menu": lambda: "BACK",
}
choice_str = ctx.selector.choose(
prompt="Select Watch History Action",
choices=list(options.keys()),
header="Watch History Management",
)
if not choice_str:
return InternalDirective.BACK
result = options[choice_str]()
if result == "BACK":
return InternalDirective.BACK
else:
return InternalDirective.RELOAD
def _display_history_stats(
console: Console, history_manager: WatchHistoryManager, icons: bool
):
"""Display current watch history statistics."""
stats = history_manager.get_stats()
# Create a stats table
table = Table(title=f"{'📊 ' if icons else ''}Watch History Overview")
table.add_column("Metric", style="cyan")
table.add_column("Count", style="green")
table.add_row("Total Anime", str(stats["total_entries"]))
table.add_row("Currently Watching", str(stats["watching"]))
table.add_row("Completed", str(stats["completed"]))
table.add_row("Dropped", str(stats["dropped"]))
table.add_row("Paused", str(stats["paused"]))
table.add_row("Total Episodes", str(stats["total_episodes_watched"]))
table.add_row("Last Updated", stats["last_updated"])
console.print(table)
console.print()
def _view_watching(ctx: Context, history_manager: WatchHistoryManager, feedback) -> str:
"""View currently watching anime."""
entries = history_manager.get_watching_entries()
if not entries:
feedback.info("No anime currently being watched")
return "CONTINUE"
return _display_entries_list(ctx, entries, "Currently Watching", feedback)
def _view_completed(
ctx: Context, history_manager: WatchHistoryManager, feedback
) -> str:
"""View completed anime."""
entries = history_manager.get_completed_entries()
if not entries:
feedback.info("No completed anime found")
return "CONTINUE"
return _display_entries_list(ctx, entries, "Completed Anime", feedback)
def _view_recent(ctx: Context, history_manager: WatchHistoryManager, feedback) -> str:
"""View recently watched anime."""
entries = history_manager.get_recently_watched(20)
if not entries:
feedback.info("No recent watch history found")
return "CONTINUE"
return _display_entries_list(ctx, entries, "Recently Watched", feedback)
def _view_all_history(
ctx: Context, history_manager: WatchHistoryManager, feedback
) -> str:
"""View all watch history entries."""
entries = history_manager.get_all_entries()
if not entries:
feedback.info("No watch history found")
return "CONTINUE"
# Sort by last watched date
entries.sort(key=lambda x: x.last_watched, reverse=True)
return _display_entries_list(ctx, entries, "All Watch History", feedback)
def _search_history(
ctx: Context, history_manager: WatchHistoryManager, feedback
) -> str:
"""Search watch history by title."""
query = ctx.selector.ask("Enter search query:")
if not query:
return "CONTINUE"
entries = history_manager.search_entries(query)
if not entries:
feedback.info(f"No anime found matching '{query}'")
return "CONTINUE"
return _display_entries_list(
ctx, entries, f"Search Results for '{query}'", feedback
)
def _display_entries_list(
ctx: Context, entries: List[WatchHistoryEntry], title: str, feedback
) -> str:
"""Display a list of watch history entries and allow selection."""
console = Console()
console.clear()
# Create table for entries
table = Table(title=title)
table.add_column("Status", style="yellow", width=6)
table.add_column("Title", style="cyan")
table.add_column("Progress", style="green", width=12)
table.add_column("Last Watched", style="blue", width=12)
choices = []
entry_map = {}
for i, entry in enumerate(entries):
# Format last watched date
last_watched = entry.last_watched.strftime("%Y-%m-%d")
# Add to table
table.add_row(
entry.get_status_emoji(),
entry.get_display_title(),
entry.get_progress_display(),
last_watched,
)
# Create choice for selector
choice_text = f"{entry.get_status_emoji()} {entry.get_display_title()} - {entry.get_progress_display()}"
choices.append(choice_text)
entry_map[choice_text] = entry
console.print(table)
console.print()
if not choices:
feedback.info("No entries to display")
feedback.pause_for_user()
return "CONTINUE"
choices.append("Back")
choice = ctx.selector.choose("Select an anime for details:", choices=choices)
if not choice or choice == "Back":
return "CONTINUE"
selected_entry = entry_map[choice]
return _show_entry_details(ctx, selected_entry, feedback)
def _show_entry_details(ctx: Context, entry: WatchHistoryEntry, feedback) -> str:
"""Show detailed information about a watch history entry."""
console = Console()
console.clear()
# Display detailed entry information
console.print(f"[bold cyan]{entry.get_display_title()}[/bold cyan]")
console.print(f"Status: {entry.get_status_emoji()} {entry.status.title()}")
console.print(f"Progress: {entry.get_progress_display()}")
console.print(f"Times Watched: {entry.times_watched}")
console.print(f"First Watched: {entry.first_watched.strftime('%Y-%m-%d %H:%M')}")
console.print(f"Last Watched: {entry.last_watched.strftime('%Y-%m-%d %H:%M')}")
if entry.notes:
console.print(f"Notes: {entry.notes}")
# Show media details if available
media = entry.media_item
if media.description:
console.print(
f"\nDescription: {media.description[:200]}{'...' if len(media.description) > 200 else ''}"
)
if media.genres:
console.print(f"Genres: {', '.join(media.genres)}")
if media.average_score:
console.print(f"Score: {media.average_score}/100")
console.print()
# Action options
actions = [
"Mark Episode as Watched",
"Change Status",
"Edit Notes",
"Remove from History",
"Back to List",
]
choice = ctx.selector.choose("Select action:", choices=actions)
if choice == "Mark Episode as Watched":
return _mark_episode_watched(ctx, entry, feedback)
elif choice == "Change Status":
return _change_entry_status(ctx, entry, feedback)
elif choice == "Edit Notes":
return _edit_entry_notes(ctx, entry, feedback)
elif choice == "Remove from History":
return _confirm_remove_entry(ctx, entry, feedback)
else:
return "CONTINUE"
def _mark_episode_watched(ctx: Context, entry: WatchHistoryEntry, feedback) -> str:
"""Mark a specific episode as watched."""
current_episode = entry.last_watched_episode
max_episodes = entry.media_item.episodes or 999
episode_str = ctx.selector.ask(
f"Enter episode number (current: {current_episode}, max: {max_episodes}):"
)
try:
episode = int(episode_str)
if episode < 1 or (max_episodes and episode > max_episodes):
feedback.error(
f"Invalid episode number. Must be between 1 and {max_episodes}"
)
return "CONTINUE"
history_manager = WatchHistoryManager()
success = history_manager.mark_episode_watched(entry.media_item.id, episode)
if success:
feedback.success(f"Marked episode {episode} as watched")
else:
feedback.error("Failed to update watch progress")
except ValueError:
feedback.error("Invalid episode number entered")
return "CONTINUE"
def _change_entry_status(ctx: Context, entry: WatchHistoryEntry, feedback) -> str:
"""Change the status of a watch history entry."""
statuses = ["watching", "completed", "paused", "dropped", "planning"]
current_status = entry.status
choices = [
f"{status.title()} {'(current)' if status == current_status else ''}"
for status in statuses
]
choices.append("Cancel")
choice = ctx.selector.choose(
f"Select new status (current: {current_status}):", choices=choices
)
if not choice or choice == "Cancel":
return "CONTINUE"
new_status = choice.split()[0].lower()
history_manager = WatchHistoryManager()
success = history_manager.change_status(entry.media_item.id, new_status)
if success:
feedback.success(f"Changed status to {new_status}")
else:
feedback.error("Failed to update status")
return "CONTINUE"
def _edit_entry_notes(ctx: Context, entry: WatchHistoryEntry, feedback) -> str:
"""Edit notes for a watch history entry."""
current_notes = entry.notes or ""
new_notes = ctx.selector.ask(f"Enter notes (current: '{current_notes}'):")
if new_notes is None: # User cancelled
return "CONTINUE"
history_manager = WatchHistoryManager()
success = history_manager.update_notes(entry.media_item.id, new_notes)
if success:
feedback.success("Notes updated successfully")
else:
feedback.error("Failed to update notes")
return "CONTINUE"
def _confirm_remove_entry(ctx: Context, entry: WatchHistoryEntry, feedback) -> str:
"""Confirm and remove a watch history entry."""
if feedback.confirm(f"Remove '{entry.get_display_title()}' from watch history?"):
history_manager = WatchHistoryManager()
success = history_manager.remove_entry(entry.media_item.id)
if success:
feedback.success("Entry removed from watch history")
else:
feedback.error("Failed to remove entry")
return "CONTINUE"
def _edit_entry(ctx: Context, history_manager: WatchHistoryManager, feedback) -> str:
"""Edit a watch history entry (select first)."""
entries = history_manager.get_all_entries()
if not entries:
feedback.info("No watch history entries to edit")
return "CONTINUE"
# Sort by title for easier selection
entries.sort(key=lambda x: x.get_display_title())
choices = [
f"{entry.get_display_title()} - {entry.get_progress_display()}"
for entry in entries
]
choices.append("Cancel")
choice = ctx.selector.choose("Select anime to edit:", choices=choices)
if not choice or choice == "Cancel":
return "CONTINUE"
# Find the selected entry
choice_title = choice.split(" - ")[0]
selected_entry = next(
(entry for entry in entries if entry.get_display_title() == choice_title), None
)
if selected_entry:
return _show_entry_details(ctx, selected_entry, feedback)
return "CONTINUE"
def _remove_entry(ctx: Context, history_manager: WatchHistoryManager, feedback) -> str:
"""Remove a watch history entry (select first)."""
entries = history_manager.get_all_entries()
if not entries:
feedback.info("No watch history entries to remove")
return "CONTINUE"
# Sort by title for easier selection
entries.sort(key=lambda x: x.get_display_title())
choices = [
f"{entry.get_display_title()} - {entry.get_progress_display()}"
for entry in entries
]
choices.append("Cancel")
choice = ctx.selector.choose("Select anime to remove:", choices=choices)
if not choice or choice == "Cancel":
return "CONTINUE"
# Find the selected entry
choice_title = choice.split(" - ")[0]
selected_entry = next(
(entry for entry in entries if entry.get_display_title() == choice_title), None
)
if selected_entry:
return _confirm_remove_entry(ctx, selected_entry, feedback)
return "CONTINUE"
def _view_stats(ctx: Context, history_manager: WatchHistoryManager, feedback) -> str:
"""View detailed watch history statistics."""
console = Console()
console.clear()
stats = history_manager.get_stats()
# Create detailed stats table
table = Table(title="Detailed Watch History Statistics")
table.add_column("Metric", style="cyan")
table.add_column("Value", style="green")
table.add_row("Total Anime Entries", str(stats["total_entries"]))
table.add_row("Currently Watching", str(stats["watching"]))
table.add_row("Completed", str(stats["completed"]))
table.add_row("Dropped", str(stats["dropped"]))
table.add_row("Paused", str(stats["paused"]))
table.add_row("Total Episodes Watched", str(stats["total_episodes_watched"]))
table.add_row("Last Updated", stats["last_updated"])
# Calculate additional stats
if stats["total_entries"] > 0:
completion_rate = (stats["completed"] / stats["total_entries"]) * 100
table.add_row("Completion Rate", f"{completion_rate:.1f}%")
avg_episodes = stats["total_episodes_watched"] / stats["total_entries"]
table.add_row("Avg Episodes per Anime", f"{avg_episodes:.1f}")
console.print(table)
feedback.pause_for_user()
return "CONTINUE"
def _export_history(
ctx: Context, history_manager: WatchHistoryManager, feedback
) -> str:
"""Export watch history to a file."""
export_name = ctx.selector.ask("Enter export filename (without extension):")
if not export_name:
return "CONTINUE"
export_path = APP_DATA_DIR / f"{export_name}.json"
if export_path.exists():
if not feedback.confirm(
f"File '{export_name}.json' already exists. Overwrite?"
):
return "CONTINUE"
success = history_manager.export_history(export_path)
if success:
feedback.success(f"Watch history exported to {export_path}")
else:
feedback.error("Failed to export watch history")
return "CONTINUE"
def _import_history(
ctx: Context, history_manager: WatchHistoryManager, feedback
) -> str:
"""Import watch history from a file."""
import_name = ctx.selector.ask("Enter import filename (without extension):")
if not import_name:
return "CONTINUE"
import_path = APP_DATA_DIR / f"{import_name}.json"
if not import_path.exists():
feedback.error(f"File '{import_name}.json' not found in {APP_DATA_DIR}")
return "CONTINUE"
merge = feedback.confirm(
"Merge with existing history? (No = Replace existing history)"
)
success = history_manager.import_history(import_path, merge=merge)
if success:
action = "merged with" if merge else "replaced"
feedback.success(f"Watch history imported and {action} existing data")
else:
feedback.error("Failed to import watch history")
return "CONTINUE"
def _clear_history(ctx: Context, history_manager: WatchHistoryManager, feedback) -> str:
"""Clear all watch history with confirmation."""
if not feedback.confirm(
"Are you sure you want to clear ALL watch history? This cannot be undone."
):
return "CONTINUE"
if not feedback.confirm("Final confirmation: Clear all watch history?"):
return "CONTINUE"
# Create backup before clearing
backup_success = history_manager.backup_history()
if backup_success:
feedback.info("Backup created before clearing")
success = history_manager.clear_history()
if success:
feedback.success("All watch history cleared")
else:
feedback.error("Failed to clear watch history")
return "CONTINUE"

View File

@@ -1,3 +0,0 @@
from .service import DownloadService
__all__ = ["DownloadService"]

View File

@@ -1,530 +0,0 @@
"""Download service that integrates with the media registry."""
import logging
from pathlib import Path
from typing import Optional
from ....core.config.model import AppConfig
from ....core.downloader.base import BaseDownloader
from ....core.downloader.downloader import create_downloader
from ....core.downloader.params import DownloadParams
from ....core.exceptions import FastAnimeError
from ....libs.media_api.types import MediaItem
from ....libs.provider.anime.base import BaseAnimeProvider
from ....libs.provider.anime.params import EpisodeStreamsParams
from ....libs.provider.anime.types import Server
from ..registry import MediaRegistryService
from ..registry.models import DownloadStatus, MediaEpisode
logger = logging.getLogger(__name__)
class DownloadService:
"""Service for downloading episodes and tracking them in the registry."""
def __init__(
self,
config: AppConfig,
media_registry: MediaRegistryService,
provider: BaseAnimeProvider,
):
self.config = config
self.downloads_config = config.downloads
self.media_registry = media_registry
self.provider = provider
self._downloader: Optional[BaseDownloader] = None
@property
def downloader(self) -> BaseDownloader:
"""Lazy initialization of downloader."""
if self._downloader is None:
self._downloader = create_downloader(self.downloads_config)
return self._downloader
def download_episode(
self,
media_item: MediaItem,
episode_number: str,
server: Optional[Server] = None,
quality: Optional[str] = None,
force_redownload: bool = False,
) -> bool:
"""
Download a specific episode and record it in the registry.
Args:
media_item: The media item to download
episode_number: The episode number to download
server: Optional specific server to use for download
quality: Optional quality preference
force_redownload: Whether to redownload if already exists
Returns:
bool: True if download was successful, False otherwise
"""
try:
# Get or create media record
media_record = self.media_registry.get_or_create_record(media_item)
# Check if episode already exists and is completed
existing_episode = self._find_episode_in_record(
media_record, episode_number
)
if (
existing_episode
and existing_episode.download_status == DownloadStatus.COMPLETED
and not force_redownload
and existing_episode.file_path.exists()
):
logger.info(
f"Episode {episode_number} already downloaded at {existing_episode.file_path}"
)
return True
# Generate file path
file_path = self._generate_episode_file_path(media_item, episode_number)
# Update status to QUEUED
self.media_registry.update_episode_download_status(
media_id=media_item.id,
episode_number=episode_number,
status=DownloadStatus.QUEUED,
file_path=file_path,
)
# Get episode stream server if not provided
if server is None:
server = self._get_episode_server(media_item, episode_number, quality)
if not server:
self.media_registry.update_episode_download_status(
media_id=media_item.id,
episode_number=episode_number,
status=DownloadStatus.FAILED,
error_message="Failed to get server for episode",
)
return False
# Update status to DOWNLOADING
self.media_registry.update_episode_download_status(
media_id=media_item.id,
episode_number=episode_number,
status=DownloadStatus.DOWNLOADING,
provider_name=self.provider.__class__.__name__,
server_name=server.name,
quality=quality or self.downloads_config.preferred_quality,
)
# Perform the download
download_result = self._download_from_server(
media_item, episode_number, server, file_path
)
if download_result.success and download_result.video_path:
# Get file size if available
file_size = None
if download_result.video_path.exists():
file_size = download_result.video_path.stat().st_size
# Update episode record with success
self.media_registry.update_episode_download_status(
media_id=media_item.id,
episode_number=episode_number,
status=DownloadStatus.COMPLETED,
file_path=download_result.video_path,
file_size=file_size,
subtitle_paths=download_result.subtitle_paths,
)
logger.info(
f"Successfully downloaded episode {episode_number} to {download_result.video_path}"
)
else:
# Update episode record with failure
self.media_registry.update_episode_download_status(
media_id=media_item.id,
episode_number=episode_number,
status=DownloadStatus.FAILED,
error_message=download_result.error_message,
)
logger.error(
f"Failed to download episode {episode_number}: {download_result.error_message}"
)
return download_result.success
except Exception as e:
logger.error(f"Error downloading episode {episode_number}: {e}")
# Update status to FAILED
try:
self.media_registry.update_episode_download_status(
media_id=media_item.id,
episode_number=episode_number,
status=DownloadStatus.FAILED,
error_message=str(e),
)
except Exception as cleanup_error:
logger.error(f"Failed to update failed status: {cleanup_error}")
return False
def download_multiple_episodes(
self,
media_item: MediaItem,
episode_numbers: list[str],
quality: Optional[str] = None,
force_redownload: bool = False,
) -> dict[str, bool]:
"""
Download multiple episodes and return success status for each.
Args:
media_item: The media item to download
episode_numbers: List of episode numbers to download
quality: Optional quality preference
force_redownload: Whether to redownload if already exists
Returns:
dict: Mapping of episode_number -> success status
"""
results = {}
for episode_number in episode_numbers:
success = self.download_episode(
media_item=media_item,
episode_number=episode_number,
quality=quality,
force_redownload=force_redownload,
)
results[episode_number] = success
# Log progress
logger.info(
f"Download progress: {episode_number} - {'' if success else ''}"
)
return results
def get_download_status(
self, media_item: MediaItem, episode_number: str
) -> Optional[DownloadStatus]:
"""Get the download status for a specific episode."""
media_record = self.media_registry.get_media_record(media_item.id)
if not media_record:
return None
episode_record = self._find_episode_in_record(media_record, episode_number)
return episode_record.download_status if episode_record else None
def get_downloaded_episodes(self, media_item: MediaItem) -> list[str]:
"""Get list of successfully downloaded episode numbers for a media item."""
media_record = self.media_registry.get_media_record(media_item.id)
if not media_record:
return []
return [
episode.episode_number
for episode in media_record.media_episodes
if episode.download_status == DownloadStatus.COMPLETED
and episode.file_path.exists()
]
def remove_downloaded_episode(
self, media_item: MediaItem, episode_number: str
) -> bool:
"""Remove a downloaded episode file and update registry."""
try:
media_record = self.media_registry.get_media_record(media_item.id)
if not media_record:
return False
episode_record = self._find_episode_in_record(media_record, episode_number)
if not episode_record:
return False
# Remove file if it exists
if episode_record.file_path.exists():
episode_record.file_path.unlink()
# Remove episode from record
media_record.media_episodes = [
ep
for ep in media_record.media_episodes
if ep.episode_number != episode_number
]
# Save updated record
self.media_registry.save_media_record(media_record)
logger.info(f"Removed downloaded episode {episode_number}")
return True
except Exception as e:
logger.error(f"Error removing episode {episode_number}: {e}")
return False
def _find_episode_in_record(
self, media_record, episode_number: str
) -> Optional[MediaEpisode]:
"""Find an episode record by episode number."""
for episode in media_record.media_episodes:
if episode.episode_number == episode_number:
return episode
return None
def _get_episode_server(
self, media_item: MediaItem, episode_number: str, quality: Optional[str] = None
) -> Optional[Server]:
"""Get a server for downloading the episode."""
try:
# Use media title for provider search
media_title = media_item.title.english or media_item.title.romaji
if not media_title:
logger.error("Media item has no searchable title")
return None
# Get episode streams from provider
streams = self.provider.episode_streams(
EpisodeStreamsParams(
anime_id=str(media_item.id),
query=media_title,
episode=episode_number,
translation_type=self.config.stream.translation_type,
)
)
if not streams:
logger.error(f"No streams found for episode {episode_number}")
return None
# Convert iterator to list and get first available server
stream_list = list(streams)
if not stream_list:
logger.error(f"No servers available for episode {episode_number}")
return None
# Return the first server (could be enhanced with quality/preference logic)
return stream_list[0]
except Exception as e:
logger.error(f"Error getting episode server: {e}")
return None
def _download_from_server(
self,
media_item: MediaItem,
episode_number: str,
server: Server,
output_path: Path,
):
"""Download episode from a specific server."""
anime_title = media_item.title.english or media_item.title.romaji or "Unknown"
episode_title = server.episode_title or f"Episode {episode_number}"
try:
# Get the best quality link from server
if not server.links:
raise FastAnimeError("Server has no available links")
# Use the first link (could be enhanced with quality filtering)
stream_link = server.links[0]
# Prepare download parameters
download_params = DownloadParams(
url=stream_link.link,
anime_title=anime_title,
episode_title=episode_title,
silent=True, # Use True by default since there's no verbose in config
headers=server.headers,
subtitles=[sub.url for sub in server.subtitles]
if server.subtitles
else [],
vid_format=self.downloads_config.preferred_quality,
force_unknown_ext=True,
)
# Ensure output directory exists
output_path.parent.mkdir(parents=True, exist_ok=True)
# Perform download
return self.downloader.download(download_params)
except Exception as e:
logger.error(f"Error during download: {e}")
from ....core.downloader.model import DownloadResult
return DownloadResult(
success=False,
error_message=str(e),
anime_title=anime_title,
episode_title=episode_title,
)
def get_download_statistics(self) -> dict:
"""Get comprehensive download statistics from the registry."""
return self.media_registry.get_download_statistics()
def get_failed_downloads(self) -> list[tuple[int, str]]:
"""Get all episodes that failed to download."""
return self.media_registry.get_episodes_by_download_status(
DownloadStatus.FAILED
)
def get_queued_downloads(self) -> list[tuple[int, str]]:
"""Get all episodes queued for download."""
return self.media_registry.get_episodes_by_download_status(
DownloadStatus.QUEUED
)
def retry_failed_downloads(self, max_retries: int = 3) -> dict[str, bool]:
"""Retry all failed downloads up to max_retries."""
failed_episodes = self.get_failed_downloads()
results = {}
for media_id, episode_number in failed_episodes:
# Get the media record to check retry attempts
media_record = self.media_registry.get_media_record(media_id)
if not media_record:
continue
episode_record = self._find_episode_in_record(media_record, episode_number)
if not episode_record or episode_record.download_attempts >= max_retries:
logger.info(
f"Skipping {media_id}:{episode_number} - max retries exceeded"
)
continue
logger.info(f"Retrying download for {media_id}:{episode_number}")
success = self.download_episode(
media_item=media_record.media_item,
episode_number=episode_number,
force_redownload=True,
)
results[f"{media_id}:{episode_number}"] = success
return results
def cleanup_failed_downloads(self, older_than_days: int = 7) -> int:
"""Clean up failed download records older than specified days."""
from datetime import datetime, timedelta
cleanup_count = 0
cutoff_date = datetime.now() - timedelta(days=older_than_days)
try:
for record in self.media_registry.get_all_media_records():
episodes_to_remove = []
for episode in record.media_episodes:
if (
episode.download_status == DownloadStatus.FAILED
and episode.download_date < cutoff_date
):
episodes_to_remove.append(episode.episode_number)
for episode_number in episodes_to_remove:
record.media_episodes = [
ep
for ep in record.media_episodes
if ep.episode_number != episode_number
]
cleanup_count += 1
if episodes_to_remove:
self.media_registry.save_media_record(record)
logger.info(f"Cleaned up {cleanup_count} failed download records")
return cleanup_count
except Exception as e:
logger.error(f"Error during cleanup: {e}")
return 0
def pause_download(self, media_item: MediaItem, episode_number: str) -> bool:
"""Pause a download (change status from DOWNLOADING to PAUSED)."""
try:
return self.media_registry.update_episode_download_status(
media_id=media_item.id,
episode_number=episode_number,
status=DownloadStatus.PAUSED,
)
except Exception as e:
logger.error(f"Error pausing download: {e}")
return False
def resume_download(self, media_item: MediaItem, episode_number: str) -> bool:
"""Resume a paused download."""
return self.download_episode(
media_item=media_item,
episode_number=episode_number,
force_redownload=True,
)
def get_media_download_progress(self, media_item: MediaItem) -> dict:
"""Get download progress for a specific media item."""
try:
media_record = self.media_registry.get_media_record(media_item.id)
if not media_record:
return {
"total": 0,
"downloaded": 0,
"failed": 0,
"queued": 0,
"downloading": 0,
}
stats = {
"total": 0,
"downloaded": 0,
"failed": 0,
"queued": 0,
"downloading": 0,
"paused": 0,
}
for episode in media_record.media_episodes:
stats["total"] += 1
status = episode.download_status.value.lower()
if status == "completed":
stats["downloaded"] += 1
elif status == "failed":
stats["failed"] += 1
elif status == "queued":
stats["queued"] += 1
elif status == "downloading":
stats["downloading"] += 1
elif status == "paused":
stats["paused"] += 1
return stats
except Exception as e:
logger.error(f"Error getting download progress: {e}")
return {
"total": 0,
"downloaded": 0,
"failed": 0,
"queued": 0,
"downloading": 0,
}
def _generate_episode_file_path(
self, media_item: MediaItem, episode_number: str
) -> Path:
"""Generate the file path for a downloaded episode."""
# Use the download directory from config
base_dir = self.downloads_config.downloads_dir
# Create anime-specific directory
anime_title = media_item.title.english or media_item.title.romaji or "Unknown"
# Sanitize title for filesystem
safe_title = "".join(
c for c in anime_title if c.isalnum() or c in (" ", "-", "_")
).rstrip()
anime_dir = base_dir / safe_title
# Generate filename (could use template from config in the future)
filename = f"Episode_{episode_number:0>2}.mp4"
return anime_dir / filename

View File

@@ -39,15 +39,6 @@ STREAM_USE_IPC = (
lambda: True if PLATFORM != "win32" and not detect.is_running_in_termux() else False
)
# ServiceConfig
SERVICE_ENABLED = False
SERVICE_WATCHLIST_CHECK_INTERVAL = 30
SERVICE_QUEUE_PROCESS_INTERVAL = 1
SERVICE_MAX_CONCURRENT_DOWNLOADS = 3
SERVICE_AUTO_RETRY_COUNT = 3
SERVICE_CLEANUP_COMPLETED_DAYS = 7
SERVICE_NOTIFICATION_ENABLED = True
# FzfConfig
FZF_OPTS = DEFAULTS_DIR / "fzf-opts"
FZF_HEADER_COLOR = "95,135,175"
@@ -78,22 +69,6 @@ ANILIST_PREFERRED_LANGUAGE = "english"
DOWNLOADS_DOWNLOADER = "auto"
DOWNLOADS_DOWNLOADS_DIR = USER_VIDEOS_DIR
DOWNLOADS_ENABLE_TRACKING = True
DOWNLOADS_AUTO_ORGANIZE = True
DOWNLOADS_MAX_CONCURRENT = 3
DOWNLOADS_AUTO_CLEANUP_FAILED = True
DOWNLOADS_RETENTION_DAYS = 30
DOWNLOADS_SYNC_WITH_WATCH_HISTORY = True
DOWNLOADS_AUTO_MARK_OFFLINE = True
DOWNLOADS_NAMING_TEMPLATE = (
"{title}/Season {season:02d}/{episode:02d} - {episode_title}.{ext}"
)
DOWNLOADS_PREFERRED_QUALITY = "1080"
DOWNLOADS_DOWNLOAD_SUBTITLES = True
DOWNLOADS_SUBTITLE_LANGUAGES = ["en"]
DOWNLOADS_QUEUE_MAX_SIZE = 100
DOWNLOADS_AUTO_START_DOWNLOADS = True
DOWNLOADS_RETRY_ATTEMPTS = 3
DOWNLOADS_RETRY_DELAY = 300
# RegistryConfig
MEDIA_REGISTRY_DIR = USER_VIDEOS_DIR / ".registry"

View File

@@ -140,49 +140,6 @@ class StreamConfig(BaseModel):
)
class ServiceConfig(BaseModel):
"""Configuration for the background download service."""
enabled: bool = Field(
default=defaults.SERVICE_ENABLED,
description=desc.SERVICE_ENABLED,
)
watchlist_check_interval: int = Field(
default=defaults.SERVICE_WATCHLIST_CHECK_INTERVAL,
ge=5,
le=180,
description=desc.SERVICE_WATCHLIST_CHECK_INTERVAL,
)
queue_process_interval: int = Field(
default=defaults.SERVICE_QUEUE_PROCESS_INTERVAL,
ge=1,
le=60,
description=desc.SERVICE_QUEUE_PROCESS_INTERVAL,
)
max_concurrent_downloads: int = Field(
default=defaults.SERVICE_MAX_CONCURRENT_DOWNLOADS,
ge=1,
le=10,
description=desc.SERVICE_MAX_CONCURRENT_DOWNLOADS,
)
auto_retry_count: int = Field(
default=defaults.SERVICE_AUTO_RETRY_COUNT,
ge=0,
le=10,
description=desc.SERVICE_AUTO_RETRY_COUNT,
)
cleanup_completed_days: int = Field(
default=defaults.SERVICE_CLEANUP_COMPLETED_DAYS,
ge=1,
le=30,
description=desc.SERVICE_CLEANUP_COMPLETED_DAYS,
)
notification_enabled: bool = Field(
default=defaults.SERVICE_NOTIFICATION_ENABLED,
description=desc.SERVICE_NOTIFICATION_ENABLED,
)
class OtherConfig(BaseModel):
pass
@@ -338,72 +295,6 @@ class DownloadsConfig(OtherConfig):
default=defaults.DOWNLOADS_ENABLE_TRACKING,
description=desc.DOWNLOADS_ENABLE_TRACKING,
)
auto_organize: bool = Field(
default=defaults.DOWNLOADS_AUTO_ORGANIZE,
description=desc.DOWNLOADS_AUTO_ORGANIZE,
)
max_concurrent: int = Field(
default=defaults.DOWNLOADS_MAX_CONCURRENT,
gt=0,
le=10,
description=desc.DOWNLOADS_MAX_CONCURRENT,
)
auto_cleanup_failed: bool = Field(
default=defaults.DOWNLOADS_AUTO_CLEANUP_FAILED,
description=desc.DOWNLOADS_AUTO_CLEANUP_FAILED,
)
retention_days: int = Field(
default=defaults.DOWNLOADS_RETENTION_DAYS,
gt=0,
description=desc.DOWNLOADS_RETENTION_DAYS,
)
# Integration with watch history
sync_with_watch_history: bool = Field(
default=defaults.DOWNLOADS_SYNC_WITH_WATCH_HISTORY,
description=desc.DOWNLOADS_SYNC_WITH_WATCH_HISTORY,
)
auto_mark_offline: bool = Field(
default=defaults.DOWNLOADS_AUTO_MARK_OFFLINE,
description=desc.DOWNLOADS_AUTO_MARK_OFFLINE,
)
# File organization
naming_template: str = Field(
default=defaults.DOWNLOADS_NAMING_TEMPLATE,
description=desc.DOWNLOADS_NAMING_TEMPLATE,
)
# Quality and subtitles
preferred_quality: Literal["360", "480", "720", "1080", "best"] = Field(
default=defaults.DOWNLOADS_PREFERRED_QUALITY,
description=desc.DOWNLOADS_PREFERRED_QUALITY,
)
download_subtitles: bool = Field(
default=defaults.DOWNLOADS_DOWNLOAD_SUBTITLES,
description=desc.DOWNLOADS_DOWNLOAD_SUBTITLES,
)
# Queue management
queue_max_size: int = Field(
default=defaults.DOWNLOADS_QUEUE_MAX_SIZE,
gt=0,
description=desc.DOWNLOADS_QUEUE_MAX_SIZE,
)
auto_start_downloads: bool = Field(
default=defaults.DOWNLOADS_AUTO_START_DOWNLOADS,
description=desc.DOWNLOADS_AUTO_START_DOWNLOADS,
)
retry_attempts: int = Field(
default=defaults.DOWNLOADS_RETRY_ATTEMPTS,
ge=0,
description=desc.DOWNLOADS_RETRY_ATTEMPTS,
)
retry_delay: int = Field(
default=defaults.DOWNLOADS_RETRY_DELAY,
ge=0,
description=desc.DOWNLOADS_RETRY_DELAY,
)
class MediaRegistryConfig(OtherConfig):
@@ -442,11 +333,6 @@ class AppConfig(BaseModel):
default_factory=JikanConfig,
description=desc.APP_JIKAN,
)
service: ServiceConfig = Field(
default_factory=ServiceConfig,
description=desc.APP_SERVICE,
)
fzf: FzfConfig = Field(
default_factory=FzfConfig,
description=desc.APP_FZF,
@@ -456,10 +342,6 @@ class AppConfig(BaseModel):
description=desc.APP_ROFI,
)
mpv: MpvConfig = Field(default_factory=MpvConfig, description=desc.APP_MPV)
service: ServiceConfig = Field(
default_factory=ServiceConfig,
description=desc.APP_SERVICE,
)
media_registry: MediaRegistryConfig = Field(
default_factory=MediaRegistryConfig, description=desc.APP_MEDIA_REGISTRY
)