style: ruff check + format

This commit is contained in:
Benexl
2025-07-28 15:37:40 +03:00
parent 9f0cf5f8dc
commit 2717d0b012
9 changed files with 130 additions and 92 deletions

View File

@@ -1,15 +1,12 @@
from typing import Callable, Dict, List, Literal, Optional, Union
from typing import Callable, Dict, Literal, Optional
from .....libs.media_api.params import (
MediaAiringScheduleParams,
MediaCharactersParams,
MediaRecommendationParams,
MediaRelationsParams,
UpdateUserMediaListEntryParams,
)
from .....libs.media_api.types import (
MediaItem,
MediaReview,
MediaStatus,
UserMediaListStatus,
)
@@ -610,7 +607,9 @@ def _view_airing_schedule(ctx: Context, state: State) -> MenuAction:
"""Action to transition to the airing schedule menu."""
def action() -> State | InternalDirective:
return State(menu_name=MenuName.MEDIA_AIRING_SCHEDULE, media_api=state.media_api)
return State(
menu_name=MenuName.MEDIA_AIRING_SCHEDULE, media_api=state.media_api
)
return action

View File

@@ -6,16 +6,15 @@ from ...state import InternalDirective, State
@session.menu
def media_airing_schedule(ctx: Context, state: State) -> Union[State, InternalDirective]:
def media_airing_schedule(
ctx: Context, state: State
) -> Union[State, InternalDirective]:
"""
Fetches and displays the airing schedule for an anime.
Shows upcoming episodes with air dates and countdown timers.
"""
from datetime import datetime
from rich.console import Console
from rich.panel import Panel
from rich.table import Table
from rich.text import Text
feedback = ctx.feedback
selector = ctx.selector
@@ -28,9 +27,7 @@ def media_airing_schedule(ctx: Context, state: State) -> Union[State, InternalDi
from .....libs.media_api.params import MediaAiringScheduleParams
loading_message = (
f"Fetching airing schedule for {media_item.title.english or media_item.title.romaji}..."
)
loading_message = f"Fetching airing schedule for {media_item.title.english or media_item.title.romaji}..."
schedule_result: Optional[AiringScheduleResult] = None
with feedback.progress(loading_message):
@@ -41,7 +38,7 @@ def media_airing_schedule(ctx: Context, state: State) -> Union[State, InternalDi
if not schedule_result or not schedule_result.schedule_items:
feedback.warning(
"No airing schedule found",
"This anime doesn't have upcoming episodes or airing data"
"This anime doesn't have upcoming episodes or airing data",
)
return InternalDirective.BACK
@@ -54,7 +51,7 @@ def media_airing_schedule(ctx: Context, state: State) -> Union[State, InternalDi
display_name += f" - {airing_time.strftime('%Y-%m-%d %H:%M')}"
if item.time_until_airing:
display_name += f" (in {item.time_until_airing})"
choice_map[display_name] = item
choices = list(choice_map.keys()) + ["View Full Schedule", "Back"]
@@ -65,7 +62,9 @@ def media_airing_schedule(ctx: Context, state: State) -> Union[State, InternalDi
anime_title = media_item.title.english or media_item.title.romaji or "Unknown"
with create_preview_context() as preview_ctx:
preview_command = preview_ctx.get_airing_schedule_preview(schedule_result, ctx.config, anime_title)
preview_command = preview_ctx.get_airing_schedule_preview(
schedule_result, ctx.config, anime_title
)
while True:
chosen_title = selector.choose(
@@ -76,11 +75,13 @@ def media_airing_schedule(ctx: Context, state: State) -> Union[State, InternalDi
if not chosen_title or chosen_title == "Back":
return InternalDirective.BACK
if chosen_title == "View Full Schedule":
console.clear()
# Display airing schedule
anime_title = media_item.title.english or media_item.title.romaji or "Unknown"
anime_title = (
media_item.title.english or media_item.title.romaji or "Unknown"
)
_display_airing_schedule(console, schedule_result, anime_title)
selector.ask("\nPress Enter to return...")
continue
@@ -88,37 +89,40 @@ def media_airing_schedule(ctx: Context, state: State) -> Union[State, InternalDi
# Show individual episode details
selected_item = choice_map[chosen_title]
console.clear()
from rich.panel import Panel
from datetime import datetime
episode_info = []
episode_info.append(f"[bold cyan]Episode {selected_item.episode}[/bold cyan]")
if selected_item.airing_at:
airing_time = selected_item.airing_at
episode_info.append(f"[green]Airs at:[/green] {airing_time.strftime('%Y-%m-%d %H:%M:%S')}")
episode_info.append(
f"[green]Airs at:[/green] {airing_time.strftime('%Y-%m-%d %H:%M:%S')}"
)
if selected_item.time_until_airing:
episode_info.append(f"[yellow]Time until airing:[/yellow] {selected_item.time_until_airing}")
episode_info.append(
f"[yellow]Time until airing:[/yellow] {selected_item.time_until_airing}"
)
episode_content = "\n".join(episode_info)
console.print(
Panel(
episode_content,
title=f"Episode Details - {media_item.title.english or media_item.title.romaji}",
border_style="blue",
expand=True
expand=True,
)
)
selector.ask("\nPress Enter to return to the schedule list...")
return InternalDirective.BACK
def _display_airing_schedule(console, schedule_result: AiringScheduleResult, anime_title: str):
def _display_airing_schedule(
console, schedule_result: AiringScheduleResult, anime_title: str
):
"""Display the airing schedule in a formatted table."""
from datetime import datetime
from rich.panel import Panel
@@ -144,7 +148,7 @@ def _display_airing_schedule(console, schedule_result: AiringScheduleResult, ani
# Format air date
if episode.airing_at:
formatted_date = episode.airing_at.strftime("%Y-%m-%d %H:%M")
# Check if episode has already aired
now = datetime.now()
if episode.airing_at < now:
@@ -181,15 +185,18 @@ def _display_airing_schedule(console, schedule_result: AiringScheduleResult, ani
# Add summary information
total_episodes = len(schedule_result.schedule_items)
upcoming_episodes = sum(1 for ep in schedule_result.schedule_items
if ep.airing_at and ep.airing_at > datetime.now())
upcoming_episodes = sum(
1
for ep in schedule_result.schedule_items
if ep.airing_at and ep.airing_at > datetime.now()
)
summary_text = Text()
summary_text.append(f"Total episodes in schedule: ", style="bold")
summary_text.append("Total episodes in schedule: ", style="bold")
summary_text.append(f"{total_episodes}", style="cyan")
summary_text.append(f"\nUpcoming episodes: ", style="bold")
summary_text.append("\nUpcoming episodes: ", style="bold")
summary_text.append(f"{upcoming_episodes}", style="green")
summary_panel = Panel(
summary_text,
title="[bold]Summary[/bold]",

View File

@@ -1,5 +1,5 @@
import re
from typing import Dict, List, Optional, Union
from typing import Dict, Optional, Union
from .....libs.media_api.types import Character, CharacterSearchResult
from ...session import Context, session
@@ -13,9 +13,6 @@ def media_characters(ctx: Context, state: State) -> Union[State, InternalDirecti
Shows character details upon selection or in the preview pane.
"""
from rich.console import Console
from rich.panel import Panel
from rich.table import Table
from rich.text import Text
feedback = ctx.feedback
selector = ctx.selector
@@ -29,9 +26,7 @@ def media_characters(ctx: Context, state: State) -> Union[State, InternalDirecti
from .....libs.media_api.params import MediaCharactersParams
loading_message = (
f"Fetching characters for {media_item.title.english or media_item.title.romaji}..."
)
loading_message = f"Fetching characters for {media_item.title.english or media_item.title.romaji}..."
characters_result: Optional[CharacterSearchResult] = None
with feedback.progress(loading_message):
@@ -45,7 +40,7 @@ def media_characters(ctx: Context, state: State) -> Union[State, InternalDirecti
characters = characters_result.characters
choice_map: Dict[str, Character] = {}
# Create display names for characters
for character in characters:
display_name = character.name.full or character.name.first or "Unknown"
@@ -53,7 +48,7 @@ def media_characters(ctx: Context, state: State) -> Union[State, InternalDirecti
display_name += f" ({character.gender})"
if character.age:
display_name += f" - Age {character.age}"
choice_map[display_name] = character
choices = list(choice_map.keys()) + ["Back"]
@@ -81,18 +76,16 @@ def media_characters(ctx: Context, state: State) -> Union[State, InternalDirecti
# Display character details
anime_title = media_item.title.english or media_item.title.romaji or "Unknown"
_display_character_details(console, selected_character, anime_title)
selector.ask("\nPress Enter to return to the character list...")
def _display_character_details(console, character: Character, anime_title: str):
"""Display detailed character information in a formatted panel."""
from rich.columns import Columns
from rich.console import Console
from rich.panel import Panel
from rich.table import Table
from rich.text import Text
import re
# Character name panel
name_text = Text()
@@ -165,7 +158,7 @@ def _display_character_details(console, character: Character, anime_title: str):
# Display everything
console.print(name_panel)
console.print()
# Show panels side by side if there's basic info
if info_table.rows:
console.print(Columns([info_panel, description_panel], equal=True, expand=True))

View File

@@ -1,4 +1,3 @@
import time
from contextlib import contextmanager
from typing import Optional

View File

@@ -39,12 +39,12 @@ TEMPLATE_EPISODE_INFO_SCRIPT = (FZF_SCRIPTS_DIR / "episode-info.template.sh").re
TEMPLATE_REVIEW_INFO_SCRIPT = (FZF_SCRIPTS_DIR / "review-info.template.sh").read_text(
encoding="utf-8"
)
TEMPLATE_CHARACTER_INFO_SCRIPT = (FZF_SCRIPTS_DIR / "character-info.template.sh").read_text(
encoding="utf-8"
)
TEMPLATE_AIRING_SCHEDULE_INFO_SCRIPT = (FZF_SCRIPTS_DIR / "airing-schedule-info.template.sh").read_text(
encoding="utf-8"
)
TEMPLATE_CHARACTER_INFO_SCRIPT = (
FZF_SCRIPTS_DIR / "character-info.template.sh"
).read_text(encoding="utf-8")
TEMPLATE_AIRING_SCHEDULE_INFO_SCRIPT = (
FZF_SCRIPTS_DIR / "airing-schedule-info.template.sh"
).read_text(encoding="utf-8")
class PreviewCacheWorker(ManagedBackgroundWorker):
@@ -510,7 +510,9 @@ class CharacterCacheWorker(ManagedBackgroundWorker):
hash_id = self._get_cache_hash(choice_str)
info_path = self.characters_cache_dir / hash_id
preview_content = self._generate_character_preview_content(character, config)
preview_content = self._generate_character_preview_content(
character, config
)
self.submit_function(self._save_preview_content, preview_content, hash_id)
def _generate_character_preview_content(
@@ -519,18 +521,25 @@ class CharacterCacheWorker(ManagedBackgroundWorker):
"""
Generates the final, formatted preview content by injecting character data into the template.
"""
character_name = character.name.full or character.name.first or "Unknown Character"
character_name = (
character.name.full or character.name.first or "Unknown Character"
)
native_name = character.name.native or "N/A"
gender = character.gender or "Unknown"
age = str(character.age) if character.age else "Unknown"
blood_type = character.blood_type or "N/A"
favourites = f"{character.favourites:,}" if character.favourites else "0"
birthday = character.date_of_birth.strftime("%B %d, %Y") if character.date_of_birth else "N/A"
birthday = (
character.date_of_birth.strftime("%B %d, %Y")
if character.date_of_birth
else "N/A"
)
# Clean and format description
description = character.description or "No description available"
if description:
import re
description = re.sub(r"<[^>]+>", "", description)
description = (
description.replace("&quot;", '"')
@@ -571,6 +580,7 @@ class CharacterCacheWorker(ManagedBackgroundWorker):
def _get_cache_hash(self, text: str) -> str:
from hashlib import sha256
return sha256(text.encode("utf-8")).hexdigest()
def _on_task_completed(self, task: WorkerTask, future) -> None:
@@ -619,16 +629,21 @@ class AiringScheduleCacheWorker(ManagedBackgroundWorker):
from datetime import datetime
total_episodes = len(schedule_result.schedule_items)
upcoming_episodes = sum(1 for ep in schedule_result.schedule_items
if ep.airing_at and ep.airing_at > datetime.now())
upcoming_episodes = sum(
1
for ep in schedule_result.schedule_items
if ep.airing_at and ep.airing_at > datetime.now()
)
# Generate schedule table text
schedule_lines = []
sorted_episodes = sorted(schedule_result.schedule_items, key=lambda x: x.episode)
sorted_episodes = sorted(
schedule_result.schedule_items, key=lambda x: x.episode
)
for episode in sorted_episodes[:10]: # Show next 10 episodes
ep_num = str(episode.episode)
if episode.airing_at:
formatted_date = episode.airing_at.strftime("%Y-%m-%d %H:%M")
now = datetime.now()
@@ -650,13 +665,15 @@ class AiringScheduleCacheWorker(ManagedBackgroundWorker):
elif hours > 0:
time_str = f"{hours}h"
else:
time_str = f"<1h"
time_str = "<1h"
elif episode.airing_at and episode.airing_at < datetime.now():
time_str = "Aired"
else:
time_str = "Unknown"
schedule_lines.append(f"Episode {ep_num:>3}: {formatted_date} ({time_str}) - {status}")
schedule_lines.append(
f"Episode {ep_num:>3}: {formatted_date} ({time_str}) - {status}"
)
schedule_table = "\n".join(schedule_lines)
@@ -681,11 +698,14 @@ class AiringScheduleCacheWorker(ManagedBackgroundWorker):
f.write(content)
logger.debug(f"Successfully cached airing schedule preview: {hash_id}")
except IOError as e:
logger.error(f"Failed to write airing schedule preview cache for {hash_id}: {e}")
logger.error(
f"Failed to write airing schedule preview cache for {hash_id}: {e}"
)
raise
def _get_cache_hash(self, text: str) -> str:
from hashlib import sha256
return sha256(text.encode("utf-8")).hexdigest()
def _on_task_completed(self, task: WorkerTask, future) -> None:
@@ -772,20 +792,29 @@ class PreviewWorkerManager:
self._character_worker = CharacterCacheWorker(self.info_cache_dir)
self._character_worker.start()
thread_manager.register_worker("character_cache_worker", self._character_worker)
thread_manager.register_worker(
"character_cache_worker", self._character_worker
)
return self._character_worker
def get_airing_schedule_worker(self) -> AiringScheduleCacheWorker:
"""Get or create the airing schedule cache worker."""
if self._airing_schedule_worker is None or not self._airing_schedule_worker.is_running():
if (
self._airing_schedule_worker is None
or not self._airing_schedule_worker.is_running()
):
if self._airing_schedule_worker:
# Clean up old worker
thread_manager.shutdown_worker("airing_schedule_cache_worker")
self._airing_schedule_worker = AiringScheduleCacheWorker(self.info_cache_dir)
self._airing_schedule_worker = AiringScheduleCacheWorker(
self.info_cache_dir
)
self._airing_schedule_worker.start()
thread_manager.register_worker("airing_schedule_cache_worker", self._airing_schedule_worker)
thread_manager.register_worker(
"airing_schedule_cache_worker", self._airing_schedule_worker
)
return self._airing_schedule_worker

View File

@@ -1,6 +1,6 @@
import logging
from enum import Enum
from typing import Any, Dict, List, Optional
from typing import Any, List, Optional
from httpx import Client
@@ -230,7 +230,9 @@ class AniListApi(BaseApiClient):
)
return mapper.to_generic_recommendations(response.json())
def get_characters_of(self, params: MediaCharactersParams) -> Optional[CharacterSearchResult]:
def get_characters_of(
self, params: MediaCharactersParams
) -> Optional[CharacterSearchResult]:
variables = {"id": params.id, "type": "ANIME"}
response = execute_graphql(
ANILIST_ENDPOINT, self.http_client, gql.GET_MEDIA_CHARACTERS, variables

View File

@@ -400,7 +400,7 @@ def _to_generic_character_name(anilist_name: Optional[Dict]) -> CharacterName:
"""Maps an AniList character name object to a generic CharacterName."""
if not anilist_name:
return CharacterName()
return CharacterName(
first=anilist_name.get("first"),
middle=anilist_name.get("middle"),
@@ -410,11 +410,13 @@ def _to_generic_character_name(anilist_name: Optional[Dict]) -> CharacterName:
)
def _to_generic_character_image(anilist_image: Optional[Dict]) -> Optional[CharacterImage]:
def _to_generic_character_image(
anilist_image: Optional[Dict],
) -> Optional[CharacterImage]:
"""Maps an AniList character image object to a generic CharacterImage."""
if not anilist_image:
return None
return CharacterImage(
medium=anilist_image.get("medium"),
large=anilist_image.get("large"),
@@ -425,7 +427,7 @@ def _to_generic_character(anilist_character: Dict) -> Optional[Character]:
"""Maps an AniList character object to a generic Character."""
if not anilist_character:
return None
# Parse date of birth if available
date_of_birth = None
if dob := anilist_character.get("dateOfBirth"):
@@ -437,7 +439,7 @@ def _to_generic_character(anilist_character: Dict) -> Optional[Character]:
date_of_birth = datetime(year, month, day)
except (ValueError, TypeError):
pass
return Character(
id=anilist_character.get("id"),
name=_to_generic_character_name(anilist_character.get("name")),
@@ -460,12 +462,12 @@ def to_generic_characters_result(data: Dict) -> Optional[CharacterSearchResult]:
try:
page_data = data["data"]["Page"]["media"][0]
characters_data = page_data["characters"]["nodes"]
characters = []
for char_data in characters_data:
if character := _to_generic_character(char_data):
characters.append(character)
return CharacterSearchResult(
characters=characters,
page_info=None, # Characters don't typically have pagination
@@ -475,18 +477,20 @@ def to_generic_characters_result(data: Dict) -> Optional[CharacterSearchResult]:
return None
def _to_generic_airing_schedule_item(anilist_episode: Dict) -> Optional[AiringScheduleItem]:
def _to_generic_airing_schedule_item(
anilist_episode: Dict,
) -> Optional[AiringScheduleItem]:
"""Maps an AniList airing schedule episode to a generic AiringScheduleItem."""
if not anilist_episode:
return None
airing_at = None
if airing_timestamp := anilist_episode.get("airingAt"):
try:
airing_at = datetime.fromtimestamp(airing_timestamp)
except (ValueError, TypeError):
pass
return AiringScheduleItem(
episode=anilist_episode.get("episode", 0),
airing_at=airing_at,
@@ -503,12 +507,12 @@ def to_generic_airing_schedule_result(data: Dict) -> Optional[AiringScheduleResu
try:
page_data = data["data"]["Page"]["media"][0]
schedule_data = page_data["airingSchedule"]["nodes"]
schedule_items = []
for episode_data in schedule_data:
if item := _to_generic_airing_schedule_item(episode_data):
schedule_items.append(item)
return AiringScheduleResult(
schedule_items=schedule_items,
page_info=None, # Schedule doesn't typically have pagination

View File

@@ -14,7 +14,6 @@ from .params import (
)
from .types import (
AiringScheduleResult,
Character,
CharacterSearchResult,
MediaItem,
MediaReview,
@@ -73,7 +72,9 @@ class BaseApiClient(abc.ABC):
pass
@abc.abstractmethod
def get_characters_of(self, params: MediaCharactersParams) -> Optional[CharacterSearchResult]:
def get_characters_of(
self, params: MediaCharactersParams
) -> Optional[CharacterSearchResult]:
pass
@abc.abstractmethod

View File

@@ -1,7 +1,7 @@
from __future__ import annotations
import logging
from typing import TYPE_CHECKING, Dict, List, Optional
from typing import TYPE_CHECKING, List, Optional
from ..base import BaseApiClient
from ..params import (
@@ -138,9 +138,13 @@ class JikanApi(BaseApiClient):
logger.error(f"Failed to fetch recommendations for media {params.id}: {e}")
return None
def get_characters_of(self, params: MediaCharactersParams) -> Optional[CharacterSearchResult]:
def get_characters_of(
self, params: MediaCharactersParams
) -> Optional[CharacterSearchResult]:
"""Fetches characters for a given anime."""
logger.warning("Jikan API does not support fetching character data in the standardized format.")
logger.warning(
"Jikan API does not support fetching character data in the standardized format."
)
return None
def get_related_anime_for(