Merge branch 'master' into feature/ci

This commit is contained in:
Benexl
2025-07-29 10:23:44 +03:00
committed by GitHub
44 changed files with 1630 additions and 3586 deletions

View File

@@ -0,0 +1,16 @@
# values in {NAME} syntax are provided by python using .replace()
#
[Unit]
Description=FastAnime Background Worker
After=network-online.target
[Service]
Type=simple
# Ensure you have the full path to your fastanime executable
# Use `which fastanime` to find it
ExecStart={EXECUTABLE} worker --log
Restart=always
RestartSec=30
[Install]
WantedBy=default.target

View File

@@ -19,6 +19,7 @@ query {
}
coverImage {
medium
large
}
}
}

View File

@@ -36,6 +36,8 @@ commands = {
"download": "download.download",
"update": "update.update",
"registry": "registry.registry",
"worker": "worker.worker",
"queue": "queue.queue",
}

View File

@@ -7,9 +7,10 @@ commands = {
# "trending": "trending.trending",
# "recent": "recent.recent",
"search": "search.search",
# "downloads": "downloads.downloads",
"download": "download.download",
"auth": "auth.auth",
"stats": "stats.stats",
"notifications": "notifications.notifications",
}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,56 @@
import click
from fastanime.core.config import AppConfig
from rich.console import Console
from rich.table import Table
@click.command(help="Check for new AniList notifications (e.g., for airing episodes).")
@click.pass_obj
def notifications(config: AppConfig):
"""
Displays unread notifications from AniList.
Running this command will also mark the notifications as read on the AniList website.
"""
from fastanime.cli.service.feedback import FeedbackService
from fastanime.libs.media_api.api import create_api_client
from ....service.auth import AuthService
feedback = FeedbackService(config.general.icons)
console = Console()
auth = AuthService(config.general.media_api)
api_client = create_api_client(config.general.media_api, config)
if profile := auth.get_auth():
api_client.authenticate(profile.token)
if not api_client.is_authenticated():
feedback.error(
"Authentication Required", "Please log in with 'fastanime anilist auth'."
)
return
with feedback.progress("Fetching notifications..."):
notifs = api_client.get_notifications()
if not notifs:
feedback.success("All caught up!", "You have no new notifications.")
return
table = Table(
title="🔔 AniList Notifications", show_header=True, header_style="bold magenta"
)
table.add_column("Date", style="dim", width=12)
table.add_column("Anime Title", style="cyan")
table.add_column("Details", style="green")
for notif in sorted(notifs, key=lambda n: n.created_at, reverse=True):
title = notif.media.title.english or notif.media.title.romaji or "Unknown"
date_str = notif.created_at.strftime("%Y-%m-%d")
details = f"Episode {notif.episode} has aired!"
table.add_row(date_str, title, details)
console.print(table)
feedback.info(
"Notifications have been marked as read on AniList.",
)

View File

@@ -261,11 +261,12 @@ def download_anime(
episode_title=f"{anime.title}; Episode {episode}",
subtitles=[sub.url for sub in server.subtitles],
headers=server.headers,
vid_format=config.stream.ytdlp_format,
vid_format=config.downloads.ytdlp_format,
force_unknown_ext=download_options["force_unknown_ext"],
verbose=download_options["verbose"],
hls_use_mpegts=download_options["hls_use_mpegts"],
hls_use_h264=download_options["hls_use_h264"],
silent=download_options["silent"],
no_check_certificate=config.downloads.no_check_certificate,
)
)

View File

@@ -0,0 +1,51 @@
import click
from fastanime.core.config import AppConfig
from fastanime.libs.media_api.params import MediaSearchParams
from fastanime.core.exceptions import FastAnimeError
@click.command(help="Queue episodes for the background worker to download.")
@click.option("--title", "-t", required=True, multiple=True, help="Anime title to queue.")
@click.option("--episode-range", "-r", required=True, help="Range of episodes (e.g., '1-10').")
@click.pass_obj
def queue(config: AppConfig, title: tuple, episode_range: str):
"""
Searches for an anime and adds the specified episodes to the download queue.
The background worker must be running for the downloads to start.
"""
from fastanime.cli.service.download.service import DownloadService
from fastanime.cli.service.feedback import FeedbackService
from fastanime.cli.utils.parser import parse_episode_range
from fastanime.libs.media_api.api import create_api_client
from fastanime.libs.provider.anime.provider import create_provider
from fastanime.cli.service.registry import MediaRegistryService
feedback = FeedbackService(config.general.icons)
media_api = create_api_client(config.general.media_api, config)
provider = create_provider(config.general.provider)
registry = MediaRegistryService(config.general.media_api, config.media_registry)
download_service = DownloadService(config, registry, media_api, provider)
for anime_title in title:
try:
feedback.info(f"Searching for '{anime_title}'...")
search_result = media_api.search_media(MediaSearchParams(query=anime_title, per_page=1))
if not search_result or not search_result.media:
feedback.warning(f"Could not find '{anime_title}' on AniList.")
continue
media_item = search_result.media[0]
available_episodes = [str(i + 1) for i in range(media_item.episodes or 0)]
episodes_to_queue = list(parse_episode_range(episode_range, available_episodes))
queued_count = 0
for ep in episodes_to_queue:
if download_service.add_to_queue(media_item, ep):
queued_count += 1
feedback.success(f"Successfully queued {queued_count} episodes for '{media_item.title.english}'.")
except FastAnimeError as e:
feedback.error(f"Failed to queue '{anime_title}'", str(e))
except Exception as e:
feedback.error("An unexpected error occurred", str(e))

View File

@@ -0,0 +1,39 @@
import click
from fastanime.core.config import AppConfig
@click.command(help="Run the background worker for notifications and downloads.")
@click.pass_obj
def worker(config: AppConfig):
"""
Starts the long-running background worker process.
This process will periodically check for AniList notifications and
process any queued downloads. It's recommended to run this in the
background (e.g., 'fastanime worker &') or as a system service.
"""
from fastanime.cli.service.download.service import DownloadService
from fastanime.cli.service.feedback import FeedbackService
from fastanime.cli.service.notification.service import NotificationService
from fastanime.cli.service.registry.service import MediaRegistryService
from fastanime.cli.service.worker.service import BackgroundWorkerService
from fastanime.libs.media_api.api import create_api_client
from fastanime.libs.provider.anime.provider import create_provider
feedback = FeedbackService(config.general.icons)
if not config.worker.enabled:
feedback.warning("Worker is disabled in the configuration. Exiting.")
return
# Instantiate services
media_api = create_api_client(config.general.media_api, config)
provider = create_provider(config.general.provider)
registry = MediaRegistryService(config.general.media_api, config.media_registry)
notification_service = NotificationService(media_api)
download_service = DownloadService(config, registry, media_api, provider)
worker_service = BackgroundWorkerService(
config.worker, notification_service, download_service
)
feedback.info("Starting background worker...", "Press Ctrl+C to stop.")
worker_service.run()

View File

@@ -0,0 +1,91 @@
from .....core.utils.fuzzy import fuzz
from .....core.utils.normalizer import normalize_title
from .....libs.provider.anime.params import AnimeParams, SearchParams
from ....service.download.service import DownloadService
from ...session import Context, session
from ...state import InternalDirective, State
@session.menu
def download_episodes(ctx: Context, state: State) -> State | InternalDirective:
"""Menu to select and download episodes synchronously."""
feedback = ctx.feedback
selector = ctx.selector
media_item = state.media_api.media_item
config = ctx.config
provider = ctx.provider
if not media_item:
feedback.error("No media item selected for download.")
return InternalDirective.BACK
media_title = media_item.title.english or media_item.title.romaji
if not media_title:
feedback.error("Cannot download: Media item has no title.")
return InternalDirective.BACK
# Step 1: Find the anime on the provider to get a full episode list
with feedback.progress(
f"Searching for '{media_title}' on {provider.__class__.__name__}..."
):
provider_search_results = provider.search(
SearchParams(
query=normalize_title(media_title, config.general.provider.value, True)
)
)
if not provider_search_results or not provider_search_results.results:
feedback.warning(f"Could not find '{media_title}' on provider.")
return InternalDirective.BACK
provider_results_map = {res.title: res for res in provider_search_results.results}
best_match_title = max(
provider_results_map.keys(),
key=lambda p_title: fuzz.ratio(
normalize_title(p_title, config.general.provider.value).lower(),
media_title.lower(),
),
)
selected_provider_anime_ref = provider_results_map[best_match_title]
with feedback.progress(f"Fetching episode list for '{best_match_title}'..."):
full_provider_anime = provider.get(
AnimeParams(id=selected_provider_anime_ref.id, query=media_title)
)
if not full_provider_anime:
feedback.warning(f"Failed to fetch details for '{best_match_title}'.")
return InternalDirective.BACK
available_episodes = getattr(
full_provider_anime.episodes, config.stream.translation_type, []
)
if not available_episodes:
feedback.warning("No episodes found for download.")
return InternalDirective.BACK
# Step 2: Let user select episodes
selected_episodes = selector.choose_multiple(
"Select episodes to download (TAB to select, ENTER to confirm)",
choices=available_episodes,
)
if not selected_episodes:
feedback.info("No episodes selected for download.")
return InternalDirective.BACK
# Step 3: Download episodes synchronously
# TODO: move to main ctx
download_service = DownloadService(
config, ctx.media_registry, ctx.media_api, ctx.provider
)
feedback.info(
f"Starting download of {len(selected_episodes)} episodes. This may take a while..."
)
download_service.download_episodes_sync(media_item, selected_episodes)
feedback.success(f"Finished downloading {len(selected_episodes)} episodes.")
# After downloading, return to the media actions menu
return InternalDirective.BACK

View File

