feat: interactive

This commit is contained in:
Benexl
2025-07-14 02:24:44 +03:00
parent 42bd4963b8
commit badd10bf97
18 changed files with 1171 additions and 793 deletions

View File

@@ -1,179 +0,0 @@
from __future__ import annotations
import logging
from typing import TYPE_CHECKING, List, Optional
from ....libs.anime.params import AnimeParams, EpisodeStreamsParams, SearchParams
from ....libs.anime.types import EpisodeStream, SearchResult, Server
from ....libs.players.base import PlayerResult
from ....Utility.utils import anime_title_percentage_match
if TYPE_CHECKING:
from ...interactive.session import Session
logger = logging.getLogger(__name__)
def find_best_provider_match(session: Session) -> Optional[SearchResult]:
"""Searches the provider via session and finds the best match."""
anime = session.state.anilist.selected_anime
if not anime:
return None
title = anime.get("title", {}).get("romaji") or anime.get("title", {}).get(
"english"
)
if not title:
return None
search_params = SearchParams(
query=title, translation_type=session.config.stream.translation_type
)
search_results_data = session.provider.search(search_params)
if not search_results_data or not search_results_data.results:
return None
best_match = max(
search_results_data.results,
key=lambda result: anime_title_percentage_match(result.title, anime),
)
return best_match
def get_stream_links(session: Session) -> List[Server]:
"""Fetches streams using the session's provider and state."""
anime_details = session.state.provider.anime_details
episode = session.state.provider.current_episode
if not anime_details or not episode:
return []
params = EpisodeStreamsParams(
anime_id=anime_details.id,
episode=episode,
translation_type=session.config.stream.translation_type,
)
stream_generator = session.provider.episode_streams(params)
return list(stream_generator) if stream_generator else []
def select_best_stream_quality(
servers: List[Server], quality: str, session: Session
) -> Optional[EpisodeStream]:
"""Selects the best quality stream from a list of servers."""
from ..ui import filter_by_quality
for server in servers:
if server.links:
link_info = filter_by_quality(quality, server.links)
if link_info:
session.state.provider.current_server = server
return link_info
return None
def play_stream(session: Session, stream_info: EpisodeStream) -> PlayerResult:
"""Handles media playback and updates watch history afterwards."""
server = session.state.provider.current_server
if not server:
return PlayerResult()
start_time = "0" # TODO: Implement watch history loading
playback_result = session.player.play(
url=stream_info.link,
title=server.episode_title or "FastAnime",
headers=server.headers,
subtitles=server.subtitles,
start_time=start_time,
)
update_watch_progress(session, playback_result)
return playback_result
def play_trailer(session: Session) -> None:
"""Plays the anime trailer using the session player."""
anime = session.state.anilist.selected_anime
if not anime or not anime.get("trailer"):
from ..ui import display_error
display_error("No trailer available for this anime.")
return
trailer_url = f"https://www.youtube.com/watch?v={anime['trailer']['id']}"
session.player.play(url=trailer_url, title=f"{anime['title']['romaji']} - Trailer")
def view_anime_info(session: Session) -> None:
"""Delegates the display of detailed anime info to the UI layer."""
from ..ui import display_anime_details
anime = session.state.anilist.selected_anime
if anime:
display_anime_details(anime)
def add_to_anilist(session: Session) -> None:
"""Prompts user for a list and adds the anime to it on AniList."""
from ..ui import display_error, prompt_add_to_list
if not session.config.user:
display_error("You must be logged in to modify your AniList.")
return
anime = session.state.anilist.selected_anime
if not anime:
return
list_status = prompt_add_to_list(session)
if not list_status:
return
success, data = session.anilist.update_anime_list(
{"status": list_status, "mediaId": anime["id"]}
)
if not success:
display_error(f"Failed to update AniList. Reason: {data}")
def update_watch_progress(session: Session, playback_result: PlayerResult) -> None:
"""Updates local and remote watch history based on playback result."""
from ....core.utils import time_to_seconds
stop_time_str = playback_result.stop_time
total_time_str = playback_result.total_time
anime = session.state.anilist.selected_anime
episode_num = session.state.provider.current_episode
if not all([stop_time_str, total_time_str, anime, episode_num]):
logger.debug("Insufficient data to update watch progress.")
return
try:
stop_seconds = time_to_seconds(stop_time_str)
total_seconds = time_to_seconds(total_time_str)
# Avoid division by zero
if total_seconds == 0:
return
percentage_watched = (stop_seconds / total_seconds) * 100
# TODO: Implement local watch history file update here
if percentage_watched >= session.config.stream.episode_complete_at:
logger.info(
f"Episode {episode_num} marked as complete ({percentage_watched:.1f}% watched)."
)
if session.config.user and session.state.tracking.progress_mode == "track":
logger.info(
f"Updating AniList progress for mediaId {anime['id']} to episode {episode_num}."
)
session.anilist.update_anime_list(
{"mediaId": anime["id"], "progress": int(episode_num)}
)
except (ValueError, TypeError) as e:
logger.error(f"Could not parse playback times to update progress: {e}")

View File

@@ -1,79 +0,0 @@
from __future__ import annotations
import logging
from typing import TYPE_CHECKING, Optional
from .states.base import GoBack, State
if TYPE_CHECKING:
from ..session import Session
logger = logging.getLogger(__name__)
class InteractiveController:
"""
Manages and executes the state-driven interactive session using a state stack
for robust navigation.
"""
def __init__(self, session: Session, history_stack: Optional[list[State]] = None):
"""
Initializes the interactive controller.
Args:
session: The global session object.
history_stack: An optional pre-populated history stack, used for
resuming a previous session.
"""
from .states.menu_states import MainMenuState
self.session = session
self.history_stack: list[State] = history_stack or [MainMenuState()]
@property
def current_state(self) -> State:
"""The current active state is the top of the stack."""
return self.history_stack[-1]
def run(self) -> None:
"""
Starts and runs the state machine loop until an exit condition is met
(e.g., an empty history stack or an explicit stop signal).
"""
logger.info(
f"Starting controller with initial state: {self.current_state.__class__.__name__}"
)
while self.history_stack and self.session.is_running:
try:
result = self.current_state.run(self.session)
if result is None:
logger.info("Exit signal received from state. Stopping controller.")
self.history_stack.clear()
break
if result is GoBack:
if len(self.history_stack) > 1:
self.history_stack.pop()
logger.debug(
f"Navigating back to: {self.current_state.__class__.__name__}"
)
else:
logger.info("Cannot go back from root state. Exiting.")
self.history_stack.clear()
elif isinstance(result, State):
self.history_stack.append(result)
logger.debug(
f"Transitioning forward to: {result.__class__.__name__}"
)
except Exception:
logger.exception(
"An unhandled error occurred in the interactive session."
)
self.session.stop()
self.history_stack.clear()
logger.info("Interactive session finished.")

View File

@@ -1,37 +0,0 @@
from __future__ import annotations
import abc
from typing import TYPE_CHECKING, Optional
if TYPE_CHECKING:
from ...session import Session
class State(abc.ABC):
"""Abstract Base Class for a state in the workflow."""
@abc.abstractmethod
def run(self, session: Session) -> Optional[State | type[GoBack]]:
"""
Executes the logic for this state.
This method should contain the primary logic for a given UI screen
or background task. It orchestrates calls to the UI and actions layers
and determines the next step in the application flow.
Args:
session: The global session object containing all context.
Returns:
- A new State instance to transition to for forward navigation.
- The `GoBack` class to signal a backward navigation.
- None to signal an application exit.
"""
pass
# --- Navigation Signals ---
class GoBack:
"""A signal class to indicate a backward navigation request from a state."""
pass

