chore: format

This commit is contained in:
Benexl
2025-07-24 14:57:36 +03:00
parent 574a739cb6
commit 8b52a1ef27
25 changed files with 106 additions and 110 deletions

View File

@@ -1,4 +1,3 @@
from re import A
import click

View File

@@ -1,6 +1,7 @@
from typing import TYPE_CHECKING
import click
from fastanime.cli.utils.completions import anime_titles_shell_complete
from .data import (
@@ -95,11 +96,12 @@ def search(
):
import json
from rich.progress import Progress
from fastanime.cli.utils.feedback import create_feedback_manager
from fastanime.core.exceptions import FastAnimeError
from fastanime.libs.api.factory import create_api_client
from fastanime.libs.api.params import MediaSearchParams
from rich.progress import Progress
feedback = create_feedback_manager(config.general.icons)

View File

@@ -12,13 +12,14 @@ def stats(config: "AppConfig"):
import shutil
import subprocess
from fastanime.cli.utils.feedback import create_feedback_manager
from fastanime.core.exceptions import FastAnimeError
from fastanime.libs.api.factory import create_api_client
from rich.console import Console
from rich.markdown import Markdown
from rich.panel import Panel
from fastanime.cli.utils.feedback import create_feedback_manager
from fastanime.core.exceptions import FastAnimeError
from fastanime.libs.api.factory import create_api_client
feedback = create_feedback_manager(config.general.icons)
console = Console()

View File

@@ -161,7 +161,7 @@ def download(
from ....anilist import AniList
force_ffmpeg |= (hls_use_mpegts or hls_use_h264)
force_ffmpeg |= hls_use_mpegts or hls_use_h264
success, anilist_search_results = AniList.search(
query=title,

View File

@@ -3,7 +3,7 @@ Common helper functions for anilist subcommands.
"""
import json
from typing import TYPE_CHECKING, Optional
from typing import TYPE_CHECKING
import click
from rich.progress import Progress
@@ -11,7 +11,6 @@ from rich.progress import Progress
if TYPE_CHECKING:
from fastanime.core.config import AppConfig
from fastanime.libs.api.base import BaseApiClient
from fastanime.libs.api.types import MediaSearchResult
def get_authenticated_api_client(config: "AppConfig") -> "BaseApiClient":

View File

@@ -62,8 +62,8 @@ def config(
user_config: AppConfig, path, view, view_json, desktop_entry, update, interactive
):
from ...core.constants import USER_CONFIG_PATH
from ..config.generate import generate_config_ini_from_app_model
from ..config.editor import InteractiveConfigEditor
from ..config.generate import generate_config_ini_from_app_model
if path:
print(USER_CONFIG_PATH)

View File

@@ -6,7 +6,6 @@ Implements Step 5: AniList Authentication Flow
import webbrowser
from typing import Optional
import click
from rich.console import Console
from rich.panel import Panel
from rich.table import Table
@@ -115,7 +114,7 @@ def _handle_login(
"Browser opened",
"Complete the authorization process in your browser",
)
except Exception as e:
except Exception:
feedback.warning(
"Could not open browser automatically",
f"Please manually visit: {oauth_url}",
@@ -151,9 +150,9 @@ def _handle_login(
feedback,
"authenticate",
loading_msg="Validating token with AniList",
success_msg=f"Successfully logged in! 🎉"
success_msg="Successfully logged in! 🎉"
if icons
else f"Successfully logged in!",
else "Successfully logged in!",
error_msg="Login failed",
show_loading=True,
)

View File

@@ -85,7 +85,7 @@ def _create_media_list_action(
feedback = ctx.services.feedback
search_params = MediaSearchParams(sort=sort, status=status)
loading_message = f"Fetching media list"
loading_message = "Fetching media list"
result = None
with feedback.progress(loading_message):
result = ctx.media_api.search_media(search_params)
@@ -112,7 +112,7 @@ def _create_random_media_list(ctx: Context, state: State) -> MenuAction:
feedback = ctx.services.feedback
search_params = MediaSearchParams(id_in=random.sample(range(1, 15000), k=50))
loading_message = f"Fetching media list"
loading_message = "Fetching media list"
result = None
with feedback.progress(loading_message):
result = ctx.media_api.search_media(search_params)
@@ -144,7 +144,7 @@ def _create_search_media_list(ctx: Context, state: State) -> MenuAction:
search_params = MediaSearchParams(query=query)
loading_message = f"Fetching media list"
loading_message = "Fetching media list"
result = None
with feedback.progress(loading_message):
result = ctx.media_api.search_media(search_params)
@@ -179,7 +179,7 @@ def _create_user_list_action(
search_params = UserMediaListSearchParams(status=status)
loading_message = f"Fetching media list"
loading_message = "Fetching media list"
result = None
with feedback.progress(loading_message):
result = ctx.media_api.search_media_list(search_params)

View File

@@ -3,7 +3,7 @@ from typing import Callable, Dict
from rich.console import Console
from ....libs.api.params import UpdateUserMediaListEntryParams
from ....libs.api.types import MediaItem, UserMediaListStatus
from ....libs.api.types import UserMediaListStatus
from ....libs.players.params import PlayerParams
from ..session import Context, session
from ..state import InternalDirective, MenuName, State

View File

@@ -148,7 +148,7 @@ def _handle_pagination(
search_params_dict = asdict(search_params)
search_params_dict.pop("page")
loading_message = f"Fetching media list"
loading_message = "Fetching media list"
result = None
new_search_params = UserMediaListSearchParams(
**search_params_dict, page=new_page
@@ -171,7 +171,7 @@ def _handle_pagination(
search_params_dict = asdict(search_params)
search_params_dict.pop("page")
loading_message = f"Fetching media list"
loading_message = "Fetching media list"
result = None
new_search_params = MediaSearchParams(**search_params_dict, page=new_page)
with feedback.progress(loading_message):

View File

@@ -40,13 +40,13 @@ def servers(ctx: Context, state: State) -> State | InternalDirective:
if config.stream.server == ProviderServer.TOP and server_iterator:
try:
all_servers = [next(server_iterator)]
except Exception as e:
except Exception:
all_servers = []
else:
all_servers: List[Server] = list(server_iterator) if server_iterator else []
if not all_servers:
feedback.error(f"o streaming servers found for this episode")
feedback.error("o streaming servers found for this episode")
return InternalDirective.BACK
server_map: Dict[str, Server] = {s.name: s for s in all_servers}

View File

@@ -32,12 +32,22 @@ def session_management(ctx: Context, state: State) -> State | ControlFlow:
_display_session_info(console, icons)
options: Dict[str, MenuAction] = {
f"{'💾 ' if icons else ''}Save Current Session": lambda: _save_session(ctx, feedback),
f"{'💾 ' if icons else ''}Save Current Session": lambda: _save_session(
ctx, feedback
),
f"{'📂 ' if icons else ''}Load Session": lambda: _load_session(ctx, feedback),
f"{'📋 ' if icons else ''}List Saved Sessions": lambda: _list_sessions(ctx, feedback),
f"{'🗑️ ' if icons else ''}Cleanup Old Sessions": lambda: _cleanup_sessions(ctx, feedback),
f"{'💾 ' if icons else ''}Create Manual Backup": lambda: _create_backup(ctx, feedback),
f"{' ' if icons else ''}Session Settings": lambda: _session_settings(ctx, feedback),
f"{'📋 ' if icons else ''}List Saved Sessions": lambda: _list_sessions(
ctx, feedback
),
f"{'🗑 ' if icons else ''}Cleanup Old Sessions": lambda: _cleanup_sessions(
ctx, feedback
),
f"{'💾 ' if icons else ''}Create Manual Backup": lambda: _create_backup(
ctx, feedback
),
f"{'⚙️ ' if icons else ''}Session Settings": lambda: _session_settings(
ctx, feedback
),
f"{'🔙 ' if icons else ''}Back to Main Menu": lambda: "BACK",
}
@@ -51,7 +61,7 @@ def session_management(ctx: Context, state: State) -> State | ControlFlow:
return ControlFlow.BACK
result = options[choice_str]()
if result == "BACK":
return ControlFlow.BACK
else:
@@ -61,17 +71,21 @@ def session_management(ctx: Context, state: State) -> State | ControlFlow:
def _display_session_info(console: Console, icons: bool):
"""Display current session information."""
session_stats = session.get_session_stats()
table = Table(title=f"{'📊 ' if icons else ''}Current Session Info")
table.add_column("Property", style="cyan")
table.add_column("Value", style="green")
table.add_row("Current States", str(session_stats["current_states"]))
table.add_row("Current Menu", session_stats["current_menu"] or "None")
table.add_row("Auto-Save", "Enabled" if session_stats["auto_save_enabled"] else "Disabled")
table.add_row(
"Auto-Save", "Enabled" if session_stats["auto_save_enabled"] else "Disabled"
)
table.add_row("Has Auto-Save", "Yes" if session_stats["has_auto_save"] else "No")
table.add_row("Has Crash Backup", "Yes" if session_stats["has_crash_backup"] else "No")
table.add_row(
"Has Crash Backup", "Yes" if session_stats["has_crash_backup"] else "No"
)
console.print(table)
console.print()
@@ -80,162 +94,161 @@ def _save_session(ctx: Context, feedback) -> str:
"""Save the current session."""
session_name = ctx.selector.ask("Enter session name (optional):")
description = ctx.selector.ask("Enter session description (optional):")
if not session_name:
session_name = f"session_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
sessions_dir = APP_DIR / "sessions"
file_path = sessions_dir / f"{session_name}.json"
if file_path.exists():
if not feedback.confirm(f"Session '{session_name}' already exists. Overwrite?"):
feedback.info("Save cancelled")
return "CONTINUE"
success = session.save(file_path, session_name, description or "")
if success:
feedback.success(f"Session saved as '{session_name}'")
return "CONTINUE"
def _load_session(ctx: Context, feedback) -> str:
"""Load a saved session."""
sessions = session.list_saved_sessions()
if not sessions:
feedback.warning("No saved sessions found")
return "CONTINUE"
# Create choices with session info
choices = []
session_map = {}
for sess in sessions:
choice_text = f"{sess['name']} - {sess['description'][:50]}{'...' if len(sess['description']) > 50 else ''}"
choices.append(choice_text)
session_map[choice_text] = sess
choices.append("Cancel")
choice = ctx.selector.choose(
"Select session to load:",
choices=choices,
header="Available Sessions"
"Select session to load:", choices=choices, header="Available Sessions"
)
if not choice or choice == "Cancel":
return "CONTINUE"
selected_session = session_map[choice]
file_path = Path(selected_session["path"])
if feedback.confirm(f"Load session '{selected_session['name']}'? This will replace your current session."):
if feedback.confirm(
f"Load session '{selected_session['name']}'? This will replace your current session."
):
success = session.resume(file_path, feedback)
if success:
feedback.info("Session loaded successfully. Returning to main menu.")
# Return to main menu after loading
return "MAIN"
return "CONTINUE"
def _list_sessions(ctx: Context, feedback) -> str:
"""List all saved sessions."""
sessions = session.list_saved_sessions()
if not sessions:
feedback.info("No saved sessions found")
return "CONTINUE"
console = Console()
table = Table(title="Saved Sessions")
table.add_column("Name", style="cyan")
table.add_column("Description", style="yellow")
table.add_column("States", style="green")
table.add_column("Created", style="blue")
for sess in sessions:
# Format the created date
created = sess["created"]
if "T" in created:
created = created.split("T")[0] # Just show the date part
table.add_row(
sess["name"],
sess["description"][:40] + "..." if len(sess["description"]) > 40 else sess["description"],
sess["description"][:40] + "..."
if len(sess["description"]) > 40
else sess["description"],
str(sess["state_count"]),
created
created,
)
console.print(table)
feedback.pause_for_user()
return "CONTINUE"
def _cleanup_sessions(ctx: Context, feedback) -> str:
"""Clean up old sessions."""
sessions = session.list_saved_sessions()
if len(sessions) <= 5:
feedback.info("No cleanup needed. You have 5 or fewer sessions.")
return "CONTINUE"
max_sessions_str = ctx.selector.ask("How many sessions to keep? (default: 10)")
try:
max_sessions = int(max_sessions_str) if max_sessions_str else 10
except ValueError:
feedback.error("Invalid number entered")
return "CONTINUE"
if feedback.confirm(f"Delete sessions older than the {max_sessions} most recent?"):
deleted_count = session.cleanup_old_sessions(max_sessions)
feedback.success(f"Deleted {deleted_count} old sessions")
return "CONTINUE"
def _create_backup(ctx: Context, feedback) -> str:
"""Create a manual backup."""
backup_name = ctx.selector.ask("Enter backup name (optional):")
success = session.create_manual_backup(backup_name or "")
if success:
feedback.success("Manual backup created successfully")
return "CONTINUE"
def _session_settings(ctx: Context, feedback) -> str:
"""Configure session settings."""
current_auto_save = session._auto_save_enabled
choices = [
f"Auto-Save: {'Enabled' if current_auto_save else 'Disabled'}",
"Clear Auto-Save File",
"Clear Crash Backup",
"Back"
"Back",
]
choice = ctx.selector.choose(
"Session Settings:",
choices=choices
)
choice = ctx.selector.choose("Session Settings:", choices=choices)
if choice and choice.startswith("Auto-Save"):
new_setting = not current_auto_save
session.enable_auto_save(new_setting)
feedback.success(f"Auto-save {'enabled' if new_setting else 'disabled'}")
elif choice == "Clear Auto-Save File":
if feedback.confirm("Clear the auto-save file?"):
session._session_manager.clear_auto_save()
feedback.success("Auto-save file cleared")
elif choice == "Clear Crash Backup":
if feedback.confirm("Clear the crash backup file?"):
session._session_manager.clear_crash_backup()
feedback.success("Crash backup cleared")
return "CONTINUE"

View File

@@ -10,12 +10,10 @@ Provides comprehensive AniList list management including:
"""
import logging
from typing import Dict, List, Optional, Tuple
from rich.console import Console
from rich.panel import Panel
from rich.table import Table
from rich.text import Text
from ....libs.api.params import UpdateUserMediaListEntryParams, UserListParams
from ....libs.api.types import MediaItem, MediaSearchResult, UserListItem
@@ -334,7 +332,7 @@ def _display_list_contents(
# Show pagination info
if result.page_info.has_next_page:
console.print(f"[dim]More results available on next page[/dim]")
console.print("[dim]More results available on next page[/dim]")
def _display_anime_list_details(console: Console, anime: MediaItem, icons: bool):
@@ -360,7 +358,7 @@ def _display_anime_list_details(console: Console, anime: MediaItem, icons: bool)
# Add list-specific information if available
if hasattr(anime, "media_list_entry") and anime.media_list_entry:
entry = anime.media_list_entry
details_text += f"\n\n[bold cyan]Your List Info:[/bold cyan]\n"
details_text += "\n\n[bold cyan]Your List Info:[/bold cyan]\n"
details_text += f"Progress: {entry.progress or 0} episodes\n"
details_text += f"Score: {entry.score or 'Not rated'}\n"
details_text += f"Status: {_status_to_display_name(entry.status) if hasattr(entry, 'status') else 'Unknown'}\n"

View File

@@ -4,12 +4,10 @@ Provides comprehensive watch history viewing, editing, and management capabiliti
"""
import logging
from pathlib import Path
from typing import Callable, Dict, List
from rich.console import Console
from rich.table import Table
from rich.text import Text
from ....core.constants import APP_DATA_DIR
from ...utils.feedback import create_feedback_manager

View File

@@ -101,7 +101,7 @@ class Session:
def _edit_config(self):
click.edit(filename=str(USER_CONFIG_PATH))
logger.debug(f"Config changed; Reloading context")
logger.debug("Config changed; Reloading context")
loader = ConfigLoader()
config = loader.load()
self._load_context(config)
@@ -127,7 +127,7 @@ class Session:
try:
self._run_main_loop()
except Exception as e:
except Exception:
self._context.services.session.create_crash_backup(self._history)
raise
self._context.services.session.save_session(self._history)

View File

@@ -25,7 +25,6 @@ from .models import (
DownloadQueueItem,
EpisodeDownload,
MediaDownloadRecord,
MediaIndexEntry,
)
logger = logging.getLogger(__name__)

View File

@@ -1,10 +1,10 @@
import re
from datetime import datetime
from typing import TYPE_CHECKING, List, Optional
from typing import List, Optional
from yt_dlp.utils import clean_html as ytdlp_clean_html
from ...libs.api.types import AiringSchedule, MediaItem
from ...libs.api.types import AiringSchedule
COMMA_REGEX = re.compile(r"([0-9]{3})(?=\d)")

View File

@@ -6,10 +6,9 @@ import tempfile
from pathlib import Path
import httpx
import yt_dlp
from rich import print
from rich.prompt import Confirm
import yt_dlp
from yt_dlp.utils import sanitize_filename
from ..exceptions import FastAnimeError
@@ -121,7 +120,7 @@ class YtDLPDownloader(BaseDownloader):
response = self.client.get(sub)
try:
response.raise_for_status()
except httpx.HTTPError as e:
except httpx.HTTPError:
raise FastAnimeError("Failed to download sub: {e}")
filename = get_remote_filename(response)

View File

@@ -1,5 +1,5 @@
import abc
from typing import TYPE_CHECKING, Any, Optional, Union
from typing import TYPE_CHECKING, Any, Optional
from ...core.config import AnilistConfig
from .params import (

View File

@@ -1,7 +1,7 @@
from __future__ import annotations
import logging
from typing import TYPE_CHECKING, List, Optional
from typing import TYPE_CHECKING, Optional
from ..base import (
BaseApiClient,
@@ -9,13 +9,12 @@ from ..base import (
UpdateUserMediaListEntryParams,
UserMediaListSearchParams,
)
from ..types import MediaItem, MediaSearchResult, UserProfile
from ..types import MediaSearchResult, UserProfile
from . import mapper
if TYPE_CHECKING:
from httpx import Client
from ....core.config import AppConfig
pass
logger = logging.getLogger(__name__)

View File

@@ -1,21 +1,14 @@
from __future__ import annotations
from datetime import datetime
from typing import TYPE_CHECKING, List, Optional
from typing import TYPE_CHECKING, Optional
from ..types import (
AiringSchedule,
MediaImage,
MediaItem,
MediaSearchResult,
MediaStatus,
MediaTagItem,
MediaTitle,
PageInfo,
StreamingEpisode,
Studio,
UserListItem,
UserProfile,
)
if TYPE_CHECKING:

View File

@@ -1,4 +1,3 @@
import logging
from ...types import EpisodeStream, Server
from ..constants import MP4_SERVER_JUICY_STREAM_REGEX

View File

@@ -1,4 +1,4 @@
from typing import TYPE_CHECKING, Any
from typing import Any
from ..types import (
Anime,

View File

@@ -1,8 +1,6 @@
import logging
import random
import time
from functools import lru_cache
from typing import TYPE_CHECKING, Iterator, Optional, Union
from typing import Iterator, Optional
from yt_dlp.utils import (
extract_attributes,
@@ -23,7 +21,7 @@ from .constants import (
)
from .extractors import process_animepahe_embed_page
from .parser import map_to_anime_result, map_to_search_results, map_to_server
from .types import AnimePaheAnimePage, AnimePaheSearchPage, AnimePaheSearchResult
from .types import AnimePaheAnimePage, AnimePaheSearchPage
logger = logging.getLogger(__name__)

View File

@@ -18,7 +18,7 @@ class BaseAnimeProvider(ABC):
super().__init_subclass__(**kwargs)
if not hasattr(cls, "HEADERS"):
raise TypeError(
f"Subclasses of BaseAnimeProvider must define a 'HEADERS' class attribute."
"Subclasses of BaseAnimeProvider must define a 'HEADERS' class attribute."
)
def __init__(self, client: "Client") -> None: