mirror of
https://github.com/Benexl/FastAnime.git
synced 2025-12-12 15:50:01 -08:00
feat: add character and airing schedule views, enhance media info display
This commit is contained in:
@@ -3,6 +3,8 @@ from typing import Callable, Dict
|
||||
from rich.console import Console
|
||||
|
||||
from .....libs.media_api.params import (
|
||||
MediaAiringScheduleParams,
|
||||
MediaCharactersParams,
|
||||
MediaRecommendationParams,
|
||||
MediaRelationsParams,
|
||||
UpdateUserMediaListEntryParams,
|
||||
@@ -34,6 +36,8 @@ def media_actions(ctx: Context, state: State) -> State | InternalDirective:
|
||||
f"{'📼 ' if icons else ''}Watch Trailer": _watch_trailer(ctx, state),
|
||||
f"{'🔗 ' if icons else ''}Recommendations": _view_recommendations(ctx, state),
|
||||
f"{'🔄 ' if icons else ''}Related Anime": _view_relations(ctx, state),
|
||||
f"{'👥 ' if icons else ''}Characters": _view_characters(ctx, state),
|
||||
f"{'📅 ' if icons else ''}Airing Schedule": _view_airing_schedule(ctx, state),
|
||||
f"{'➕ ' if icons else ''}Add/Update List": _manage_user_media_list(ctx, state),
|
||||
f"{'⭐ ' if icons else ''}Score Anime": _score_anime(ctx, state),
|
||||
f"{'ℹ️ ' if icons else ''}View Info": _view_info(ctx, state),
|
||||
@@ -157,26 +161,209 @@ def _view_info(ctx: Context, state: State) -> MenuAction:
|
||||
return InternalDirective.RELOAD
|
||||
|
||||
from rich import box
|
||||
from rich.columns import Columns
|
||||
from rich.panel import Panel
|
||||
from rich.table import Table
|
||||
from rich.text import Text
|
||||
import re
|
||||
|
||||
from ....utils import image
|
||||
|
||||
# TODO: make this look nicer plus add other fields
|
||||
console = Console()
|
||||
title = Text(
|
||||
media_item.title.english or media_item.title.romaji or "", style="bold cyan"
|
||||
)
|
||||
description = Text(media_item.description or "NO description")
|
||||
genres = Text(f"Genres: {', '.join([v.value for v in media_item.genres])}")
|
||||
|
||||
panel_content = f"{genres}\n\n{description}"
|
||||
|
||||
console.clear()
|
||||
|
||||
# Display cover image if available
|
||||
if cover_image := media_item.cover_image:
|
||||
image.render_image(cover_image.large)
|
||||
|
||||
console.print(Panel(panel_content, title=title, box=box.ROUNDED, expand=True))
|
||||
# Create main title
|
||||
main_title = media_item.title.english or media_item.title.romaji or "Unknown Title"
|
||||
title_text = Text(main_title, style="bold cyan")
|
||||
|
||||
# Create info table
|
||||
info_table = Table(show_header=False, box=box.SIMPLE, pad_edge=False)
|
||||
info_table.add_column("Field", style="bold yellow", min_width=15)
|
||||
info_table.add_column("Value", style="white")
|
||||
|
||||
# Add basic information
|
||||
info_table.add_row("English Title", media_item.title.english or "N/A")
|
||||
info_table.add_row("Romaji Title", media_item.title.romaji or "N/A")
|
||||
info_table.add_row("Native Title", media_item.title.native or "N/A")
|
||||
|
||||
if media_item.synonymns:
|
||||
synonyms = ", ".join(media_item.synonymns[:3]) # Show first 3 synonyms
|
||||
if len(media_item.synonymns) > 3:
|
||||
synonyms += f" (+{len(media_item.synonymns) - 3} more)"
|
||||
info_table.add_row("Synonyms", synonyms)
|
||||
|
||||
info_table.add_row("Type", media_item.type.value if media_item.type else "N/A")
|
||||
info_table.add_row("Format", media_item.format.value if media_item.format else "N/A")
|
||||
info_table.add_row("Status", media_item.status.value if media_item.status else "N/A")
|
||||
info_table.add_row("Episodes", str(media_item.episodes) if media_item.episodes else "Unknown")
|
||||
info_table.add_row("Duration", f"{media_item.duration} min" if media_item.duration else "Unknown")
|
||||
|
||||
# Add dates
|
||||
if media_item.start_date:
|
||||
start_date = media_item.start_date.strftime("%Y-%m-%d")
|
||||
info_table.add_row("Start Date", start_date)
|
||||
if media_item.end_date:
|
||||
end_date = media_item.end_date.strftime("%Y-%m-%d")
|
||||
info_table.add_row("End Date", end_date)
|
||||
|
||||
# Add scores and popularity
|
||||
if media_item.average_score:
|
||||
info_table.add_row("Average Score", f"{media_item.average_score}/100")
|
||||
if media_item.popularity:
|
||||
info_table.add_row("Popularity", f"#{media_item.popularity:,}")
|
||||
if media_item.favourites:
|
||||
info_table.add_row("Favorites", f"{media_item.favourites:,}")
|
||||
|
||||
# Add MAL ID if available
|
||||
if media_item.id_mal:
|
||||
info_table.add_row("MyAnimeList ID", str(media_item.id_mal))
|
||||
|
||||
# Create genres panel
|
||||
if media_item.genres:
|
||||
genres_text = ", ".join([genre.value for genre in media_item.genres])
|
||||
genres_panel = Panel(
|
||||
Text(genres_text, style="green"),
|
||||
title="[bold]Genres[/bold]",
|
||||
border_style="green",
|
||||
box=box.ROUNDED
|
||||
)
|
||||
else:
|
||||
genres_panel = Panel(
|
||||
Text("No genres available", style="dim"),
|
||||
title="[bold]Genres[/bold]",
|
||||
border_style="green",
|
||||
box=box.ROUNDED
|
||||
)
|
||||
|
||||
# Create tags panel (show top tags)
|
||||
if media_item.tags:
|
||||
top_tags = sorted(media_item.tags, key=lambda x: x.rank or 0, reverse=True)[:10]
|
||||
tags_text = ", ".join([tag.name.value for tag in top_tags])
|
||||
tags_panel = Panel(
|
||||
Text(tags_text, style="yellow"),
|
||||
title="[bold]Tags[/bold]",
|
||||
border_style="yellow",
|
||||
box=box.ROUNDED
|
||||
)
|
||||
else:
|
||||
tags_panel = Panel(
|
||||
Text("No tags available", style="dim"),
|
||||
title="[bold]Tags[/bold]",
|
||||
border_style="yellow",
|
||||
box=box.ROUNDED
|
||||
)
|
||||
|
||||
# Create studios panel
|
||||
if media_item.studios:
|
||||
studios_text = ", ".join([studio.name for studio in media_item.studios if studio.name])
|
||||
studios_panel = Panel(
|
||||
Text(studios_text, style="blue"),
|
||||
title="[bold]Studios[/bold]",
|
||||
border_style="blue",
|
||||
box=box.ROUNDED
|
||||
)
|
||||
else:
|
||||
studios_panel = Panel(
|
||||
Text("No studio information", style="dim"),
|
||||
title="[bold]Studios[/bold]",
|
||||
border_style="blue",
|
||||
box=box.ROUNDED
|
||||
)
|
||||
|
||||
# Create description panel
|
||||
description = media_item.description or "No description available"
|
||||
# Clean HTML tags from description
|
||||
clean_description = re.sub(r'<[^>]+>', '', description)
|
||||
# Replace common HTML entities
|
||||
clean_description = clean_description.replace('"', '"').replace('&', '&').replace('<', '<').replace('>', '>')
|
||||
|
||||
description_panel = Panel(
|
||||
Text(clean_description, style="white"),
|
||||
title="[bold]Description[/bold]",
|
||||
border_style="cyan",
|
||||
box=box.ROUNDED
|
||||
)
|
||||
|
||||
# Create user status panel if available
|
||||
if media_item.user_status:
|
||||
user_info_table = Table(show_header=False, box=box.SIMPLE)
|
||||
user_info_table.add_column("Field", style="bold magenta")
|
||||
user_info_table.add_column("Value", style="white")
|
||||
|
||||
if media_item.user_status.status:
|
||||
user_info_table.add_row("Status", media_item.user_status.status.value.title())
|
||||
if media_item.user_status.progress is not None:
|
||||
progress = f"{media_item.user_status.progress}/{media_item.episodes or '?'}"
|
||||
user_info_table.add_row("Progress", progress)
|
||||
if media_item.user_status.score:
|
||||
user_info_table.add_row("Your Score", f"{media_item.user_status.score}/10")
|
||||
if media_item.user_status.repeat:
|
||||
user_info_table.add_row("Rewatched", f"{media_item.user_status.repeat} times")
|
||||
|
||||
user_panel = Panel(
|
||||
user_info_table,
|
||||
title="[bold]Your List Status[/bold]",
|
||||
border_style="magenta",
|
||||
box=box.ROUNDED
|
||||
)
|
||||
else:
|
||||
user_panel = None
|
||||
|
||||
# Create next airing panel if available
|
||||
if media_item.next_airing:
|
||||
from datetime import datetime
|
||||
airing_info_table = Table(show_header=False, box=box.SIMPLE)
|
||||
airing_info_table.add_column("Field", style="bold red")
|
||||
airing_info_table.add_column("Value", style="white")
|
||||
|
||||
airing_info_table.add_row("Next Episode", str(media_item.next_airing.episode))
|
||||
|
||||
if media_item.next_airing.airing_at:
|
||||
air_date = media_item.next_airing.airing_at.strftime("%Y-%m-%d %H:%M")
|
||||
airing_info_table.add_row("Air Date", air_date)
|
||||
|
||||
airing_panel = Panel(
|
||||
airing_info_table,
|
||||
title="[bold]Next Airing[/bold]",
|
||||
border_style="red",
|
||||
box=box.ROUNDED
|
||||
)
|
||||
else:
|
||||
airing_panel = None
|
||||
|
||||
# Create main info panel
|
||||
info_panel = Panel(
|
||||
info_table,
|
||||
title="[bold]Basic Information[/bold]",
|
||||
border_style="cyan",
|
||||
box=box.ROUNDED
|
||||
)
|
||||
|
||||
# Display everything
|
||||
console.print(Panel(title_text, box=box.DOUBLE, border_style="bright_cyan"))
|
||||
console.print()
|
||||
|
||||
# Create columns for better layout
|
||||
panels_row1 = [info_panel, genres_panel]
|
||||
if user_panel:
|
||||
panels_row1.append(user_panel)
|
||||
|
||||
console.print(Columns(panels_row1, equal=True, expand=True))
|
||||
console.print()
|
||||
|
||||
panels_row2 = [tags_panel, studios_panel]
|
||||
if airing_panel:
|
||||
panels_row2.append(airing_panel)
|
||||
|
||||
console.print(Columns(panels_row2, equal=True, expand=True))
|
||||
console.print()
|
||||
|
||||
console.print(description_panel)
|
||||
|
||||
ctx.selector.ask("Press Enter to continue...")
|
||||
return InternalDirective.RELOAD
|
||||
|
||||
@@ -194,7 +381,7 @@ def _view_recommendations(ctx: Context, state: State) -> MenuAction:
|
||||
|
||||
loading_message = "Fetching recommendations..."
|
||||
recommendations = None
|
||||
|
||||
|
||||
with feedback.progress(loading_message):
|
||||
recommendations = ctx.media_api.get_recommendation_for(
|
||||
MediaRecommendationParams(id=media_item.id, page=1)
|
||||
@@ -202,21 +389,22 @@ def _view_recommendations(ctx: Context, state: State) -> MenuAction:
|
||||
|
||||
if not recommendations:
|
||||
feedback.warning(
|
||||
"No recommendations found",
|
||||
"This anime doesn't have any recommendations available"
|
||||
"No recommendations found",
|
||||
"This anime doesn't have any recommendations available",
|
||||
)
|
||||
return InternalDirective.RELOAD
|
||||
|
||||
# Convert list of MediaItem to search result format
|
||||
search_result = {item.id: item for item in recommendations}
|
||||
|
||||
|
||||
# Create a fake page info since recommendations don't have pagination
|
||||
from .....libs.media_api.types import PageInfo
|
||||
|
||||
page_info = PageInfo(
|
||||
total=len(recommendations),
|
||||
current_page=1,
|
||||
has_next_page=False,
|
||||
per_page=len(recommendations)
|
||||
per_page=len(recommendations),
|
||||
)
|
||||
|
||||
return State(
|
||||
@@ -242,7 +430,7 @@ def _view_relations(ctx: Context, state: State) -> MenuAction:
|
||||
|
||||
loading_message = "Fetching related anime..."
|
||||
relations = None
|
||||
|
||||
|
||||
with feedback.progress(loading_message):
|
||||
relations = ctx.media_api.get_related_anime_for(
|
||||
MediaRelationsParams(id=media_item.id)
|
||||
@@ -250,21 +438,22 @@ def _view_relations(ctx: Context, state: State) -> MenuAction:
|
||||
|
||||
if not relations:
|
||||
feedback.warning(
|
||||
"No related anime found",
|
||||
"This anime doesn't have any related anime available"
|
||||
"No related anime found",
|
||||
"This anime doesn't have any related anime available",
|
||||
)
|
||||
return InternalDirective.RELOAD
|
||||
|
||||
# Convert list of MediaItem to search result format
|
||||
search_result = {item.id: item for item in relations}
|
||||
|
||||
|
||||
# Create a fake page info since relations don't have pagination
|
||||
from .....libs.media_api.types import PageInfo
|
||||
|
||||
page_info = PageInfo(
|
||||
total=len(relations),
|
||||
current_page=1,
|
||||
has_next_page=False,
|
||||
per_page=len(relations)
|
||||
per_page=len(relations),
|
||||
)
|
||||
|
||||
return State(
|
||||
@@ -277,3 +466,187 @@ def _view_relations(ctx: Context, state: State) -> MenuAction:
|
||||
)
|
||||
|
||||
return action
|
||||
|
||||
|
||||
def _view_characters(ctx: Context, state: State) -> MenuAction:
|
||||
def action():
|
||||
feedback = ctx.service.feedback
|
||||
media_item = state.media_api.media_item
|
||||
|
||||
if not media_item:
|
||||
feedback.error("Media item is not in state")
|
||||
return InternalDirective.RELOAD
|
||||
|
||||
loading_message = "Fetching characters..."
|
||||
characters_data = None
|
||||
|
||||
with feedback.progress(loading_message):
|
||||
characters_data = ctx.media_api.get_characters_of(
|
||||
MediaCharactersParams(id=media_item.id)
|
||||
)
|
||||
|
||||
if not characters_data or not characters_data.get("data"):
|
||||
feedback.warning(
|
||||
"No character information found",
|
||||
"This anime doesn't have character data available"
|
||||
)
|
||||
return InternalDirective.RELOAD
|
||||
|
||||
try:
|
||||
# Extract characters from the nested response structure
|
||||
page_data = characters_data["data"]["Page"]["media"][0]
|
||||
characters = page_data["characters"]["nodes"]
|
||||
|
||||
if not characters:
|
||||
feedback.warning("No characters found for this anime")
|
||||
return InternalDirective.RELOAD
|
||||
|
||||
# Display characters using rich
|
||||
from rich.console import Console
|
||||
from rich.table import Table
|
||||
from rich.panel import Panel
|
||||
from rich.text import Text
|
||||
from datetime import datetime
|
||||
|
||||
console = Console()
|
||||
console.clear()
|
||||
|
||||
# Create title
|
||||
anime_title = media_item.title.english or media_item.title.romaji
|
||||
title = Text(f"Characters in {anime_title}", style="bold cyan")
|
||||
|
||||
# Create table for characters
|
||||
table = Table(show_header=True, header_style="bold magenta")
|
||||
table.add_column("Name", style="cyan", no_wrap=True)
|
||||
table.add_column("Gender", style="green")
|
||||
table.add_column("Age", style="yellow")
|
||||
table.add_column("Favorites", style="red")
|
||||
table.add_column("Description", style="dim", max_width=50)
|
||||
|
||||
for char in characters[:20]: # Show first 20 characters
|
||||
name = char["name"]["full"] or char["name"]["first"] or "Unknown"
|
||||
gender = char.get("gender") or "Unknown"
|
||||
age = str(char.get("age") or "Unknown")
|
||||
favorites = str(char.get("favourites") or "0")
|
||||
|
||||
# Clean up description (remove HTML tags and truncate)
|
||||
description = char.get("description") or "No description"
|
||||
if description:
|
||||
import re
|
||||
description = re.sub(r'<[^>]+>', '', description) # Remove HTML tags
|
||||
if len(description) > 100:
|
||||
description = description[:97] + "..."
|
||||
|
||||
table.add_row(name, gender, age, favorites, description)
|
||||
|
||||
# Display in a panel
|
||||
panel = Panel(table, title=title, border_style="blue")
|
||||
console.print(panel)
|
||||
|
||||
ctx.selector.ask("Press Enter to continue...")
|
||||
|
||||
except (KeyError, IndexError, TypeError) as e:
|
||||
feedback.error(f"Error displaying characters: {e}")
|
||||
|
||||
return InternalDirective.RELOAD
|
||||
|
||||
return action
|
||||
|
||||
|
||||
def _view_airing_schedule(ctx: Context, state: State) -> MenuAction:
|
||||
def action():
|
||||
feedback = ctx.service.feedback
|
||||
media_item = state.media_api.media_item
|
||||
|
||||
if not media_item:
|
||||
feedback.error("Media item is not in state")
|
||||
return InternalDirective.RELOAD
|
||||
|
||||
loading_message = "Fetching airing schedule..."
|
||||
schedule_data = None
|
||||
|
||||
with feedback.progress(loading_message):
|
||||
schedule_data = ctx.media_api.get_airing_schedule_for(
|
||||
MediaAiringScheduleParams(id=media_item.id)
|
||||
)
|
||||
|
||||
if not schedule_data or not schedule_data.get("data"):
|
||||
feedback.warning(
|
||||
"No airing schedule found",
|
||||
"This anime doesn't have upcoming episodes or airing data"
|
||||
)
|
||||
return InternalDirective.RELOAD
|
||||
|
||||
try:
|
||||
# Extract schedule from the nested response structure
|
||||
page_data = schedule_data["data"]["Page"]["media"][0]
|
||||
schedule_nodes = page_data["airingSchedule"]["nodes"]
|
||||
|
||||
if not schedule_nodes:
|
||||
feedback.info(
|
||||
"No upcoming episodes",
|
||||
"This anime has no scheduled upcoming episodes"
|
||||
)
|
||||
return InternalDirective.RELOAD
|
||||
|
||||
# Display schedule using rich
|
||||
from rich.console import Console
|
||||
from rich.table import Table
|
||||
from rich.panel import Panel
|
||||
from rich.text import Text
|
||||
from datetime import datetime
|
||||
|
||||
console = Console()
|
||||
console.clear()
|
||||
|
||||
# Create title
|
||||
anime_title = media_item.title.english or media_item.title.romaji
|
||||
title = Text(f"Airing Schedule for {anime_title}", style="bold cyan")
|
||||
|
||||
# Create table for episodes
|
||||
table = Table(show_header=True, header_style="bold magenta")
|
||||
table.add_column("Episode", style="cyan", justify="center")
|
||||
table.add_column("Air Date", style="green")
|
||||
table.add_column("Time Until Airing", style="yellow")
|
||||
|
||||
for episode in schedule_nodes[:10]: # Show next 10 episodes
|
||||
ep_num = str(episode.get("episode", "?"))
|
||||
|
||||
# Format air date
|
||||
airing_at = episode.get("airingAt")
|
||||
if airing_at:
|
||||
air_date = datetime.fromtimestamp(airing_at)
|
||||
formatted_date = air_date.strftime("%Y-%m-%d %H:%M")
|
||||
else:
|
||||
formatted_date = "Unknown"
|
||||
|
||||
# Format time until airing
|
||||
time_until = episode.get("timeUntilAiring")
|
||||
if time_until:
|
||||
days = time_until // 86400
|
||||
hours = (time_until % 86400) // 3600
|
||||
minutes = (time_until % 3600) // 60
|
||||
|
||||
if days > 0:
|
||||
time_str = f"{days}d {hours}h {minutes}m"
|
||||
elif hours > 0:
|
||||
time_str = f"{hours}h {minutes}m"
|
||||
else:
|
||||
time_str = f"{minutes}m"
|
||||
else:
|
||||
time_str = "Unknown"
|
||||
|
||||
table.add_row(ep_num, formatted_date, time_str)
|
||||
|
||||
# Display in a panel
|
||||
panel = Panel(table, title=title, border_style="blue")
|
||||
console.print(panel)
|
||||
|
||||
ctx.selector.ask("Press Enter to continue...")
|
||||
|
||||
except (KeyError, IndexError, TypeError) as e:
|
||||
feedback.error(f"Error displaying airing schedule: {e}")
|
||||
|
||||
return InternalDirective.RELOAD
|
||||
|
||||
return action
|
||||
|
||||
@@ -56,7 +56,12 @@ def results(ctx: Context, state: State) -> State | InternalDirective:
|
||||
}
|
||||
)
|
||||
choices.update(
|
||||
{"Back": lambda: InternalDirective.MAIN, "Exit": lambda: InternalDirective.EXIT}
|
||||
{
|
||||
"Back": lambda: InternalDirective.BACK
|
||||
if page_info and page_info.current_page == 1
|
||||
else InternalDirective.MAIN,
|
||||
"Exit": lambda: InternalDirective.EXIT,
|
||||
}
|
||||
)
|
||||
|
||||
choice = ctx.selector.choose(
|
||||
|
||||
@@ -180,7 +180,7 @@ class Session:
|
||||
|
||||
return decorator
|
||||
|
||||
def load_menus_from_folder(self, package:str):
|
||||
def load_menus_from_folder(self, package: str):
|
||||
package_path = MENUS_DIR / package
|
||||
package_name = package_path.name
|
||||
logger.debug(f"Loading menus from '{package_path}'...")
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import re
|
||||
from datetime import datetime
|
||||
from typing import List, Optional,Dict,Union
|
||||
from typing import List, Optional, Dict, Union
|
||||
|
||||
from ...libs.media_api.types import AiringSchedule
|
||||
|
||||
@@ -182,6 +182,7 @@ def shell_safe(text: Optional[str]) -> str:
|
||||
return ""
|
||||
return text.replace("`", "\\`").replace('"', '\\"').replace("$", "\\$")
|
||||
|
||||
|
||||
def extract_episode_number(title: str) -> Optional[float]:
|
||||
"""
|
||||
Extracts the episode number (supports floats) from a title like:
|
||||
|
||||
@@ -208,11 +208,13 @@ class AniListApi(BaseApiClient):
|
||||
else False
|
||||
)
|
||||
|
||||
def get_recommendation_for(self, params: MediaRecommendationParams) -> Optional[List[MediaItem]]:
|
||||
def get_recommendation_for(
|
||||
self, params: MediaRecommendationParams
|
||||
) -> Optional[List[MediaItem]]:
|
||||
variables = {
|
||||
"id": params.id,
|
||||
"id": params.id,
|
||||
"page": params.page,
|
||||
"per_page": params.per_page or self.config.per_page
|
||||
"per_page": params.per_page or self.config.per_page,
|
||||
}
|
||||
response = execute_graphql(
|
||||
ANILIST_ENDPOINT, self.http_client, gql.GET_MEDIA_RECOMMENDATIONS, variables
|
||||
@@ -221,10 +223,11 @@ class AniListApi(BaseApiClient):
|
||||
try:
|
||||
return mapper.to_generic_recommendations(response.json())
|
||||
except Exception as e:
|
||||
logger.error(f"Error mapping recommendations for media {params.id}: {e}")
|
||||
logger.error(
|
||||
f"Error mapping recommendations for media {params.id}: {e}"
|
||||
)
|
||||
return None
|
||||
return None
|
||||
|
||||
|
||||
def get_characters_of(self, params: MediaCharactersParams) -> Optional[Dict]:
|
||||
variables = {"id": params.id, "type": "ANIME"}
|
||||
@@ -233,7 +236,9 @@ class AniListApi(BaseApiClient):
|
||||
)
|
||||
return response.json() if response else None
|
||||
|
||||
def get_related_anime_for(self, params: MediaRelationsParams) -> Optional[List[MediaItem]]:
|
||||
def get_related_anime_for(
|
||||
self, params: MediaRelationsParams
|
||||
) -> Optional[List[MediaItem]]:
|
||||
variables = {"id": params.id}
|
||||
response = execute_graphql(
|
||||
ANILIST_ENDPOINT, self.http_client, gql.GET_MEDIA_RELATIONS, variables
|
||||
@@ -246,7 +251,9 @@ class AniListApi(BaseApiClient):
|
||||
return None
|
||||
return None
|
||||
|
||||
def get_airing_schedule_for(self, params: MediaAiringScheduleParams) -> Optional[Dict]:
|
||||
def get_airing_schedule_for(
|
||||
self, params: MediaAiringScheduleParams
|
||||
) -> Optional[Dict]:
|
||||
variables = {"id": params.id, "type": "ANIME"}
|
||||
response = execute_graphql(
|
||||
ANILIST_ENDPOINT, self.http_client, gql.GET_AIRING_SCHEDULE, variables
|
||||
|
||||
@@ -321,15 +321,15 @@ def to_generic_recommendations(data: dict) -> Optional[List[MediaItem]]:
|
||||
"""Maps the 'recommendations' part of an API response."""
|
||||
if not data or not data.get("data"):
|
||||
return None
|
||||
|
||||
|
||||
page_data = data.get("data", {}).get("Page", {})
|
||||
if not page_data:
|
||||
return None
|
||||
|
||||
|
||||
recommendations = page_data.get("recommendations", [])
|
||||
if not recommendations:
|
||||
return None
|
||||
|
||||
|
||||
result = []
|
||||
for rec in recommendations:
|
||||
if rec and rec.get("media"):
|
||||
@@ -339,5 +339,5 @@ def to_generic_recommendations(data: dict) -> Optional[List[MediaItem]]:
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to map recommendation media item: {e}")
|
||||
continue
|
||||
|
||||
|
||||
return result if result else None
|
||||
|
||||
@@ -58,7 +58,9 @@ class BaseApiClient(abc.ABC):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_recommendation_for(self, params: MediaRecommendationParams) -> Optional[List[MediaItem]]:
|
||||
def get_recommendation_for(
|
||||
self, params: MediaRecommendationParams
|
||||
) -> Optional[List[MediaItem]]:
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
@@ -66,9 +68,13 @@ class BaseApiClient(abc.ABC):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_related_anime_for(self, params: MediaRelationsParams) -> Optional[List[MediaItem]]:
|
||||
def get_related_anime_for(
|
||||
self, params: MediaRelationsParams
|
||||
) -> Optional[List[MediaItem]]:
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_airing_schedule_for(self, params: MediaAiringScheduleParams) -> Optional[Dict]:
|
||||
def get_airing_schedule_for(
|
||||
self, params: MediaAiringScheduleParams
|
||||
) -> Optional[Dict]:
|
||||
pass
|
||||
|
||||
@@ -13,7 +13,6 @@ from ..types import MediaSearchResult, UserProfile
|
||||
from . import mapper
|
||||
|
||||
if TYPE_CHECKING:
|
||||
|
||||
pass
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -1,14 +1,16 @@
|
||||
from ..base import BaseApiClient
|
||||
import logging
|
||||
|
||||
logger=logging.getLogger(__name__)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def test_media_api(api_client: BaseApiClient):
|
||||
"""
|
||||
Test all abstract methods of the media API with user feedback.
|
||||
|
||||
|
||||
This function provides an interactive test suite that validates all the core
|
||||
functionality of the media API, similar to test_anime_provider for anime providers.
|
||||
|
||||
|
||||
Tests performed:
|
||||
1. Authentication status and user profile retrieval
|
||||
2. Media search functionality
|
||||
@@ -18,14 +20,14 @@ def test_media_api(api_client: BaseApiClient):
|
||||
6. Airing schedule information
|
||||
7. User media list operations (if authenticated)
|
||||
8. List entry management (add/remove from user list)
|
||||
|
||||
|
||||
Args:
|
||||
api_client: An instance of AniListApi to test
|
||||
|
||||
|
||||
Usage:
|
||||
Run this module directly: python -m fastanime.libs.media_api.anilist.api
|
||||
Or import and call: test_media_api(AniListApi(config, client))
|
||||
"""
|
||||
"""
|
||||
from ....core.constants import APP_ASCII_ART
|
||||
from ..params import (
|
||||
MediaAiringScheduleParams,
|
||||
@@ -58,24 +60,31 @@ def test_media_api(api_client: BaseApiClient):
|
||||
print("2. Testing Media Search...")
|
||||
query = input("What anime would you like to search for: ")
|
||||
search_results = api_client.search_media(MediaSearchParams(query=query, per_page=5))
|
||||
|
||||
|
||||
if not search_results or not search_results.media:
|
||||
print(" No search results found")
|
||||
return
|
||||
|
||||
|
||||
print(f" Found {len(search_results.media)} results:")
|
||||
for i, result in enumerate(search_results.media):
|
||||
title = result.title.english or result.title.romaji
|
||||
print(f" {i + 1}: {title} ({result.episodes or '?'} episodes)")
|
||||
|
||||
|
||||
# Select an anime for further testing
|
||||
try:
|
||||
choice = int(input(f"\nSelect anime for detailed testing (1-{len(search_results.media)}): ")) - 1
|
||||
choice = (
|
||||
int(
|
||||
input(
|
||||
f"\nSelect anime for detailed testing (1-{len(search_results.media)}): "
|
||||
)
|
||||
)
|
||||
- 1
|
||||
)
|
||||
selected_anime = search_results.media[choice]
|
||||
except (ValueError, IndexError):
|
||||
print("Invalid selection")
|
||||
return
|
||||
|
||||
|
||||
print(f"\nSelected: {selected_anime.title.english or selected_anime.title.romaji}")
|
||||
print()
|
||||
|
||||
@@ -143,7 +152,9 @@ def test_media_api(api_client: BaseApiClient):
|
||||
MediaAiringScheduleParams(id=selected_anime.id)
|
||||
)
|
||||
if schedule and schedule.get("data"):
|
||||
schedule_data = schedule["data"]["Page"]["media"][0]["airingSchedule"]["nodes"]
|
||||
schedule_data = schedule["data"]["Page"]["media"][0]["airingSchedule"][
|
||||
"nodes"
|
||||
]
|
||||
if schedule_data:
|
||||
print(f" Found {len(schedule_data)} upcoming episodes:")
|
||||
for ep in schedule_data[:3]: # Show first 3
|
||||
@@ -162,9 +173,7 @@ def test_media_api(api_client: BaseApiClient):
|
||||
try:
|
||||
user_list = api_client.search_media_list(
|
||||
UserMediaListSearchParams(
|
||||
status=UserMediaListStatus.WATCHING,
|
||||
page=1,
|
||||
per_page=3
|
||||
status=UserMediaListStatus.WATCHING, page=1, per_page=3
|
||||
)
|
||||
)
|
||||
if user_list and user_list.media:
|
||||
@@ -172,7 +181,9 @@ def test_media_api(api_client: BaseApiClient):
|
||||
for anime in user_list.media:
|
||||
title = anime.title.english or anime.title.romaji
|
||||
progress = anime.user_status.progress if anime.user_status else 0
|
||||
print(f" - {title} (Progress: {progress}/{anime.episodes or '?'})")
|
||||
print(
|
||||
f" - {title} (Progress: {progress}/{anime.episodes or '?'})"
|
||||
)
|
||||
else:
|
||||
print(" No anime in watching list")
|
||||
except Exception as e:
|
||||
@@ -181,21 +192,24 @@ def test_media_api(api_client: BaseApiClient):
|
||||
|
||||
# Test 8: Update List Entry
|
||||
print("8. Testing List Entry Management...")
|
||||
update_test = input("Would you like to test adding the selected anime to your list? (y/n): ")
|
||||
if update_test.lower() == 'y':
|
||||
update_test = input(
|
||||
"Would you like to test adding the selected anime to your list? (y/n): "
|
||||
)
|
||||
if update_test.lower() == "y":
|
||||
try:
|
||||
success = api_client.update_list_entry(
|
||||
UpdateUserMediaListEntryParams(
|
||||
media_id=selected_anime.id,
|
||||
status=UserMediaListStatus.PLANNING
|
||||
media_id=selected_anime.id, status=UserMediaListStatus.PLANNING
|
||||
)
|
||||
)
|
||||
if success:
|
||||
print(" ✓ Successfully added to planning list")
|
||||
|
||||
|
||||
# Test delete
|
||||
delete_test = input(" Would you like to remove it from your list? (y/n): ")
|
||||
if delete_test.lower() == 'y':
|
||||
delete_test = input(
|
||||
" Would you like to remove it from your list? (y/n): "
|
||||
)
|
||||
if delete_test.lower() == "y":
|
||||
delete_success = api_client.delete_list_entry(selected_anime.id)
|
||||
if delete_success:
|
||||
print(" ✓ Successfully removed from list")
|
||||
@@ -211,4 +225,3 @@ def test_media_api(api_client: BaseApiClient):
|
||||
|
||||
print("=== Test Suite Complete ===")
|
||||
print("All basic API methods have been tested!")
|
||||
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
|
||||
from ...types import EpisodeStream, Server
|
||||
from ..constants import MP4_SERVER_JUICY_STREAM_REGEX
|
||||
from ..utils import logger
|
||||
|
||||
Reference in New Issue
Block a user