View File

@@ -1,115 +0,0 @@
from __future__ import annotations
import logging
from typing import TYPE_CHECKING, Callable, Optional, Tuple
from .....libs.api.base import ApiSearchParams
from .base import GoBack, State
from .task_states import AnimeActionsState
if TYPE_CHECKING:
from .....libs.api.types import MediaSearchResult
from ...session import Session
from .. import ui
logger = logging.getLogger(__name__)
class MainMenuState(State):
"""Handles the main menu display and action routing."""
def run(self, session: Session) -> Optional[State | type[GoBack]]:
from .. import ui
# Define actions as tuples: (Display Name, SearchParams, Next State)
# This centralizes the "business logic" of what each menu item means.
menu_actions: List[
Tuple[str, Callable[[], Optional[ApiSearchParams]], Optional[State]]
] = [
(
"🔥 Trending",
lambda: ApiSearchParams(sort="TRENDING_DESC"),
ResultsState(),
),
(
"🌟 Most Popular",
lambda: ApiSearchParams(sort="POPULARITY_DESC"),
ResultsState(),
),
(
"💖 Most Favourite",
lambda: ApiSearchParams(sort="FAVOURITES_DESC"),
ResultsState(),
),
(
"🔎 Search",
lambda: ApiSearchParams(query=ui.prompt_for_search(session)),
ResultsState(),
),
(
"📺 Watching",
lambda: session.api_client.fetch_user_list,
ResultsState(),
), # Direct method call
("❌ Exit", lambda: None, None),
]
display_choices = [action[0] for action in menu_actions]
choice_str = ui.prompt_main_menu(session, display_choices)
if not choice_str:
return None
# Find the chosen action
chosen_action = next(
(action for action in menu_actions if action[0] == choice_str), None
)
if not chosen_action:
return self # Should not happen
_, param_creator, next_state = chosen_action
if not next_state: # Exit case
return None
# Execute the data fetch
with ui.progress_spinner(f"Fetching {choice_str.strip('🔥🔎📺🌟💖❌ ')}..."):
if choice_str == "📺 Watching": # Special case for user list
result_data = param_creator(status="CURRENT")
else:
search_params = param_creator()
if search_params is None: # User cancelled search prompt
return self
result_data = session.api_client.search_media(search_params)
if not result_data:
ui.display_error(f"Failed to fetch data for '{choice_str}'.")
return self
session.state.anilist.results_data = result_data # Store the generic dataclass
return next_state
class ResultsState(State):
"""Displays a list of anime and handles pagination and selection."""
def run(self, session: Session) -> Optional[State | type[GoBack]]:
from .. import ui
search_result = session.state.anilist.results_data
if not search_result or not isinstance(search_result, MediaSearchResult):
ui.display_error("No results to display.")
return GoBack
selection = ui.prompt_anime_selection(session, search_result.media)
if selection == "Back":
return GoBack
if selection is None:
return None
# TODO: Implement pagination logic here by checking selection for "Next Page" etc.
# and re-calling the search_media method with an updated page number.
session.state.anilist.selected_anime = selection
return AnimeActionsState()

View File

@@ -1,145 +0,0 @@
from __future__ import annotations
import logging
from typing import TYPE_CHECKING, Optional
from ....libs.anime.params import AnimeParams
from .base import GoBack, State
if TYPE_CHECKING:
from ....libs.anime.types import Anime
from ...session import Session
from .. import actions, ui
logger = logging.getLogger(__name__)
class AnimeActionsState(State):
"""Displays actions for a single selected anime."""
def run(self, session: Session) -> Optional[State | type[GoBack]]:
from .. import actions, ui
anime = session.state.anilist.selected_anime
if not anime:
ui.display_error("No anime selected.")
return GoBack
action = ui.prompt_anime_actions(session, anime)
if not action:
return GoBack
if action == "Stream":
return ProviderSearchState()
elif action == "Watch Trailer":
actions.play_trailer(session)
return self
elif action == "Add to List":
actions.add_to_anilist(session)
return self
elif action == "Back":
return GoBack
return self
class ProviderSearchState(State):
"""Searches the provider for the selected AniList anime."""
def run(self, session: Session) -> Optional[State | type[GoBack]]:
from .. import actions, ui
anime = session.state.anilist.selected_anime
if not anime:
return GoBack
with ui.progress_spinner("Searching provider..."):
best_match = actions.find_best_provider_match(session)
if best_match:
session.state.provider.selected_search_result = best_match
return EpisodeSelectionState()
else:
title = anime.get("title", {}).get("romaji")
ui.display_error(
f"Could not find '{title}' on provider '{session.provider.__class__.__name__}'."
)
return GoBack
class EpisodeSelectionState(State):
"""Fetches the full episode list from the provider and lets the user choose."""
def run(self, session: Session) -> Optional[State | type[GoBack]]:
from .. import ui
search_result = session.state.provider.selected_search_result
if not search_result:
return GoBack
with ui.progress_spinner("Fetching episode list..."):
params = AnimeParams(anime_id=search_result.id)
anime_details: Optional[Anime] = session.provider.get(params)
if not anime_details:
ui.display_error("Failed to fetch episode details from provider.")
return GoBack
session.state.provider.anime_details = anime_details
episode_list = (
anime_details.episodes.sub
if session.config.stream.translation_type == "sub"
else anime_details.episodes.dub
)
if not episode_list:
ui.display_error(
f"No episodes of type '{session.config.stream.translation_type}' found."
)
return GoBack
selected_episode = ui.prompt_episode_selection(
session, sorted(episode_list, key=float), anime_details
)
if selected_episode is None:
return GoBack
session.state.provider.current_episode = selected_episode
return StreamPlaybackState()
class StreamPlaybackState(State):
"""Fetches stream links for the chosen episode and initiates playback."""
def run(self, session: Session) -> Optional[State | type[GoBack]]:
from .. import actions, ui
if (
not session.state.provider.anime_details
or not session.state.provider.current_episode
):
return GoBack
with ui.progress_spinner(
f"Fetching streams for episode {session.state.provider.current_episode}..."
):
stream_servers = actions.get_stream_links(session)
if not stream_servers:
ui.display_error("No streams found for this episode.")
return GoBack
best_link_info = actions.select_best_stream_quality(
stream_servers, session.config.stream.quality, session
)
if not best_link_info:
ui.display_error(
f"Could not find quality '{session.config.stream.quality}p'."
)
return GoBack
playback_result = actions.play_stream(session, best_link_info)
return GoBack

View File

