feat: enhance error handling in media recommendations and relations mapping

This commit is contained in:
Benexl
2025-07-24 19:49:58 +03:00
parent 4f401aa91c
commit 8e9aeb660f
3 changed files with 258 additions and 258 deletions

View File

@@ -217,7 +217,14 @@ class AniListApi(BaseApiClient):
response = execute_graphql(
ANILIST_ENDPOINT, self.http_client, gql.GET_MEDIA_RECOMMENDATIONS, variables
)
return mapper.to_generic_recommendations(response.json()) if response else None
if response and response.json():
try:
return mapper.to_generic_recommendations(response.json())
except Exception as e:
logger.error(f"Error mapping recommendations for media {params.id}: {e}")
return None
return None
def get_characters_of(self, params: MediaCharactersParams) -> Optional[Dict]:
variables = {"id": params.id, "type": "ANIME"}
@@ -231,7 +238,13 @@ class AniListApi(BaseApiClient):
response = execute_graphql(
ANILIST_ENDPOINT, self.http_client, gql.GET_MEDIA_RELATIONS, variables
)
return mapper.to_generic_relations(response.json()) if response else None
if response and response.json():
try:
return mapper.to_generic_relations(response.json())
except Exception as e:
logger.error(f"Error mapping relations: {e}")
return None
return None
def get_airing_schedule_for(self, params: MediaAiringScheduleParams) -> Optional[Dict]:
variables = {"id": params.id, "type": "ANIME"}
@@ -241,222 +254,11 @@ class AniListApi(BaseApiClient):
return response.json() if response else None
def test_media_api(api_client: "AniListApi"):
"""
Test all abstract methods of the media API with user feedback.
This function provides an interactive test suite that validates all the core
functionality of the media API, similar to test_anime_provider for anime providers.
Tests performed:
1. Authentication status and user profile retrieval
2. Media search functionality
3. Anime recommendations fetching
4. Related anime retrieval
5. Character information fetching
6. Airing schedule information
7. User media list operations (if authenticated)
8. List entry management (add/remove from user list)
Args:
api_client: An instance of AniListApi to test
Usage:
Run this module directly: python -m fastanime.libs.media_api.anilist.api
Or import and call: test_media_api(AniListApi(config, client))
"""
from httpx import Client
from ....core.config import AnilistConfig
from ....core.constants import APP_ASCII_ART
from ..params import (
MediaAiringScheduleParams,
MediaCharactersParams,
MediaRecommendationParams,
MediaRelationsParams,
MediaSearchParams,
UpdateUserMediaListEntryParams,
UserMediaListSearchParams,
)
from ..types import UserMediaListStatus
print(APP_ASCII_ART)
print("=== Media API Test Suite ===\n")
# Test 1: Authentication
print("1. Testing Authentication...")
print(f"Authenticated: {api_client.is_authenticated()}")
if api_client.is_authenticated():
profile = api_client.get_viewer_profile()
if profile:
print(f" User: {profile.name} (ID: {profile.id})")
else:
print(" Failed to get user profile")
else:
print(" Not authenticated - some features will be limited")
print()
# Test 2: Media Search
print("2. Testing Media Search...")
query = input("What anime would you like to search for: ")
search_results = api_client.search_media(MediaSearchParams(query=query, per_page=5))
if not search_results or not search_results.media:
print(" No search results found")
return
print(f" Found {len(search_results.media)} results:")
for i, result in enumerate(search_results.media):
title = result.title.english or result.title.romaji
print(f" {i + 1}: {title} ({result.episodes or '?'} episodes)")
# Select an anime for further testing
try:
choice = int(input(f"\nSelect anime for detailed testing (1-{len(search_results.media)}): ")) - 1
selected_anime = search_results.media[choice]
except (ValueError, IndexError):
print("Invalid selection")
return
print(f"\nSelected: {selected_anime.title.english or selected_anime.title.romaji}")
print()
# Test 3: Get Recommendations
print("3. Testing Recommendations...")
try:
recommendations = api_client.get_recommendation_for(
MediaRecommendationParams(id=selected_anime.id, page=1, per_page=3)
)
if recommendations:
print(f" Found {len(recommendations)} recommendations:")
for rec in recommendations[:3]: # Show first 3
title = rec.title.english or rec.title.romaji
print(f" - {title}")
else:
print(" No recommendations found")
except Exception as e:
print(f" Error: {e}")
print()
# Test 4: Get Related Anime
print("4. Testing Related Anime...")
try:
relations = api_client.get_related_anime_for(
MediaRelationsParams(id=selected_anime.id)
)
if relations:
print(f" Found {len(relations)} related anime:")
for rel in relations[:3]: # Show first 3
title = rel.title.english or rel.title.romaji
print(f" - {title}")
else:
print(" No related anime found")
except Exception as e:
print(f" Error: {e}")
print()
# Test 5: Get Characters
print("5. Testing Character Information...")
try:
characters = api_client.get_characters_of(
MediaCharactersParams(id=selected_anime.id)
)
if characters and characters.get("data"):
char_data = characters["data"]["Page"]["media"][0]["characters"]["nodes"]
if char_data:
print(f" Found {len(char_data)} characters:")
for char in char_data[:3]: # Show first 3
name = char["name"]["full"] or char["name"]["first"]
print(f" - {name}")
else:
print(" No character data found")
else:
print(" No characters found")
except Exception as e:
print(f" Error: {e}")
print()
# Test 6: Get Airing Schedule
print("6. Testing Airing Schedule...")
try:
schedule = api_client.get_airing_schedule_for(
MediaAiringScheduleParams(id=selected_anime.id)
)
if schedule and schedule.get("data"):
schedule_data = schedule["data"]["Page"]["media"][0]["airingSchedule"]["nodes"]
if schedule_data:
print(f" Found {len(schedule_data)} upcoming episodes:")
for ep in schedule_data[:3]: # Show first 3
print(f" - Episode {ep['episode']}")
else:
print(" No upcoming episodes")
else:
print(" No airing schedule found")
except Exception as e:
print(f" Error: {e}")
print()
# Test 7: User Media List (if authenticated)
if api_client.is_authenticated():
print("7. Testing User Media List...")
try:
user_list = api_client.search_media_list(
UserMediaListSearchParams(
status=UserMediaListStatus.WATCHING,
page=1,
per_page=3
)
)
if user_list and user_list.media:
print(f" Found {len(user_list.media)} watching anime:")
for anime in user_list.media:
title = anime.title.english or anime.title.romaji
progress = anime.user_status.progress if anime.user_status else 0
print(f" - {title} (Progress: {progress}/{anime.episodes or '?'})")
else:
print(" No anime in watching list")
except Exception as e:
print(f" Error: {e}")
print()
# Test 8: Update List Entry
print("8. Testing List Entry Management...")
update_test = input("Would you like to test adding the selected anime to your list? (y/n): ")
if update_test.lower() == 'y':
try:
success = api_client.update_list_entry(
UpdateUserMediaListEntryParams(
media_id=selected_anime.id,
status=UserMediaListStatus.PLANNING
)
)
if success:
print(" ✓ Successfully added to planning list")
# Test delete
delete_test = input(" Would you like to remove it from your list? (y/n): ")
if delete_test.lower() == 'y':
delete_success = api_client.delete_list_entry(selected_anime.id)
if delete_success:
print(" ✓ Successfully removed from list")
else:
print(" ✗ Failed to remove from list")
else:
print(" ✗ Failed to add to list")
except Exception as e:
print(f" Error: {e}")
print()
else:
print("7-8. Skipping user list tests (not authenticated)\n")
print("=== Test Suite Complete ===")
print("All basic API methods have been tested!")
if __name__ == "__main__":
from httpx import Client
from ....core.config import AnilistConfig
from ..utils.debug import test_media_api
anilist = AniListApi(AnilistConfig(), Client())
test_media_api(anilist)