@@ -11,6 +11,7 @@ from .....libs.media_api.types import (
UserMediaListStatus,
)
from .....libs.player.params import PlayerParams
from ....service.registry.service import DownloadStatus
from ...session import Context, session
from ...state import InternalDirective, MediaApiState, MenuName, State
@@ -30,41 +31,70 @@ def media_actions(ctx: Context, state: State) -> State | InternalDirective:
return InternalDirective.BACK
progress = _get_progress_string(ctx, state.media_api.media_item)
# TODO: Add media list management
# TODO: cross reference for none implemented features
# Check for downloaded episodes to conditionally show options
record = ctx.media_registry.get_media_record(media_item.id)
has_downloads = False
if record:
has_downloads = any(
ep.download_status == DownloadStatus.COMPLETED
and ep.file_path
and ep.file_path.exists()
for ep in record.media_episodes
)
options: Dict[str, MenuAction] = {
f"{'▶️ ' if icons else ''}Stream {progress}": _stream(ctx, state),
f"{'📽️ ' if icons else ''}Episodes": _stream(
ctx, state, force_episodes_menu=True
),
f"{'📼 ' if icons else ''}Watch Trailer": _watch_trailer(ctx, state),
f"{'🔗 ' if icons else ''}Recommendations": _view_recommendations(ctx, state),
f"{'🔄 ' if icons else ''}Related Anime": _view_relations(ctx, state),
f"{'👥 ' if icons else ''}Characters": _view_characters(ctx, state),
f"{'📅 ' if icons else ''}Airing Schedule": _view_airing_schedule(ctx, state),
f"{'📝 ' if icons else ''}View Reviews": _view_reviews(ctx, state),
f"{' ' if icons else ''}Add/Update List": _manage_user_media_list(ctx, state),
f"{'' if icons else ''}Score Anime": _score_anime(ctx, state),
f"{' ' if icons else ''}View Info": _view_info(ctx, state),
f"{'📀 ' if icons else ''}Change Provider (Current: {ctx.config.general.provider.value.upper()})": _change_provider(
ctx, state
),
f"{'🔘 ' if icons else ''}Toggle Auto Select Anime (Current: {ctx.config.general.auto_select_anime_result})": _toggle_config_state(
ctx, state, "AUTO_ANIME"
),
f"{'🔘 ' if icons else ''}Toggle Auto Next Episode (Current: {ctx.config.stream.auto_next})": _toggle_config_state(
ctx, state, "AUTO_EPISODE"
),
f"{'🔘 ' if icons else ''}Toggle Continue From History (Current: {ctx.config.stream.continue_from_watch_history})": _toggle_config_state(
ctx, state, "CONTINUE_FROM_HISTORY"
),
f"{'🔘 ' if icons else ''}Toggle Translation Type (Current: {ctx.config.stream.translation_type.upper()})": _toggle_config_state(
ctx, state, "TRANSLATION_TYPE"
),
f"{'🔙 ' if icons else ''}Back to Results": lambda: InternalDirective.BACK,
f"{'' if icons else ''}Exit": lambda: InternalDirective.EXIT,
}
if has_downloads:
options[f"{'💾 ' if icons else ''}Stream (Downloads)"] = _stream_downloads(
ctx, state
)
options[f"{'💿 ' if icons else ''}Episodes (Downloads)"] = _stream_downloads(
ctx, state, force_episodes_menu=True
)
options.update(
{
f"{'📥 ' if icons else ''}Download": _download_episodes(ctx, state),
f"{'📼 ' if icons else ''}Watch Trailer": _watch_trailer(ctx, state),
f"{'🔗 ' if icons else ''}Recommendations": _view_recommendations(
ctx, state
),
f"{'🔄 ' if icons else ''}Related Anime": _view_relations(ctx, state),
f"{'👥 ' if icons else ''}Characters": _view_characters(ctx, state),
f"{'📅 ' if icons else ''}Airing Schedule": _view_airing_schedule(
ctx, state
),
f"{'📝 ' if icons else ''}View Reviews": _view_reviews(ctx, state),
f"{' ' if icons else ''}Add/Update List": _manage_user_media_list(
ctx, state
),
f"{'' if icons else ''}Score Anime": _score_anime(ctx, state),
f"{' ' if icons else ''}View Info": _view_info(ctx, state),
f"{'📀 ' if icons else ''}Change Provider (Current: {ctx.config.general.provider.value.upper()})": _change_provider(
ctx, state
),
f"{'🔘 ' if icons else ''}Toggle Auto Select Anime (Current: {ctx.config.general.auto_select_anime_result})": _toggle_config_state(
ctx, state, "AUTO_ANIME"
),
f"{'🔘 ' if icons else ''}Toggle Auto Next Episode (Current: {ctx.config.stream.auto_next})": _toggle_config_state(
ctx, state, "AUTO_EPISODE"
),
f"{'🔘 ' if icons else ''}Toggle Continue From History (Current: {ctx.config.stream.continue_from_watch_history})": _toggle_config_state(
ctx, state, "CONTINUE_FROM_HISTORY"
),
f"{'🔘 ' if icons else ''}Toggle Translation Type (Current: {ctx.config.stream.translation_type.upper()})": _toggle_config_state(
ctx, state, "TRANSLATION_TYPE"
),
f"{'🔙 ' if icons else ''}Back to Results": lambda: InternalDirective.BACK,
f"{'' if icons else ''}Exit": lambda: InternalDirective.EXIT,
}
)
choice = ctx.selector.choose(
prompt="Select Action",
choices=list(options.keys()),
@@ -114,6 +144,24 @@ def _stream(ctx: Context, state: State, force_episodes_menu=False) -> MenuAction
return action
def _stream_downloads(
ctx: Context, state: State, force_episodes_menu=False
) -> MenuAction:
def action():
if force_episodes_menu:
ctx.switch.force_episodes_menu()
return State(menu_name=MenuName.PLAY_DOWNLOADS, media_api=state.media_api)
return action
def _download_episodes(ctx: Context, state: State) -> MenuAction:
def action():
return State(menu_name=MenuName.DOWNLOAD_EPISODES, media_api=state.media_api)
return action
def _watch_trailer(ctx: Context, state: State) -> MenuAction:
def action():
feedback = ctx.feedback

View File

@@ -0,0 +1,329 @@
from typing import Callable, Dict, Literal, Union
from .....libs.player.params import PlayerParams
from ....service.registry.models import DownloadStatus
from ...session import Context, session
from ...state import InternalDirective, MenuName, State
MenuAction = Callable[[], Union[State, InternalDirective]]
@session.menu
def play_downloads(ctx: Context, state: State) -> State | InternalDirective:
"""Menu to select and play locally downloaded episodes."""
feedback = ctx.feedback
media_item = state.media_api.media_item
current_episode_num = state.provider.episode
if not media_item:
feedback.error("No media item selected.")
return InternalDirective.BACK
record = ctx.media_registry.get_media_record(media_item.id)
if not record or not record.media_episodes:
feedback.warning("No downloaded episodes found for this anime.")
return InternalDirective.BACK
downloaded_episodes = {
ep.episode_number: ep.file_path
for ep in record.media_episodes
if ep.download_status == DownloadStatus.COMPLETED
and ep.file_path
and ep.file_path.exists()
}
if not downloaded_episodes:
feedback.warning("No complete downloaded episodes found.")
return InternalDirective.BACK
chosen_episode: str | None = current_episode_num
start_time: str | None = None
if not chosen_episode and ctx.config.stream.continue_from_watch_history:
_chosen_episode, _start_time = ctx.watch_history.get_episode(media_item)
if _chosen_episode in downloaded_episodes:
chosen_episode = _chosen_episode
start_time = _start_time
if not chosen_episode or ctx.switch.show_episodes_menu:
choices = [*list(sorted(downloaded_episodes.keys(), key=float)), "Back"]
preview_command = None
if ctx.config.general.preview != "none":
from ....utils.preview import create_preview_context
with create_preview_context() as preview_ctx:
preview_command = preview_ctx.get_episode_preview(
list(downloaded_episodes.keys()), media_item, ctx.config
)
chosen_episode_str = ctx.selector.choose(
prompt="Select Episode", choices=choices, preview=preview_command
)
if not chosen_episode_str or chosen_episode_str == "Back":
return InternalDirective.BACK
chosen_episode = chosen_episode_str
# Workers are automatically cleaned up when exiting the context
else:
# No preview mode
chosen_episode_str = ctx.selector.choose(
prompt="Select Episode", choices=choices, preview=None
)
if not chosen_episode_str or chosen_episode_str == "Back":
return InternalDirective.BACK
chosen_episode = chosen_episode_str
if not chosen_episode or chosen_episode == "Back":
return InternalDirective.BACK
return State(
menu_name=MenuName.DOWNLOADS_PLAYER_CONTROLS,
media_api=state.media_api,
provider=state.provider.model_copy(
update={"episode": chosen_episode, "start_time": start_time}
),
)
# TODO: figure out the best way to implement this logic for next episode ...
@session.menu
def downloads_player_controls(
ctx: Context, state: State
) -> Union[State, InternalDirective]:
feedback = ctx.feedback
feedback.clear_console()
config = ctx.config
selector = ctx.selector
media_item = state.media_api.media_item
current_episode_num = state.provider.episode
current_start_time = state.provider.start_time
if not media_item or not current_episode_num:
feedback.error("Player state is incomplete. Returning.")
return InternalDirective.BACK
record = ctx.media_registry.get_media_record(media_item.id)
if not record or not record.media_episodes:
feedback.warning("No downloaded episodes found for this anime.")
return InternalDirective.BACK
downloaded_episodes = {
ep.episode_number: ep.file_path
for ep in record.media_episodes
if ep.download_status == DownloadStatus.COMPLETED
and ep.file_path
and ep.file_path.exists()
}
available_episodes = list(sorted(downloaded_episodes.keys(), key=float))
current_index = available_episodes.index(current_episode_num)
if not ctx.switch.dont_play:
file_path = downloaded_episodes[current_episode_num]
# Use the player service to play the local file
title = f"{media_item.title.english or media_item.title.romaji}; Episode {current_episode_num}"
if media_item.streaming_episodes:
streaming_episode = media_item.streaming_episodes.get(current_episode_num)
title = streaming_episode.title if streaming_episode else title
player_result = ctx.player.play(
PlayerParams(
url=str(file_path),
title=title,
query=media_item.title.english or media_item.title.romaji or "",
episode=current_episode_num,
start_time=current_start_time,
),
media_item=media_item,
local=True,
)
# Track watch history after playing
ctx.watch_history.track(media_item, player_result)
if config.stream.auto_next and current_index < len(available_episodes) - 1:
feedback.info("Auto-playing next episode...")
next_episode_num = available_episodes[current_index + 1]
return State(
menu_name=MenuName.DOWNLOADS_PLAYER_CONTROLS,
media_api=state.media_api,
provider=state.provider.model_copy(
update={"episode": next_episode_num, "start_time": None}
),
)
# --- Menu Options ---
icons = config.general.icons
options: Dict[str, Callable[[], Union[State, InternalDirective]]] = {}
if current_index < len(available_episodes) - 1:
options[f"{'⏭️ ' if icons else ''}Next Episode"] = _next_episode(ctx, state)
if current_index:
options[f"{'' if icons else ''}Previous Episode"] = _previous_episode(
ctx, state
)
options.update(
{
f"{'🔂 ' if icons else ''}Replay": _replay(ctx, state),
f"{'🎞️ ' if icons else ''}Episode List": _episodes_list(ctx, state),
f"{'🔘 ' if icons else ''}Toggle Auto Next Episode (Current: {ctx.config.stream.auto_next})": _toggle_config_state(
ctx, state, "AUTO_EPISODE"
),
f"{'🎥 ' if icons else ''}Media Actions Menu": lambda: InternalDirective.BACKX2,
f"{'🏠 ' if icons else ''}Main Menu": lambda: InternalDirective.MAIN,
f"{'' if icons else ''}Exit": lambda: InternalDirective.EXIT,
}
)
choice = selector.choose(prompt="What's next?", choices=list(options.keys()))
if choice and choice in options:
return options[choice]()
else:
return InternalDirective.RELOAD
def _next_episode(ctx: Context, state: State) -> MenuAction:
def action():
feedback = ctx.feedback
config = ctx.config
media_item = state.media_api.media_item
current_episode_num = state.provider.episode
if not media_item or not current_episode_num:
feedback.error(
"Player state is incomplete. not going to next episode. Returning."
)
ctx.switch.force_dont_play()
return InternalDirective.RELOAD
record = ctx.media_registry.get_media_record(media_item.id)
if not record or not record.media_episodes:
feedback.warning("No downloaded episodes found for this anime.")
ctx.switch.force_dont_play()
return InternalDirective.RELOAD
downloaded_episodes = {
ep.episode_number: ep.file_path
for ep in record.media_episodes
if ep.download_status == DownloadStatus.COMPLETED
and ep.file_path
and ep.file_path.exists()
}
available_episodes = list(sorted(downloaded_episodes.keys(), key=float))
current_index = available_episodes.index(current_episode_num)
if current_index < len(available_episodes) - 1:
next_episode_num = available_episodes[current_index + 1]
return State(
menu_name=MenuName.DOWNLOADS_PLAYER_CONTROLS,
media_api=state.media_api,
provider=state.provider.model_copy(
update={"episode": next_episode_num, "start_time": None}
),
)
feedback.warning("This is the last available episode.")
ctx.switch.force_dont_play()
return InternalDirective.RELOAD
return action
def _previous_episode(ctx: Context, state: State) -> MenuAction:
def action():
feedback = ctx.feedback
config = ctx.config
media_item = state.media_api.media_item
current_episode_num = state.provider.episode
if not media_item or not current_episode_num:
feedback.error(
"Player state is incomplete not going to previous episode. Returning."
)
ctx.switch.force_dont_play()
return InternalDirective.RELOAD
record = ctx.media_registry.get_media_record(media_item.id)
if not record or not record.media_episodes:
feedback.warning("No downloaded episodes found for this anime.")
ctx.switch.force_dont_play()
return InternalDirective.RELOAD
downloaded_episodes = {
ep.episode_number: ep.file_path
for ep in record.media_episodes
if ep.download_status == DownloadStatus.COMPLETED
and ep.file_path
and ep.file_path.exists()
}
available_episodes = list(sorted(downloaded_episodes.keys(), key=float))
current_index = available_episodes.index(current_episode_num)
if current_index:
prev_episode_num = available_episodes[current_index - 1]
return State(
menu_name=MenuName.DOWNLOADS_PLAYER_CONTROLS,
media_api=state.media_api,
provider=state.provider.model_copy(
update={"episode": prev_episode_num, "start_time": None}
),
)
feedback.warning("This is the last available episode.")
ctx.switch.force_dont_play()
return InternalDirective.RELOAD
return action
def _replay(ctx: Context, state: State) -> MenuAction:
def action():
return InternalDirective.RELOAD
return action
def _toggle_config_state(
ctx: Context,
state: State,
config_state: Literal[
"AUTO_ANIME", "AUTO_EPISODE", "CONTINUE_FROM_HISTORY", "TRANSLATION_TYPE"
],
) -> MenuAction:
def action():
match config_state:
case "AUTO_ANIME":
ctx.config.general.auto_select_anime_result = (
not ctx.config.general.auto_select_anime_result
)
case "AUTO_EPISODE":
ctx.config.stream.auto_next = not ctx.config.stream.auto_next
case "CONTINUE_FROM_HISTORY":
ctx.config.stream.continue_from_watch_history = (
not ctx.config.stream.continue_from_watch_history
)
case "TRANSLATION_TYPE":
ctx.config.stream.translation_type = (
"sub" if ctx.config.stream.translation_type == "dub" else "dub"
)
return InternalDirective.RELOAD
return action
def _episodes_list(ctx: Context, state: State) -> MenuAction:
def action():
ctx.switch.force_episodes_menu()
return InternalDirective.BACK
return action

View File

@@ -78,6 +78,8 @@ def player_controls(ctx: Context, state: State) -> Union[State, InternalDirectiv
if choice and choice in options:
return options[choice]()
else:
return InternalDirective.RELOAD
def _next_episode(ctx: Context, state: State) -> MenuAction:

View File

@@ -1,283 +0,0 @@
"""
Interactive authentication menu for AniList OAuth login/logout and user profile management.
Implements Step 5: AniList Authentication Flow
"""
import webbrowser
from typing import Optional
from rich.console import Console
from rich.panel import Panel
from rich.table import Table
from ....libs.media_api.types import UserProfile
from ...auth.manager import AuthManager
from ...utils.feedback import create_feedback_manager, execute_with_feedback
from ..session import Context, session
from ..state import InternalDirective, State
@session.menu
def auth(ctx: Context, state: State) -> State | InternalDirective:
"""
Interactive authentication menu for managing AniList login/logout and viewing user profile.
"""
icons = ctx.config.general.icons
feedback = create_feedback_manager(icons)
console = Console()
console.clear()
# Get current authentication status
user_profile = getattr(ctx.media_api, "user_profile", None)
auth_manager = AuthManager()
# Display current authentication status
_display_auth_status(console, user_profile, icons)
# Menu options based on authentication status
if user_profile:
options = [
f"{'👤 ' if icons else ''}View Profile Details",
f"{'🔓 ' if icons else ''}Logout",
f"{'↩️ ' if icons else ''}Back to Main Menu",
]
else:
options = [
f"{'🔐 ' if icons else ''}Login to AniList",
f"{'' if icons else ''}How to Get Token",
f"{'↩️ ' if icons else ''}Back to Main Menu",
]
choice = ctx.selector.choose(
prompt="Select Authentication Action",
choices=options,
header="AniList Authentication Menu",
)
if not choice:
return InternalDirective.BACK
# Handle menu choices
if "Login to AniList" in choice:
return _handle_login(ctx, auth_manager, feedback, icons)
elif "Logout" in choice:
return _handle_logout(ctx, auth_manager, feedback, icons)
elif "View Profile Details" in choice:
_display_user_profile_details(console, user_profile, icons)
feedback.pause_for_user("Press Enter to continue")
return InternalDirective.RELOAD
elif "How to Get Token" in choice:
_display_token_help(console, icons)
feedback.pause_for_user("Press Enter to continue")
return InternalDirective.RELOAD
else: # Back to Main Menu
return InternalDirective.BACK
def _display_auth_status(
console: Console, user_profile: Optional[UserProfile], icons: bool
):
"""Display current authentication status in a nice panel."""
if user_profile:
status_icon = "🟢" if icons else "[green]●[/green]"
status_text = f"{status_icon} Authenticated"
user_info = f"Logged in as: [bold cyan]{user_profile.name}[/bold cyan]\nUser ID: {user_profile.id}"
else:
status_icon = "🔴" if icons else "[red]○[/red]"
status_text = f"{status_icon} Not Authenticated"
user_info = "Log in to access personalized features like:\n• Your anime lists (Watching, Completed, etc.)\n• Progress tracking\n• List management"
panel = Panel(
user_info,
title=f"Authentication Status: {status_text}",
border_style="green" if user_profile else "red",
)
console.print(panel)
console.print()
def _handle_login(
ctx: Context, auth_manager: AuthManager, feedback, icons: bool
) -> State | InternalDirective:
"""Handle the interactive login process."""
def perform_login():
# Open browser to AniList OAuth page
oauth_url = "https://anilist.co/api/v2/oauth/authorize?client_id=20148&response_type=token"
if feedback.confirm(
"Open AniList authorization page in browser?", default=True
):
try:
webbrowser.open(oauth_url)
feedback.info(
"Browser opened",
"Complete the authorization process in your browser",
)
except Exception:
feedback.warning(
"Could not open browser automatically",
f"Please manually visit: {oauth_url}",
)
else:
feedback.info("Manual authorization", f"Please visit: {oauth_url}")
# Get token from user
feedback.info(
"Token Input", "Paste the token from the browser URL after '#access_token='"
)
token = ctx.selector.ask("Enter your AniList Access Token")
if not token or not token.strip():
feedback.error("Login cancelled", "No token provided")
return None
# Authenticate with the API
profile = ctx.media_api.authenticate(token.strip())
if not profile:
feedback.error(
"Authentication failed", "The token may be invalid or expired"
)
return None
# Save credentials using the auth manager
auth_manager.save_user_profile(profile, token.strip())
return profile
success, profile = execute_with_feedback(
perform_login,
feedback,
"authenticate",
loading_msg="Validating token with AniList",
success_msg="Successfully logged in! 🎉"
if icons
else "Successfully logged in!",
error_msg="Login failed",
show_loading=True,
)
if success and profile:
feedback.success(
f"Logged in as {profile.name}" if profile else "Successfully logged in"
)
feedback.pause_for_user("Press Enter to continue")
return InternalDirective.RELOAD
def _handle_logout(
ctx: Context, auth_manager: AuthManager, feedback, icons: bool
) -> State | InternalDirective:
"""Handle the logout process with confirmation."""
if not feedback.confirm(
"Are you sure you want to logout?",
"This will remove your saved AniList token and log you out",
default=False,
):
return InternalDirective.RELOAD
def perform_logout():
# Clear from auth manager
if hasattr(auth_manager, "logout"):
auth_manager.logout()
else:
auth_manager.clear_user_profile()
# Clear from API client
ctx.media_api.token = None
ctx.media_api.user_profile = None
if hasattr(ctx.media_api, "http_client"):
ctx.media_api.http_client.headers.pop("Authorization", None)
return True
success, _ = execute_with_feedback(
perform_logout,
feedback,
"logout",
loading_msg="Logging out",
success_msg="Successfully logged out 👋"
if icons
else "Successfully logged out",
error_msg="Logout failed",
show_loading=False,
)
if success:
feedback.pause_for_user("Press Enter to continue")
return InternalDirective.CONFIG_EDIT
def _display_user_profile_details(
console: Console, user_profile: UserProfile, icons: bool
):
"""Display detailed user profile information."""
if not user_profile:
console.print("[red]No user profile available[/red]")
return
# Create a detailed profile table
table = Table(title=f"{'👤 ' if icons else ''}User Profile: {user_profile.name}")
table.add_column("Property", style="cyan", no_wrap=True)
table.add_column("Value", style="green")
table.add_row("Name", user_profile.name)
table.add_row("User ID", str(user_profile.id))
if user_profile.avatar_url:
table.add_row("Avatar URL", user_profile.avatar_url)
if user_profile.banner_url:
table.add_row("Banner URL", user_profile.banner_url)
console.print()
console.print(table)
console.print()
# Show available features
features_panel = Panel(
"Available Features:\n"
f"{'📺 ' if icons else ''}Access your anime lists (Watching, Completed, etc.)\n"
f"{'✏️ ' if icons else ''}Update watch progress and scores\n"
f"{' ' if icons else ''}Add/remove anime from your lists\n"
f"{'🔄 ' if icons else ''}Sync progress with AniList\n"
f"{'🔔 ' if icons else ''}Access AniList notifications",
title="Available with Authentication",
border_style="green",
)
console.print(features_panel)
def _display_token_help(console: Console, icons: bool):
"""Display help information about getting an AniList token."""
help_text = """
[bold cyan]How to get your AniList Access Token:[/bold cyan]
[bold]Step 1:[/bold] Visit the AniList authorization page
https://anilist.co/api/v2/oauth/authorize?client_id=20148&response_type=token
[bold]Step 2:[/bold] Log in to your AniList account if prompted
[bold]Step 3:[/bold] Click "Authorize" to grant FastAnime access
[bold]Step 4:[/bold] Copy the token from the browser URL
Look for the part after "#access_token=" in the address bar
[bold]Step 5:[/bold] Paste the token when prompted in FastAnime
[yellow]Note:[/yellow] The token will be stored securely and used for all AniList features.
You only need to do this once unless you revoke access or the token expires.
[yellow]Privacy:[/yellow] FastAnime only requests minimal permissions needed for
list management and does not access sensitive account information.
"""
panel = Panel(
help_text,
title=f"{'' if icons else ''}AniList Token Help",
border_style="blue",
)
console.print()
console.print(panel)

View File

@@ -1,254 +0,0 @@
"""
Session management menu for the interactive CLI.
Provides options to save, load, and manage session state.
"""
from datetime import datetime
from pathlib import Path
from typing import Callable, Dict
from rich.console import Console
from rich.table import Table
from ....core.constants import APP_DIR
from ...utils.feedback import create_feedback_manager
from ..session import Context, session
from ..state import ControlFlow, State
MenuAction = Callable[[], str]
@session.menu
def session_management(ctx: Context, state: State) -> State | ControlFlow:
"""
Session management menu for saving, loading, and managing session state.
"""
icons = ctx.config.general.icons
feedback = create_feedback_manager(icons)
console = Console()
console.clear()
# Show current session stats
_display_session_info(console, icons)
options: Dict[str, MenuAction] = {
f"{'💾 ' if icons else ''}Save Current Session": lambda: _save_session(
ctx, feedback
),
f"{'📂 ' if icons else ''}Load Session": lambda: _load_session(ctx, feedback),
f"{'📋 ' if icons else ''}List Saved Sessions": lambda: _list_sessions(
ctx, feedback
),
f"{'🗑️ ' if icons else ''}Cleanup Old Sessions": lambda: _cleanup_sessions(
ctx, feedback
),
f"{'💾 ' if icons else ''}Create Manual Backup": lambda: _create_backup(
ctx, feedback
),
f"{'⚙️ ' if icons else ''}Session Settings": lambda: _session_settings(
ctx, feedback
),
f"{'🔙 ' if icons else ''}Back to Main Menu": lambda: "BACK",
}
choice_str = ctx.selector.choose(
prompt="Select Session Action",
choices=list(options.keys()),
header="Session Management",
)
if not choice_str:
return ControlFlow.BACK
result = options[choice_str]()
if result == "BACK":
return ControlFlow.BACK
else:
return ControlFlow.CONTINUE
def _display_session_info(console: Console, icons: bool):
"""Display current session information."""
session_stats = session.get_session_stats()
table = Table(title=f"{'📊 ' if icons else ''}Current Session Info")
table.add_column("Property", style="cyan")
table.add_column("Value", style="green")
table.add_row("Current States", str(session_stats["current_states"]))
table.add_row("Current Menu", session_stats["current_menu"] or "None")
table.add_row(
"Auto-Save", "Enabled" if session_stats["auto_save_enabled"] else "Disabled"
)
table.add_row("Has Auto-Save", "Yes" if session_stats["has_auto_save"] else "No")
table.add_row(
"Has Crash Backup", "Yes" if session_stats["has_crash_backup"] else "No"
)
console.print(table)
console.print()
def _save_session(ctx: Context, feedback) -> str:
"""Save the current session."""
session_name = ctx.selector.ask("Enter session name (optional):")
description = ctx.selector.ask("Enter session description (optional):")
if not session_name:
session_name = f"session_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
sessions_dir = APP_DIR / "sessions"
file_path = sessions_dir / f"{session_name}.json"
if file_path.exists():
if not feedback.confirm(f"Session '{session_name}' already exists. Overwrite?"):
feedback.info("Save cancelled")
return "CONTINUE"
success = session.save(file_path, session_name, description or "")
if success:
feedback.success(f"Session saved as '{session_name}'")
return "CONTINUE"
def _load_session(ctx: Context, feedback) -> str:
"""Load a saved session."""
sessions = session.list_saved_sessions()
if not sessions:
feedback.warning("No saved sessions found")
return "CONTINUE"
# Create choices with session info
choices = []
session_map = {}
for sess in sessions:
choice_text = f"{sess['name']} - {sess['description'][:50]}{'...' if len(sess['description']) > 50 else ''}"
choices.append(choice_text)
session_map[choice_text] = sess
choices.append("Cancel")
choice = ctx.selector.choose(
"Select session to load:", choices=choices, header="Available Sessions"
)
if not choice or choice == "Cancel":
return "CONTINUE"
selected_session = session_map[choice]
file_path = Path(selected_session["path"])
if feedback.confirm(
f"Load session '{selected_session['name']}'? This will replace your current session."
):
success = session.resume(file_path, feedback)
if success:
feedback.info("Session loaded successfully. Returning to main menu.")
# Return to main menu after loading
return "MAIN"
return "CONTINUE"
def _list_sessions(ctx: Context, feedback) -> str:
"""List all saved sessions."""
sessions = session.list_saved_sessions()
if not sessions:
feedback.info("No saved sessions found")
return "CONTINUE"
console = Console()
table = Table(title="Saved Sessions")
table.add_column("Name", style="cyan")
table.add_column("Description", style="yellow")
table.add_column("States", style="green")
table.add_column("Created", style="blue")
for sess in sessions:
# Format the created date
created = sess["created"]
if "T" in created:
created = created.split("T")[0] # Just show the date part
table.add_row(
sess["name"],
sess["description"][:40] + "..."
if len(sess["description"]) > 40
else sess["description"],
str(sess["state_count"]),
created,
)
console.print(table)
feedback.pause_for_user()
return "CONTINUE"
def _cleanup_sessions(ctx: Context, feedback) -> str:
"""Clean up old sessions."""
sessions = session.list_saved_sessions()
if len(sessions) <= 5:
feedback.info("No cleanup needed. You have 5 or fewer sessions.")
return "CONTINUE"
max_sessions_str = ctx.selector.ask("How many sessions to keep? (default: 10)")
try:
max_sessions = int(max_sessions_str) if max_sessions_str else 10
except ValueError:
feedback.error("Invalid number entered")
return "CONTINUE"
if feedback.confirm(f"Delete sessions older than the {max_sessions} most recent?"):
deleted_count = session.cleanup_old_sessions(max_sessions)
feedback.success(f"Deleted {deleted_count} old sessions")
return "CONTINUE"
def _create_backup(ctx: Context, feedback) -> str:
"""Create a manual backup."""
backup_name = ctx.selector.ask("Enter backup name (optional):")
success = session.create_manual_backup(backup_name or "")
if success:
feedback.success("Manual backup created successfully")
return "CONTINUE"
def _session_settings(ctx: Context, feedback) -> str:
"""Configure session settings."""
current_auto_save = session._auto_save_enabled
choices = [
f"Auto-Save: {'Enabled' if current_auto_save else 'Disabled'}",
"Clear Auto-Save File",
"Clear Crash Backup",
"Back",
]
choice = ctx.selector.choose("Session Settings:", choices=choices)
if choice and choice.startswith("Auto-Save"):
new_setting = not current_auto_save
session.enable_auto_save(new_setting)
feedback.success(f"Auto-save {'enabled' if new_setting else 'disabled'}")
elif choice == "Clear Auto-Save File":
if feedback.confirm("Clear the auto-save file?"):
session._session_manager.clear_auto_save()
feedback.success("Auto-save file cleared")
elif choice == "Clear Crash Backup":
if feedback.confirm("Clear the crash backup file?"):
session._session_manager.clear_crash_backup()
feedback.success("Crash backup cleared")
return "CONTINUE"

View File

@@ -1,827 +0,0 @@
"""
AniList Watch List Operations Menu
Implements Step 8: Remote Watch List Operations
Provides comprehensive AniList list management including:
- Viewing user lists (Watching, Completed, Planning, etc.)
- Interactive list selection and navigation
- Adding/removing anime from lists
- List statistics and overview
"""
import logging
from rich.console import Console
from rich.panel import Panel
from rich.table import Table
from ....libs.media_api.params import UpdateUserMediaListEntryParams, UserListParams
from ....libs.media_api.types import MediaItem, MediaSearchResult, UserListItem
from ...utils.feedback import create_feedback_manager, execute_with_feedback
from ..session import Context, session
from ..state import ControlFlow, MediaApiState, State
logger = logging.getLogger(__name__)
@session.menu
def anilist_lists(ctx: Context, state: State) -> State | ControlFlow:
"""
Main AniList lists management menu.
Shows all user lists with statistics and navigation options.
"""
icons = ctx.config.general.icons
feedback = create_feedback_manager(icons)
console = Console()
console.clear()
# Check authentication
if not ctx.media_api.user_profile:
feedback.error(
"Authentication Required",
"You must be logged in to access your AniList lists. Please authenticate first.",
)
feedback.pause_for_user("Press Enter to continue")
return State(menu_name="AUTH")
# Display user profile and lists overview
_display_lists_overview(console, ctx, icons)
# Menu options
options = [
f"{'📺 ' if icons else ''}Currently Watching",
f"{'📋 ' if icons else ''}Planning to Watch",
f"{'' if icons else ''}Completed",
f"{'⏸️ ' if icons else ''}Paused",
f"{'🚮 ' if icons else ''}Dropped",
f"{'🔁 ' if icons else ''}Rewatching",
f"{'📊 ' if icons else ''}View All Lists Statistics",
f"{'🔍 ' if icons else ''}Search Across All Lists",
f"{' ' if icons else ''}Add Anime to List",
f"{'↩️ ' if icons else ''}Back to Main Menu",
]
choice = ctx.selector.choose(
prompt="Select List Action",
choices=options,
header=f"AniList Lists - {ctx.media_api.user_profile.name}",
)
if not choice:
return ControlFlow.BACK
# Handle menu choices
if "Currently Watching" in choice:
return _navigate_to_list(ctx, "CURRENT")
elif "Planning to Watch" in choice:
return _navigate_to_list(ctx, "PLANNING")
elif "Completed" in choice:
return _navigate_to_list(ctx, "COMPLETED")
elif "Paused" in choice:
return _navigate_to_list(ctx, "PAUSED")
elif "Dropped" in choice:
return _navigate_to_list(ctx, "DROPPED")
elif "Rewatching" in choice:
return _navigate_to_list(ctx, "REPEATING")
elif "View All Lists Statistics" in choice:
return _show_all_lists_stats(ctx, feedback, icons)
elif "Search Across All Lists" in choice:
return _search_all_lists(ctx, feedback, icons)
elif "Add Anime to List" in choice:
return _add_anime_to_list(ctx, feedback, icons)
else: # Back to Main Menu
return ControlFlow.BACK
@session.menu
def anilist_list_view(ctx: Context, state: State) -> State | ControlFlow:
"""
View and manage a specific AniList list (e.g., Watching, Completed).
"""
icons = ctx.config.general.icons
feedback = create_feedback_manager(icons)
console = Console()
console.clear()
# Get list status from state data
list_status = state.data.get("list_status") if state.data else "CURRENT"
page = state.data.get("page", 1) if state.data else 1
# Fetch list data
def fetch_list():
return ctx.media_api.search_media_list(
UserListParams(status=list_status, page=page, per_page=20)
)
success, result = execute_with_feedback(
fetch_list,
feedback,
f"fetch {_status_to_display_name(list_status)} list",
loading_msg=f"Loading {_status_to_display_name(list_status)} list...",
success_msg=f"Loaded {_status_to_display_name(list_status)} list",
error_msg=f"Failed to load {_status_to_display_name(list_status)} list",
)
if not success or not result:
feedback.pause_for_user("Press Enter to continue")
return ControlFlow.BACK
# Display list contents
_display_list_contents(console, result, list_status, page, icons)
# Menu options
options = [
f"{'👁️ ' if icons else ''}View/Edit Anime Details",
f"{'🔄 ' if icons else ''}Refresh List",
f"{' ' if icons else ''}Add New Anime",
f"{'🗑️ ' if icons else ''}Remove from List",
]
# Add pagination options
if result.page_info.has_next_page:
options.append(f"{'➡️ ' if icons else ''}Next Page")
if page > 1:
options.append(f"{'⬅️ ' if icons else ''}Previous Page")
options.extend(
[
f"{'📊 ' if icons else ''}List Statistics",
f"{'↩️ ' if icons else ''}Back to Lists Menu",
]
)
choice = ctx.selector.choose(
prompt="Select Action",
choices=options,
header=f"{_status_to_display_name(list_status)} - Page {page}",
)
if not choice:
return ControlFlow.BACK
# Handle menu choices
if "View/Edit Anime Details" in choice:
return _select_anime_for_details(ctx, result, list_status, page)
elif "Refresh List" in choice:
return ControlFlow.CONTINUE
elif "Add New Anime" in choice:
return _add_anime_to_specific_list(ctx, list_status, feedback, icons)
elif "Remove from List" in choice:
return _remove_anime_from_list(ctx, result, list_status, page, feedback, icons)
elif "Next Page" in choice:
return State(
menu_name="ANILIST_LIST_VIEW",
data={"list_status": list_status, "page": page + 1},
)
elif "Previous Page" in choice:
return State(
menu_name="ANILIST_LIST_VIEW",
data={"list_status": list_status, "page": page - 1},
)
elif "List Statistics" in choice:
return _show_list_statistics(ctx, list_status, feedback, icons)
else: # Back to Lists Menu
return State(menu_name="ANILIST_LISTS")
@session.menu
def anilist_anime_details(ctx: Context, state: State) -> State | ControlFlow:
"""
View and edit details for a specific anime in a user's list.
"""
icons = ctx.config.general.icons
feedback = create_feedback_manager(icons)
console = Console()
console.clear()
# Get anime and list info from state
if not state.data:
return ControlFlow.BACK
anime = state.data.get("anime")
list_status = state.data.get("list_status")
return_page = state.data.get("return_page", 1)
from_media_actions = state.data.get("from_media_actions", False)
if not anime:
return ControlFlow.BACK
# Display anime details
_display_anime_list_details(console, anime, icons)
# Menu options
options = [
f"{'✏️ ' if icons else ''}Edit Progress",
f"{'' if icons else ''}Edit Rating",
f"{'📝 ' if icons else ''}Edit Status",
f"{'🎬 ' if icons else ''}Watch/Stream",
f"{'🗑️ ' if icons else ''}Remove from List",
f"{'↩️ ' if icons else ''}Back to List",
]
choice = ctx.selector.choose(
prompt="Select Action",
choices=options,
header=f"{anime.title.english or anime.title.romaji}",
)
if not choice:
# Return to appropriate menu based on how we got here
if from_media_actions:
return ControlFlow.BACK
elif list_status:
return State(
menu_name="ANILIST_LIST_VIEW",
data={"list_status": list_status, "page": return_page},
)
else:
return State(menu_name="ANILIST_LISTS")
# Handle menu choices
if "Edit Progress" in choice:
return _edit_anime_progress(
ctx, anime, list_status, return_page, feedback, from_media_actions
)
elif "Edit Rating" in choice:
return _edit_anime_rating(
ctx, anime, list_status, return_page, feedback, from_media_actions
)
elif "Edit Status" in choice:
return _edit_anime_status(
ctx, anime, list_status, return_page, feedback, from_media_actions
)
elif "Watch/Stream" in choice:
return _stream_anime(ctx, anime)
elif "Remove from List" in choice:
return _confirm_remove_anime(
ctx, anime, list_status, return_page, feedback, icons, from_media_actions
)
else: # Back to List/Media Actions
# Return to appropriate menu based on how we got here
if from_media_actions:
return ControlFlow.BACK
elif list_status:
return State(
menu_name="ANILIST_LIST_VIEW",
data={"list_status": list_status, "page": return_page},
)
else:
return State(menu_name="ANILIST_LISTS")
def _display_lists_overview(console: Console, ctx: Context, icons: bool):
"""Display overview of all user lists with counts."""
user = ctx.media_api.user_profile
# Create overview panel
overview_text = f"[bold cyan]{user.name}[/bold cyan]'s AniList Management\n"
overview_text += f"User ID: {user.id}\n\n"
overview_text += "Manage your anime lists, track progress, and sync with AniList"
panel = Panel(
overview_text,
title=f"{'📚 ' if icons else ''}AniList Lists Overview",
border_style="cyan",
)
console.print(panel)
console.print()
def _display_list_contents(
console: Console,
result: MediaSearchResult,
list_status: str,
page: int,
icons: bool,
):
"""Display the contents of a specific list in a table."""
if not result.media:
console.print(
f"[yellow]No anime found in {_status_to_display_name(list_status)} list[/yellow]"
)
return
table = Table(title=f"{_status_to_display_name(list_status)} - Page {page}")
table.add_column("Title", style="cyan", no_wrap=False, width=40)
table.add_column("Episodes", justify="center", width=10)
table.add_column("Progress", justify="center", width=10)
table.add_column("Score", justify="center", width=8)
table.add_column("Status", justify="center", width=12)
for i, anime in enumerate(result.media, 1):
title = anime.title.english or anime.title.romaji or "Unknown Title"
episodes = str(anime.episodes or "?")
# Get list entry details if available
progress = "?"
score = "?"
status = _status_to_display_name(list_status)
# Note: In a real implementation, you'd get these from the MediaList entry
# For now, we'll show placeholders
if hasattr(anime, "media_list_entry") and anime.media_list_entry:
progress = str(anime.media_list_entry.progress or 0)
score = str(anime.media_list_entry.score or "-")
table.add_row(f"{i}. {title}", episodes, progress, score, status)
console.print(table)
console.print(
f"\nShowing {len(result.media)} anime from {_status_to_display_name(list_status)} list"
)
# Show pagination info
if result.page_info.has_next_page:
console.print("[dim]More results available on next page[/dim]")
def _display_anime_list_details(console: Console, anime: MediaItem, icons: bool):
"""Display detailed information about an anime in the user's list."""
title = anime.title.english or anime.title.romaji or "Unknown Title"
details_text = f"[bold]{title}[/bold]\n\n"
details_text += f"Episodes: {anime.episodes or 'Unknown'}\n"
details_text += f"Status: {anime.status or 'Unknown'}\n"
details_text += (
f"Genres: {', '.join(anime.genres) if anime.genres else 'Unknown'}\n"
)
if anime.description:
# Truncate description for display
desc = (
anime.description[:300] + "..."
if len(anime.description) > 300
else anime.description
)
details_text += f"\nDescription:\n{desc}"
# Add list-specific information if available
if hasattr(anime, "media_list_entry") and anime.media_list_entry:
entry = anime.media_list_entry
details_text += "\n\n[bold cyan]Your List Info:[/bold cyan]\n"
details_text += f"Progress: {entry.progress or 0} episodes\n"
details_text += f"Score: {entry.score or 'Not rated'}\n"
details_text += f"Status: {_status_to_display_name(entry.status) if hasattr(entry, 'status') else 'Unknown'}\n"
panel = Panel(
details_text,
title=f"{'📺 ' if icons else ''}Anime Details",
border_style="blue",
)
console.print(panel)
def _navigate_to_list(ctx: Context, list_status: UserListItem) -> State:
"""Navigate to a specific list view."""
return State(
menu_name="ANILIST_LIST_VIEW", data={"list_status": list_status, "page": 1}
)
def _select_anime_for_details(
ctx: Context, result: MediaSearchResult, list_status: str, page: int
) -> State | ControlFlow:
"""Let user select an anime from the list to view/edit details."""
if not result.media:
return ControlFlow.CONTINUE
# Create choices from anime list
choices = []
for i, anime in enumerate(result.media, 1):
title = anime.title.english or anime.title.romaji or "Unknown Title"
choices.append(f"{i}. {title}")
choice = ctx.selector.choose(
prompt="Select anime to view/edit",
choices=choices,
header="Select Anime",
)
if not choice:
return ControlFlow.CONTINUE
# Extract index and get selected anime
try:
index = int(choice.split(".")[0]) - 1
selected_anime = result.media[index]
return State(
menu_name="ANILIST_ANIME_DETAILS",
data={
"anime": selected_anime,
"list_status": list_status,
"return_page": page,
},
)
except (ValueError, IndexError):
return ControlFlow.CONTINUE
def _edit_anime_progress(
ctx: Context,
anime: MediaItem,
list_status: str,
return_page: int,
feedback,
from_media_actions: bool = False,
) -> State | ControlFlow:
"""Edit the progress (episodes watched) for an anime."""
current_progress = 0
if hasattr(anime, "media_list_entry") and anime.media_list_entry:
current_progress = anime.media_list_entry.progress or 0
max_episodes = anime.episodes or 999
try:
new_progress = click.prompt(
f"Enter new progress (0-{max_episodes}, current: {current_progress})",
type=int,
default=current_progress,
)
if new_progress < 0 or new_progress > max_episodes:
feedback.error(
"Invalid progress", f"Progress must be between 0 and {max_episodes}"
)
feedback.pause_for_user("Press Enter to continue")
return ControlFlow.CONTINUE
# Update via API
def update_progress():
return ctx.media_api.update_list_entry(
UpdateUserMediaListEntryParams(media_id=anime.id, progress=new_progress)
)
success, _ = execute_with_feedback(
update_progress,
feedback,
"update progress",
loading_msg="Updating progress...",
success_msg=f"Progress updated to {new_progress} episodes",
error_msg="Failed to update progress",
)
if success:
feedback.pause_for_user("Press Enter to continue")
except click.Abort:
pass
# Return to appropriate menu based on how we got here
if from_media_actions:
return ControlFlow.BACK
elif list_status:
return State(
menu_name="ANILIST_LIST_VIEW",
data={"list_status": list_status, "page": return_page},
)
else:
return State(menu_name="ANILIST_LISTS")
def _edit_anime_rating(
ctx: Context,
anime: MediaItem,
list_status: str,
return_page: int,
feedback,
from_media_actions: bool = False,
) -> State | ControlFlow:
"""Edit the rating/score for an anime."""
current_score = 0.0
if hasattr(anime, "media_list_entry") and anime.media_list_entry:
current_score = anime.media_list_entry.score or 0.0
try:
new_score = click.prompt(
f"Enter new rating (0.0-10.0, current: {current_score})",
type=float,
default=current_score,
)
if new_score < 0.0 or new_score > 10.0:
feedback.error("Invalid rating", "Rating must be between 0.0 and 10.0")
feedback.pause_for_user("Press Enter to continue")
return ControlFlow.CONTINUE
# Update via API
def update_score():
return ctx.media_api.update_list_entry(
UpdateUserMediaListEntryParams(media_id=anime.id, score=new_score)
)
success, _ = execute_with_feedback(
update_score,
feedback,
"update rating",
loading_msg="Updating rating...",
success_msg=f"Rating updated to {new_score}/10",
error_msg="Failed to update rating",
)
if success:
feedback.pause_for_user("Press Enter to continue")
except click.Abort:
pass
# Return to appropriate menu based on how we got here
if from_media_actions:
return ControlFlow.BACK
elif list_status:
return State(
menu_name="ANILIST_LIST_VIEW",
data={"list_status": list_status, "page": return_page},
)
else:
return State(menu_name="ANILIST_LISTS")
def _edit_anime_status(
ctx: Context,
anime: MediaItem,
list_status: str,
return_page: int,
feedback,
from_media_actions: bool = False,
) -> State | ControlFlow:
"""Edit the list status for an anime."""
status_options = [
"CURRENT (Currently Watching)",
"PLANNING (Plan to Watch)",
"COMPLETED (Completed)",
"PAUSED (Paused)",
"DROPPED (Dropped)",
"REPEATING (Rewatching)",
]
choice = ctx.selector.choose(
prompt="Select new status",
choices=status_options,
header="Change List Status",
)
if not choice:
return ControlFlow.CONTINUE
new_status = choice.split(" ")[0]
# Update via API
def update_status():
return ctx.media_api.update_list_entry(
UpdateUserMediaListEntryParams(media_id=anime.id, status=new_status)
)
success, _ = execute_with_feedback(
update_status,
feedback,
"update status",
loading_msg="Updating status...",
success_msg=f"Status updated to {_status_to_display_name(new_status)}",
error_msg="Failed to update status",
)
if success:
feedback.pause_for_user("Press Enter to continue")
# If status changed, return to main lists menu since the anime
# is no longer in the current list
if new_status != list_status:
if from_media_actions:
return ControlFlow.BACK
else:
return State(menu_name="ANILIST_LISTS")
# Return to appropriate menu based on how we got here
if from_media_actions:
return ControlFlow.BACK
elif list_status:
return State(
menu_name="ANILIST_LIST_VIEW",
data={"list_status": list_status, "page": return_page},
)
else:
return State(menu_name="ANILIST_LISTS")
def _confirm_remove_anime(
ctx: Context,
anime: MediaItem,
list_status: str,
return_page: int,
feedback,
icons: bool,
from_media_actions: bool = False,
) -> State | ControlFlow:
"""Confirm and remove an anime from the user's list."""
title = anime.title.english or anime.title.romaji or "Unknown Title"
if not feedback.confirm(
f"Remove '{title}' from your {_status_to_display_name(list_status)} list?",
default=False,
):
return ControlFlow.CONTINUE
# Remove via API
def remove_anime():
return ctx.media_api.delete_list_entry(anime.id)
success, _ = execute_with_feedback(
remove_anime,
feedback,
"remove anime",
loading_msg="Removing anime from list...",
success_msg=f"'{title}' removed from list",
error_msg="Failed to remove anime from list",
)
if success:
feedback.pause_for_user("Press Enter to continue")
# Return to appropriate menu based on how we got here
if from_media_actions:
return ControlFlow.BACK
elif list_status:
return State(
menu_name="ANILIST_LIST_VIEW",
data={"list_status": list_status, "page": return_page},
)
else:
return State(menu_name="ANILIST_LISTS")
def _stream_anime(ctx: Context, anime: MediaItem) -> State:
"""Navigate to streaming interface for the selected anime."""
return State(
menu_name="RESULTS",
data=MediaApiState(
results=[anime], # Pass as single-item list
query=anime.title.english or anime.title.romaji or "Unknown",
page=1,
api_params=None,
user_list_params=None,
),
)
def _show_all_lists_stats(ctx: Context, feedback, icons: bool) -> State | ControlFlow:
"""Show comprehensive statistics across all user lists."""
console = Console()
console.clear()
# This would require fetching data from all lists
# For now, show a placeholder implementation
stats_text = "[bold cyan]📊 Your AniList Statistics[/bold cyan]\n\n"
stats_text += "[dim]Loading comprehensive list statistics...[/dim]\n"
stats_text += "[dim]This feature requires fetching data from all lists.[/dim]"
panel = Panel(
stats_text,
title=f"{'📊 ' if icons else ''}AniList Statistics",
border_style="green",
)
console.print(panel)
feedback.pause_for_user("Press Enter to continue")
return ControlFlow.CONTINUE
def _search_all_lists(ctx: Context, feedback, icons: bool) -> State | ControlFlow:
"""Search across all user lists."""
try:
query = click.prompt("Enter search query", type=str)
if not query.strip():
return ControlFlow.CONTINUE
# This would require implementing search across all lists
feedback.info(
"Search functionality",
"Cross-list search will be implemented in a future update",
)
feedback.pause_for_user("Press Enter to continue")
except click.Abort:
pass
return ControlFlow.CONTINUE
def _add_anime_to_list(ctx: Context, feedback, icons: bool) -> State | ControlFlow:
"""Add a new anime to one of the user's lists."""
try:
query = click.prompt("Enter anime name to search", type=str)
if not query.strip():
return ControlFlow.CONTINUE
# Navigate to search with intent to add to list
return State(
menu_name="PROVIDER_SEARCH", data={"query": query, "add_to_list_mode": True}
)
except click.Abort:
pass
return ControlFlow.CONTINUE
def _add_anime_to_specific_list(
ctx: Context, list_status: str, feedback, icons: bool
) -> State | ControlFlow:
"""Add a new anime to a specific list."""
try:
query = click.prompt("Enter anime name to search", type=str)
if not query.strip():
return ControlFlow.CONTINUE
# Navigate to search with specific list target
return State(
menu_name="PROVIDER_SEARCH",
data={"query": query, "target_list": list_status},
)
except click.Abort:
pass
return ControlFlow.CONTINUE
def _remove_anime_from_list(
ctx: Context,
result: MediaSearchResult,
list_status: str,
page: int,
feedback,
icons: bool,
) -> State | ControlFlow:
"""Select and remove an anime from the current list."""
if not result.media:
feedback.info("Empty list", "No anime to remove from this list")
feedback.pause_for_user("Press Enter to continue")
return ControlFlow.CONTINUE
# Create choices from anime list
choices = []
for i, anime in enumerate(result.media, 1):
title = anime.title.english or anime.title.romaji or "Unknown Title"
choices.append(f"{i}. {title}")
choice = ctx.selector.choose(
prompt="Select anime to remove",
choices=choices,
header="Remove Anime from List",
)
if not choice:
return ControlFlow.CONTINUE
# Extract index and get selected anime
try:
index = int(choice.split(".")[0]) - 1
selected_anime = result.media[index]
return _confirm_remove_anime(
ctx, selected_anime, list_status, page, feedback, icons
)
except (ValueError, IndexError):
return ControlFlow.CONTINUE
def _show_list_statistics(
ctx: Context, list_status: str, feedback, icons: bool
) -> State | ControlFlow:
"""Show statistics for a specific list."""
console = Console()
console.clear()
list_name = _status_to_display_name(list_status)
stats_text = f"[bold cyan]📊 {list_name} Statistics[/bold cyan]\n\n"
stats_text += "[dim]Loading list statistics...[/dim]\n"
stats_text += "[dim]This feature requires comprehensive list analysis.[/dim]"
panel = Panel(
stats_text,
title=f"{'📊 ' if icons else ''}{list_name} Stats",
border_style="blue",
)
console.print(panel)
feedback.pause_for_user("Press Enter to continue")
return ControlFlow.CONTINUE
def _status_to_display_name(status: str) -> str:
"""Convert API status to human-readable display name."""
status_map = {
"CURRENT": "Currently Watching",
"PLANNING": "Planning to Watch",
"COMPLETED": "Completed",
"PAUSED": "Paused",
"DROPPED": "Dropped",
"REPEATING": "Rewatching",
}
return status_map.get(status, status)
# Import click for user input
import click

View File

@@ -1,572 +0,0 @@
"""
Watch History Management Menu for the interactive CLI.
Provides comprehensive watch history viewing, editing, and management capabilities.
"""
import logging
from typing import Callable, Dict, List
from rich.console import Console
from rich.table import Table
from ....core.constants import APP_DATA_DIR
from ...utils.feedback import create_feedback_manager
from ...utils.watch_history_manager import WatchHistoryManager
from ...utils.watch_history_types import WatchHistoryEntry
from ..session import Context, session
from ..state import InternalDirective, State
logger = logging.getLogger(__name__)
MenuAction = Callable[[], str]
@session.menu
def watch_history(ctx: Context, state: State) -> State | InternalDirective:
"""
Watch history management menu for viewing and managing local watch history.
"""
icons = ctx.config.general.icons
feedback = create_feedback_manager(icons)
console = Console()
console.clear()
# Initialize watch history manager
history_manager = WatchHistoryManager()
# Show watch history stats
_display_history_stats(console, history_manager, icons)
options: Dict[str, MenuAction] = {
f"{'📺 ' if icons else ''}Currently Watching": lambda: _view_watching(
ctx, history_manager, feedback
),
f"{'' if icons else ''}Completed Anime": lambda: _view_completed(
ctx, history_manager, feedback
),
f"{'🕒 ' if icons else ''}Recently Watched": lambda: _view_recent(
ctx, history_manager, feedback
),
f"{'📋 ' if icons else ''}View All History": lambda: _view_all_history(
ctx, history_manager, feedback
),
f"{'🔍 ' if icons else ''}Search History": lambda: _search_history(
ctx, history_manager, feedback
),
f"{'✏️ ' if icons else ''}Edit Entry": lambda: _edit_entry(
ctx, history_manager, feedback
),
f"{'🗑️ ' if icons else ''}Remove Entry": lambda: _remove_entry(
ctx, history_manager, feedback
),
f"{'📊 ' if icons else ''}View Statistics": lambda: _view_stats(
ctx, history_manager, feedback
),
f"{'💾 ' if icons else ''}Export History": lambda: _export_history(
ctx, history_manager, feedback
),
f"{'📥 ' if icons else ''}Import History": lambda: _import_history(
ctx, history_manager, feedback
),
f"{'🧹 ' if icons else ''}Clear All History": lambda: _clear_history(
ctx, history_manager, feedback
),
f"{'🔙 ' if icons else ''}Back to Main Menu": lambda: "BACK",
}
choice_str = ctx.selector.choose(
prompt="Select Watch History Action",
choices=list(options.keys()),
header="Watch History Management",
)
if not choice_str:
return InternalDirective.BACK
result = options[choice_str]()
if result == "BACK":
return InternalDirective.BACK
else:
return InternalDirective.RELOAD
def _display_history_stats(
console: Console, history_manager: WatchHistoryManager, icons: bool
):
"""Display current watch history statistics."""
stats = history_manager.get_stats()
# Create a stats table
table = Table(title=f"{'📊 ' if icons else ''}Watch History Overview")
table.add_column("Metric", style="cyan")
table.add_column("Count", style="green")
table.add_row("Total Anime", str(stats["total_entries"]))
table.add_row("Currently Watching", str(stats["watching"]))
table.add_row("Completed", str(stats["completed"]))
table.add_row("Dropped", str(stats["dropped"]))
table.add_row("Paused", str(stats["paused"]))
table.add_row("Total Episodes", str(stats["total_episodes_watched"]))
table.add_row("Last Updated", stats["last_updated"])
console.print(table)
console.print()
def _view_watching(ctx: Context, history_manager: WatchHistoryManager, feedback) -> str:
"""View currently watching anime."""
entries = history_manager.get_watching_entries()
if not entries:
feedback.info("No anime currently being watched")
return "CONTINUE"
return _display_entries_list(ctx, entries, "Currently Watching", feedback)
def _view_completed(
ctx: Context, history_manager: WatchHistoryManager, feedback
) -> str:
"""View completed anime."""
entries = history_manager.get_completed_entries()
if not entries:
feedback.info("No completed anime found")
return "CONTINUE"
return _display_entries_list(ctx, entries, "Completed Anime", feedback)
def _view_recent(ctx: Context, history_manager: WatchHistoryManager, feedback) -> str:
"""View recently watched anime."""
entries = history_manager.get_recently_watched(20)
if not entries:
feedback.info("No recent watch history found")
return "CONTINUE"
return _display_entries_list(ctx, entries, "Recently Watched", feedback)
def _view_all_history(
ctx: Context, history_manager: WatchHistoryManager, feedback
) -> str:
"""View all watch history entries."""
entries = history_manager.get_all_entries()
if not entries:
feedback.info("No watch history found")
return "CONTINUE"
# Sort by last watched date
entries.sort(key=lambda x: x.last_watched, reverse=True)
return _display_entries_list(ctx, entries, "All Watch History", feedback)
def _search_history(
ctx: Context, history_manager: WatchHistoryManager, feedback
) -> str:
"""Search watch history by title."""
query = ctx.selector.ask("Enter search query:")
if not query:
return "CONTINUE"
entries = history_manager.search_entries(query)
if not entries:
feedback.info(f"No anime found matching '{query}'")
return "CONTINUE"
return _display_entries_list(
ctx, entries, f"Search Results for '{query}'", feedback
)
def _display_entries_list(
ctx: Context, entries: List[WatchHistoryEntry], title: str, feedback
) -> str:
"""Display a list of watch history entries and allow selection."""
console = Console()
console.clear()
# Create table for entries
table = Table(title=title)
table.add_column("Status", style="yellow", width=6)
table.add_column("Title", style="cyan")
table.add_column("Progress", style="green", width=12)
table.add_column("Last Watched", style="blue", width=12)
choices = []
entry_map = {}
for i, entry in enumerate(entries):
# Format last watched date
last_watched = entry.last_watched.strftime("%Y-%m-%d")
# Add to table
table.add_row(
entry.get_status_emoji(),
entry.get_display_title(),
entry.get_progress_display(),
last_watched,
)
# Create choice for selector
choice_text = f"{entry.get_status_emoji()} {entry.get_display_title()} - {entry.get_progress_display()}"
choices.append(choice_text)
entry_map[choice_text] = entry
console.print(table)
console.print()
if not choices:
feedback.info("No entries to display")
feedback.pause_for_user()
return "CONTINUE"
choices.append("Back")
choice = ctx.selector.choose("Select an anime for details:", choices=choices)
if not choice or choice == "Back":
return "CONTINUE"
selected_entry = entry_map[choice]
return _show_entry_details(ctx, selected_entry, feedback)
def _show_entry_details(ctx: Context, entry: WatchHistoryEntry, feedback) -> str:
"""Show detailed information about a watch history entry."""
console = Console()
console.clear()
# Display detailed entry information
console.print(f"[bold cyan]{entry.get_display_title()}[/bold cyan]")
console.print(f"Status: {entry.get_status_emoji()} {entry.status.title()}")
console.print(f"Progress: {entry.get_progress_display()}")
console.print(f"Times Watched: {entry.times_watched}")
console.print(f"First Watched: {entry.first_watched.strftime('%Y-%m-%d %H:%M')}")
console.print(f"Last Watched: {entry.last_watched.strftime('%Y-%m-%d %H:%M')}")
if entry.notes:
console.print(f"Notes: {entry.notes}")
# Show media details if available
media = entry.media_item
if media.description:
console.print(
f"\nDescription: {media.description[:200]}{'...' if len(media.description) > 200 else ''}"
)
if media.genres:
console.print(f"Genres: {', '.join(media.genres)}")
if media.average_score:
console.print(f"Score: {media.average_score}/100")
console.print()
# Action options
actions = [
"Mark Episode as Watched",
"Change Status",
"Edit Notes",
"Remove from History",
"Back to List",
]
choice = ctx.selector.choose("Select action:", choices=actions)
if choice == "Mark Episode as Watched":
return _mark_episode_watched(ctx, entry, feedback)
elif choice == "Change Status":
return _change_entry_status(ctx, entry, feedback)
elif choice == "Edit Notes":
return _edit_entry_notes(ctx, entry, feedback)
elif choice == "Remove from History":
return _confirm_remove_entry(ctx, entry, feedback)
else:
return "CONTINUE"
def _mark_episode_watched(ctx: Context, entry: WatchHistoryEntry, feedback) -> str:
"""Mark a specific episode as watched."""
current_episode = entry.last_watched_episode
max_episodes = entry.media_item.episodes or 999
episode_str = ctx.selector.ask(
f"Enter episode number (current: {current_episode}, max: {max_episodes}):"
)
try:
episode = int(episode_str)
if episode < 1 or (max_episodes and episode > max_episodes):
feedback.error(
f"Invalid episode number. Must be between 1 and {max_episodes}"
)
return "CONTINUE"
history_manager = WatchHistoryManager()
success = history_manager.mark_episode_watched(entry.media_item.id, episode)
if success:
feedback.success(f"Marked episode {episode} as watched")
else:
feedback.error("Failed to update watch progress")
except ValueError:
feedback.error("Invalid episode number entered")
return "CONTINUE"
def _change_entry_status(ctx: Context, entry: WatchHistoryEntry, feedback) -> str:
"""Change the status of a watch history entry."""
statuses = ["watching", "completed", "paused", "dropped", "planning"]
current_status = entry.status
choices = [
f"{status.title()} {'(current)' if status == current_status else ''}"
for status in statuses
]
choices.append("Cancel")
choice = ctx.selector.choose(
f"Select new status (current: {current_status}):", choices=choices
)
if not choice or choice == "Cancel":
return "CONTINUE"
new_status = choice.split()[0].lower()
history_manager = WatchHistoryManager()
success = history_manager.change_status(entry.media_item.id, new_status)
if success:
feedback.success(f"Changed status to {new_status}")
else:
feedback.error("Failed to update status")
return "CONTINUE"
def _edit_entry_notes(ctx: Context, entry: WatchHistoryEntry, feedback) -> str:
"""Edit notes for a watch history entry."""
current_notes = entry.notes or ""
new_notes = ctx.selector.ask(f"Enter notes (current: '{current_notes}'):")
if new_notes is None: # User cancelled
return "CONTINUE"
history_manager = WatchHistoryManager()
success = history_manager.update_notes(entry.media_item.id, new_notes)
if success:
feedback.success("Notes updated successfully")
else:
feedback.error("Failed to update notes")
return "CONTINUE"
def _confirm_remove_entry(ctx: Context, entry: WatchHistoryEntry, feedback) -> str:
"""Confirm and remove a watch history entry."""
if feedback.confirm(f"Remove '{entry.get_display_title()}' from watch history?"):
history_manager = WatchHistoryManager()
success = history_manager.remove_entry(entry.media_item.id)
if success:
feedback.success("Entry removed from watch history")
else:
feedback.error("Failed to remove entry")
return "CONTINUE"
def _edit_entry(ctx: Context, history_manager: WatchHistoryManager, feedback) -> str:
"""Edit a watch history entry (select first)."""
entries = history_manager.get_all_entries()
if not entries:
feedback.info("No watch history entries to edit")
return "CONTINUE"
# Sort by title for easier selection
entries.sort(key=lambda x: x.get_display_title())
choices = [
f"{entry.get_display_title()} - {entry.get_progress_display()}"
for entry in entries
]
choices.append("Cancel")
choice = ctx.selector.choose("Select anime to edit:", choices=choices)
if not choice or choice == "Cancel":
return "CONTINUE"
# Find the selected entry
choice_title = choice.split(" - ")[0]
selected_entry = next(
(entry for entry in entries if entry.get_display_title() == choice_title), None
)
if selected_entry:
return _show_entry_details(ctx, selected_entry, feedback)
return "CONTINUE"
def _remove_entry(ctx: Context, history_manager: WatchHistoryManager, feedback) -> str:
"""Remove a watch history entry (select first)."""
entries = history_manager.get_all_entries()
if not entries:
feedback.info("No watch history entries to remove")
return "CONTINUE"
# Sort by title for easier selection
entries.sort(key=lambda x: x.get_display_title())
choices = [
f"{entry.get_display_title()} - {entry.get_progress_display()}"
for entry in entries
]
choices.append("Cancel")
choice = ctx.selector.choose("Select anime to remove:", choices=choices)
if not choice or choice == "Cancel":
return "CONTINUE"
# Find the selected entry
choice_title = choice.split(" - ")[0]
selected_entry = next(
(entry for entry in entries if entry.get_display_title() == choice_title), None
)
if selected_entry:
return _confirm_remove_entry(ctx, selected_entry, feedback)
return "CONTINUE"
def _view_stats(ctx: Context, history_manager: WatchHistoryManager, feedback) -> str:
"""View detailed watch history statistics."""
console = Console()
console.clear()
stats = history_manager.get_stats()
# Create detailed stats table
table = Table(title="Detailed Watch History Statistics")
table.add_column("Metric", style="cyan")
table.add_column("Value", style="green")
table.add_row("Total Anime Entries", str(stats["total_entries"]))
table.add_row("Currently Watching", str(stats["watching"]))
table.add_row("Completed", str(stats["completed"]))
table.add_row("Dropped", str(stats["dropped"]))
table.add_row("Paused", str(stats["paused"]))
table.add_row("Total Episodes Watched", str(stats["total_episodes_watched"]))
table.add_row("Last Updated", stats["last_updated"])
# Calculate additional stats
if stats["total_entries"] > 0:
completion_rate = (stats["completed"] / stats["total_entries"]) * 100
table.add_row("Completion Rate", f"{completion_rate:.1f}%")
avg_episodes = stats["total_episodes_watched"] / stats["total_entries"]
table.add_row("Avg Episodes per Anime", f"{avg_episodes:.1f}")
console.print(table)
feedback.pause_for_user()
return "CONTINUE"
def _export_history(
ctx: Context, history_manager: WatchHistoryManager, feedback
) -> str:
"""Export watch history to a file."""
export_name = ctx.selector.ask("Enter export filename (without extension):")
if not export_name:
return "CONTINUE"
export_path = APP_DATA_DIR / f"{export_name}.json"
if export_path.exists():
if not feedback.confirm(
f"File '{export_name}.json' already exists. Overwrite?"
):
return "CONTINUE"
success = history_manager.export_history(export_path)
if success:
feedback.success(f"Watch history exported to {export_path}")
else:
feedback.error("Failed to export watch history")
return "CONTINUE"
def _import_history(
ctx: Context, history_manager: WatchHistoryManager, feedback
) -> str:
"""Import watch history from a file."""
import_name = ctx.selector.ask("Enter import filename (without extension):")
if not import_name:
return "CONTINUE"
import_path = APP_DATA_DIR / f"{import_name}.json"
if not import_path.exists():
feedback.error(f"File '{import_name}.json' not found in {APP_DATA_DIR}")
return "CONTINUE"
merge = feedback.confirm(
"Merge with existing history? (No = Replace existing history)"
)
success = history_manager.import_history(import_path, merge=merge)
if success:
action = "merged with" if merge else "replaced"
feedback.success(f"Watch history imported and {action} existing data")
else:
feedback.error("Failed to import watch history")
return "CONTINUE"
def _clear_history(ctx: Context, history_manager: WatchHistoryManager, feedback) -> str:
"""Clear all watch history with confirmation."""
if not feedback.confirm(
"Are you sure you want to clear ALL watch history? This cannot be undone."
):
return "CONTINUE"
if not feedback.confirm("Final confirmation: Clear all watch history?"):
return "CONTINUE"
# Create backup before clearing
backup_success = history_manager.backup_history()
if backup_success:
feedback.info("Backup created before clearing")
success = history_manager.clear_history()
if success:
feedback.success("All watch history cleared")
else:
feedback.error("Failed to clear watch history")
return "CONTINUE"

View File

@@ -34,6 +34,7 @@ class Switch:
_provider_results: bool = False
_episodes: bool = False
_servers: bool = False
_dont_play: bool = False
@property
def show_provider_results_menu(self):
@@ -45,6 +46,16 @@ class Switch:
def force_provider_results_menu(self):
self._provider_results = True
@property
def dont_play(self):
if self._dont_play:
self._dont_play = False
return True
return False
def force_dont_play(self):
self._dont_play = True
@property
def show_episodes_menu(self):
if self._episodes:
@@ -102,18 +113,18 @@ class Context:
if not self._media_api:
from ...libs.media_api.api import create_api_client
self._media_api = create_api_client(
self.config.general.media_api, self.config
)
media_api = create_api_client(self.config.general.media_api, self.config)
if auth_profile := self.auth.get_auth():
p = self._media_api.authenticate(auth_profile.token)
auth = self.auth
if auth_profile := auth.get_auth():
p = media_api.authenticate(auth_profile.token)
if p:
logger.debug(f"Authenticated as {p.name}")
else:
logger.warning(f"Failed to authenticate with {auth_profile.token}")
else:
logger.debug("Not authenticated")
self._media_api = media_api
return self._media_api
@@ -122,7 +133,9 @@ class Context:
if not self._player:
from ..service.player import PlayerService
self._player = PlayerService(self.config, self.provider)
self._player = PlayerService(
self.config, self.provider, self.media_registry
)
return self._player
@property
@@ -149,7 +162,7 @@ class Context:
from ..service.watch_history.service import WatchHistoryService
self._watch_history = WatchHistoryService(
self.config, self.media_registry, self._media_api
self.config, self.media_registry, self.media_api
)
return self._watch_history

View File

@@ -44,6 +44,9 @@ class MenuName(Enum):
MEDIA_REVIEW = "MEDIA_REVIEW"
MEDIA_CHARACTERS = "MEDIA_CHARACTERS"
MEDIA_AIRING_SCHEDULE = "MEDIA_AIRING_SCHEDULE"
PLAY_DOWNLOADS = "PLAY_DOWNLOADS"
DOWNLOADS_PLAYER_CONTROLS = "DOWNLOADS_PLAYER_CONTROLS"
DOWNLOAD_EPISODES = "DOWNLOAD_EPISODES"
class StateModel(BaseModel):

View File

@@ -1,3 +0,0 @@
from .service import DownloadService
__all__ = ["DownloadService"]

View File

@@ -1,530 +1,248 @@
"""Download service that integrates with the media registry."""
import logging
from pathlib import Path
from typing import Optional
from typing import TYPE_CHECKING, List
from ....core.config.model import AppConfig
from ....core.downloader.base import BaseDownloader
from ....core.downloader.downloader import create_downloader
from ....core.downloader.params import DownloadParams
from ....core.exceptions import FastAnimeError
from ....core.downloader import DownloadParams, create_downloader
from ....core.utils.concurrency import ManagedBackgroundWorker, thread_manager
from ....core.utils.fuzzy import fuzz
from ....core.utils.normalizer import normalize_title
from ....libs.media_api.types import MediaItem
from ....libs.provider.anime.base import BaseAnimeProvider
from ....libs.provider.anime.params import EpisodeStreamsParams
from ....libs.provider.anime.types import Server
from ..registry import MediaRegistryService
from ..registry.models import DownloadStatus, MediaEpisode
from ....libs.provider.anime.params import (
AnimeParams,
EpisodeStreamsParams,
SearchParams,
)
from ..registry.models import DownloadStatus
if TYPE_CHECKING:
from ....libs.media_api.api import BaseApiClient
from ....libs.provider.anime.provider import BaseAnimeProvider
from ..registry.service import MediaRegistryService
logger = logging.getLogger(__name__)
class DownloadService:
"""Service for downloading episodes and tracking them in the registry."""
def __init__(
self,
config: AppConfig,
media_registry: MediaRegistryService,
provider: BaseAnimeProvider,
registry_service: "MediaRegistryService",
media_api_service: "BaseApiClient",
provider_service: "BaseAnimeProvider",
):
self.config = config
self.downloads_config = config.downloads
self.media_registry = media_registry
self.provider = provider
self._downloader: Optional[BaseDownloader] = None
self.registry = registry_service
self.media_api = media_api_service
self.provider = provider_service
self.downloader = create_downloader(config.downloads)
@property
def downloader(self) -> BaseDownloader:
"""Lazy initialization of downloader."""
if self._downloader is None:
self._downloader = create_downloader(self.downloads_config)
return self._downloader
# Worker is kept for potential future background commands
self._worker = ManagedBackgroundWorker(
max_workers=config.downloads.max_concurrent_downloads,
name="DownloadWorker",
)
thread_manager.register_worker("download_worker", self._worker)
def download_episode(
self,
media_item: MediaItem,
episode_number: str,
server: Optional[Server] = None,
quality: Optional[str] = None,
force_redownload: bool = False,
) -> bool:
def start(self):
"""Starts the download worker for background tasks."""
if not self._worker.is_running():
self._worker.start()
# We can still resume background tasks on startup if any exist
self.resume_unfinished_downloads()
def stop(self):
"""Stops the download worker."""
self._worker.shutdown(wait=False)
def add_to_queue(self, media_item: MediaItem, episode_number: str) -> bool:
"""Adds a download job to the ASYNCHRONOUS queue."""
logger.info(
f"Queueing background download for '{media_item.title.english}' Episode {episode_number}"
)
self.registry.get_or_create_record(media_item)
updated = self.registry.update_episode_download_status(
media_id=media_item.id,
episode_number=episode_number,
status=DownloadStatus.QUEUED,
)
if not updated:
return False
self._worker.submit_function(
self._execute_download_job, media_item, episode_number
)
return True
def download_episodes_sync(self, media_item: MediaItem, episodes: List[str]):
"""
Download a specific episode and record it in the registry.
Args:
media_item: The media item to download
episode_number: The episode number to download
server: Optional specific server to use for download
quality: Optional quality preference
force_redownload: Whether to redownload if already exists
Returns:
bool: True if download was successful, False otherwise
Performs downloads SYNCHRONOUSLY and blocks until complete.
This is for the direct `download` command.
"""
try:
# Get or create media record
media_record = self.media_registry.get_or_create_record(media_item)
# Check if episode already exists and is completed
existing_episode = self._find_episode_in_record(
media_record, episode_number
for episode_number in episodes:
title = (
media_item.title.english
or media_item.title.romaji
or f"ID: {media_item.id}"
)
if (
existing_episode
and existing_episode.download_status == DownloadStatus.COMPLETED
and not force_redownload
and existing_episode.file_path.exists()
):
logger.info(
f"Episode {episode_number} already downloaded at {existing_episode.file_path}"
logger.info(
f"Starting synchronous download for '{title}' Episode {episode_number}"
)
self._execute_download_job(media_item, episode_number)
def resume_unfinished_downloads(self):
"""Finds and re-queues any downloads that were left in an unfinished state."""
logger.info("Checking for unfinished downloads to resume...")
queued_jobs = self.registry.get_episodes_by_download_status(
DownloadStatus.QUEUED
)
downloading_jobs = self.registry.get_episodes_by_download_status(
DownloadStatus.DOWNLOADING
)
unfinished_jobs = queued_jobs + downloading_jobs
if not unfinished_jobs:
logger.info("No unfinished downloads found.")
return
logger.info(
f"Found {len(unfinished_jobs)} unfinished downloads. Re-queueing..."
)
for media_id, episode_number in unfinished_jobs:
record = self.registry.get_media_record(media_id)
if record and record.media_item:
self.add_to_queue(record.media_item, episode_number)
else:
logger.error(
f"Could not find metadata for media ID {media_id}. Cannot resume. Please run 'fastanime registry sync'."
)
return True
# Generate file path
file_path = self._generate_episode_file_path(media_item, episode_number)
# Update status to QUEUED
self.media_registry.update_episode_download_status(
media_id=media_item.id,
episode_number=episode_number,
status=DownloadStatus.QUEUED,
file_path=file_path,
)
# Get episode stream server if not provided
if server is None:
server = self._get_episode_server(media_item, episode_number, quality)
if not server:
self.media_registry.update_episode_download_status(
media_id=media_item.id,
episode_number=episode_number,
status=DownloadStatus.FAILED,
error_message="Failed to get server for episode",
)
return False
# Update status to DOWNLOADING
self.media_registry.update_episode_download_status(
def _execute_download_job(self, media_item: MediaItem, episode_number: str):
"""The core download logic, can be called by worker or synchronously."""
self.registry.get_or_create_record(media_item)
try:
self.registry.update_episode_download_status(
media_id=media_item.id,
episode_number=episode_number,
status=DownloadStatus.DOWNLOADING,
provider_name=self.provider.__class__.__name__,
server_name=server.name,
quality=quality or self.downloads_config.preferred_quality,
)
# Perform the download
download_result = self._download_from_server(
media_item, episode_number, server, file_path
media_title = (
media_item.title.english or media_item.title.romaji or "Unknown"
)
if download_result.success and download_result.video_path:
# Get file size if available
file_size = None
if download_result.video_path.exists():
file_size = download_result.video_path.stat().st_size
# Update episode record with success
self.media_registry.update_episode_download_status(
media_id=media_item.id,
episode_number=episode_number,
status=DownloadStatus.COMPLETED,
file_path=download_result.video_path,
file_size=file_size,
subtitle_paths=download_result.subtitle_paths,
)
logger.info(
f"Successfully downloaded episode {episode_number} to {download_result.video_path}"
)
else:
# Update episode record with failure
self.media_registry.update_episode_download_status(
media_id=media_item.id,
episode_number=episode_number,
status=DownloadStatus.FAILED,
error_message=download_result.error_message,
)
logger.error(
f"Failed to download episode {episode_number}: {download_result.error_message}"
)
return download_result.success
except Exception as e:
logger.error(f"Error downloading episode {episode_number}: {e}")
# Update status to FAILED
try:
self.media_registry.update_episode_download_status(
media_id=media_item.id,
episode_number=episode_number,
status=DownloadStatus.FAILED,
error_message=str(e),
)
except Exception as cleanup_error:
logger.error(f"Failed to update failed status: {cleanup_error}")
return False
def download_multiple_episodes(
self,
media_item: MediaItem,
episode_numbers: list[str],
quality: Optional[str] = None,
force_redownload: bool = False,
) -> dict[str, bool]:
"""
Download multiple episodes and return success status for each.
Args:
media_item: The media item to download
episode_numbers: List of episode numbers to download
quality: Optional quality preference
force_redownload: Whether to redownload if already exists
Returns:
dict: Mapping of episode_number -> success status
"""
results = {}
for episode_number in episode_numbers:
success = self.download_episode(
media_item=media_item,
episode_number=episode_number,
quality=quality,
force_redownload=force_redownload,
# 1. Search the provider to get the provider-specific ID
provider_search_title = normalize_title(
media_title,
self.config.general.provider.value,
use_provider_mapping=True,
)
results[episode_number] = success
# Log progress
logger.info(
f"Download progress: {episode_number} - {'' if success else ''}"
provider_search_results = self.provider.search(
SearchParams(query=provider_search_title)
)
return results
if not provider_search_results or not provider_search_results.results:
raise ValueError(
f"Could not find '{media_title}' on provider '{self.config.general.provider.value}'"
)
def get_download_status(
self, media_item: MediaItem, episode_number: str
) -> Optional[DownloadStatus]:
"""Get the download status for a specific episode."""
media_record = self.media_registry.get_media_record(media_item.id)
if not media_record:
return None
# 2. Find the best match using fuzzy logic (like auto-select)
provider_results_map = {
result.title: result for result in provider_search_results.results
}
best_match_title = max(
provider_results_map.keys(),
key=lambda p_title: fuzz.ratio(
normalize_title(
p_title, self.config.general.provider.value
).lower(),
media_title.lower(),
),
)
provider_anime_ref = provider_results_map[best_match_title]
episode_record = self._find_episode_in_record(media_record, episode_number)
return episode_record.download_status if episode_record else None
# 3. Get full provider anime details (contains the correct episode list)
provider_anime = self.provider.get(
AnimeParams(id=provider_anime_ref.id, query=media_title)
)
if not provider_anime:
raise ValueError(
f"Failed to get full details for '{best_match_title}' from provider."
)
def get_downloaded_episodes(self, media_item: MediaItem) -> list[str]:
"""Get list of successfully downloaded episode numbers for a media item."""
media_record = self.media_registry.get_media_record(media_item.id)
if not media_record:
return []
return [
episode.episode_number
for episode in media_record.media_episodes
if episode.download_status == DownloadStatus.COMPLETED
and episode.file_path.exists()
]
def remove_downloaded_episode(
self, media_item: MediaItem, episode_number: str
) -> bool:
"""Remove a downloaded episode file and update registry."""
try:
media_record = self.media_registry.get_media_record(media_item.id)
if not media_record:
return False
episode_record = self._find_episode_in_record(media_record, episode_number)
if not episode_record:
return False
# Remove file if it exists
if episode_record.file_path.exists():
episode_record.file_path.unlink()
# Remove episode from record
media_record.media_episodes = [
ep
for ep in media_record.media_episodes
if ep.episode_number != episode_number
]
# Save updated record
self.media_registry.save_media_record(media_record)
logger.info(f"Removed downloaded episode {episode_number}")
return True
except Exception as e:
logger.error(f"Error removing episode {episode_number}: {e}")
return False
def _find_episode_in_record(
self, media_record, episode_number: str
) -> Optional[MediaEpisode]:
"""Find an episode record by episode number."""
for episode in media_record.media_episodes:
if episode.episode_number == episode_number:
return episode
return None
def _get_episode_server(
self, media_item: MediaItem, episode_number: str, quality: Optional[str] = None
) -> Optional[Server]:
"""Get a server for downloading the episode."""
try:
# Use media title for provider search
media_title = media_item.title.english or media_item.title.romaji
if not media_title:
logger.error("Media item has no searchable title")
return None
# Get episode streams from provider
streams = self.provider.episode_streams(
# 4. Get stream links using the now-validated provider_anime ID
streams_iterator = self.provider.episode_streams(
EpisodeStreamsParams(
anime_id=str(media_item.id),
anime_id=provider_anime.id, # Use the ID from the provider, not AniList
query=media_title,
episode=episode_number,
translation_type=self.config.stream.translation_type,
)
)
if not streams_iterator:
raise ValueError("Provider returned no stream iterator.")
if not streams:
logger.error(f"No streams found for episode {episode_number}")
return None
server = next(streams_iterator, None)
if not server or not server.links:
raise ValueError(f"No stream links found for Episode {episode_number}")
# Convert iterator to list and get first available server
stream_list = list(streams)
if not stream_list:
logger.error(f"No servers available for episode {episode_number}")
return None
if server.name != self.config.downloads.server.value:
while True:
try:
_server = next(streams_iterator)
if _server.name == self.config.downloads.server.value:
server = _server
break
except StopIteration:
break
# Return the first server (could be enhanced with quality/preference logic)
return stream_list[0]
except Exception as e:
logger.error(f"Error getting episode server: {e}")
return None
def _download_from_server(
self,
media_item: MediaItem,
episode_number: str,
server: Server,
output_path: Path,
):
"""Download episode from a specific server."""
anime_title = media_item.title.english or media_item.title.romaji or "Unknown"
episode_title = server.episode_title or f"Episode {episode_number}"
try:
# Get the best quality link from server
if not server.links:
raise FastAnimeError("Server has no available links")
# Use the first link (could be enhanced with quality filtering)
stream_link = server.links[0]
# Prepare download parameters
# 5. Perform the download
download_params = DownloadParams(
url=stream_link.link,
anime_title=anime_title,
episode_title=episode_title,
silent=True, # Use True by default since there's no verbose in config
anime_title=media_title,
episode_title=f"{media_title} - Episode {episode_number}",
silent=False,
headers=server.headers,
subtitles=[sub.url for sub in server.subtitles]
if server.subtitles
else [],
vid_format=self.downloads_config.preferred_quality,
force_unknown_ext=True,
subtitles=[sub.url for sub in server.subtitles],
merge=self.config.downloads.merge_subtitles,
clean=self.config.downloads.cleanup_after_merge,
no_check_certificate=self.config.downloads.no_check_certificate,
)
# Ensure output directory exists
output_path.parent.mkdir(parents=True, exist_ok=True)
result = self.downloader.download(download_params)
# Perform download
return self.downloader.download(download_params)
except Exception as e:
logger.error(f"Error during download: {e}")
from ....core.downloader.model import DownloadResult
return DownloadResult(
success=False,
error_message=str(e),
anime_title=anime_title,
episode_title=episode_title,
)
def get_download_statistics(self) -> dict:
"""Get comprehensive download statistics from the registry."""
return self.media_registry.get_download_statistics()
def get_failed_downloads(self) -> list[tuple[int, str]]:
"""Get all episodes that failed to download."""
return self.media_registry.get_episodes_by_download_status(
DownloadStatus.FAILED
)
def get_queued_downloads(self) -> list[tuple[int, str]]:
"""Get all episodes queued for download."""
return self.media_registry.get_episodes_by_download_status(
DownloadStatus.QUEUED
)
def retry_failed_downloads(self, max_retries: int = 3) -> dict[str, bool]:
"""Retry all failed downloads up to max_retries."""
failed_episodes = self.get_failed_downloads()
results = {}
for media_id, episode_number in failed_episodes:
# Get the media record to check retry attempts
media_record = self.media_registry.get_media_record(media_id)
if not media_record:
continue
episode_record = self._find_episode_in_record(media_record, episode_number)
if not episode_record or episode_record.download_attempts >= max_retries:
logger.info(
f"Skipping {media_id}:{episode_number} - max retries exceeded"
# 6. Update registry based on result
if result.success and result.video_path:
file_size = (
result.video_path.stat().st_size
if result.video_path.exists()
else None
)
continue
logger.info(f"Retrying download for {media_id}:{episode_number}")
success = self.download_episode(
media_item=media_record.media_item,
episode_number=episode_number,
force_redownload=True,
)
results[f"{media_id}:{episode_number}"] = success
return results
def cleanup_failed_downloads(self, older_than_days: int = 7) -> int:
"""Clean up failed download records older than specified days."""
from datetime import datetime, timedelta
cleanup_count = 0
cutoff_date = datetime.now() - timedelta(days=older_than_days)
try:
for record in self.media_registry.get_all_media_records():
episodes_to_remove = []
for episode in record.media_episodes:
if (
episode.download_status == DownloadStatus.FAILED
and episode.download_date < cutoff_date
):
episodes_to_remove.append(episode.episode_number)
for episode_number in episodes_to_remove:
record.media_episodes = [
ep
for ep in record.media_episodes
if ep.episode_number != episode_number
]
cleanup_count += 1
if episodes_to_remove:
self.media_registry.save_media_record(record)
logger.info(f"Cleaned up {cleanup_count} failed download records")
return cleanup_count
self.registry.update_episode_download_status(
media_id=media_item.id,
episode_number=episode_number,
status=DownloadStatus.COMPLETED,
file_path=result.merged_path or result.video_path,
file_size=file_size,
quality=stream_link.quality,
provider_name=self.config.general.provider.value,
server_name=server.name,
subtitle_paths=result.subtitle_paths,
)
logger.info(
f"Successfully downloaded Episode {episode_number} of '{media_title}'"
)
else:
raise ValueError(result.error_message or "Unknown download error")
except Exception as e:
logger.error(f"Error during cleanup: {e}")
return 0
def pause_download(self, media_item: MediaItem, episode_number: str) -> bool:
"""Pause a download (change status from DOWNLOADING to PAUSED)."""
try:
return self.media_registry.update_episode_download_status(
logger.error(
f"Download failed for '{media_item.title.english}' Ep {episode_number}: {e}",
exc_info=True,
)
self.registry.update_episode_download_status(
media_id=media_item.id,
episode_number=episode_number,
status=DownloadStatus.PAUSED,
status=DownloadStatus.FAILED,
error_message=str(e),
)
except Exception as e:
logger.error(f"Error pausing download: {e}")
return False
def resume_download(self, media_item: MediaItem, episode_number: str) -> bool:
"""Resume a paused download."""
return self.download_episode(
media_item=media_item,
episode_number=episode_number,
force_redownload=True,
)
def get_media_download_progress(self, media_item: MediaItem) -> dict:
"""Get download progress for a specific media item."""
try:
media_record = self.media_registry.get_media_record(media_item.id)
if not media_record:
return {
"total": 0,
"downloaded": 0,
"failed": 0,
"queued": 0,
"downloading": 0,
}
stats = {
"total": 0,
"downloaded": 0,
"failed": 0,
"queued": 0,
"downloading": 0,
"paused": 0,
}
for episode in media_record.media_episodes:
stats["total"] += 1
status = episode.download_status.value.lower()
if status == "completed":
stats["downloaded"] += 1
elif status == "failed":
stats["failed"] += 1
elif status == "queued":
stats["queued"] += 1
elif status == "downloading":
stats["downloading"] += 1
elif status == "paused":
stats["paused"] += 1
return stats
except Exception as e:
logger.error(f"Error getting download progress: {e}")
return {
"total": 0,
"downloaded": 0,
"failed": 0,
"queued": 0,
"downloading": 0,
}
def _generate_episode_file_path(
self, media_item: MediaItem, episode_number: str
) -> Path:
"""Generate the file path for a downloaded episode."""
# Use the download directory from config
base_dir = self.downloads_config.downloads_dir
# Create anime-specific directory
anime_title = media_item.title.english or media_item.title.romaji or "Unknown"
# Sanitize title for filesystem
safe_title = "".join(
c for c in anime_title if c.isalnum() or c in (" ", "-", "_")
).rstrip()
anime_dir = base_dir / safe_title
# Generate filename (could use template from config in the future)
filename = f"Episode_{episode_number:0>2}.mp4"
return anime_dir / filename

View File

@@ -0,0 +1,80 @@
import json
import logging
from pathlib import Path
from typing import Set
from fastanime.core.constants import APP_CACHE_DIR
from fastanime.libs.media_api.base import BaseApiClient
from fastanime.libs.media_api.types import Notification
try:
import plyer
PLYER_AVAILABLE = True
except ImportError:
PLYER_AVAILABLE = False
logger = logging.getLogger(__name__)
SEEN_NOTIFICATIONS_CACHE = APP_CACHE_DIR / "seen_notifications.json"
class NotificationService:
def __init__(self, media_api: BaseApiClient):
self.media_api = media_api
self._seen_ids: Set[int] = self._load_seen_ids()
def _load_seen_ids(self) -> Set[int]:
if not SEEN_NOTIFICATIONS_CACHE.exists():
return set()
try:
with open(SEEN_NOTIFICATIONS_CACHE, "r") as f:
return set(json.load(f))
except (json.JSONDecodeError, IOError):
return set()
def _save_seen_ids(self):
try:
with open(SEEN_NOTIFICATIONS_CACHE, "w") as f:
json.dump(list(self._seen_ids), f)
except IOError:
logger.error("Failed to save seen notifications cache.")
def check_and_display_notifications(self):
if not PLYER_AVAILABLE:
logger.warning("plyer not installed. Cannot display desktop notifications.")
return
if not self.media_api.is_authenticated():
logger.info("Not authenticated, skipping notification check.")
return
logger.info("Checking for new notifications...")
notifications = self.media_api.get_notifications()
if not notifications:
logger.info("No new notifications found.")
return
new_notifications = [n for n in notifications if n.id not in self._seen_ids]
if not new_notifications:
logger.info("No unseen notifications found.")
return
for notif in new_notifications:
title = notif.media.title.english or notif.media.title.romaji
message = f"Episode {notif.episode} of {title} has aired!"
try:
plyer.notification.notify(
title="FastAnime: New Episode",
message=message,
app_name="FastAnime",
timeout=20,
)
logger.info(f"Displayed notification: {message}")
self._seen_ids.add(notif.id)
except Exception as e:
logger.error(f"Failed to display notification: {e}")
self._save_seen_ids()

View File

@@ -8,6 +8,7 @@ from .....libs.player.params import PlayerParams
from .....libs.player.types import PlayerResult
from .....libs.provider.anime.base import BaseAnimeProvider
from .....libs.provider.anime.types import Anime
from ....service.registry import MediaRegistryService
class BaseIPCPlayer(ABC):
@@ -23,8 +24,9 @@ class BaseIPCPlayer(ABC):
self,
player: BasePlayer,
player_params: PlayerParams,
provider: BaseAnimeProvider,
anime: Anime,
provider: Optional[BaseAnimeProvider] = None,
anime: Optional[Anime] = None,
registry: Optional[MediaRegistryService] = None,
media_item: Optional[MediaItem] = None,
) -> PlayerResult:
"""

View File

@@ -25,6 +25,8 @@ from .....libs.player.types import PlayerResult
from .....libs.provider.anime.base import BaseAnimeProvider
from .....libs.provider.anime.params import EpisodeStreamsParams
from .....libs.provider.anime.types import Anime, ProviderServer, Server
from ....service.registry.models import DownloadStatus
from ...registry import MediaRegistryService
from .base import BaseIPCPlayer
logger = logging.getLogger(__name__)
@@ -260,9 +262,11 @@ class MpvIPCPlayer(BaseIPCPlayer):
property_observers: Dict[str, List[Callable]] = {}
key_bindings: Dict[str, Callable] = {}
message_handlers: Dict[str, Callable] = {}
provider: BaseAnimeProvider
anime: Anime
media_item: Optional[MediaItem]
provider: Optional[BaseAnimeProvider] = None
anime: Optional[Anime] = None
media_item: Optional[MediaItem] = None
registry: Optional[MediaRegistryService] = None
def __init__(self, stream_config: StreamConfig):
super().__init__(stream_config)
@@ -271,11 +275,18 @@ class MpvIPCPlayer(BaseIPCPlayer):
self._fetch_result_queue: Queue = Queue()
def play(
self, player, player_params, provider, anime, media_item=None
self,
player: BasePlayer,
player_params: PlayerParams,
provider: Optional[BaseAnimeProvider] = None,
anime: Optional[Anime] = None,
registry: Optional[MediaRegistryService] = None,
media_item: Optional[MediaItem] = None,
) -> PlayerResult:
self.provider = provider
self.anime = anime
self.media_item = media_item
self.registry = registry
self.player_state = PlayerState(
self.stream_config,
player_params.query,
@@ -430,6 +441,7 @@ class MpvIPCPlayer(BaseIPCPlayer):
elif event == "client-message":
self._handle_client_message(message)
elif event == "file-loaded":
time.sleep(0.1)
self._configure_player()
elif event:
logger.debug(f"MPV event: {event}")
@@ -497,52 +509,101 @@ class MpvIPCPlayer(BaseIPCPlayer):
):
"""This function runs in a background thread to fetch episode streams."""
try:
available_episodes = getattr(
self.anime.episodes, self.stream_config.translation_type
)
if not available_episodes:
raise ValueError(
f"No {self.stream_config.translation_type} episodes available."
if self.anime and self.provider:
available_episodes = getattr(
self.anime.episodes, self.stream_config.translation_type
)
current_index = available_episodes.index(self.player_state.episode)
if episode_type == "next":
if current_index >= len(available_episodes) - 1:
raise ValueError("Already at the last episode.")
target_episode = available_episodes[current_index + 1]
elif episode_type == "previous":
if current_index <= 0:
raise ValueError("Already at first episode")
target_episode = available_episodes[current_index - 1]
elif episode_type == "reload":
target_episode = self.player_state.episode
elif episode_type == "custom":
if not ep_no or ep_no not in available_episodes:
if not available_episodes:
raise ValueError(
f"Invalid episode. Available: {', '.join(available_episodes)}"
f"No {self.stream_config.translation_type} episodes available."
)
target_episode = ep_no
else:
return
stream_params = EpisodeStreamsParams(
anime_id=self.anime.id,
query=self.player_state.query,
episode=target_episode,
translation_type=self.stream_config.translation_type,
)
# This is the blocking network call, now safely in a thread
episode_streams = list(self.provider.episode_streams(stream_params) or [])
if not episode_streams:
raise ValueError(f"No streams found for episode {target_episode}")
current_index = available_episodes.index(self.player_state.episode)
result = {
"type": "success",
"target_episode": target_episode,
"servers": {ProviderServer(s.name): s for s in episode_streams},
}
self._fetch_result_queue.put(result)
if episode_type == "next":
if current_index >= len(available_episodes) - 1:
raise ValueError("Already at the last episode.")
target_episode = available_episodes[current_index + 1]
elif episode_type == "previous":
if current_index <= 0:
raise ValueError("Already at first episode")
target_episode = available_episodes[current_index - 1]
elif episode_type == "reload":
target_episode = self.player_state.episode
elif episode_type == "custom":
if not ep_no or ep_no not in available_episodes:
raise ValueError(
f"Invalid episode. Available: {', '.join(available_episodes)}"
)
target_episode = ep_no
else:
return
stream_params = EpisodeStreamsParams(
anime_id=self.anime.id,
query=self.player_state.query,
episode=target_episode,
translation_type=self.stream_config.translation_type,
)
# This is the blocking network call, now safely in a thread
episode_streams = list(
self.provider.episode_streams(stream_params) or []
)
if not episode_streams:
raise ValueError(f"No streams found for episode {target_episode}")
result = {
"type": "success",
"target_episode": target_episode,
"servers": {ProviderServer(s.name): s for s in episode_streams},
}
self._fetch_result_queue.put(result)
elif self.registry and self.media_item:
record = self.registry.get_media_record(self.media_item.id)
if not record or not record.media_episodes:
logger.warning("No downloaded episodes found for this anime.")
return
downloaded_episodes = {
ep.episode_number: ep.file_path
for ep in record.media_episodes
if ep.download_status == DownloadStatus.COMPLETED
and ep.file_path
and ep.file_path.exists()
}
available_episodes = list(sorted(downloaded_episodes.keys(), key=float))
current_index = available_episodes.index(self.player_state.episode)
if episode_type == "next":
if current_index >= len(available_episodes) - 1:
raise ValueError("Already at the last episode.")
target_episode = available_episodes[current_index + 1]
elif episode_type == "previous":
if current_index <= 0:
raise ValueError("Already at first episode")
target_episode = available_episodes[current_index - 1]
elif episode_type == "reload":
target_episode = self.player_state.episode
elif episode_type == "custom":
if not ep_no or ep_no not in available_episodes:
raise ValueError(
f"Invalid episode. Available: {', '.join(available_episodes)}"
)
target_episode = ep_no
else:
return
file_path = downloaded_episodes[target_episode]
self.player_state.reset()
self.player_state.episode = target_episode
self.ipc_client.send_command(["loadfile", str(file_path)])
# time.sleep(1)
# self.ipc_client.send_command(["seek", 0, "absolute"])
# self.ipc_client.send_command(
# ["set_property", "title", self.player_state.episode_title]
# )
self._show_text(f"Fetched {file_path}")
self.player_fetching = False
except Exception as e:
logger.error(f"Episode fetch task failed: {e}")

View File

@@ -10,6 +10,7 @@ from ....libs.player.player import create_player
from ....libs.player.types import PlayerResult
from ....libs.provider.anime.base import BaseAnimeProvider
from ....libs.provider.anime.types import Anime
from ..registry import MediaRegistryService
logger = logging.getLogger(__name__)
@@ -18,10 +19,18 @@ class PlayerService:
app_config: AppConfig
provider: BaseAnimeProvider
player: BasePlayer
registry: Optional[MediaRegistryService] = None
local: bool = False
def __init__(self, app_config: AppConfig, provider: BaseAnimeProvider):
def __init__(
self,
app_config: AppConfig,
provider: BaseAnimeProvider,
registry: Optional[MediaRegistryService] = None,
):
self.app_config = app_config
self.provider = provider
self.registry = registry
self.player = create_player(app_config)
def play(
@@ -29,9 +38,11 @@ class PlayerService:
params: PlayerParams,
anime: Optional[Anime] = None,
media_item: Optional[MediaItem] = None,
local: bool = False,
) -> PlayerResult:
self.local = local
if self.app_config.stream.use_ipc:
if anime:
if anime or self.registry:
return self._play_with_ipc(params, anime, media_item)
else:
logger.warning(
@@ -40,13 +51,17 @@ class PlayerService:
return self.player.play(params)
def _play_with_ipc(
self, params: PlayerParams, anime: Anime, media_item: Optional[MediaItem] = None
self,
params: PlayerParams,
anime: Optional[Anime] = None,
media_item: Optional[MediaItem] = None,
) -> PlayerResult:
if self.app_config.stream.player == "mpv":
from .ipc.mpv import MpvIPCPlayer
registry = self.registry if self.local else None
return MpvIPCPlayer(self.app_config.stream).play(
self.player, params, self.provider, anime, media_item
self.player, params, self.provider, anime, registry, media_item
)
else:
raise FastAnimeError("Not implemented")

View File

@@ -28,8 +28,8 @@ class MediaEpisode(BaseModel):
episode_number: str
download_status: DownloadStatus = DownloadStatus.NOT_DOWNLOADED
file_path: Path
download_date: datetime = Field(default_factory=datetime.now)
file_path: Optional[Path] = None
download_date: Optional[datetime] = None
# Additional download metadata
file_size: Optional[int] = None # File size in bytes
@@ -40,6 +40,8 @@ class MediaEpisode(BaseModel):
download_attempts: int = 0 # Number of download attempts
last_error: Optional[str] = None # Last error message if failed
model_config = {"arbitrary_types_allowed": True}
class MediaRecord(BaseModel):
media_item: MediaItem

View File

@@ -531,6 +531,7 @@ class MediaRegistryService:
server_name: Optional[str] = None,
subtitle_paths: Optional[list[Path]] = None,
error_message: Optional[str] = None,
download_date: Optional[datetime] = None,
) -> bool:
"""Update the download status and metadata for a specific episode."""
try:

View File

@@ -38,13 +38,23 @@ class WatchHistoryService:
)
if self.media_api and self.media_api.is_authenticated():
self.media_api.update_list_entry(
if not self.media_api.update_list_entry(
UpdateUserMediaListEntryParams(
media_id=media_item.id,
progress=player_result.episode,
status=status,
progress=player_result.episode,
)
)
):
logger.info(
"successfully updated remote progress with {player_result.episode}"
)
else:
logger.warning(
"failed to update remote progress with {player_result.episode}"
)
else:
logger.warning("Not logged in")
def get_episode(self, media_item: MediaItem):
index_entry = self.media_registry.get_media_index_entry(media_item.id)
@@ -116,3 +126,18 @@ class WatchHistoryService:
progress=progress,
)
)
logger.info("updating remote progressd")
else:
logger.warning("Not logged in")
def add_media_to_list_if_not_present(self, media_item: MediaItem):
"""Adds a media item to the user's PLANNING list if it's not already on any list."""
if not self.media_api or not self.media_api.is_authenticated():
return
# If user_status is None, it means the item is not on the user's list.
if media_item.user_status is None:
logger.info(
f"'{media_item.title.english}' not on list. Adding to 'Planning'."
)
self.update(media_item, status=UserMediaListStatus.PLANNING)

View File

@@ -0,0 +1,61 @@
import logging
import time
from fastanime.cli.service.download.service import DownloadService
from fastanime.cli.service.notification.service import NotificationService
from fastanime.core.config.model import WorkerConfig
logger = logging.getLogger(__name__)
class BackgroundWorkerService:
def __init__(
self,
config: WorkerConfig,
notification_service: NotificationService,
download_service: DownloadService,
):
self.config = config
self.notification_service = notification_service
self.download_service = download_service
self.running = True
def run(self):
logger.info("Background worker started.")
last_notification_check = 0
last_download_check = 0
notification_interval_sec = self.config.notification_check_interval * 60
download_interval_sec = self.config.download_check_interval * 60
self.download_service.start()
try:
while self.running:
current_time = time.time()
# Check for notifications
if current_time - last_notification_check > notification_interval_sec:
try:
self.notification_service.check_and_display_notifications()
except Exception as e:
logger.error(f"Error during notification check: {e}")
last_notification_check = current_time
# Process download queue
if current_time - last_download_check > download_interval_sec:
try:
self.download_service.resume_unfinished_downloads()
except Exception as e:
logger.error(f"Error during download queue processing: {e}")
last_download_check = current_time
# Sleep for a short interval to prevent high CPU usage
time.sleep(30) # Sleep for 30 seconds before next check cycle
except KeyboardInterrupt:
logger.info("Background worker stopped by user.")
self.stop()
def stop(self):
self.running = False
logger.info("Background worker shutting down.")

View File

@@ -7,7 +7,6 @@ from .model import (
MediaRegistryConfig,
MpvConfig,
RofiConfig,
ServiceConfig,
StreamConfig,
VlcConfig,
)
@@ -22,6 +21,5 @@ __all__ = [
"StreamConfig",
"GeneralConfig",
"DownloadsConfig",
"ServiceConfig",
"MediaRegistryConfig",
]

View File

@@ -39,14 +39,10 @@ STREAM_USE_IPC = (
lambda: True if PLATFORM != "win32" and not detect.is_running_in_termux() else False
)
# ServiceConfig
SERVICE_ENABLED = False
SERVICE_WATCHLIST_CHECK_INTERVAL = 30
SERVICE_QUEUE_PROCESS_INTERVAL = 1
SERVICE_MAX_CONCURRENT_DOWNLOADS = 3
SERVICE_AUTO_RETRY_COUNT = 3
SERVICE_CLEANUP_COMPLETED_DAYS = 7
SERVICE_NOTIFICATION_ENABLED = True
# WorkerConfig
WORKER_ENABLED = True
WORKER_NOTIFICATION_CHECK_INTERVAL = 15 # minutes
WORKER_DOWNLOAD_CHECK_INTERVAL = 5 # minutes
# FzfConfig
FZF_OPTS = DEFAULTS_DIR / "fzf-opts"
@@ -78,22 +74,13 @@ ANILIST_PREFERRED_LANGUAGE = "english"
DOWNLOADS_DOWNLOADER = "auto"
DOWNLOADS_DOWNLOADS_DIR = USER_VIDEOS_DIR
DOWNLOADS_ENABLE_TRACKING = True
DOWNLOADS_AUTO_ORGANIZE = True
DOWNLOADS_NO_CHECK_CERTIFICATE = True
DOWNLOADS_MAX_CONCURRENT = 3
DOWNLOADS_AUTO_CLEANUP_FAILED = True
DOWNLOADS_RETENTION_DAYS = 30
DOWNLOADS_SYNC_WITH_WATCH_HISTORY = True
DOWNLOADS_AUTO_MARK_OFFLINE = True
DOWNLOADS_NAMING_TEMPLATE = (
"{title}/Season {season:02d}/{episode:02d} - {episode_title}.{ext}"
)
DOWNLOADS_PREFERRED_QUALITY = "1080"
DOWNLOADS_DOWNLOAD_SUBTITLES = True
DOWNLOADS_SUBTITLE_LANGUAGES = ["en"]
DOWNLOADS_QUEUE_MAX_SIZE = 100
DOWNLOADS_AUTO_START_DOWNLOADS = True
DOWNLOADS_RETRY_ATTEMPTS = 3
DOWNLOADS_RETRY_DELAY = 300
DOWNLOADS_RETRY_ATTEMPTS = 2
DOWNLOADS_RETRY_DELAY = 60
DOWNLOADS_MERGE_SUBTITLES = True
DOWNLOADS_CLEANUP_AFTER_MERGE = True
# RegistryConfig
MEDIA_REGISTRY_DIR = USER_VIDEOS_DIR / ".registry"

View File

@@ -58,18 +58,13 @@ STREAM_DEFAULT_MEDIA_LIST_TRACKING = (
STREAM_SUB_LANG = "Preferred language code for subtitles (e.g., 'en', 'es')."
STREAM_USE_IPC = "Use IPC communication with the player for advanced features like episode navigation."
# ServiceConfig
SERVICE_ENABLED = "Whether the background service should be enabled by default."
SERVICE_WATCHLIST_CHECK_INTERVAL = (
"Minutes between checking AniList watchlist for new episodes."
# WorkerConfig
APP_WORKER = "Configuration for the background worker service."
WORKER_ENABLED = "Enable the background worker for notifications and queued downloads."
WORKER_NOTIFICATION_CHECK_INTERVAL = (
"How often to check for new AniList notifications (in minutes)."
)
SERVICE_QUEUE_PROCESS_INTERVAL = "Minutes between processing the download queue."
SERVICE_MAX_CONCURRENT_DOWNLOADS = "Maximum number of concurrent downloads."
SERVICE_AUTO_RETRY_COUNT = "Number of times to retry failed downloads."
SERVICE_CLEANUP_COMPLETED_DAYS = (
"Days to keep completed/failed jobs in queue before cleanup."
)
SERVICE_NOTIFICATION_ENABLED = "Whether to show notifications for new episodes."
WORKER_DOWNLOAD_CHECK_INTERVAL = "How often to process the download queue (in minutes)."
# FzfConfig
FZF_HEADER_COLOR = "RGB color for the main TUI header."
@@ -102,22 +97,17 @@ ANILIST_PREFERRED_LANGUAGE = "Preferred language for anime titles from AniList."
DOWNLOADS_DOWNLOADER = "The downloader to use"
DOWNLOADS_DOWNLOADS_DIR = "The default directory to save downloaded anime."
DOWNLOADS_ENABLE_TRACKING = "Enable download tracking and management"
DOWNLOADS_AUTO_ORGANIZE = "Automatically organize downloads by anime title"
DOWNLOADS_MAX_CONCURRENT = "Maximum concurrent downloads"
DOWNLOADS_AUTO_CLEANUP_FAILED = "Automatically cleanup failed downloads"
DOWNLOADS_RETENTION_DAYS = "Days to keep failed downloads before cleanup"
DOWNLOADS_SYNC_WITH_WATCH_HISTORY = "Sync download status with watch history"
DOWNLOADS_AUTO_MARK_OFFLINE = (
"Automatically mark downloaded episodes as available offline"
)
DOWNLOADS_NAMING_TEMPLATE = "File naming template for downloaded episodes"
DOWNLOADS_PREFERRED_QUALITY = "Preferred download quality"
DOWNLOADS_DOWNLOAD_SUBTITLES = "Download subtitles when available"
DOWNLOADS_SUBTITLE_LANGUAGES = "Preferred subtitle languages"
DOWNLOADS_QUEUE_MAX_SIZE = "Maximum number of items in download queue"
DOWNLOADS_AUTO_START_DOWNLOADS = "Automatically start downloads when items are queued"
DOWNLOADS_MAX_CONCURRENT = "Maximum number of concurrent downloads"
DOWNLOADS_NO_CHECK_CERTIFICATE = "Whether or not to check certificates"
DOWNLOADS_RETRY_ATTEMPTS = "Number of retry attempts for failed downloads"
DOWNLOADS_RETRY_DELAY = "Delay between retry attempts in seconds"
DOWNLOADS_MERGE_SUBTITLES = (
"Automatically merge subtitles into the video file after download."
)
DOWNLOADS_CLEANUP_AFTER_MERGE = (
"Delete the original video and subtitle files after a successful merge."
)
# RegistryConfig
MEDIA_REGISTRY_DIR = "The default directory to save media registry"

View File

@@ -140,53 +140,29 @@ class StreamConfig(BaseModel):
)
class ServiceConfig(BaseModel):
"""Configuration for the background download service."""
enabled: bool = Field(
default=defaults.SERVICE_ENABLED,
description=desc.SERVICE_ENABLED,
)
watchlist_check_interval: int = Field(
default=defaults.SERVICE_WATCHLIST_CHECK_INTERVAL,
ge=5,
le=180,
description=desc.SERVICE_WATCHLIST_CHECK_INTERVAL,
)
queue_process_interval: int = Field(
default=defaults.SERVICE_QUEUE_PROCESS_INTERVAL,
ge=1,
le=60,
description=desc.SERVICE_QUEUE_PROCESS_INTERVAL,
)
max_concurrent_downloads: int = Field(
default=defaults.SERVICE_MAX_CONCURRENT_DOWNLOADS,
ge=1,
le=10,
description=desc.SERVICE_MAX_CONCURRENT_DOWNLOADS,
)
auto_retry_count: int = Field(
default=defaults.SERVICE_AUTO_RETRY_COUNT,
ge=0,
le=10,
description=desc.SERVICE_AUTO_RETRY_COUNT,
)
cleanup_completed_days: int = Field(
default=defaults.SERVICE_CLEANUP_COMPLETED_DAYS,
ge=1,
le=30,
description=desc.SERVICE_CLEANUP_COMPLETED_DAYS,
)
notification_enabled: bool = Field(
default=defaults.SERVICE_NOTIFICATION_ENABLED,
description=desc.SERVICE_NOTIFICATION_ENABLED,
)
class OtherConfig(BaseModel):
pass
class WorkerConfig(OtherConfig):
"""Configuration for the background worker service."""
enabled: bool = Field(
default=True,
description="Enable the background worker for notifications and queued downloads.",
)
notification_check_interval: int = Field(
default=15, # in minutes
ge=1,
description="How often to check for new AniList notifications (in minutes).",
)
download_check_interval: int = Field(
default=5, # in minutes
ge=1,
description="How often to process the download queue (in minutes).",
)
class SessionsConfig(OtherConfig):
dir: Path = Field(
default_factory=lambda: defaults.SESSIONS_DIR,
@@ -327,73 +303,19 @@ class DownloadsConfig(OtherConfig):
downloader: Literal["auto", "default", "yt-dlp"] = Field(
default=defaults.DOWNLOADS_DOWNLOADER, description=desc.DOWNLOADS_DOWNLOADER
)
downloads_dir: Path = Field(
default_factory=lambda: defaults.DOWNLOADS_DOWNLOADS_DIR,
description=desc.DOWNLOADS_DOWNLOADS_DIR,
)
# Download tracking configuration
enable_tracking: bool = Field(
default=defaults.DOWNLOADS_ENABLE_TRACKING,
description=desc.DOWNLOADS_ENABLE_TRACKING,
)
auto_organize: bool = Field(
default=defaults.DOWNLOADS_AUTO_ORGANIZE,
description=desc.DOWNLOADS_AUTO_ORGANIZE,
)
max_concurrent: int = Field(
max_concurrent_downloads: int = Field(
default=defaults.DOWNLOADS_MAX_CONCURRENT,
gt=0,
le=10,
ge=1,
description=desc.DOWNLOADS_MAX_CONCURRENT,
)
auto_cleanup_failed: bool = Field(
default=defaults.DOWNLOADS_AUTO_CLEANUP_FAILED,
description=desc.DOWNLOADS_AUTO_CLEANUP_FAILED,
)
retention_days: int = Field(
default=defaults.DOWNLOADS_RETENTION_DAYS,
gt=0,
description=desc.DOWNLOADS_RETENTION_DAYS,
)
# Integration with watch history
sync_with_watch_history: bool = Field(
default=defaults.DOWNLOADS_SYNC_WITH_WATCH_HISTORY,
description=desc.DOWNLOADS_SYNC_WITH_WATCH_HISTORY,
)
auto_mark_offline: bool = Field(
default=defaults.DOWNLOADS_AUTO_MARK_OFFLINE,
description=desc.DOWNLOADS_AUTO_MARK_OFFLINE,
)
# File organization
naming_template: str = Field(
default=defaults.DOWNLOADS_NAMING_TEMPLATE,
description=desc.DOWNLOADS_NAMING_TEMPLATE,
)
# Quality and subtitles
preferred_quality: Literal["360", "480", "720", "1080", "best"] = Field(
default=defaults.DOWNLOADS_PREFERRED_QUALITY,
description=desc.DOWNLOADS_PREFERRED_QUALITY,
)
download_subtitles: bool = Field(
default=defaults.DOWNLOADS_DOWNLOAD_SUBTITLES,
description=desc.DOWNLOADS_DOWNLOAD_SUBTITLES,
)
# Queue management
queue_max_size: int = Field(
default=defaults.DOWNLOADS_QUEUE_MAX_SIZE,
gt=0,
description=desc.DOWNLOADS_QUEUE_MAX_SIZE,
)
auto_start_downloads: bool = Field(
default=defaults.DOWNLOADS_AUTO_START_DOWNLOADS,
description=desc.DOWNLOADS_AUTO_START_DOWNLOADS,
)
retry_attempts: int = Field(
default=defaults.DOWNLOADS_RETRY_ATTEMPTS,
ge=0,
@@ -404,6 +326,28 @@ class DownloadsConfig(OtherConfig):
ge=0,
description=desc.DOWNLOADS_RETRY_DELAY,
)
merge_subtitles: bool = Field(
default=defaults.DOWNLOADS_MERGE_SUBTITLES,
description=desc.DOWNLOADS_MERGE_SUBTITLES,
)
cleanup_after_merge: bool = Field(
default=defaults.DOWNLOADS_CLEANUP_AFTER_MERGE,
description=desc.DOWNLOADS_CLEANUP_AFTER_MERGE,
)
server: ProviderServer = Field(
default=ProviderServer.TOP,
description=desc.STREAM_SERVER,
)
ytdlp_format: str = Field(
default=defaults.STREAM_YTDLP_FORMAT,
description=desc.STREAM_YTDLP_FORMAT,
)
no_check_certificate: bool = Field(
default=defaults.DOWNLOADS_NO_CHECK_CERTIFICATE,
description=desc.DOWNLOADS_NO_CHECK_CERTIFICATE,
)
class MediaRegistryConfig(OtherConfig):
@@ -442,11 +386,6 @@ class AppConfig(BaseModel):
default_factory=JikanConfig,
description=desc.APP_JIKAN,
)
service: ServiceConfig = Field(
default_factory=ServiceConfig,
description=desc.APP_SERVICE,
)
fzf: FzfConfig = Field(
default_factory=FzfConfig,
description=desc.APP_FZF,
@@ -456,13 +395,13 @@ class AppConfig(BaseModel):
description=desc.APP_ROFI,
)
mpv: MpvConfig = Field(default_factory=MpvConfig, description=desc.APP_MPV)
service: ServiceConfig = Field(
default_factory=ServiceConfig,
description=desc.APP_SERVICE,
)
media_registry: MediaRegistryConfig = Field(
default_factory=MediaRegistryConfig, description=desc.APP_MEDIA_REGISTRY
)
sessions: SessionsConfig = Field(
default_factory=SessionsConfig, description=desc.APP_SESSIONS
)
worker: WorkerConfig = Field(
default_factory=WorkerConfig,
description="Configuration for the background worker service.",
)

View File

@@ -3,8 +3,8 @@ from abc import ABC, abstractmethod
import httpx
from ..config.model import DownloadsConfig
from .params import DownloadParams
from .model import DownloadResult
from .params import DownloadParams
class BaseDownloader(ABC):
@@ -13,7 +13,13 @@ class BaseDownloader(ABC):
def __init__(self, config: DownloadsConfig):
self.config = config
self.client = httpx.Client()
# Increase timeouts and add retries for robustness
transport = httpx.HTTPTransport(retries=3)
self.client = httpx.Client(
transport=transport,
timeout=httpx.Timeout(15.0, connect=60.0),
follow_redirects=True,
)
@abstractmethod
def download(self, params: DownloadParams) -> DownloadResult:

View File

@@ -20,3 +20,4 @@ class DownloadParams:
force_ffmpeg: bool = False
hls_use_mpegts: bool = False
hls_use_h264: bool = False
no_check_certificate: bool = True

View File

@@ -6,17 +6,18 @@ import tempfile
from pathlib import Path
import httpx
import yt_dlp
from rich import print
from rich.prompt import Confirm
import yt_dlp
from yt_dlp.utils import sanitize_filename
from ..exceptions import FastAnimeError
from ..patterns import TORRENT_REGEX
from ..utils.networking import get_remote_filename
from .base import BaseDownloader
from .params import DownloadParams
from .model import DownloadResult
from .params import DownloadParams
logger = logging.getLogger(__name__)
@@ -89,6 +90,7 @@ class YtDLPDownloader(BaseDownloader):
if params.force_unknown_ext
else tuple(),
"progress_hooks": params.progress_hooks,
"nocheckcertificate": params.no_check_certificate,
}
opts = opts
if params.force_ffmpeg:

View File

@@ -25,6 +25,7 @@ from ..types import (
MediaItem,
MediaReview,
MediaSearchResult,
Notification,
UserMediaListStatus,
UserProfile,
)
@@ -276,6 +277,20 @@ class AniListApi(BaseApiClient):
return mapper.to_generic_reviews_list(response.json())
return None
def get_notifications(self) -> Optional[List[Notification]]:
"""Fetches the user's unread notifications from AniList."""
if not self.is_authenticated():
logger.warning("Cannot fetch notifications: user is not authenticated.")
return None
response = execute_graphql(
ANILIST_ENDPOINT, self.http_client, gql.GET_NOTIFICATIONS, {}
)
if response and "errors" not in response.json():
return mapper.to_generic_notifications(response.json())
logger.error(f"Failed to fetch notifications: {response.text}")
return None
def transform_raw_search_data(self, raw_data: Any) -> Optional[MediaSearchResult]:
"""
Transform raw AniList API response data into a MediaSearchResult.

View File

@@ -25,6 +25,8 @@ from ..types import (
MediaTagItem,
MediaTitle,
MediaTrailer,
Notification,
NotificationType,
PageInfo,
Reviewer,
StreamingEpisode,
@@ -45,6 +47,8 @@ from .types import (
AnilistMediaTag,
AnilistMediaTitle,
AnilistMediaTrailer,
AnilistNotification,
AnilistNotifications,
AnilistPageInfo,
AnilistReview,
AnilistReviews,
@@ -520,3 +524,70 @@ def to_generic_airing_schedule_result(data: Dict) -> Optional[AiringScheduleResu
except (KeyError, IndexError, TypeError) as e:
logger.error(f"Error parsing airing schedule data: {e}")
return None
def _to_generic_media_item_from_notification_partial(
data: AnilistBaseMediaDataSchema,
) -> MediaItem:
"""
A specialized mapper for the partial MediaItem object received in notifications.
It provides default values for fields not present in the notification's media payload.
"""
return MediaItem(
id=data["id"],
id_mal=data.get("idMal"),
title=_to_generic_media_title(data["title"]),
cover_image=_to_generic_media_image(data["coverImage"]),
# Provide default/empty values for fields not in notification payload
type="ANIME",
status=MediaStatus.RELEASING, # Assume releasing for airing notifications
format=None,
description=None,
episodes=None,
duration=None,
genres=[],
tags=[],
studios=[],
synonymns=[],
average_score=None,
popularity=None,
favourites=None,
streaming_episodes={},
user_status=None,
)
def _to_generic_notification(anilist_notification: AnilistNotification) -> Notification:
"""Maps a single AniList notification to a generic Notification object."""
return Notification(
id=anilist_notification["id"],
type=NotificationType(anilist_notification["type"]),
episode=anilist_notification.get("episode"),
contexts=anilist_notification.get("contexts", []),
created_at=datetime.fromtimestamp(anilist_notification["createdAt"]),
media=_to_generic_media_item_from_notification_partial(
anilist_notification["media"]
),
)
def to_generic_notifications(
data: AnilistNotifications,
) -> Optional[List[Notification]]:
"""Top-level mapper for a list of notifications."""
if not data or "data" not in data:
return None
page_data = data["data"].get("Page", {})
if not page_data:
return None
raw_notifications = page_data.get("notifications", [])
if not raw_notifications:
return []
return [
_to_generic_notification(notification)
for notification in raw_notifications
if notification
]

View File

@@ -1,4 +1,4 @@
from typing import Literal, Optional, TypedDict
from typing import List, Literal, Optional, TypedDict
class AnilistPageInfo(TypedDict):
@@ -226,28 +226,6 @@ class AnilistDataSchema(TypedDict):
data: AnilistPages
class AnilistNotification(TypedDict):
id: int
type: str
episode: int
context: str
createdAt: str
media: AnilistBaseMediaDataSchema
class AnilistNotificationPage(TypedDict):
pageInfo: AnilistPageInfo
notifications: list[AnilistNotification]
class AnilistNotificationPages(TypedDict):
Page: AnilistNotificationPage
class AnilistNotifications(TypedDict):
data: AnilistNotificationPages
class AnilistMediaList(TypedDict):
media: AnilistBaseMediaDataSchema
status: AnilistMediaListStatus
@@ -271,3 +249,25 @@ class AnilistMediaListPages(TypedDict):
class AnilistMediaLists(TypedDict):
data: AnilistMediaListPages
class AnilistNotification(TypedDict):
id: int
type: str
episode: int
contexts: List[str]
createdAt: int # This is a Unix timestamp
media: AnilistBaseMediaDataSchema # This will be a partial response
class AnilistNotificationPage(TypedDict):
pageInfo: AnilistPageInfo
notifications: list[AnilistNotification]
class AnilistNotificationPages(TypedDict):
Page: AnilistNotificationPage
class AnilistNotifications(TypedDict):
data: AnilistNotificationPages

View File

@@ -18,6 +18,7 @@ from .types import (
MediaItem,
MediaReview,
MediaSearchResult,
Notification,
UserProfile,
)
@@ -95,6 +96,11 @@ class BaseApiClient(abc.ABC):
) -> Optional[List[MediaReview]]:
pass
@abc.abstractmethod
def get_notifications(self) -> Optional[List[Notification]]:
"""Fetches the user's unread notifications."""
pass
@abc.abstractmethod
def transform_raw_search_data(self, raw_data: Dict) -> Optional[MediaSearchResult]:
"""

View File

@@ -1,5 +1,3 @@
from __future__ import annotations
import logging
from typing import TYPE_CHECKING, List, Optional
@@ -20,6 +18,7 @@ from ..types import (
MediaItem,
MediaSearchResult,
MediaTitle,
Notification,
UserProfile,
)
from . import mapper
@@ -183,6 +182,11 @@ class JikanApi(BaseApiClient):
logger.error(f"Failed to fetch related anime for media {params.id}: {e}")
return None
def get_notifications(self) -> Optional[List[Notification]]:
"""Jikan is a public API and does not support user notifications."""
logger.warning("Jikan API does not support fetching user notifications.")
return None
def get_airing_schedule_for(
self, params: MediaAiringScheduleParams
) -> Optional[AiringScheduleResult]:

View File

@@ -65,6 +65,13 @@ class MediaFormat(Enum):
ONE_SHOT = "ONE_SHOT"
class NotificationType(Enum):
AIRING = "AIRING"
RELATED_MEDIA_ADDITION = "RELATED_MEDIA_ADDITION"
MEDIA_DATA_CHANGE = "MEDIA_DATA_CHANGE"
# ... add other types as needed
# MODELS
class BaseMediaApiModel(BaseModel):
model_config = ConfigDict(frozen=True)
@@ -227,6 +234,17 @@ class MediaItem(BaseMediaApiModel):
user_status: Optional[UserListItem] = None
class Notification(BaseMediaApiModel):
"""A generic representation of a user notification."""
id: int
type: NotificationType
episode: Optional[int] = None
contexts: List[str] = Field(default_factory=list)
created_at: datetime
media: MediaItem
class PageInfo(BaseMediaApiModel):
"""Generic pagination information."""

View File

@@ -24,12 +24,18 @@ standard = [
"lxml>=6.0.0",
"pypresence>=4.3.0",
"thefuzz>=0.22.1",
"yt-dlp>=2025.7.21",
"pycryptodomex>=3.23.0",
]
notifications = ["plyer>=2.1.0"]
mpv = ["mpv>=1.0.7"]
torrent = ["libtorrent>=2.0.11"]
lxml = ["lxml>=6.0.0"]
discord = ["pypresence>=4.3.0"]
download = [
"pycryptodomex>=3.23.0",
"yt-dlp>=2025.7.21",
]
[build-system]
requires = ["hatchling"]

56
uv.lock generated
View File

@@ -111,6 +111,10 @@ dependencies = [
discord = [
{ name = "pypresence" },
]
download = [
{ name = "pycryptodomex" },
{ name = "yt-dlp" },
]
lxml = [
{ name = "lxml" },
]
@@ -125,8 +129,10 @@ standard = [
{ name = "lxml" },
{ name = "mpv" },
{ name = "plyer" },
{ name = "pycryptodomex" },
{ name = "pypresence" },
{ name = "thefuzz" },
{ name = "yt-dlp" },
]
torrent = [
{ name = "libtorrent" },
@@ -155,13 +161,17 @@ requires-dist = [
{ name = "mpv", marker = "extra == 'standard'", specifier = ">=1.0.7" },
{ name = "plyer", marker = "extra == 'notifications'", specifier = ">=2.1.0" },
{ name = "plyer", marker = "extra == 'standard'", specifier = ">=2.1.0" },
{ name = "pycryptodomex", marker = "extra == 'download'", specifier = ">=3.23.0" },
{ name = "pycryptodomex", marker = "extra == 'standard'", specifier = ">=3.23.0" },
{ name = "pydantic", specifier = ">=2.11.7" },
{ name = "pypresence", marker = "extra == 'discord'", specifier = ">=4.3.0" },
{ name = "pypresence", marker = "extra == 'standard'", specifier = ">=4.3.0" },
{ name = "rich", specifier = ">=13.9.2" },
{ name = "thefuzz", marker = "extra == 'standard'", specifier = ">=0.22.1" },
{ name = "yt-dlp", marker = "extra == 'download'", specifier = ">=2025.7.21" },
{ name = "yt-dlp", marker = "extra == 'standard'", specifier = ">=2025.7.21" },
]
provides-extras = ["standard", "notifications", "mpv", "torrent", "lxml", "discord"]
provides-extras = ["standard", "notifications", "mpv", "torrent", "lxml", "discord", "download"]
[package.metadata.requires-dev]
dev = [
@@ -509,6 +519,41 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/ce/4f/5249960887b1fbe561d9ff265496d170b55a735b76724f10ef19f9e40716/prompt_toolkit-3.0.51-py3-none-any.whl", hash = "sha256:52742911fde84e2d423e2f9a4cf1de7d7ac4e51958f648d9540e0fb8db077b07", size = 387810, upload-time = "2025-04-15T09:18:44.753Z" },
]
[[package]]
name = "pycryptodomex"
version = "3.23.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/c9/85/e24bf90972a30b0fcd16c73009add1d7d7cd9140c2498a68252028899e41/pycryptodomex-3.23.0.tar.gz", hash = "sha256:71909758f010c82bc99b0abf4ea12012c98962fbf0583c2164f8b84533c2e4da", size = 4922157, upload-time = "2025-05-17T17:23:41.434Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/2e/00/10edb04777069a42490a38c137099d4b17ba6e36a4e6e28bdc7470e9e853/pycryptodomex-3.23.0-cp313-cp313t-macosx_10_13_universal2.whl", hash = "sha256:7b37e08e3871efe2187bc1fd9320cc81d87caf19816c648f24443483005ff886", size = 2498764, upload-time = "2025-05-17T17:22:21.453Z" },
{ url = "https://files.pythonhosted.org/packages/6b/3f/2872a9c2d3a27eac094f9ceaa5a8a483b774ae69018040ea3240d5b11154/pycryptodomex-3.23.0-cp313-cp313t-macosx_10_13_x86_64.whl", hash = "sha256:91979028227543010d7b2ba2471cf1d1e398b3f183cb105ac584df0c36dac28d", size = 1643012, upload-time = "2025-05-17T17:22:23.702Z" },
{ url = "https://files.pythonhosted.org/packages/70/af/774c2e2b4f6570fbf6a4972161adbb183aeeaa1863bde31e8706f123bf92/pycryptodomex-3.23.0-cp313-cp313t-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:6b8962204c47464d5c1c4038abeadd4514a133b28748bcd9fa5b6d62e3cec6fa", size = 2187643, upload-time = "2025-05-17T17:22:26.37Z" },
{ url = "https://files.pythonhosted.org/packages/de/a3/71065b24cb889d537954cedc3ae5466af00a2cabcff8e29b73be047e9a19/pycryptodomex-3.23.0-cp313-cp313t-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a33986a0066860f7fcf7c7bd2bc804fa90e434183645595ae7b33d01f3c91ed8", size = 2273762, upload-time = "2025-05-17T17:22:28.313Z" },
{ url = "https://files.pythonhosted.org/packages/c9/0b/ff6f43b7fbef4d302c8b981fe58467b8871902cdc3eb28896b52421422cc/pycryptodomex-3.23.0-cp313-cp313t-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:c7947ab8d589e3178da3d7cdeabe14f841b391e17046954f2fbcd941705762b5", size = 2313012, upload-time = "2025-05-17T17:22:30.57Z" },
{ url = "https://files.pythonhosted.org/packages/02/de/9d4772c0506ab6da10b41159493657105d3f8bb5c53615d19452afc6b315/pycryptodomex-3.23.0-cp313-cp313t-musllinux_1_2_aarch64.whl", hash = "sha256:c25e30a20e1b426e1f0fa00131c516f16e474204eee1139d1603e132acffc314", size = 2186856, upload-time = "2025-05-17T17:22:32.819Z" },
{ url = "https://files.pythonhosted.org/packages/28/ad/8b30efcd6341707a234e5eba5493700a17852ca1ac7a75daa7945fcf6427/pycryptodomex-3.23.0-cp313-cp313t-musllinux_1_2_i686.whl", hash = "sha256:da4fa650cef02db88c2b98acc5434461e027dce0ae8c22dd5a69013eaf510006", size = 2347523, upload-time = "2025-05-17T17:22:35.386Z" },
{ url = "https://files.pythonhosted.org/packages/0f/02/16868e9f655b7670dbb0ac4f2844145cbc42251f916fc35c414ad2359849/pycryptodomex-3.23.0-cp313-cp313t-musllinux_1_2_x86_64.whl", hash = "sha256:58b851b9effd0d072d4ca2e4542bf2a4abcf13c82a29fd2c93ce27ee2a2e9462", size = 2272825, upload-time = "2025-05-17T17:22:37.632Z" },
{ url = "https://files.pythonhosted.org/packages/ca/18/4ca89ac737230b52ac8ffaca42f9c6f1fd07c81a6cd821e91af79db60632/pycryptodomex-3.23.0-cp313-cp313t-win32.whl", hash = "sha256:a9d446e844f08299236780f2efa9898c818fe7e02f17263866b8550c7d5fb328", size = 1772078, upload-time = "2025-05-17T17:22:40Z" },
{ url = "https://files.pythonhosted.org/packages/73/34/13e01c322db027682e00986873eca803f11c56ade9ba5bbf3225841ea2d4/pycryptodomex-3.23.0-cp313-cp313t-win_amd64.whl", hash = "sha256:bc65bdd9fc8de7a35a74cab1c898cab391a4add33a8fe740bda00f5976ca4708", size = 1803656, upload-time = "2025-05-17T17:22:42.139Z" },
{ url = "https://files.pythonhosted.org/packages/54/68/9504c8796b1805d58f4425002bcca20f12880e6fa4dc2fc9a668705c7a08/pycryptodomex-3.23.0-cp313-cp313t-win_arm64.whl", hash = "sha256:c885da45e70139464f082018ac527fdaad26f1657a99ee13eecdce0f0ca24ab4", size = 1707172, upload-time = "2025-05-17T17:22:44.704Z" },
{ url = "https://files.pythonhosted.org/packages/dd/9c/1a8f35daa39784ed8adf93a694e7e5dc15c23c741bbda06e1d45f8979e9e/pycryptodomex-3.23.0-cp37-abi3-macosx_10_9_universal2.whl", hash = "sha256:06698f957fe1ab229a99ba2defeeae1c09af185baa909a31a5d1f9d42b1aaed6", size = 2499240, upload-time = "2025-05-17T17:22:46.953Z" },
{ url = "https://files.pythonhosted.org/packages/7a/62/f5221a191a97157d240cf6643747558759126c76ee92f29a3f4aee3197a5/pycryptodomex-3.23.0-cp37-abi3-macosx_10_9_x86_64.whl", hash = "sha256:b2c2537863eccef2d41061e82a881dcabb04944c5c06c5aa7110b577cc487545", size = 1644042, upload-time = "2025-05-17T17:22:49.098Z" },
{ url = "https://files.pythonhosted.org/packages/8c/fd/5a054543c8988d4ed7b612721d7e78a4b9bf36bc3c5ad45ef45c22d0060e/pycryptodomex-3.23.0-cp37-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:43c446e2ba8df8889e0e16f02211c25b4934898384c1ec1ec04d7889c0333587", size = 2186227, upload-time = "2025-05-17T17:22:51.139Z" },
{ url = "https://files.pythonhosted.org/packages/c8/a9/8862616a85cf450d2822dbd4fff1fcaba90877907a6ff5bc2672cafe42f8/pycryptodomex-3.23.0-cp37-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:f489c4765093fb60e2edafdf223397bc716491b2b69fe74367b70d6999257a5c", size = 2272578, upload-time = "2025-05-17T17:22:53.676Z" },
{ url = "https://files.pythonhosted.org/packages/46/9f/bda9c49a7c1842820de674ab36c79f4fbeeee03f8ff0e4f3546c3889076b/pycryptodomex-3.23.0-cp37-abi3-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:bdc69d0d3d989a1029df0eed67cc5e8e5d968f3724f4519bd03e0ec68df7543c", size = 2312166, upload-time = "2025-05-17T17:22:56.585Z" },
{ url = "https://files.pythonhosted.org/packages/03/cc/870b9bf8ca92866ca0186534801cf8d20554ad2a76ca959538041b7a7cf4/pycryptodomex-3.23.0-cp37-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:6bbcb1dd0f646484939e142462d9e532482bc74475cecf9c4903d4e1cd21f003", size = 2185467, upload-time = "2025-05-17T17:22:59.237Z" },
{ url = "https://files.pythonhosted.org/packages/96/e3/ce9348236d8e669fea5dd82a90e86be48b9c341210f44e25443162aba187/pycryptodomex-3.23.0-cp37-abi3-musllinux_1_2_i686.whl", hash = "sha256:8a4fcd42ccb04c31268d1efeecfccfd1249612b4de6374205376b8f280321744", size = 2346104, upload-time = "2025-05-17T17:23:02.112Z" },
{ url = "https://files.pythonhosted.org/packages/a5/e9/e869bcee87beb89040263c416a8a50204f7f7a83ac11897646c9e71e0daf/pycryptodomex-3.23.0-cp37-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:55ccbe27f049743a4caf4f4221b166560d3438d0b1e5ab929e07ae1702a4d6fd", size = 2271038, upload-time = "2025-05-17T17:23:04.872Z" },
{ url = "https://files.pythonhosted.org/packages/8d/67/09ee8500dd22614af5fbaa51a4aee6e342b5fa8aecf0a6cb9cbf52fa6d45/pycryptodomex-3.23.0-cp37-abi3-win32.whl", hash = "sha256:189afbc87f0b9f158386bf051f720e20fa6145975f1e76369303d0f31d1a8d7c", size = 1771969, upload-time = "2025-05-17T17:23:07.115Z" },
{ url = "https://files.pythonhosted.org/packages/69/96/11f36f71a865dd6df03716d33bd07a67e9d20f6b8d39820470b766af323c/pycryptodomex-3.23.0-cp37-abi3-win_amd64.whl", hash = "sha256:52e5ca58c3a0b0bd5e100a9fbc8015059b05cffc6c66ce9d98b4b45e023443b9", size = 1803124, upload-time = "2025-05-17T17:23:09.267Z" },
{ url = "https://files.pythonhosted.org/packages/f9/93/45c1cdcbeb182ccd2e144c693eaa097763b08b38cded279f0053ed53c553/pycryptodomex-3.23.0-cp37-abi3-win_arm64.whl", hash = "sha256:02d87b80778c171445d67e23d1caef279bf4b25c3597050ccd2e13970b57fd51", size = 1707161, upload-time = "2025-05-17T17:23:11.414Z" },
{ url = "https://files.pythonhosted.org/packages/f3/b8/3e76d948c3c4ac71335bbe75dac53e154b40b0f8f1f022dfa295257a0c96/pycryptodomex-3.23.0-pp310-pypy310_pp73-macosx_10_15_x86_64.whl", hash = "sha256:ebfff755c360d674306e5891c564a274a47953562b42fb74a5c25b8fc1fb1cb5", size = 1627695, upload-time = "2025-05-17T17:23:17.38Z" },
{ url = "https://files.pythonhosted.org/packages/6a/cf/80f4297a4820dfdfd1c88cf6c4666a200f204b3488103d027b5edd9176ec/pycryptodomex-3.23.0-pp310-pypy310_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:eca54f4bb349d45afc17e3011ed4264ef1cc9e266699874cdd1349c504e64798", size = 1675772, upload-time = "2025-05-17T17:23:19.202Z" },
{ url = "https://files.pythonhosted.org/packages/d1/42/1e969ee0ad19fe3134b0e1b856c39bd0b70d47a4d0e81c2a8b05727394c9/pycryptodomex-3.23.0-pp310-pypy310_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:4f2596e643d4365e14d0879dc5aafe6355616c61c2176009270f3048f6d9a61f", size = 1668083, upload-time = "2025-05-17T17:23:21.867Z" },
{ url = "https://files.pythonhosted.org/packages/6e/c3/1de4f7631fea8a992a44ba632aa40e0008764c0fb9bf2854b0acf78c2cf2/pycryptodomex-3.23.0-pp310-pypy310_pp73-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:fdfac7cda115bca3a5abb2f9e43bc2fb66c2b65ab074913643803ca7083a79ea", size = 1706056, upload-time = "2025-05-17T17:23:24.031Z" },
{ url = "https://files.pythonhosted.org/packages/f2/5f/af7da8e6f1e42b52f44a24d08b8e4c726207434e2593732d39e7af5e7256/pycryptodomex-3.23.0-pp310-pypy310_pp73-win_amd64.whl", hash = "sha256:14c37aaece158d0ace436f76a7bb19093db3b4deade9797abfc39ec6cd6cc2fe", size = 1806478, upload-time = "2025-05-17T17:23:26.066Z" },
]
[[package]]
name = "pydantic"
version = "2.11.7"
@@ -997,3 +1042,12 @@ sdist = { url = "https://files.pythonhosted.org/packages/6c/63/53559446a878410fc
wheels = [
{ url = "https://files.pythonhosted.org/packages/fd/84/fd2ba7aafacbad3c4201d395674fc6348826569da3c0937e75505ead3528/wcwidth-0.2.13-py2.py3-none-any.whl", hash = "sha256:3da69048e4540d84af32131829ff948f1e022c1c6bdb8d6102117aac784f6859", size = 34166, upload-time = "2024-01-06T02:10:55.763Z" },
]
[[package]]
name = "yt-dlp"
version = "2025.7.21"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/7e/3a/343f7a0024ddd4c30f150e8d8f57fd7b924846f97d99fc0dcd75ea8d2773/yt_dlp-2025.7.21.tar.gz", hash = "sha256:46fbb53eab1afbe184c45b4c17e9a6eba614be680e4c09de58b782629d0d7f43", size = 3050219, upload-time = "2025-07-21T23:59:03.826Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/e8/2f/abe59a3204c749fed494849ea29176bcefa186ec8898def9e43f649ddbcf/yt_dlp-2025.7.21-py3-none-any.whl", hash = "sha256:d7aa2b53f9b2f35453346360f41811a0dad1e956e70b35a4ae95039d4d815d15", size = 3288681, upload-time = "2025-07-21T23:59:01.788Z" },
]