@@ -0,0 +1,89 @@
from typing import TYPE_CHECKING
import click
from ..session import Context, session
from ..state import ControlFlow, ProviderState, State
if TYPE_CHECKING:
pass
@session.menu
def episodes(ctx: Context, state: State) -> State | ControlFlow:
"""
Displays available episodes for a selected provider anime and handles
the logic for continuing from watch history or manual selection.
"""
provider_anime = state.provider.anime
anilist_anime = state.media_api.anime
config = ctx.config
if not provider_anime or not anilist_anime:
click.echo("[bold red]Error: Anime details are missing.[/bold red]")
return ControlFlow.BACK
# Get the list of episode strings based on the configured translation type
available_episodes = getattr(
provider_anime.episodes, config.stream.translation_type, []
)
if not available_episodes:
click.echo(
f"[bold yellow]No '{config.stream.translation_type}' episodes found for this anime.[/bold yellow]"
)
return ControlFlow.BACK
chosen_episode: str | None = None
# --- "Continue from History" Logic ---
if config.stream.continue_from_watch_history:
progress = (
anilist_anime.user_status.progress
if anilist_anime.user_status and anilist_anime.user_status.progress
else 0
)
# Calculate the next episode based on progress
next_episode_num = str(progress + 1)
if next_episode_num in available_episodes:
click.echo(
f"[cyan]Continuing from history. Auto-selecting episode {next_episode_num}.[/cyan]"
)
chosen_episode = next_episode_num
else:
# If the next episode isn't available, fall back to the last watched one
last_watched_num = str(progress)
if last_watched_num in available_episodes:
click.echo(
f"[cyan]Next episode ({next_episode_num}) not found. Falling back to last watched episode {last_watched_num}.[/cyan]"
)
chosen_episode = last_watched_num
else:
click.echo(
f"[yellow]Could not find episode based on your watch history. Please select manually.[/yellow]"
)
# --- Manual Selection Logic ---
if not chosen_episode:
choices = [*sorted(available_episodes, key=float), "Back"]
# TODO: Implement FZF/Rofi preview for episode thumbnails if available
# preview_command = get_episode_preview(...)
chosen_episode_str = ctx.selector.choose(
prompt="Select Episode", choices=choices, header=provider_anime.title
)
if not chosen_episode_str or chosen_episode_str == "Back":
return ControlFlow.BACK
chosen_episode = chosen_episode_str
# --- Transition to Servers Menu ---
# Create a new state, updating the provider state with the chosen episode.
return State(
menu_name="SERVERS",
media_api=state.media_api,
provider=state.provider.model_copy(update={"episode_number": chosen_episode}),
)

View File

@@ -0,0 +1,157 @@
# fastanime/cli/interactive/menus/main.py
from __future__ import annotations
import random
from typing import TYPE_CHECKING, Callable, Dict, Tuple
import click
from rich.progress import Progress
from ....libs.api.params import ApiSearchParams, UserListParams
from ..session import Context, session
from ..state import ControlFlow, MediaApiState, State
if TYPE_CHECKING:
from ....libs.api.types import MediaSearchResult
# A type alias for the actions this menu can perform.
# It returns a tuple: (NextMenuNameOrControlFlow, Optional[DataPayload])
MenuAction = Callable[[], Tuple[str, MediaSearchResult | None]]
@session.menu
def main(ctx: Context, state: State) -> State | ControlFlow:
"""
The main entry point menu for the interactive session.
Displays top-level categories for the user to browse and select.
"""
icons = ctx.config.general.icons
api_client = ctx.media_api
per_page = ctx.config.anilist.per_page
# The lambdas now correctly use the versatile search_media for most actions.
options: Dict[str, MenuAction] = {
# --- Search-based Actions ---
f"{'🔥 ' if icons else ''}Trending": lambda: (
"RESULTS",
api_client.search_media(
ApiSearchParams(sort="TRENDING_DESC", per_page=per_page)
),
),
f"{'' if icons else ''}Popular": lambda: (
"RESULTS",
api_client.search_media(
ApiSearchParams(sort="POPULARITY_DESC", per_page=per_page)
),
),
f"{'💖 ' if icons else ''}Favourites": lambda: (
"RESULTS",
api_client.search_media(
ApiSearchParams(sort="FAVOURITES_DESC", per_page=per_page)
),
),
f"{'💯 ' if icons else ''}Top Scored": lambda: (
"RESULTS",
api_client.search_media(
ApiSearchParams(sort="SCORE_DESC", per_page=per_page)
),
),
f"{'🎬 ' if icons else ''}Upcoming": lambda: (
"RESULTS",
api_client.search_media(
ApiSearchParams(
status="NOT_YET_RELEASED", sort="POPULARITY_DESC", per_page=per_page
)
),
),
f"{'🔔 ' if icons else ''}Recently Updated": lambda: (
"RESULTS",
api_client.search_media(
ApiSearchParams(
status="RELEASING", sort="UPDATED_AT_DESC", per_page=per_page
)
),
),
f"{'🎲 ' if icons else ''}Random": lambda: (
"RESULTS",
api_client.search_media(
ApiSearchParams(
id_in=random.sample(range(1, 160000), k=50), per_page=per_page
)
),
),
f"{'🔎 ' if icons else ''}Search": lambda: (
"RESULTS",
api_client.search_media(
ApiSearchParams(query=ctx.selector.ask("Search for Anime"))
),
),
# --- Authenticated User List Actions ---
f"{'📺 ' if icons else ''}Watching": _create_user_list_action(ctx, "CURRENT"),
f"{'📑 ' if icons else ''}Planned": _create_user_list_action(ctx, "PLANNING"),
f"{'' if icons else ''}Completed": _create_user_list_action(
ctx, "COMPLETED"
),
f"{'⏸️ ' if icons else ''}Paused": _create_user_list_action(ctx, "PAUSED"),
f"{'🚮 ' if icons else ''}Dropped": _create_user_list_action(ctx, "DROPPED"),
f"{'🔁 ' if icons else ''}Rewatching": _create_user_list_action(
ctx, "REPEATING"
),
# --- Control Flow and Utility Options ---
f"{'📝 ' if icons else ''}Edit Config": lambda: ("RELOAD_CONFIG", None),
f"{'' if icons else ''}Exit": lambda: ("EXIT", None),
}
choice_str = ctx.selector.choose(
prompt="Select Category",
choices=list(options.keys()),
header="FastAnime Main Menu",
)
if not choice_str:
return ControlFlow.EXIT
# --- Action Handling ---
selected_action = options[choice_str]
with Progress(transient=True) as progress:
task = progress.add_task(f"[cyan]Fetching {choice_str.strip()}...", total=None)
next_menu_name, result_data = selected_action()
progress.update(task, completed=True)
if next_menu_name == "EXIT":
return ControlFlow.EXIT
if next_menu_name == "RELOAD_CONFIG":
return ControlFlow.RELOAD_CONFIG
if next_menu_name == "CONTINUE":
return ControlFlow.CONTINUE
if not result_data:
click.echo(
f"[bold red]Error:[/bold red] Failed to fetch data for '{choice_str.strip()}'."
)
return ControlFlow.CONTINUE
# On success, transition to the RESULTS menu state.
return State(
menu_name="RESULTS",
media_api=MediaApiState(search_results=result_data),
)
def _create_user_list_action(ctx: Context, status: str) -> MenuAction:
"""A factory to create menu actions for fetching user lists, handling authentication."""
def action() -> Tuple[str, MediaSearchResult | None]:
if not ctx.media_api.user_profile:
click.echo(
f"[bold yellow]Please log in to view your '{status.title()}' list.[/]"
)
return "CONTINUE", None
return "RESULTS", ctx.media_api.fetch_user_list(
UserListParams(status=status, per_page=ctx.config.anilist.per_page)
)
return action

View File