View File

@@ -126,6 +126,8 @@ def _to_generic_airing_schedule(
def _to_generic_studios(anilist_studios: AnilistStudioNodes) -> List[Studio]:
"""Maps AniList studio nodes to a list of generic Studio objects."""
if not anilist_studios or not anilist_studios.get("nodes"):
return []
return [
Studio(
name=s["name"],
@@ -133,51 +135,21 @@ def _to_generic_studios(anilist_studios: AnilistStudioNodes) -> List[Studio]:
is_animation_studio=s["isAnimationStudio"],
)
for s in anilist_studios["nodes"]
if s # Also check if individual studio object is not None
]
def _to_generic_tags(anilist_tags: list[AnilistMediaTag]) -> List[MediaTagItem]:
"""Maps a list of AniList tags to generic MediaTag objects."""
if not anilist_tags:
return []
return [
MediaTagItem(name=MediaTag(t["name"]), rank=t.get("rank"))
for t in anilist_tags
if t.get("name")
if t and t.get("name")
]
# def _to_generic_streaming_episodes(
# anilist_episodes: list[AnilistStreamingEpisode],
# ) -> List[StreamingEpisode]:
# """Maps a list of AniList streaming episodes to generic StreamingEpisode objects."""
# return [
# StreamingEpisode(title=episode["title"], thumbnail=episode.get("thumbnail"))
# for episode in anilist_episodes
# if episode.get("title")
# ]
# def _to_generic_streaming_episodes(
# anilist_episodes: list[dict],
# ) -> List[StreamingEpisode]:
# """Maps a list of AniList streaming episodes to generic StreamingEpisode objects with renumbered episode titles."""
# # Extract titles
# titles = [ep["title"] for ep in anilist_episodes if "title" in ep]
# # Generate mapping: title -> renumbered_ep
# renumbered_map = renumber_titles(titles)
# # Apply renumbering
# return [
# StreamingEpisode(
# title=f"{renumbered_map[ep['title']]} - {ep['title']}",
# thumbnail=ep.get("thumbnail"),
# )
# for ep in anilist_episodes
# if ep.get("title")
# ]
def _to_generic_streaming_episodes(
anilist_episodes: list[AnilistStreamingEpisode],
) -> Dict[str, StreamingEpisode]:
@@ -347,13 +319,25 @@ def to_generic_relations(data: dict) -> Optional[List[MediaItem]]:
def to_generic_recommendations(data: dict) -> Optional[List[MediaItem]]:
"""Maps the 'recommendations' part of an API response."""
recommendations = (
data.get("data", {})
.get("Page", {})
.get("recommendations", [])
)
return [
_to_generic_media_item(rec.get("media"))
for rec in recommendations
if rec.get("media")
]
if not data or not data.get("data"):
return None
page_data = data.get("data", {}).get("Page", {})
if not page_data:
return None
recommendations = page_data.get("recommendations", [])
if not recommendations:
return None
result = []
for rec in recommendations:
if rec and rec.get("media"):
try:
media_item = _to_generic_media_item(rec["media"])
result.append(media_item)
except Exception as e:
logger.warning(f"Failed to map recommendation media item: {e}")
continue
return result if result else None

View File

@@ -0,0 +1,214 @@
from ..base import BaseApiClient
import logging
logger=logging.getLogger(__name__)
def test_media_api(api_client: BaseApiClient):
"""
Test all abstract methods of the media API with user feedback.
This function provides an interactive test suite that validates all the core
functionality of the media API, similar to test_anime_provider for anime providers.
Tests performed:
1. Authentication status and user profile retrieval
2. Media search functionality
3. Anime recommendations fetching
4. Related anime retrieval
5. Character information fetching
6. Airing schedule information
7. User media list operations (if authenticated)
8. List entry management (add/remove from user list)
Args:
api_client: An instance of AniListApi to test
Usage:
Run this module directly: python -m fastanime.libs.media_api.anilist.api
Or import and call: test_media_api(AniListApi(config, client))
"""
from ....core.constants import APP_ASCII_ART
from ..params import (
MediaAiringScheduleParams,
MediaCharactersParams,
MediaRecommendationParams,
MediaRelationsParams,
MediaSearchParams,
UpdateUserMediaListEntryParams,
UserMediaListSearchParams,
)
from ..types import UserMediaListStatus
print(APP_ASCII_ART)
print("=== Media API Test Suite ===\n")
# Test 1: Authentication
print("1. Testing Authentication...")
print(f"Authenticated: {api_client.is_authenticated()}")
if api_client.is_authenticated():
profile = api_client.get_viewer_profile()
if profile:
print(f" User: {profile.name} (ID: {profile.id})")
else:
print(" Failed to get user profile")
else:
print(" Not authenticated - some features will be limited")
print()
# Test 2: Media Search
print("2. Testing Media Search...")
query = input("What anime would you like to search for: ")
search_results = api_client.search_media(MediaSearchParams(query=query, per_page=5))
if not search_results or not search_results.media:
print(" No search results found")
return
print(f" Found {len(search_results.media)} results:")
for i, result in enumerate(search_results.media):
title = result.title.english or result.title.romaji
print(f" {i + 1}: {title} ({result.episodes or '?'} episodes)")
# Select an anime for further testing
try:
choice = int(input(f"\nSelect anime for detailed testing (1-{len(search_results.media)}): ")) - 1
selected_anime = search_results.media[choice]
except (ValueError, IndexError):
print("Invalid selection")
return
print(f"\nSelected: {selected_anime.title.english or selected_anime.title.romaji}")
print()
# Test 3: Get Recommendations
print("3. Testing Recommendations...")
try:
recommendations = api_client.get_recommendation_for(
MediaRecommendationParams(id=selected_anime.id, page=1, per_page=3)
)
if recommendations:
print(f" Found {len(recommendations)} recommendations:")
for rec in recommendations[:3]: # Show first 3
title = rec.title.english or rec.title.romaji
print(f" - {title}")
else:
print(" No recommendations found")
except Exception as e:
print(f" Error getting recommendations: {e}")
logger.error(f"Recommendations error for anime {selected_anime.id}: {e}")
print()
# Test 4: Get Related Anime
print("4. Testing Related Anime...")
try:
relations = api_client.get_related_anime_for(
MediaRelationsParams(id=selected_anime.id)
)
if relations:
print(f" Found {len(relations)} related anime:")
for rel in relations[:3]: # Show first 3
title = rel.title.english or rel.title.romaji
print(f" - {title}")
else:
print(" No related anime found")
except Exception as e:
print(f" Error getting related anime: {e}")
logger.error(f"Relations error for anime {selected_anime.id}: {e}")
print()
# Test 5: Get Characters
print("5. Testing Character Information...")
try:
characters = api_client.get_characters_of(
MediaCharactersParams(id=selected_anime.id)
)
if characters and characters.get("data"):
char_data = characters["data"]["Page"]["media"][0]["characters"]["nodes"]
if char_data:
print(f" Found {len(char_data)} characters:")
for char in char_data[:3]: # Show first 3
name = char["name"]["full"] or char["name"]["first"]
print(f" - {name}")
else:
print(" No character data found")
else:
print(" No characters found")
except Exception as e:
print(f" Error: {e}")
print()
# Test 6: Get Airing Schedule
print("6. Testing Airing Schedule...")
try:
schedule = api_client.get_airing_schedule_for(
MediaAiringScheduleParams(id=selected_anime.id)
)
if schedule and schedule.get("data"):
schedule_data = schedule["data"]["Page"]["media"][0]["airingSchedule"]["nodes"]
if schedule_data:
print(f" Found {len(schedule_data)} upcoming episodes:")
for ep in schedule_data[:3]: # Show first 3
print(f" - Episode {ep['episode']}")
else:
print(" No upcoming episodes")
else:
print(" No airing schedule found")
except Exception as e:
print(f" Error: {e}")
print()
# Test 7: User Media List (if authenticated)
if api_client.is_authenticated():
print("7. Testing User Media List...")
try:
user_list = api_client.search_media_list(
UserMediaListSearchParams(
status=UserMediaListStatus.WATCHING,
page=1,
per_page=3
)
)
if user_list and user_list.media:
print(f" Found {len(user_list.media)} watching anime:")
for anime in user_list.media:
title = anime.title.english or anime.title.romaji
progress = anime.user_status.progress if anime.user_status else 0
print(f" - {title} (Progress: {progress}/{anime.episodes or '?'})")
else:
print(" No anime in watching list")
except Exception as e:
print(f" Error: {e}")
print()
# Test 8: Update List Entry
print("8. Testing List Entry Management...")
update_test = input("Would you like to test adding the selected anime to your list? (y/n): ")
if update_test.lower() == 'y':
try:
success = api_client.update_list_entry(
UpdateUserMediaListEntryParams(
media_id=selected_anime.id,
status=UserMediaListStatus.PLANNING
)
)
if success:
print(" ✓ Successfully added to planning list")
# Test delete
delete_test = input(" Would you like to remove it from your list? (y/n): ")
if delete_test.lower() == 'y':
delete_success = api_client.delete_list_entry(selected_anime.id)
if delete_success:
print(" ✓ Successfully removed from list")
else:
print(" ✗ Failed to remove from list")
else:
print(" ✗ Failed to add to list")
except Exception as e:
print(f" Error: {e}")
print()
else:
print("7-8. Skipping user list tests (not authenticated)\n")
print("=== Test Suite Complete ===")
print("All basic API methods have been tested!")