@@ -0,0 +1,137 @@
from typing import TYPE_CHECKING, Callable, Dict, Tuple
import click
from InquirerPy.validator import EmptyInputValidator, NumberValidator
from ....libs.api.params import UpdateListEntryParams
from ....libs.api.types import UserListStatusType
from ...utils.anilist import anilist_data_helper
from ..session import Context, session
from ..state import ControlFlow, MediaApiState, ProviderState, State
if TYPE_CHECKING:
from ....libs.api.types import MediaItem
@session.menu
def media_actions(ctx: Context, state: State) -> State | ControlFlow:
"""
Displays actions for a single, selected anime, such as streaming,
viewing details, or managing its status on the user's list.
"""
anime = state.media_api.anime
if not anime:
click.echo("[bold red]Error: No anime selected.[/bold red]")
return ControlFlow.BACK
icons = ctx.config.general.icons
selector = ctx.selector
player = ctx.player
# --- Action Implementations ---
def stream() -> State | ControlFlow:
# This is the key transition to the provider-focused part of the app.
# We create a new state for the next menu, carrying over the selected
# anime's details for the provider to use.
return State(
menu_name="PROVIDER_SEARCH",
media_api=state.media_api, # Carry over the existing api state
provider=ProviderState(), # Initialize a fresh provider state
)
def watch_trailer() -> State | ControlFlow:
if not anime.trailer or not anime.trailer.id:
click.echo(
"[bold yellow]No trailer available for this anime.[/bold yellow]"
)
else:
trailer_url = f"https://www.youtube.com/watch?v={anime.trailer.id}"
click.echo(
f"Playing trailer for '{anime.title.english or anime.title.romaji}'..."
)
player.play(url=trailer_url, title=f"Trailer: {anime.title.english}")
return ControlFlow.CONTINUE
def add_to_list() -> State | ControlFlow:
choices = ["CURRENT", "PLANNING", "COMPLETED", "DROPPED", "PAUSED", "REPEATING"]
status = selector.choose("Select list status:", choices=choices)
if status:
_update_user_list(
ctx,
anime,
UpdateListEntryParams(media_id=anime.id, status=status),
)
return ControlFlow.CONTINUE
def score_anime() -> State | ControlFlow:
score_str = selector.ask(
"Enter score (0.0 - 10.0):",
)
try:
score = float(score_str) if score_str else 0.0
if not 0.0 <= score <= 10.0:
raise ValueError("Score out of range.")
_update_user_list(
ctx, anime, UpdateListEntryParams(media_id=anime.id, score=score)
)
except (ValueError, TypeError):
click.echo(
"[bold red]Invalid score. Please enter a number between 0 and 10.[/bold red]"
)
return ControlFlow.CONTINUE
def view_info() -> State | ControlFlow:
# Placeholder for a more detailed info screen if needed.
# For now, we'll just print key details.
from rich import box
from rich.panel import Panel
from rich.text import Text
title = Text(anime.title.english or anime.title.romaji, style="bold cyan")
description = anilist_data_helper.clean_html(
anime.description or "No description."
)
genres = f"[bold]Genres:[/bold] {', '.join(anime.genres)}"
panel_content = f"{genres}\n\n{description}"
click.echo(Panel(panel_content, title=title, box=box.ROUNDED, expand=False))
selector.ask("Press Enter to continue...") # Pause to allow reading
return ControlFlow.CONTINUE
# --- Build Menu Options ---
options: Dict[str, Callable[[], State | ControlFlow]] = {
f"{'▶️ ' if icons else ''}Stream": stream,
f"{'📼 ' if icons else ''}Watch Trailer": watch_trailer,
f"{' ' if icons else ''}Add/Update List": add_to_list,
f"{'' if icons else ''}Score Anime": score_anime,
f"{' ' if icons else ''}View Info": view_info,
# TODO: Add 'Recommendations' and 'Relations' here later.
f"{'🔙 ' if icons else ''}Back to Results": lambda: ControlFlow.BACK,
}
# --- Prompt and Execute ---
header = f"Actions for: {anime.title.english or anime.title.romaji}"
choice_str = ctx.selector.choose(
prompt="Select Action", choices=list(options.keys()), header=header
)
if choice_str and choice_str in options:
return options[choice_str]()
return ControlFlow.BACK
def _update_user_list(ctx: Context, anime: MediaItem, params: UpdateListEntryParams):
"""Helper to call the API to update a user's list and show feedback."""
if not ctx.media_api.user_profile:
click.echo("[bold yellow]You must be logged in to modify your list.[/]")
return
success = ctx.media_api.update_list_entry(params)
if success:
click.echo(
f"[bold green]Successfully updated '{anime.title.english or anime.title.romaji}' on your list![/]"
)
else:
click.echo("[bold red]Failed to update list entry.[/bold red]")

View File

@@ -0,0 +1,164 @@
import threading
from typing import TYPE_CHECKING, Callable, Dict
import click
from ....libs.api.params import UpdateListEntryParams
from ..session import Context, session
from ..state import ControlFlow, ProviderState, State
if TYPE_CHECKING:
from ....libs.providers.anime.types import Server
def _calculate_completion(start_time: str, end_time: str) -> float:
"""Calculates the percentage completion from two time strings (HH:MM:SS)."""
try:
start_parts = list(map(int, start_time.split(":")))
end_parts = list(map(int, end_time.split(":")))
start_secs = start_parts[0] * 3600 + start_parts[1] * 60 + start_parts[2]
end_secs = end_parts[0] * 3600 + end_parts[1] * 60 + end_parts[2]
return (start_secs / end_secs) * 100 if end_secs > 0 else 0
except (ValueError, IndexError, ZeroDivisionError):
return 0
def _update_progress_in_background(ctx: Context, anime_id: int, progress: int):
"""Fires off a non-blocking request to update AniList progress."""
def task():
if not ctx.media_api.user_profile:
return
params = UpdateListEntryParams(media_id=anime_id, progress=progress)
ctx.media_api.update_list_entry(params)
# We don't need to show feedback here, it's a background task.
threading.Thread(target=task).start()
@session.menu
def player_controls(ctx: Context, state: State) -> State | ControlFlow:
"""
Handles post-playback options like playing the next episode,
replaying, or changing streaming options.
"""
# --- State and Context Extraction ---
config = ctx.config
player = ctx.player
selector = ctx.selector
provider_anime = state.provider.anime
anilist_anime = state.media_api.anime
current_episode_num = state.provider.episode_number
selected_server = state.provider.selected_server
all_servers = state.provider.servers
player_result = state.provider.last_player_result
if not all(
(
provider_anime,
anilist_anime,
current_episode_num,
selected_server,
all_servers,
)
):
click.echo("[bold red]Error: Player state is incomplete. Returning.[/bold red]")
return ControlFlow.BACK
# --- Post-Playback Logic ---
if player_result and player_result.stop_time and player_result.total_time:
completion_pct = _calculate_completion(
player_result.stop_time, player_result.total_time
)
if completion_pct >= config.stream.episode_complete_at:
click.echo(
f"[green]Episode {current_episode_num} marked as complete. Updating progress...[/green]"
)
_update_progress_in_background(
ctx, anilist_anime.id, int(current_episode_num)
)
# --- Auto-Next Logic ---
available_episodes = getattr(
provider_anime.episodes, config.stream.translation_type, []
)
current_index = available_episodes.index(current_episode_num)
if config.stream.auto_next and current_index < len(available_episodes) - 1:
click.echo("[cyan]Auto-playing next episode...[/cyan]")
next_episode_num = available_episodes[current_index + 1]
return State(
menu_name="SERVERS",
media_api=state.media_api,
provider=state.provider.model_copy(
update={"episode_number": next_episode_num}
),
)
# --- Action Definitions ---
def next_episode() -> State | ControlFlow:
if current_index < len(available_episodes) - 1:
next_episode_num = available_episodes[current_index + 1]
# Transition back to the SERVERS menu with the new episode number.
return State(
menu_name="SERVERS",
media_api=state.media_api,
provider=state.provider.model_copy(
update={"episode_number": next_episode_num}
),
)
click.echo("[bold yellow]This is the last available episode.[/bold yellow]")
return ControlFlow.CONTINUE
def replay() -> State | ControlFlow:
# We don't need to change state, just re-trigger the SERVERS menu's logic.
return State(
menu_name="SERVERS", media_api=state.media_api, provider=state.provider
)
def change_server() -> State | ControlFlow:
server_map: Dict[str, Server] = {s.name: s for s in all_servers}
new_server_name = selector.choose(
"Select a different server:", list(server_map.keys())
)
if new_server_name:
# Update the selected server and re-run the SERVERS logic.
return State(
menu_name="SERVERS",
media_api=state.media_api,
provider=state.provider.model_copy(
update={"selected_server": server_map[new_server_name]}
),
)
return ControlFlow.CONTINUE
# --- Menu Options ---
icons = config.general.icons
options: Dict[str, Callable[[], State | ControlFlow]] = {}
if current_index < len(available_episodes) - 1:
options[f"{'⏭️ ' if icons else ''}Next Episode"] = next_episode
options.update(
{
f"{'🔄 ' if icons else ''}Replay Episode": replay,
f"{'💻 ' if icons else ''}Change Server": change_server,
f"{'🎞️ ' if icons else ''}Back to Episode List": lambda: State(
menu_name="EPISODES", media_api=state.media_api, provider=state.provider
),
f"{'🏠 ' if icons else ''}Main Menu": lambda: State(menu_name="MAIN"),
f"{'' if icons else ''}Exit": lambda: ControlFlow.EXIT,
}
)
# --- Prompt and Execute ---
header = f"Finished Episode {current_episode_num} of {provider_anime.title}"
choice_str = selector.choose(
prompt="What's next?", choices=list(options.keys()), header=header
)
if choice_str and choice_str in options:
return options[choice_str]()
return ControlFlow.BACK

View File

@@ -0,0 +1,117 @@
from typing import TYPE_CHECKING
import click
from rich.progress import Progress
from thefuzz import fuzz
from ....libs.providers.anime.params import SearchParams
from ..session import Context, session
from ..state import ControlFlow, ProviderState, State
if TYPE_CHECKING:
from ....libs.providers.anime.types import SearchResult
@session.menu
def provider_search(ctx: Context, state: State) -> State | ControlFlow:
"""
Searches for the selected AniList anime on the configured provider.
This state allows the user to confirm the correct provider entry before
proceeding to list episodes.
"""
anilist_anime = state.media_api.anime
if not anilist_anime:
click.echo("[bold red]Error: No AniList anime to search for.[/bold red]")
return ControlFlow.BACK
provider = ctx.provider
selector = ctx.selector
config = ctx.config
anilist_title = anilist_anime.title.english or anilist_anime.title.romaji
if not anilist_title:
click.echo(
"[bold red]Error: Selected anime has no searchable title.[/bold red]"
)
return ControlFlow.BACK
# --- Perform Search on Provider ---
with Progress(transient=True) as progress:
progress.add_task(
f"[cyan]Searching for '{anilist_title}' on {provider.__class__.__name__}...",
total=None,
)
provider_search_results = provider.search(
SearchParams(
query=anilist_title, translation_type=config.stream.translation_type
)
)
if not provider_search_results or not provider_search_results.results:
click.echo(
f"[bold yellow]Could not find '{anilist_title}' on {provider.__class__.__name__}.[/bold yellow]"
)
click.echo("Try another provider from the config or go back.")
return ControlFlow.BACK
# --- Map results for selection ---
provider_results_map: dict[str, SearchResult] = {
result.title: result for result in provider_search_results.results
}
selected_provider_anime: SearchResult | None = None
# --- Auto-Select or Prompt ---
if config.general.auto_select_anime_result:
# Use fuzzy matching to find the best title
best_match_title = max(
provider_results_map.keys(),
key=lambda p_title: fuzz.ratio(p_title.lower(), anilist_title.lower()),
)
click.echo(f"[cyan]Auto-selecting best match:[/] {best_match_title}")
selected_provider_anime = provider_results_map[best_match_title]
else:
choices = list(provider_results_map.keys())
choices.append("Back")
chosen_title = selector.choose(
prompt=f"Confirm match for '{anilist_title}'",
choices=choices,
header="Provider Search Results",
)
if not chosen_title or chosen_title == "Back":
return ControlFlow.BACK
selected_provider_anime = provider_results_map[chosen_title]
if not selected_provider_anime:
return ControlFlow.BACK
# --- Fetch Full Anime Details from Provider ---
with Progress(transient=True) as progress:
progress.add_task(
f"[cyan]Fetching full details for '{selected_provider_anime.title}'...",
total=None,
)
from ....libs.providers.anime.params import AnimeParams
full_provider_anime = provider.get(AnimeParams(id=selected_provider_anime.id))
if not full_provider_anime:
click.echo(
f"[bold red]Failed to fetch details for '{selected_provider_anime.title}'.[/bold red]"
)
return ControlFlow.BACK
# --- Transition to Episodes Menu ---
# Create the next state, populating the 'provider' field for the first time
# while carrying over the 'media_api' state.
return State(
menu_name="EPISODES",
media_api=state.media_api,
provider=ProviderState(
search_results=provider_search_results,
anime=full_provider_anime,
),
)

View File

@@ -0,0 +1,123 @@
from typing import TYPE_CHECKING, List
import click
from rich.progress import Progress
from yt_dlp.utils import sanitize_filename
from ...utils.anilist import (
anilist_data_helper, # Assuming this is the new location
)
from ...utils.previews import get_anime_preview
from ..session import Context, session
from ..state import ControlFlow, MediaApiState, State
if TYPE_CHECKING:
from ....libs.api.types import MediaItem
@session.menu
def results(ctx: Context, state: State) -> State | ControlFlow:
"""
Displays a paginated list of anime from a search or category query.
Allows the user to select an anime to view its actions or navigate pages.
"""
search_results = state.media_api.search_results
if not search_results or not search_results.media:
click.echo("[bold yellow]No anime found for the given criteria.[/bold yellow]")
return ControlFlow.BACK
# --- Prepare choices and previews ---
anime_items = search_results.media
formatted_titles = [
_format_anime_choice(anime, ctx.config) for anime in anime_items
]
# Map formatted titles back to the original MediaItem objects
anime_map = dict(zip(formatted_titles, anime_items))
preview_command = None
if ctx.config.general.preview != "none":
# This function will start background jobs to cache preview data
preview_command = get_anime_preview(anime_items, formatted_titles, ctx.config)
# --- Build Navigation and Final Choice List ---
choices = formatted_titles
page_info = search_results.page_info
# Add pagination controls if available
if page_info.has_next_page:
choices.append("Next Page")
if page_info.current_page > 1:
choices.append("Previous Page")
choices.append("Back")
# --- Prompt User ---
choice_str = ctx.selector.choose(
prompt="Select Anime",
choices=choices,
header="AniList Results",
preview=preview_command,
)
if not choice_str:
return ControlFlow.EXIT
# --- Handle User Selection ---
if choice_str == "Back":
return ControlFlow.BACK
if choice_str == "Next Page" or choice_str == "Previous Page":
page_delta = 1 if choice_str == "Next Page" else -1
# We need to re-run the previous state's data loader with a new page.
# This is a bit tricky. We'll need to store the loader function in the session.
# For now, let's assume a simplified re-search. A better way will be to store the
# search params in the State. Let's add that.
# Let's placeholder this for now, as it requires modifying the state object
# to carry over the original search parameters.
click.echo(f"Pagination logic needs to be fully implemented.")
return ControlFlow.CONTINUE
# If an anime was selected, transition to the MEDIA_ACTIONS state
selected_anime = anime_map.get(choice_str)
if selected_anime:
return State(
menu_name="MEDIA_ACTIONS",
media_api=MediaApiState(
search_results=state.media_api.search_results, # Carry over the list
anime=selected_anime, # Set the newly selected item
),
# Persist provider state if it exists
provider=state.provider,
)
# Fallback
return ControlFlow.CONTINUE
def _format_anime_choice(anime: MediaItem, config) -> str:
"""Creates a display string for a single anime item for the selector."""
title = anime.title.english or anime.title.romaji
progress = "0"
if anime.user_status:
progress = str(anime.user_status.progress or 0)
episodes_total = str(anime.episodes or "??")
display_title = f"{title} ({progress} of {episodes_total})"
# Add a visual indicator for new episodes if applicable
if (
anime.status == "RELEASING"
and anime.next_airing
and anime.user_status
and anime.user_status.status == "CURRENT"
):
last_aired = anime.next_airing.episode - 1
unwatched = last_aired - (anime.user_status.progress or 0)
if unwatched > 0:
icon = "🔹" if config.general.icons else "!"
display_title += f" {icon}{unwatched} new{icon}"
# Sanitize for use as a potential filename/cache key
return sanitize_filename(display_title, restricted=True)

View File

@@ -0,0 +1,118 @@
from typing import TYPE_CHECKING, Dict, List
import click
from rich.progress import Progress
from ....libs.players.params import PlayerParams
from ....libs.providers.anime.params import EpisodeStreamsParams
from ..session import Context, session
from ..state import ControlFlow, ProviderState, State
if TYPE_CHECKING:
from ....cli.utils.utils import (
filter_by_quality, # You may need to create this helper
)
from ....libs.providers.anime.types import Server
def _filter_by_quality(links, quality):
# Simplified version of your filter_by_quality for brevity
for link in links:
if str(link.quality) == quality:
return link
return links[0] if links else None
@session.menu
def servers(ctx: Context, state: State) -> State | ControlFlow:
"""
Fetches and displays available streaming servers for a chosen episode,
then launches the media player and transitions to post-playback controls.
"""
provider_anime = state.provider.anime
episode_number = state.provider.episode_number
config = ctx.config
provider = ctx.provider
selector = ctx.selector
if not provider_anime or not episode_number:
click.echo("[bold red]Error: Anime or episode details are missing.[/bold red]")
return ControlFlow.BACK
# --- Fetch Server Streams ---
with Progress(transient=True) as progress:
progress.add_task(
f"[cyan]Fetching servers for episode {episode_number}...", total=None
)
server_iterator = provider.episode_streams(
EpisodeStreamsParams(
anime_id=provider_anime.id,
episode=episode_number,
translation_type=config.stream.translation_type,
)
)
# Consume the iterator to get a list of all servers
all_servers: List[Server] = list(server_iterator) if server_iterator else []
if not all_servers:
click.echo(
f"[bold yellow]No streaming servers found for this episode.[/bold yellow]"
)
return ControlFlow.BACK
# --- Auto-Select or Prompt for Server ---
server_map: Dict[str, Server] = {s.name: s for s in all_servers}
selected_server: Server | None = None
preferred_server = config.stream.server.lower()
if preferred_server == "top":
selected_server = all_servers[0]
click.echo(f"[cyan]Auto-selecting top server:[/] {selected_server.name}")
elif preferred_server in server_map:
selected_server = server_map[preferred_server]
click.echo(f"[cyan]Auto-selecting preferred server:[/] {selected_server.name}")
else:
choices = [*server_map.keys(), "Back"]
chosen_name = selector.choose("Select Server", choices)
if not chosen_name or chosen_name == "Back":
return ControlFlow.BACK
selected_server = server_map[chosen_name]
if not selected_server:
return ControlFlow.BACK
# --- Select Stream Quality ---
stream_link_obj = _filter_by_quality(selected_server.links, config.stream.quality)
if not stream_link_obj:
click.echo(
f"[bold red]No stream of quality '{config.stream.quality}' found on server '{selected_server.name}'.[/bold red]"
)
return ControlFlow.CONTINUE
# --- Launch Player ---
final_title = f"{provider_anime.title} - Ep {episode_number}"
click.echo(f"[bold green]Launching player for:[/] {final_title}")
player_result = ctx.player.play(
PlayerParams(
url=stream_link_obj.link,
title=final_title,
subtitles=[sub.url for sub in selected_server.subtitles],
headers=selected_server.headers,
# start_time logic will be added in player_controls
)
)
# --- Transition to Player Controls ---
# We now have all the data for post-playback actions.
return State(
menu_name="PLAYER_CONTROLS",
media_api=state.media_api,
provider=state.provider.model_copy(
update={
"servers": all_servers,
"selected_server": selected_server,
"last_player_result": player_result, # We should add this to ProviderState
}
),
)

View File

@@ -1,100 +1,205 @@
from __future__ import annotations
import importlib.util
import logging
from typing import TYPE_CHECKING, Optional
import os
from dataclasses import dataclass
from pathlib import Path
from typing import TYPE_CHECKING, Callable, List
from pydantic import BaseModel, Field
import click
from ...core.config import AppConfig
from ...core.constants import USER_CONFIG_PATH
from ..config import ConfigLoader
from .state import ControlFlow, State
if TYPE_CHECKING:
from ...core.config import AppConfig
from ...libs.api.base import BaseApiClient
from ...libs.api.types import Anime, SearchResult, Server, UserProfile
from ...libs.players.base import BasePlayer
from ...libs.selector.base import BaseSelector
from ...libs.providers.anime.base import BaseAnimeProvider
from ...libs.selectors.base import BaseSelector
logger = logging.getLogger(__name__)
# --- Nested State Models (Unchanged) ---
class AnilistState(BaseModel):
results_data: Optional[dict] = None
selected_anime: Optional[dict] = (
None # Using dict for AnilistBaseMediaDataSchema for now
)
# A type alias for the signature all menu functions must follow.
MenuFunction = Callable[["Context", State], "State | ControlFlow"]
class ProviderState(BaseModel):
selected_search_result: Optional[SearchResult] = None
anime_details: Optional[Anime] = None
current_episode: Optional[str] = None
current_server: Optional[Server] = None
@dataclass(frozen=True)
class Context:
"""
A mutable container for long-lived, shared services and configurations.
This object is passed to every menu state, providing access to essential
application components like API clients and UI selectors.
"""
class Config:
arbitrary_types_allowed = True
config: AppConfig
provider: BaseAnimeProvider
selector: BaseSelector
player: BasePlayer
media_api: BaseApiClient
class NavigationState(BaseModel):
current_page: int = 1
history_stack_class_names: list[str] = Field(default_factory=list)
@dataclass(frozen=True)
class Menu:
"""Represents a registered menu, linking a name to an executable function."""
class TrackingState(BaseModel):
progress_mode: str = "prompt"
class SessionState(BaseModel):
anilist: AnilistState = Field(default_factory=AnilistState)
provider: ProviderState = Field(default_factory=ProviderState)
navigation: NavigationState = Field(default_factory=NavigationState)
tracking: TrackingState = Field(default_factory=TrackingState)
class Config:
arbitrary_types_allowed = True
name: str
execute: MenuFunction
class Session:
def __init__(self, config: AppConfig) -> None:
self.config: AppConfig = config
self.state: SessionState = SessionState()
self.is_running: bool = True
self.user_profile: Optional[UserProfile] = None
self._initialize_components()
"""
The orchestrator for the interactive UI state machine.
def _initialize_components(self) -> None:
from ...cli.auth.manager import CredentialsManager
This class manages the state history, holds the application context,
runs the main event loop, and provides the decorator for registering menus.
"""
def __init__(self):
self._context: Context | None = None
self._history: List[State] = []
self._menus: dict[str, Menu] = {}
def _load_context(self, config: AppConfig):
"""Initializes all shared services based on the provided configuration."""
from ...libs.api.factory import create_api_client
from ...libs.players import create_player
from ...libs.selector import create_selector
from ...libs.providers.anime.provider import create_provider
from ...libs.selectors import create_selector
logger.debug("Initializing session components...")
self.selector: BaseSelector = create_selector(self.config)
self.provider: BaseAnimeProvider = create_provider(self.config.general.provider)
self.player: BasePlayer = create_player(self.config.stream.player, self.config)
self._context = Context(
config=config,
provider=create_provider(config.general.provider),
selector=create_selector(config),
player=create_player(config),
media_api=create_api_client(config.general.api_client, config),
)
logger.info("Application context reloaded.")
# Instantiate and use the API factory
self.api_client: BaseApiClient = create_api_client("anilist", self.config)
def _edit_config(self):
"""Handles the logic for editing the config file and reloading the context."""
click.edit(filename=str(USER_CONFIG_PATH))
loader = ConfigLoader()
new_config = loader.load()
self._load_context(new_config)
click.echo("[bold green]Configuration reloaded.[/bold green]")
# Load credentials and authenticate the API client
manager = CredentialsManager()
user_data = manager.load_user_profile()
if user_data and (token := user_data.get("token")):
self.user_profile = self.api_client.authenticate(token)
if not self.user_profile:
logger.warning(
"Loaded token is invalid or expired. User is not logged in."
def run(self, config: AppConfig, resume_path: Path | None = None):
"""
Starts and manages the main interactive session loop.
Args:
config: The initial application configuration.
resume_path: Optional path to a saved session file to resume from.
"""
self._load_context(config)
if resume_path:
self.resume(resume_path)
elif not self._history:
# Start with the main menu if history is empty
self._history.append(State(menu_name="MAIN"))
while self._history:
current_state = self._history[-1]
menu_to_run = self._menus.get(current_state.menu_name)
if not menu_to_run or not self._context:
logger.error(
f"Menu '{current_state.menu_name}' not found or context not loaded."
)
break
def change_provider(self, provider_name: str) -> None:
from ...libs.anime.provider import create_provider
# Execute the menu function, which returns the next step.
next_step = menu_to_run.execute(self._context, current_state)
self.config.general.provider = provider_name
self.provider = create_provider(provider_name)
if isinstance(next_step, State):
# A new state was returned, push it to history for the next loop.
self._history.append(next_step)
elif isinstance(next_step, ControlFlow):
# A control command was issued.
if next_step == ControlFlow.EXIT:
break # Exit the loop
elif next_step == ControlFlow.BACK:
if len(self._history) > 1:
self._history.pop() # Go back one state
elif next_step == ControlFlow.RELOAD_CONFIG:
self._edit_config()
# For CONTINUE, we do nothing, allowing the loop to re-run the current state.
else:
logger.error(
f"Menu '{current_state.menu_name}' returned invalid type: {type(next_step)}"
)
break
def change_player(self, player_name: str) -> None:
from ...libs.players import create_player
click.echo("Exiting interactive session.")
self.config.stream.player = player_name
self.player = create_player(player_name, self.config)
def save(self, file_path: Path):
"""Serializes the session history to a JSON file."""
history_dicts = [state.model_dump(mode="json") for state in self._history]
try:
file_path.write_text(str(history_dicts))
logger.info(f"Session saved to {file_path}")
except IOError as e:
logger.error(f"Failed to save session: {e}")
def stop(self) -> None:
self.is_running = False
def resume(self, file_path: Path):
"""Loads a session history from a JSON file."""
if not file_path.exists():
logger.warning(f"Resume file not found: {file_path}")
return
try:
history_dicts = file_path.read_text()
self._history = [State.model_validate(d) for d in history_dicts]
logger.info(f"Session resumed from {file_path}")
except Exception as e:
logger.error(f"Failed to resume session: {e}")
self._history = [] # Reset history on failure
@property
def menu(self) -> Callable[[MenuFunction], MenuFunction]:
"""A decorator to register a function as a menu."""
def decorator(func: MenuFunction) -> MenuFunction:
menu_name = func.__name__.upper()
if menu_name in self._menus:
logger.warning(f"Menu '{menu_name}' is being redefined.")
self._menus[menu_name] = Menu(name=menu_name, execute=func)
return func
return decorator
def load_menus_from_folder(self, package_path: Path):
"""
Dynamically imports all Python modules from a folder to register their menus.
Args:
package_path: The filesystem path to the 'menus' package directory.
"""
package_name = package_path.name
logger.debug(f"Loading menus from '{package_path}'...")
for filename in os.listdir(package_path):
if filename.endswith(".py") and not filename.startswith("__"):
module_name = filename[:-3]
full_module_name = (
f"fastanime.cli.interactive.{package_name}.{module_name}"
)
file_path = package_path / filename
try:
spec = importlib.util.spec_from_file_location(
full_module_name, file_path
)
if spec and spec.loader:
module = importlib.util.module_from_spec(spec)
# The act of executing the module runs the @session.menu decorators
spec.loader.exec_module(module)
except Exception as e:
logger.error(
f"Failed to load menu module '{full_module_name}': {e}"
)
# Create a single, global instance of the Session to be imported by menu modules.
session = Session()

View File

@@ -0,0 +1,91 @@
from enum import Enum, auto
from typing import Iterator, Optional
from pydantic import BaseModel, ConfigDict
# Import the actual data models from your libs.
# These will be the data types held within our state models.
from ....libs.api.types import MediaItem, MediaSearchResult
from ....libs.providers.anime.types import Anime, SearchResults, Server
class ControlFlow(Enum):
"""
Represents special commands to control the session loop instead of
transitioning to a new state. This provides a clear, type-safe alternative
to using magic strings.
"""
BACK = auto()
"""Pop the current state from history and return to the previous one."""
EXIT = auto()
"""Terminate the interactive session gracefully."""
RELOAD_CONFIG = auto()
"""Reload the application configuration and re-initialize the context."""
CONTINUE = auto()
"""
Stay in the current menu. This is useful for actions that don't
change the state but should not exit the menu (e.g., displaying an error).
"""
# ==============================================================================
# Nested State Models
# ==============================================================================
class ProviderState(BaseModel):
"""
An immutable snapshot of data related to the anime provider.
This includes search results, the selected anime's full details,
and the latest fetched episode streams.
"""
search_results: Optional[SearchResults] = None
anime: Optional[Anime] = None
episode_streams: Optional[Iterator[Server]] = None
model_config = ConfigDict(
frozen=True,
# Required to allow complex types like iterators in the model.
arbitrary_types_allowed=True,
)
class MediaApiState(BaseModel):
"""
An immutable snapshot of data related to the metadata API (e.g., AniList).
This includes search results and the full details of a selected media item.
"""
search_results: Optional[MediaSearchResult] = None
anime: Optional[MediaItem] = None
model_config = ConfigDict(frozen=True, arbitrary_types_allowed=True)
# ==============================================================================
# Root State Model
# ==============================================================================
class State(BaseModel):
"""
Represents the complete, immutable state of the interactive UI at a single
point in time. A new State object is created for each transition.
Attributes:
menu_name: The name of the menu function (e.g., 'MAIN', 'MEDIA_RESULTS')
that should be rendered for this state.
provider: Nested state for data from the anime provider.
media_api: Nested state for data from the metadata API (AniList).
"""
menu_name: str
provider: ProviderState = ProviderState()
media_api: MediaApiState = MediaApiState()
model_config = ConfigDict(frozen=True, arbitrary_types_allowed=True)

View File

@@ -1,168 +0,0 @@
from __future__ import annotations
import contextlib
from typing import TYPE_CHECKING, Any, Iterator, List, Optional
from rich import print as rprint
from rich.panel import Panel
from rich.progress import Progress, SpinnerColumn, TextColumn
from rich.prompt import Confirm
from yt_dlp.utils import clean_html
from ...libs.anime.types import Anime
if TYPE_CHECKING:
from ...core.config import AppConfig
from ...libs.anilist.types import AnilistBaseMediaDataSchema
from .session import Session
@contextlib.contextmanager
def progress_spinner(description: str = "Working...") -> Iterator[None]:
"""A context manager for showing a rich spinner for long operations."""
progress = Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
transient=True,
)
task = progress.add_task(description=description, total=None)
with progress:
yield
progress.remove_task(task)
def display_error(message: str) -> None:
"""Displays a formatted error message and waits for user confirmation."""
rprint(f"[bold red]Error:[/] {message}")
Confirm.ask("Press Enter to continue...", default=True, show_default=False)
def prompt_main_menu(session: Session, choices: list[str]) -> Optional[str]:
"""Displays the main menu using the session's selector."""
header = (
"🚀 FastAnime Interactive Menu"
if session.config.general.icons
else "FastAnime Interactive Menu"
)
return session.selector.choose("Select Action", choices, header=header)
def prompt_for_search(session: Session) -> Optional[str]:
"""Prompts the user for a search query using the session's selector."""
search_term = session.selector.ask("Enter search term")
return search_term if search_term and search_term.strip() else None
def prompt_anime_selection(
session: Session, media_list: list[AnilistBaseMediaDataSchema]
) -> Optional[AnilistBaseMediaDataSchema]:
"""Displays anime results using the session's selector."""
from yt_dlp.utils import sanitize_filename
choice_map = {}
for anime in media_list:
title = anime.get("title", {}).get("romaji") or anime.get("title", {}).get(
"english", "Unknown Title"
)
progress = anime.get("mediaListEntry", {}).get("progress", 0)
episodes_total = anime.get("episodes") or ""
display_title = sanitize_filename(f"{title} ({progress}/{episodes_total})")
choice_map[display_title] = anime
choices = list(choice_map.keys()) + ["Next Page", "Previous Page", "Back"]
selection = session.selector.choose(
"Select Anime", choices, header="Search Results"
)
if selection in ["Back", "Next Page", "Previous Page"] or selection is None:
return selection # Let the state handle these special strings
return choice_map.get(selection)
def prompt_anime_actions(
session: Session, anime: AnilistBaseMediaDataSchema
) -> Optional[str]:
"""Displays the actions menu for a selected anime."""
choices = ["Stream", "View Info", "Back"]
if anime.get("trailer"):
choices.insert(0, "Watch Trailer")
if session.config.user:
choices.insert(1, "Add to List")
choices.insert(2, "Score Anime")
header = anime.get("title", {}).get("romaji", "Anime Actions")
return session.selector.choose("Select Action", choices, header=header)
def prompt_episode_selection(
session: Session, episode_list: list[str], anime_details: Anime
) -> Optional[str]:
"""Displays the list of available episodes."""
choices = episode_list + ["Back"]
header = f"Episodes for {anime_details.title}"
return session.selector.choose("Select Episode", choices, header=header)
def prompt_add_to_list(session: Session) -> Optional[str]:
"""Prompts user to select an AniList media list status."""
statuses = {
"Watching": "CURRENT",
"Planning": "PLANNING",
"Completed": "COMPLETED",
"Rewatching": "REPEATING",
"Paused": "PAUSED",
"Dropped": "DROPPED",
"Back": None,
}
choice = session.selector.choose("Add to which list?", list(statuses.keys()))
return statuses.get(choice) if choice else None
def display_anime_details(anime: AnilistBaseMediaDataSchema) -> None:
"""Renders a detailed view of an anime's information."""
from click import clear
from ...cli.utils.anilist import (
extract_next_airing_episode,
format_anilist_date_object,
format_list_data_with_comma,
format_number_with_commas,
)
clear()
title_eng = anime.get("title", {}).get("english", "N/A")
title_romaji = anime.get("title", {}).get("romaji", "N/A")
content = (
f"[bold cyan]English:[/] {title_eng}\n"
f"[bold cyan]Romaji:[/] {title_romaji}\n\n"
f"[bold]Status:[/] {anime.get('status', 'N/A')} "
f"[bold]Episodes:[/] {anime.get('episodes') or 'N/A'}\n"
f"[bold]Score:[/] {anime.get('averageScore', 0) / 10.0} / 10\n"
f"[bold]Popularity:[/] {format_number_with_commas(anime.get('popularity'))}\n\n"
f"[bold]Genres:[/] {format_list_data_with_comma([g for g in anime.get('genres', [])])}\n"
f"[bold]Tags:[/] {format_list_data_with_comma([t['name'] for t in anime.get('tags', [])[:5]])}\n\n"
f"[bold]Airing:[/] {extract_next_airing_episode(anime.get('nextAiringEpisode'))}\n"
f"[bold]Period:[/] {format_anilist_date_object(anime.get('startDate'))} to {format_anilist_date_object(anime.get('endDate'))}\n\n"
f"[bold underline]Description[/]\n{clean_html(anime.get('description', 'No description available.'))}"
)
rprint(Panel(content, title="Anime Details", border_style="magenta"))
Confirm.ask("Press Enter to return...", default=True, show_default=False)
def filter_by_quality(quality: str, stream_links: list, default=True):
"""(Moved from utils) Filters a list of streams by quality."""
for stream_link in stream_links:
q = float(quality)
try:
stream_q = float(stream_link.quality)
except (ValueError, TypeError):
continue
if q - 80 <= stream_q <= q + 80:
return stream_link
if stream_links and default:
return stream_links[0]
return None