mirror of
https://github.com/Benexl/FastAnime.git
synced 2025-12-12 15:50:01 -08:00
feat: mass refactor
This commit is contained in:
79
fastanime/cli/auth/__init__.py
Normal file
79
fastanime/cli/auth/__init__.py
Normal file
@@ -0,0 +1,79 @@
|
||||
# In fastanime/cli/auth/manager.py
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, Optional
|
||||
|
||||
from ...core.exceptions import ConfigError
|
||||
from ..constants import USER_DATA_PATH
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from ...libs.api.types import UserProfile
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class CredentialsManager:
|
||||
"""
|
||||
Handles loading and saving of user credentials and profile data.
|
||||
|
||||
This class abstracts the storage mechanism (currently a JSON file),
|
||||
allowing for future changes (e.g., to a system keyring) without
|
||||
affecting the rest of the application.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
"""Initializes the manager with the path to the user data file."""
|
||||
self.path = USER_DATA_PATH
|
||||
|
||||
def load_user_profile(self) -> Optional[dict]:
|
||||
"""
|
||||
Loads the user profile data from the JSON file.
|
||||
|
||||
Returns:
|
||||
A dictionary containing user data, or None if the file doesn't exist
|
||||
or is invalid.
|
||||
"""
|
||||
if not self.path.exists():
|
||||
return None
|
||||
try:
|
||||
with self.path.open("r", encoding="utf-8") as f:
|
||||
return json.load(f)
|
||||
except (json.JSONDecodeError, IOError) as e:
|
||||
logger.error(f"Failed to load user credentials from {self.path}: {e}")
|
||||
return None
|
||||
|
||||
def save_user_profile(self, profile: UserProfile, token: str) -> None:
|
||||
"""
|
||||
Saves the user profile and token to the JSON file.
|
||||
|
||||
Args:
|
||||
profile: The generic UserProfile dataclass.
|
||||
token: The authentication token string.
|
||||
"""
|
||||
user_data = {
|
||||
"id": profile.id,
|
||||
"name": profile.name,
|
||||
"bannerImage": profile.banner_url,
|
||||
"avatar": {"large": profile.avatar_url},
|
||||
"token": token,
|
||||
}
|
||||
try:
|
||||
self.path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with self.path.open("w", encoding="utf-8") as f:
|
||||
json.dump(user_data, f, indent=2)
|
||||
logger.info(f"Successfully saved user credentials to {self.path}")
|
||||
except IOError as e:
|
||||
raise ConfigError(f"Could not save user credentials to {self.path}: {e}")
|
||||
|
||||
def clear_user_profile(self) -> None:
|
||||
"""Deletes the user credentials file."""
|
||||
if self.path.exists():
|
||||
try:
|
||||
self.path.unlink()
|
||||
logger.info("Cleared user credentials.")
|
||||
except IOError as e:
|
||||
raise ConfigError(
|
||||
f"Could not clear user credentials at {self.path}: {e}"
|
||||
)
|
||||
@@ -1,76 +1,57 @@
|
||||
from typing import TYPE_CHECKING
|
||||
from __future__ import annotations
|
||||
|
||||
import click
|
||||
from rich import print
|
||||
from rich.prompt import Confirm, Prompt
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from ...config import Config
|
||||
from .....cli.auth.manager import CredentialsManager
|
||||
|
||||
|
||||
@click.command(help="Login to your anilist account")
|
||||
@click.option("--status", "-s", help="Whether you are logged in or not", is_flag=True)
|
||||
@click.option("--erase", "-e", help="Erase your login details", is_flag=True)
|
||||
@click.pass_obj
|
||||
def login(config: "Config", status, erase):
|
||||
from os import path
|
||||
from sys import exit
|
||||
|
||||
from rich import print
|
||||
from rich.prompt import Confirm, Prompt
|
||||
|
||||
from ....constants import S_PLATFORM
|
||||
@click.command(help="Login to your AniList account to enable progress tracking.")
|
||||
@click.option("--status", "-s", is_flag=True, help="Check current login status.")
|
||||
@click.option("--logout", "-l", is_flag=True, help="Log out and erase credentials.")
|
||||
@click.pass_context
|
||||
def login(ctx: click.Context, status: bool, logout: bool):
|
||||
"""Handles user authentication and credential management."""
|
||||
manager = CredentialsManager()
|
||||
|
||||
if status:
|
||||
is_logged_in = True if config.user else False
|
||||
message = (
|
||||
"You are logged in :smile:"
|
||||
if is_logged_in
|
||||
else "You aren't logged in :cry:"
|
||||
)
|
||||
print(message)
|
||||
print(config.user)
|
||||
exit(0)
|
||||
elif erase:
|
||||
user_data = manager.load_user_profile()
|
||||
if user_data:
|
||||
print(f"[bold green]Logged in as:[/] {user_data.get('name')}")
|
||||
print(f"User ID: {user_data.get('id')}")
|
||||
else:
|
||||
print("[bold yellow]Not logged in.[/]")
|
||||
return
|
||||
|
||||
if logout:
|
||||
if Confirm.ask(
|
||||
"Are you sure you want to erase your login status", default=False
|
||||
"[bold red]Are you sure you want to log out and erase your token?[/]"
|
||||
):
|
||||
config.update_user({})
|
||||
print("Success")
|
||||
exit(0)
|
||||
else:
|
||||
exit(1)
|
||||
manager.clear_user_profile()
|
||||
print("You have been logged out.")
|
||||
return
|
||||
|
||||
# --- Start Login Flow ---
|
||||
from ....libs.api.factory import create_api_client
|
||||
|
||||
api_client = create_api_client("anilist", ctx.obj)
|
||||
|
||||
click.launch(
|
||||
"https://anilist.co/api/v2/oauth/authorize?client_id=20148&response_type=token"
|
||||
)
|
||||
print("Your browser has been opened to obtain an AniList token.")
|
||||
print("After authorizing, copy the token from the address bar and paste it below.")
|
||||
|
||||
token = Prompt.ask("Enter your AniList Access Token")
|
||||
if not token.strip():
|
||||
print("[bold red]Login cancelled.[/]")
|
||||
return
|
||||
|
||||
profile = api_client.authenticate(token.strip())
|
||||
|
||||
if profile:
|
||||
manager.save_user_profile(profile, token)
|
||||
print(f"[bold green]Successfully logged in as {profile.name}! ✨[/]")
|
||||
else:
|
||||
from click import launch
|
||||
|
||||
from ....anilist import AniList
|
||||
|
||||
if config.user:
|
||||
print("Already logged in :confused:")
|
||||
if not Confirm.ask("or would you like to reloggin", default=True):
|
||||
exit(0)
|
||||
# ---- new loggin -----
|
||||
print(
|
||||
f"A browser session will be opened ( [link]{config.fastanime_anilist_app_login_url}[/link] )",
|
||||
)
|
||||
token = ""
|
||||
if S_PLATFORM.startswith("darwin"):
|
||||
anilist_key_file_path = path.expanduser("~") + "/Downloads/anilist_key.txt"
|
||||
launch(config.fastanime_anilist_app_login_url, wait=False)
|
||||
Prompt.ask(
|
||||
"MacOS detected.\nPress any key once the token provided has been pasted into "
|
||||
+ anilist_key_file_path
|
||||
)
|
||||
with open(anilist_key_file_path) as key_file:
|
||||
token = key_file.read().strip()
|
||||
else:
|
||||
launch(config.fastanime_anilist_app_login_url, wait=False)
|
||||
token = Prompt.ask("Enter token")
|
||||
user = AniList.login_user(token)
|
||||
if not user:
|
||||
print("Sth went wrong", user)
|
||||
exit(1)
|
||||
return
|
||||
user["token"] = token
|
||||
config.update_user(user)
|
||||
print("Successfully saved credentials")
|
||||
print(user)
|
||||
exit(0)
|
||||
print("[bold red]Login failed. The token may be invalid or expired.[/]")
|
||||
|
||||
@@ -1,17 +1,14 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, Optional
|
||||
from typing import TYPE_CHECKING, Callable, Optional, Tuple
|
||||
|
||||
from .....libs.api.base import ApiSearchParams
|
||||
from .base import GoBack, State
|
||||
from .task_states import (
|
||||
AnimeActionsState,
|
||||
EpisodeSelectionState,
|
||||
ProviderSearchState,
|
||||
StreamPlaybackState,
|
||||
)
|
||||
from .task_states import AnimeActionsState
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .....libs.api.types import MediaSearchResult
|
||||
from ...session import Session
|
||||
from .. import ui
|
||||
|
||||
@@ -24,46 +21,72 @@ class MainMenuState(State):
|
||||
def run(self, session: Session) -> Optional[State | type[GoBack]]:
|
||||
from .. import ui
|
||||
|
||||
menu_actions = {
|
||||
"🔥 Trending": (session.anilist.get_trending, ResultsState()),
|
||||
"🔎 Search": (
|
||||
lambda: session.anilist.search(query=ui.prompt_for_search(session)),
|
||||
# Define actions as tuples: (Display Name, SearchParams, Next State)
|
||||
# This centralizes the "business logic" of what each menu item means.
|
||||
menu_actions: List[
|
||||
Tuple[str, Callable[[], Optional[ApiSearchParams]], Optional[State]]
|
||||
] = [
|
||||
(
|
||||
"🔥 Trending",
|
||||
lambda: ApiSearchParams(sort="TRENDING_DESC"),
|
||||
ResultsState(),
|
||||
),
|
||||
"📺 Watching": (
|
||||
lambda: session.anilist.get_anime_list("CURRENT"),
|
||||
(
|
||||
"🌟 Most Popular",
|
||||
lambda: ApiSearchParams(sort="POPULARITY_DESC"),
|
||||
ResultsState(),
|
||||
),
|
||||
"🌟 Most Popular": (session.anilist.get_most_popular, ResultsState()),
|
||||
"💖 Most Favourite": (session.anilist.get_most_favourite, ResultsState()),
|
||||
"❌ Exit": (lambda: (True, None), None),
|
||||
}
|
||||
(
|
||||
"💖 Most Favourite",
|
||||
lambda: ApiSearchParams(sort="FAVOURITES_DESC"),
|
||||
ResultsState(),
|
||||
),
|
||||
(
|
||||
"🔎 Search",
|
||||
lambda: ApiSearchParams(query=ui.prompt_for_search(session)),
|
||||
ResultsState(),
|
||||
),
|
||||
(
|
||||
"📺 Watching",
|
||||
lambda: session.api_client.fetch_user_list,
|
||||
ResultsState(),
|
||||
), # Direct method call
|
||||
("❌ Exit", lambda: None, None),
|
||||
]
|
||||
|
||||
choice = ui.prompt_main_menu(session, list(menu_actions.keys()))
|
||||
display_choices = [action[0] for action in menu_actions]
|
||||
choice_str = ui.prompt_main_menu(session, display_choices)
|
||||
|
||||
if not choice:
|
||||
if not choice_str:
|
||||
return None
|
||||
|
||||
data_loader, next_state = menu_actions[choice]
|
||||
if not next_state:
|
||||
# Find the chosen action
|
||||
chosen_action = next(
|
||||
(action for action in menu_actions if action[0] == choice_str), None
|
||||
)
|
||||
if not chosen_action:
|
||||
return self # Should not happen
|
||||
|
||||
_, param_creator, next_state = chosen_action
|
||||
|
||||
if not next_state: # Exit case
|
||||
return None
|
||||
|
||||
with ui.progress_spinner(f"Fetching {choice.strip('🔥🔎📺🌟💖❌ ')}..."):
|
||||
success, data = data_loader()
|
||||
# Execute the data fetch
|
||||
with ui.progress_spinner(f"Fetching {choice_str.strip('🔥🔎📺🌟💖❌ ')}..."):
|
||||
if choice_str == "📺 Watching": # Special case for user list
|
||||
result_data = param_creator(status="CURRENT")
|
||||
else:
|
||||
search_params = param_creator()
|
||||
if search_params is None: # User cancelled search prompt
|
||||
return self
|
||||
result_data = session.api_client.search_media(search_params)
|
||||
|
||||
if not success or not data:
|
||||
ui.display_error(f"Failed to fetch data. Reason: {data}")
|
||||
if not result_data:
|
||||
ui.display_error(f"Failed to fetch data for '{choice_str}'.")
|
||||
return self
|
||||
|
||||
if "mediaList" in data.get("data", {}).get("Page", {}):
|
||||
data["data"]["Page"]["media"] = [
|
||||
item["media"] for item in data["data"]["Page"]["mediaList"]
|
||||
]
|
||||
|
||||
session.state.anilist.results_data = data
|
||||
session.state.navigation.current_page = 1
|
||||
# Store the data loader for pagination
|
||||
session.current_data_loader = data_loader
|
||||
session.state.anilist.results_data = result_data # Store the generic dataclass
|
||||
return next_state
|
||||
|
||||
|
||||
@@ -73,59 +96,20 @@ class ResultsState(State):
|
||||
def run(self, session: Session) -> Optional[State | type[GoBack]]:
|
||||
from .. import ui
|
||||
|
||||
if not session.state.anilist.results_data:
|
||||
search_result = session.state.anilist.results_data
|
||||
if not search_result or not isinstance(search_result, MediaSearchResult):
|
||||
ui.display_error("No results to display.")
|
||||
return GoBack
|
||||
|
||||
media_list = (
|
||||
session.state.anilist.results_data.get("data", {})
|
||||
.get("Page", {})
|
||||
.get("media", [])
|
||||
)
|
||||
selection = ui.prompt_anime_selection(session, media_list)
|
||||
selection = ui.prompt_anime_selection(session, search_result.media)
|
||||
|
||||
if selection == "Back":
|
||||
return GoBack
|
||||
if selection is None:
|
||||
return None # User cancelled prompt
|
||||
return None
|
||||
|
||||
if selection == "Next Page":
|
||||
page_info = (
|
||||
session.state.anilist.results_data.get("data", {})
|
||||
.get("Page", {})
|
||||
.get("pageInfo", {})
|
||||
)
|
||||
if page_info.get("hasNextPage"):
|
||||
session.state.navigation.current_page += 1
|
||||
with ui.progress_spinner("Fetching next page..."):
|
||||
success, data = session.current_data_loader(
|
||||
page=session.state.navigation.current_page
|
||||
)
|
||||
if success:
|
||||
session.state.anilist.results_data = data
|
||||
else:
|
||||
ui.display_error("Failed to fetch next page.")
|
||||
session.state.navigation.current_page -= 1
|
||||
else:
|
||||
ui.display_error("Already on the last page.")
|
||||
return self # Return to the same results state
|
||||
# TODO: Implement pagination logic here by checking selection for "Next Page" etc.
|
||||
# and re-calling the search_media method with an updated page number.
|
||||
|
||||
if selection == "Previous Page":
|
||||
if session.state.navigation.current_page > 1:
|
||||
session.state.navigation.current_page -= 1
|
||||
with ui.progress_spinner("Fetching previous page..."):
|
||||
success, data = session.current_data_loader(
|
||||
page=session.state.navigation.current_page
|
||||
)
|
||||
if success:
|
||||
session.state.anilist.results_data = data
|
||||
else:
|
||||
ui.display_error("Failed to fetch previous page.")
|
||||
session.state.navigation.current_page += 1
|
||||
else:
|
||||
ui.display_error("Already on the first page.")
|
||||
return self
|
||||
|
||||
# If it's a valid anime object
|
||||
session.state.anilist.selected_anime = selection
|
||||
return AnimeActionsState()
|
||||
|
||||
@@ -7,53 +7,42 @@ from pydantic import BaseModel, Field
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from ...core.config import AppConfig
|
||||
from ...libs.anilist.api import AniListApi
|
||||
from ...libs.anilist.types import AnilistBaseMediaDataSchema
|
||||
from ...libs.anime.provider import AnimeProvider
|
||||
|
||||
# Import the dataclasses for type hinting
|
||||
from ...libs.anime.types import Anime, SearchResult, SearchResults, Server
|
||||
from ...libs.api.base import BaseApiClient
|
||||
from ...libs.api.types import Anime, SearchResult, Server, UserProfile
|
||||
from ...libs.players.base import BasePlayer
|
||||
from ...libs.selector.base import BaseSelector
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# --- Nested State Models ---
|
||||
# --- Nested State Models (Unchanged) ---
|
||||
class AnilistState(BaseModel):
|
||||
"""Holds state related to AniList data and selections."""
|
||||
|
||||
results_data: dict | None = None
|
||||
selected_anime: Optional[AnilistBaseMediaDataSchema] = None
|
||||
results_data: Optional[dict] = None
|
||||
selected_anime: Optional[dict] = (
|
||||
None # Using dict for AnilistBaseMediaDataSchema for now
|
||||
)
|
||||
|
||||
|
||||
class ProviderState(BaseModel):
|
||||
"""Holds state related to the current anime provider, using specific dataclasses."""
|
||||
|
||||
search_results: Optional[SearchResults] = None
|
||||
selected_search_result: Optional[SearchResult] = None
|
||||
anime_details: Optional[Anime] = None
|
||||
current_episode: Optional[str] = None
|
||||
current_server: Optional[Server] = None
|
||||
|
||||
class Config:
|
||||
arbitrary_types_allowed = True
|
||||
|
||||
|
||||
class NavigationState(BaseModel):
|
||||
"""Holds state related to the UI navigation stack."""
|
||||
|
||||
current_page: int = 1
|
||||
history_stack_class_names: list[str] = Field(default_factory=list)
|
||||
|
||||
|
||||
class TrackingState(BaseModel):
|
||||
"""Holds state for user progress tracking preferences."""
|
||||
|
||||
progress_mode: str = "prompt"
|
||||
|
||||
|
||||
# --- Top-Level SessionState ---
|
||||
class SessionState(BaseModel):
|
||||
"""The root model for all serializable runtime state."""
|
||||
|
||||
anilist: AnilistState = Field(default_factory=AnilistState)
|
||||
provider: ProviderState = Field(default_factory=ProviderState)
|
||||
navigation: NavigationState = Field(default_factory=NavigationState)
|
||||
@@ -64,41 +53,48 @@ class SessionState(BaseModel):
|
||||
|
||||
|
||||
class Session:
|
||||
"""
|
||||
Manages the entire runtime session for the interactive anilist command.
|
||||
"""
|
||||
|
||||
def __init__(self, config: AppConfig, anilist_client: AniListApi) -> None:
|
||||
def __init__(self, config: AppConfig) -> None:
|
||||
self.config: AppConfig = config
|
||||
self.state: SessionState = SessionState()
|
||||
self.is_running: bool = True
|
||||
self.anilist: AniListApi = anilist_client
|
||||
self.user_profile: Optional[UserProfile] = None
|
||||
self._initialize_components()
|
||||
|
||||
def _initialize_components(self) -> None:
|
||||
"""Creates instances of core components based on the current config."""
|
||||
from ...libs.anime.provider import create_provider
|
||||
from ...cli.auth.manager import CredentialsManager
|
||||
from ...libs.api.factory import create_api_client
|
||||
from ...libs.players import create_player
|
||||
from ...libs.selector import create_selector
|
||||
|
||||
logger.debug("Initializing session components from configuration...")
|
||||
logger.debug("Initializing session components...")
|
||||
self.selector: BaseSelector = create_selector(self.config)
|
||||
self.provider: AnimeProvider = create_provider(self.config.general.provider)
|
||||
self.player: BasePlayer = create_player(self.config.stream.player, self.config)
|
||||
|
||||
# Instantiate and use the API factory
|
||||
self.api_client: BaseApiClient = create_api_client("anilist", self.config)
|
||||
|
||||
# Load credentials and authenticate the API client
|
||||
manager = CredentialsManager()
|
||||
user_data = manager.load_user_profile()
|
||||
if user_data and (token := user_data.get("token")):
|
||||
self.user_profile = self.api_client.authenticate(token)
|
||||
if not self.user_profile:
|
||||
logger.warning(
|
||||
"Loaded token is invalid or expired. User is not logged in."
|
||||
)
|
||||
|
||||
def change_provider(self, provider_name: str) -> None:
|
||||
from ...libs.anime.provider import create_provider
|
||||
|
||||
self.config.general.provider = provider_name
|
||||
self.provider = create_provider(provider_name)
|
||||
logger.info(f"Provider changed to: {self.provider.__class__.__name__}")
|
||||
|
||||
def change_player(self, player_name: str) -> None:
|
||||
from ...libs.players import create_player
|
||||
|
||||
self.config.stream.player = player_name
|
||||
self.player = create_player(player_name, self.config)
|
||||
logger.info(f"Player changed to: {self.player.__class__.__name__}")
|
||||
|
||||
def stop(self) -> None:
|
||||
self.is_running = False
|
||||
|
||||
@@ -1,26 +1,84 @@
|
||||
import json
|
||||
from pathlib import Path
|
||||
from __future__ import annotations
|
||||
|
||||
from httpx import AsyncClient, Client, Response
|
||||
from typing_extensions import Counter
|
||||
import json
|
||||
import logging
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from .networking import TIMEOUT
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from httpx import Client
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def load_graphql_from_file(file: Path) -> str:
|
||||
"""
|
||||
Reads and returns the content of a .gql file.
|
||||
|
||||
Args:
|
||||
file: The Path object pointing to the .gql file.
|
||||
|
||||
Returns:
|
||||
The string content of the file.
|
||||
"""
|
||||
try:
|
||||
return file.read_text(encoding="utf-8")
|
||||
except FileNotFoundError:
|
||||
logger.error(f"GraphQL file not found at: {file}")
|
||||
raise
|
||||
|
||||
|
||||
def execute_graphql_query(
|
||||
url: str, httpx_client: Client, graphql_file: Path, variables: dict
|
||||
):
|
||||
response = httpx_client.get(
|
||||
url,
|
||||
params={
|
||||
"variables": json.dumps(variables),
|
||||
"query": load_graphql_from_file(graphql_file),
|
||||
},
|
||||
timeout=TIMEOUT,
|
||||
)
|
||||
return response
|
||||
) -> dict | None:
|
||||
"""
|
||||
Executes a GraphQL query using a GET request with query parameters.
|
||||
Suitable for read-only operations.
|
||||
|
||||
Args:
|
||||
url: The base GraphQL endpoint URL.
|
||||
httpx_client: The httpx.Client instance to use.
|
||||
graphql_file: Path to the .gql file containing the query.
|
||||
variables: A dictionary of variables for the query.
|
||||
|
||||
Returns:
|
||||
The JSON response as a dictionary, or None on failure.
|
||||
"""
|
||||
query = load_graphql_from_file(graphql_file)
|
||||
params = {"query": query, "variables": json.dumps(variables)}
|
||||
try:
|
||||
response = httpx_client.get(url, params=params, timeout=TIMEOUT)
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
except Exception as e:
|
||||
logger.error(f"GraphQL GET request failed for {graphql_file.name}: {e}")
|
||||
return None
|
||||
|
||||
|
||||
def load_graphql_from_file(file: Path) -> str:
|
||||
query = file.read_text(encoding="utf-8")
|
||||
return query
|
||||
def execute_graphql_mutation(
|
||||
url: str, httpx_client: Client, graphql_file: Path, variables: dict
|
||||
) -> dict | None:
|
||||
"""
|
||||
Executes a GraphQL mutation using a POST request with a JSON body.
|
||||
Suitable for write/update operations.
|
||||
|
||||
Args:
|
||||
url: The GraphQL endpoint URL.
|
||||
httpx_client: The httpx.Client instance to use.
|
||||
graphql_file: Path to the .gql file containing the mutation.
|
||||
variables: A dictionary of variables for the mutation.
|
||||
|
||||
Returns:
|
||||
The JSON response as a dictionary, or None on failure.
|
||||
"""
|
||||
query = load_graphql_from_file(graphql_file)
|
||||
json_body = {"query": query, "variables": variables}
|
||||
try:
|
||||
response = httpx_client.post(url, json=json_body, timeout=TIMEOUT)
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
except Exception as e:
|
||||
logger.error(f"GraphQL POST request failed for {graphql_file.name}: {e}")
|
||||
return None
|
||||
|
||||
@@ -1,472 +0,0 @@
|
||||
"""
|
||||
This is the core module availing all the abstractions of the anilist api
|
||||
"""
|
||||
|
||||
import logging
|
||||
import os
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import requests
|
||||
|
||||
from .queries_graphql import (
|
||||
airing_schedule_query,
|
||||
anime_characters_query,
|
||||
anime_query,
|
||||
anime_relations_query,
|
||||
delete_list_entry_query,
|
||||
get_logged_in_user_query,
|
||||
get_medialist_item_query,
|
||||
get_user_info,
|
||||
media_list_mutation,
|
||||
media_list_query,
|
||||
most_favourite_query,
|
||||
most_popular_query,
|
||||
most_recently_updated_query,
|
||||
most_scored_query,
|
||||
notification_query,
|
||||
recommended_query,
|
||||
search_query,
|
||||
trending_query,
|
||||
upcoming_anime_query,
|
||||
)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .types import (
|
||||
AnilistDataSchema,
|
||||
AnilistMediaLists,
|
||||
AnilistMediaListStatus,
|
||||
AnilistNotifications,
|
||||
AnilistUser_,
|
||||
AnilistUserData,
|
||||
AnilistViewerData,
|
||||
)
|
||||
logger = logging.getLogger(__name__)
|
||||
ANILIST_ENDPOINT = "https://graphql.anilist.co"
|
||||
|
||||
|
||||
class AniListApi:
|
||||
"""An abstraction over the anilist api offering an easy and simple interface
|
||||
|
||||
Attributes:
|
||||
session: [TODO:attribute]
|
||||
session: [TODO:attribute]
|
||||
token: [TODO:attribute]
|
||||
headers: [TODO:attribute]
|
||||
user_id: [TODO:attribute]
|
||||
token: [TODO:attribute]
|
||||
headers: [TODO:attribute]
|
||||
user_id: [TODO:attribute]
|
||||
"""
|
||||
|
||||
session: requests.Session
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.session = requests.session()
|
||||
|
||||
def login_user(self, token: str):
|
||||
"""method used to login a new user enabling authenticated requests
|
||||
|
||||
Args:
|
||||
token: anilist app token
|
||||
|
||||
Returns:
|
||||
the logged in user
|
||||
"""
|
||||
self.token = token
|
||||
self.headers = {"Authorization": f"Bearer {self.token}"}
|
||||
self.session.headers.update(self.headers)
|
||||
success, user = self.get_logged_in_user()
|
||||
if not user:
|
||||
return
|
||||
if not success or not user:
|
||||
return
|
||||
user_info: AnilistUser_ = user["data"]["Viewer"]
|
||||
self.user_id = user_info["id"]
|
||||
return user_info
|
||||
|
||||
def get_notification(
|
||||
self,
|
||||
) -> tuple[bool, "AnilistNotifications"] | tuple[bool, None]:
|
||||
"""get the top five latest notifications for anime thats airing
|
||||
|
||||
Returns:
|
||||
airing notifications
|
||||
"""
|
||||
return self._make_authenticated_request(notification_query)
|
||||
|
||||
def update_login_info(self, user: "AnilistUser_", token: str):
|
||||
"""method used to login a user enabling authenticated requests
|
||||
|
||||
Args:
|
||||
user: an anilist user object
|
||||
token: the login token
|
||||
"""
|
||||
self.token = token
|
||||
self.headers = {"Authorization": f"Bearer {self.token}"}
|
||||
self.session.headers.update(self.headers)
|
||||
self.user_id = user["id"]
|
||||
|
||||
def get_user_info(self) -> tuple[bool, "AnilistUserData"] | tuple[bool, None]:
|
||||
"""get the details of the user who is currently logged in
|
||||
|
||||
Returns:
|
||||
an anilist user
|
||||
"""
|
||||
|
||||
return self._make_authenticated_request(get_user_info, {"userId": self.user_id})
|
||||
|
||||
def get_logged_in_user(
|
||||
self,
|
||||
) -> tuple[bool, "AnilistViewerData"] | tuple[bool, None]:
|
||||
"""get the details of the user who is currently logged in
|
||||
|
||||
Returns:
|
||||
an anilist user
|
||||
"""
|
||||
if not self.headers:
|
||||
return (False, None)
|
||||
return self._make_authenticated_request(get_logged_in_user_query)
|
||||
|
||||
def update_anime_list(self, values_to_update: dict):
|
||||
"""a powerful method for managing mediaLists giving full power to the user
|
||||
|
||||
Args:
|
||||
values_to_update: a dict containing valid media list options
|
||||
|
||||
Returns:
|
||||
an anilist object indicating success
|
||||
"""
|
||||
variables = {"userId": self.user_id, **values_to_update}
|
||||
return self._make_authenticated_request(media_list_mutation, variables)
|
||||
|
||||
def get_anime_list(
|
||||
self,
|
||||
status: "AnilistMediaListStatus",
|
||||
type="ANIME",
|
||||
page=1,
|
||||
perPage=os.environ.get("FASTANIME_PER_PAGE", 15),
|
||||
**kwargs,
|
||||
) -> tuple[bool, "AnilistMediaLists"] | tuple[bool, None]:
|
||||
"""gets an anime list from your media list given the list status
|
||||
|
||||
Args:
|
||||
status: the mediaListStatus of the anime list
|
||||
|
||||
Returns:
|
||||
a media list
|
||||
"""
|
||||
variables = {
|
||||
"status": status,
|
||||
"userId": self.user_id,
|
||||
"type": type,
|
||||
"page": page,
|
||||
"perPage": int(perPage),
|
||||
}
|
||||
return self._make_authenticated_request(media_list_query, variables)
|
||||
|
||||
def get_medialist_entry(
|
||||
self, mediaId: int
|
||||
) -> tuple[bool, dict] | tuple[bool, None]:
|
||||
"""Get the id entry of the items in an Anilist MediaList
|
||||
|
||||
Args:
|
||||
mediaId: The mediaList item entry mediaId
|
||||
|
||||
Returns:
|
||||
a boolean indicating whether the request succeeded and either a dict object containing the id of the media list entry
|
||||
"""
|
||||
variables = {"mediaId": mediaId}
|
||||
return self._make_authenticated_request(get_medialist_item_query, variables)
|
||||
|
||||
def delete_medialist_entry(self, mediaId: int):
|
||||
"""Deletes a mediaList item given its mediaId
|
||||
|
||||
Args:
|
||||
mediaId: the media id of the anime
|
||||
|
||||
Returns:
|
||||
a tuple containing a boolean whether the operation was successful and either an anilist object or none depending on success
|
||||
"""
|
||||
result = self.get_medialist_entry(mediaId)
|
||||
data = result[1]
|
||||
if not result[0] or not data:
|
||||
return result
|
||||
id = data["data"]["MediaList"]["id"]
|
||||
variables = {"id": id}
|
||||
return self._make_authenticated_request(delete_list_entry_query, variables)
|
||||
|
||||
# TODO: unify the _make_authenticated_request with original since sessions are now in use
|
||||
def _make_authenticated_request(self, query: str, variables: dict = {}):
|
||||
"""the abstraction over all authenticated requests
|
||||
|
||||
Args:
|
||||
query: the anilist query to make
|
||||
variables: the anilist variables to use
|
||||
|
||||
Returns:
|
||||
an anilist object containing the queried data or none and a boolean indicating whether the request was successful
|
||||
"""
|
||||
try:
|
||||
response = self.session.post(
|
||||
ANILIST_ENDPOINT,
|
||||
json={"query": query, "variables": variables},
|
||||
timeout=10,
|
||||
headers=self.headers,
|
||||
)
|
||||
anilist_data = response.json()
|
||||
|
||||
# ensuring you dont get blocked
|
||||
if (
|
||||
int(response.headers.get("X-RateLimit-Remaining", 0)) < 30
|
||||
and not response.status_code == 500
|
||||
):
|
||||
print(
|
||||
"Warning you are exceeding the allowed number of calls per minute"
|
||||
)
|
||||
logger.warning(
|
||||
"You are exceeding the allowed number of calls per minute for the AniList api enforcing timeout"
|
||||
)
|
||||
print("Forced timeout will now be initiated")
|
||||
import time
|
||||
|
||||
print("sleeping...")
|
||||
time.sleep(1 * 60)
|
||||
if response.status_code == 200:
|
||||
return (True, anilist_data)
|
||||
else:
|
||||
return (False, anilist_data)
|
||||
except requests.exceptions.Timeout:
|
||||
logger.warning(
|
||||
"Timeout has been exceeded this could mean anilist is down or you have lost internet connection"
|
||||
)
|
||||
return (False, None)
|
||||
except requests.exceptions.ConnectionError:
|
||||
logger.warning(
|
||||
"ConnectionError this could mean anilist is down or you have lost internet connection"
|
||||
)
|
||||
return (False, None)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Something unexpected occurred {e}")
|
||||
return (False, None) # type: ignore
|
||||
|
||||
def get_data(
|
||||
self, query: str, variables: dict = {}
|
||||
) -> tuple[bool, "AnilistDataSchema"]:
|
||||
"""the abstraction over all none authenticated requests and that returns data of a similar type
|
||||
|
||||
Args:
|
||||
query: the anilist query
|
||||
variables: the anilist api variables
|
||||
|
||||
Returns:
|
||||
a boolean indicating success and none or an anilist object depending on success
|
||||
"""
|
||||
try:
|
||||
response = self.session.post(
|
||||
ANILIST_ENDPOINT,
|
||||
json={"query": query, "variables": variables},
|
||||
timeout=10,
|
||||
)
|
||||
anilist_data: AnilistDataSchema = response.json()
|
||||
|
||||
# ensuring you dont get blocked
|
||||
if (
|
||||
int(response.headers.get("X-RateLimit-Remaining", 0)) < 30
|
||||
and not response.status_code == 500
|
||||
):
|
||||
print(
|
||||
"Warning you are exceeding the allowed number of calls per minute"
|
||||
)
|
||||
logger.warning(
|
||||
"You are exceeding the allowed number of calls per minute for the AniList api enforcing timeout"
|
||||
)
|
||||
print("Forced timeout will now be initiated")
|
||||
import time
|
||||
|
||||
print("sleeping...")
|
||||
time.sleep(1 * 60)
|
||||
if response.status_code == 200:
|
||||
return (True, anilist_data)
|
||||
else:
|
||||
return (False, anilist_data)
|
||||
except requests.exceptions.Timeout:
|
||||
logger.warning(
|
||||
"Timeout has been exceeded this could mean anilist is down or you have lost internet connection"
|
||||
)
|
||||
return (
|
||||
False,
|
||||
{
|
||||
"Error": "Timeout Exceeded for connection there might be a problem with your internet or anilist is down."
|
||||
},
|
||||
) # type: ignore
|
||||
except requests.exceptions.ConnectionError:
|
||||
logger.warning(
|
||||
"ConnectionError this could mean anilist is down or you have lost internet connection"
|
||||
)
|
||||
return (
|
||||
False,
|
||||
{
|
||||
"Error": "There might be a problem with your internet or anilist is down."
|
||||
},
|
||||
) # type: ignore
|
||||
except Exception as e:
|
||||
logger.error(f"Something unexpected occurred {e}")
|
||||
return (False, {"Error": f"{e}"}) # type: ignore
|
||||
|
||||
def search(
|
||||
self,
|
||||
max_results=50,
|
||||
query: str | None = None,
|
||||
sort: str | None = None,
|
||||
genre_in: list[str] | None = None,
|
||||
id_in: list[int] | None = None,
|
||||
genre_not_in: list[str] = ["hentai"],
|
||||
popularity_greater: int | None = None,
|
||||
popularity_lesser: int | None = None,
|
||||
averageScore_greater: int | None = None,
|
||||
averageScore_lesser: int | None = None,
|
||||
tag_in: list[str] | None = None,
|
||||
tag_not_in: list[str] | None = None,
|
||||
status: str | None = None,
|
||||
status_in: list[str] | None = None,
|
||||
status_not_in: list[str] | None = None,
|
||||
endDate_greater: int | None = None,
|
||||
endDate_lesser: int | None = None,
|
||||
startDate_greater: int | None = None,
|
||||
startDate_lesser: int | None = None,
|
||||
startDate: str | None = None,
|
||||
seasonYear: str | None = None,
|
||||
page: int | None = None,
|
||||
season: str | None = None,
|
||||
format_in: list[str] | None = None,
|
||||
on_list: bool | None = None,
|
||||
type="ANIME",
|
||||
**kwargs,
|
||||
):
|
||||
"""
|
||||
A powerful method abstracting all of anilist media queries
|
||||
"""
|
||||
variables = {}
|
||||
for key, val in list(locals().items())[1:]:
|
||||
if (val or val is False) and key not in ["variables"]:
|
||||
variables[key] = val
|
||||
search_results = self.get_data(search_query, variables=variables)
|
||||
return search_results
|
||||
|
||||
def get_anime(self, id: int):
|
||||
"""
|
||||
Gets a single anime by a valid anilist anime id
|
||||
"""
|
||||
variables = {"id": id}
|
||||
return self.get_data(anime_query, variables)
|
||||
|
||||
def get_trending(
|
||||
self,
|
||||
type="ANIME",
|
||||
page=1,
|
||||
perPage=os.environ.get("FASTANIME_PER_PAGE", 15),
|
||||
*_,
|
||||
**kwargs,
|
||||
):
|
||||
"""
|
||||
Gets the currently trending anime
|
||||
"""
|
||||
variables = {"type": type, "page": page, "perPage": int(perPage)}
|
||||
trending = self.get_data(trending_query, variables)
|
||||
return trending
|
||||
|
||||
def get_most_favourite(
|
||||
self,
|
||||
type="ANIME",
|
||||
page=1,
|
||||
perPage=os.environ.get("FASTANIME_PER_PAGE", 15),
|
||||
*_,
|
||||
**kwargs,
|
||||
):
|
||||
"""
|
||||
Gets the most favoured anime on anilist
|
||||
"""
|
||||
variables = {"type": type, "page": page, "perPage": int(perPage)}
|
||||
most_favourite = self.get_data(most_favourite_query, variables)
|
||||
return most_favourite
|
||||
|
||||
def get_most_scored(
|
||||
self,
|
||||
type="ANIME",
|
||||
page=1,
|
||||
perPage=os.environ.get("FASTANIME_PER_PAGE", 15),
|
||||
*_,
|
||||
**kwargs,
|
||||
):
|
||||
"""
|
||||
Gets most scored anime on anilist
|
||||
"""
|
||||
variables = {"type": type, "page": page, "perPage": int(perPage)}
|
||||
most_scored = self.get_data(most_scored_query, variables)
|
||||
return most_scored
|
||||
|
||||
def get_most_recently_updated(
|
||||
self,
|
||||
type="ANIME",
|
||||
page=1,
|
||||
perPage=os.environ.get("FASTANIME_PER_PAGE", 15),
|
||||
*_,
|
||||
**kwargs,
|
||||
):
|
||||
"""
|
||||
Gets most recently updated anime from anilist
|
||||
"""
|
||||
variables = {"type": type, "page": page, "perPage": int(perPage)}
|
||||
most_recently_updated = self.get_data(most_recently_updated_query, variables)
|
||||
return most_recently_updated
|
||||
|
||||
def get_most_popular(
|
||||
self,
|
||||
type="ANIME",
|
||||
page=1,
|
||||
perPage=os.environ.get("FASTANIME_PER_PAGE", 15),
|
||||
**kwargs,
|
||||
):
|
||||
"""
|
||||
Gets most popular anime on anilist
|
||||
"""
|
||||
variables = {"type": type, "page": page, "perPage": int(perPage)}
|
||||
most_popular = self.get_data(most_popular_query, variables)
|
||||
return most_popular
|
||||
|
||||
def get_upcoming_anime(
|
||||
self,
|
||||
type="ANIME",
|
||||
page: int = 1,
|
||||
perPage=os.environ.get("FASTANIME_PER_PAGE", 15),
|
||||
*_,
|
||||
**kwargs,
|
||||
):
|
||||
"""
|
||||
Gets upcoming anime from anilist
|
||||
"""
|
||||
variables = {"page": page, "type": type, "perPage": int(perPage)}
|
||||
upcoming_anime = self.get_data(upcoming_anime_query, variables)
|
||||
return upcoming_anime
|
||||
|
||||
# NOTE: THe following methods will probably be scraped soon
|
||||
def get_recommended_anime_for(self, mediaRecommendationId, page=1, *_, **kwargs):
|
||||
variables = {"mediaRecommendationId": mediaRecommendationId, "page": page}
|
||||
recommended_anime = self.get_data(recommended_query, variables)
|
||||
return recommended_anime
|
||||
|
||||
def get_characters_of(self, id: int, type="ANIME", *_, **kwargs):
|
||||
variables = {"id": id}
|
||||
characters = self.get_data(anime_characters_query, variables)
|
||||
return characters
|
||||
|
||||
def get_related_anime_for(self, id: int, *_, **kwargs):
|
||||
variables = {"id": id}
|
||||
related_anime = self.get_data(anime_relations_query, variables)
|
||||
return related_anime
|
||||
|
||||
def get_airing_schedule_for(self, id: int, type="ANIME", *_, **kwargs):
|
||||
variables = {"id": id}
|
||||
airing_schedule = self.get_data(airing_schedule_query, variables)
|
||||
return airing_schedule
|
||||
File diff suppressed because it is too large
Load Diff
112
fastanime/libs/api/anilist/api.py
Normal file
112
fastanime/libs/api/anilist/api.py
Normal file
@@ -0,0 +1,112 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, Any, List, Optional
|
||||
|
||||
from ....core.utils.graphql import execute_graphql_mutation, execute_graphql_query
|
||||
from ..base import ApiSearchParams, BaseApiClient, UpdateListEntryParams, UserListParams
|
||||
from ..types import MediaSearchResult, UserProfile
|
||||
from . import gql, mapper
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from httpx import Client
|
||||
|
||||
from ....core.config import AnilistConfig
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
ANILIST_ENDPOINT = "https://graphql.anilist.co"
|
||||
|
||||
|
||||
class AniListApi(BaseApiClient):
|
||||
"""AniList API implementation of the BaseApiClient contract."""
|
||||
|
||||
def __init__(self, config: AnilistConfig, client: Client):
|
||||
super().__init__(config, client)
|
||||
self.token: Optional[str] = None
|
||||
self.user_profile: Optional[UserProfile] = None
|
||||
|
||||
def authenticate(self, token: str) -> Optional[UserProfile]:
|
||||
self.token = token
|
||||
self.http_client.headers["Authorization"] = f"Bearer {token}"
|
||||
self.user_profile = self.get_viewer_profile()
|
||||
if not self.user_profile:
|
||||
self.token = None
|
||||
self.http_client.headers.pop("Authorization", None)
|
||||
return self.user_profile
|
||||
|
||||
def get_viewer_profile(self) -> Optional[UserProfile]:
|
||||
if not self.token:
|
||||
return None
|
||||
raw_data = execute_graphql_query(
|
||||
ANILIST_ENDPOINT, self.http_client, gql.GET_LOGGED_IN_USER, {}
|
||||
)
|
||||
return mapper.to_generic_user_profile(raw_data) if raw_data else None
|
||||
|
||||
def search_media(self, params: ApiSearchParams) -> Optional[MediaSearchResult]:
|
||||
variables = {k: v for k, v in params.__dict__.items() if v is not None}
|
||||
variables["perPage"] = params.per_page
|
||||
raw_data = execute_graphql_query(
|
||||
ANILIST_ENDPOINT, self.http_client, gql.SEARCH_MEDIA, variables
|
||||
)
|
||||
return mapper.to_generic_search_result(raw_data) if raw_data else None
|
||||
|
||||
def fetch_user_list(self, params: UserListParams) -> Optional[MediaSearchResult]:
|
||||
if not self.user_profile:
|
||||
logger.error("Cannot fetch user list: user is not authenticated.")
|
||||
return None
|
||||
variables = {
|
||||
"userId": self.user_profile.id,
|
||||
"status": params.status,
|
||||
"page": params.page,
|
||||
"perPage": params.per_page,
|
||||
}
|
||||
raw_data = execute_graphql_query(
|
||||
ANILIST_ENDPOINT, self.http_client, gql.GET_USER_LIST, variables
|
||||
)
|
||||
return mapper.to_generic_user_list_result(raw_data) if raw_data else None
|
||||
|
||||
def update_list_entry(self, params: UpdateListEntryParams) -> bool:
|
||||
if not self.token:
|
||||
return False
|
||||
score_raw = int(params.score * 10) if params.score is not None else None
|
||||
variables = {
|
||||
"mediaId": params.media_id,
|
||||
"status": params.status,
|
||||
"progress": params.progress,
|
||||
"scoreRaw": score_raw,
|
||||
}
|
||||
variables = {k: v for k, v in variables.items() if v is not None}
|
||||
response = execute_graphql_mutation(
|
||||
ANILIST_ENDPOINT, self.http_client, gql.SAVE_MEDIA_LIST_ENTRY, variables
|
||||
)
|
||||
return response is not None and "errors" not in response
|
||||
|
||||
def delete_list_entry(self, media_id: int) -> bool:
|
||||
if not self.token:
|
||||
return False
|
||||
entry_data = execute_graphql_query(
|
||||
ANILIST_ENDPOINT,
|
||||
self.http_client,
|
||||
gql.GET_MEDIA_LIST_ITEM,
|
||||
{"mediaId": media_id},
|
||||
)
|
||||
list_id = (
|
||||
entry_data.get("data", {}).get("MediaList", {}).get("id")
|
||||
if entry_data
|
||||
else None
|
||||
)
|
||||
if not list_id:
|
||||
return False
|
||||
response = execute_graphql_mutation(
|
||||
ANILIST_ENDPOINT,
|
||||
self.http_client,
|
||||
gql.DELETE_MEDIA_LIST_ENTRY,
|
||||
{"id": list_id},
|
||||
)
|
||||
return (
|
||||
response.get("data", {})
|
||||
.get("DeleteMediaListEntry", {})
|
||||
.get("deleted", False)
|
||||
if response
|
||||
else False
|
||||
)
|
||||
51
fastanime/libs/api/anilist/gql.py
Normal file
51
fastanime/libs/api/anilist/gql.py
Normal file
@@ -0,0 +1,51 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
GraphQL Path Registry for the AniList API Client.
|
||||
|
||||
This module uses `importlib.resources` to create robust, cross-platform
|
||||
`pathlib.Path` objects for every .gql file in the `queries` and `mutations`
|
||||
directories. This provides a single, type-safe source of truth for all
|
||||
GraphQL operations, making the codebase easier to maintain and validate.
|
||||
|
||||
Constants are named to reflect the action they perform, e.g.,
|
||||
`SEARCH_MEDIA` points to the `search.gql` file.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from importlib import resources
|
||||
from pathlib import Path
|
||||
|
||||
# --- Base Paths ---
|
||||
# Safely access package data directories using the standard library.
|
||||
_QUERIES_PATH = resources.files("fastanime.libs.api.anilist") / "queries"
|
||||
_MUTATIONS_PATH = resources.files("fastanime.libs.api.anilist") / "mutations"
|
||||
|
||||
|
||||
# --- Queries ---
|
||||
# Each constant is a Path object pointing to a specific .gql query file.
|
||||
GET_AIRING_SCHEDULE: Path = _QUERIES_PATH / "airing.gql"
|
||||
GET_ANIME_DETAILS: Path = _QUERIES_PATH / "anime.gql"
|
||||
GET_CHARACTERS: Path = _QUERIES_PATH / "character.gql"
|
||||
GET_FAVOURITES: Path = _QUERIES_PATH / "favourite.gql"
|
||||
GET_MEDIA_LIST_ITEM: Path = _QUERIES_PATH / "get-medialist-item.gql"
|
||||
GET_LOGGED_IN_USER: Path = _QUERIES_PATH / "logged-in-user.gql"
|
||||
GET_MEDIA_LIST: Path = _QUERIES_PATH / "media-list.gql"
|
||||
GET_MEDIA_RELATIONS: Path = _QUERIES_PATH / "media-relations.gql"
|
||||
GET_NOTIFICATIONS: Path = _QUERIES_PATH / "notifications.gql"
|
||||
GET_POPULAR: Path = _QUERIES_PATH / "popular.gql"
|
||||
GET_RECENTLY_UPDATED: Path = _QUERIES_PATH / "recently-updated.gql"
|
||||
GET_RECOMMENDATIONS: Path = _QUERIES_PATH / "recommended.gql"
|
||||
GET_REVIEWS: Path = _QUERIES_PATH / "reviews.gql"
|
||||
GET_SCORES: Path = _QUERIES_PATH / "score.gql"
|
||||
SEARCH_MEDIA: Path = _QUERIES_PATH / "search.gql"
|
||||
GET_TRENDING: Path = _QUERIES_PATH / "trending.gql"
|
||||
GET_UPCOMING: Path = _QUERIES_PATH / "upcoming.gql"
|
||||
GET_USER_INFO: Path = _QUERIES_PATH / "user-info.gql"
|
||||
|
||||
|
||||
# --- Mutations ---
|
||||
# Each constant is a Path object pointing to a specific .gql mutation file.
|
||||
DELETE_MEDIA_LIST_ENTRY: Path = _MUTATIONS_PATH / "delete-list-entry.gql"
|
||||
MARK_NOTIFICATIONS_AS_READ: Path = _MUTATIONS_PATH / "mark-read.gql"
|
||||
SAVE_MEDIA_LIST_ENTRY: Path = _MUTATIONS_PATH / "media-list.gql"
|
||||
239
fastanime/libs/api/anilist/mapper.py
Normal file
239
fastanime/libs/api/anilist/mapper.py
Normal file
@@ -0,0 +1,239 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from datetime import datetime
|
||||
from typing import TYPE_CHECKING, List, Optional
|
||||
|
||||
from ..types import (
|
||||
AiringSchedule,
|
||||
MediaImage,
|
||||
MediaItem,
|
||||
MediaSearchResult,
|
||||
MediaTag,
|
||||
MediaTitle,
|
||||
MediaTrailer,
|
||||
PageInfo,
|
||||
Studio,
|
||||
UserListStatus,
|
||||
UserProfile,
|
||||
)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .types import AnilistBaseMediaDataSchema, AnilistPageInfo, AnilistUser_
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _to_generic_media_title(anilist_title: Optional[dict]) -> MediaTitle:
|
||||
"""Maps an AniList title object to a generic MediaTitle."""
|
||||
if not anilist_title:
|
||||
return MediaTitle()
|
||||
return MediaTitle(
|
||||
romaji=anilist_title.get("romaji"),
|
||||
english=anilist_title.get("english"),
|
||||
native=anilist_title.get("native"),
|
||||
)
|
||||
|
||||
|
||||
def _to_generic_media_image(anilist_image: Optional[dict]) -> MediaImage:
|
||||
"""Maps an AniList image object to a generic MediaImage."""
|
||||
if not anilist_image:
|
||||
return MediaImage()
|
||||
return MediaImage(
|
||||
medium=anilist_image.get("medium"),
|
||||
large=anilist_image.get("large"),
|
||||
extra_large=anilist_image.get("extraLarge"),
|
||||
)
|
||||
|
||||
|
||||
def _to_generic_media_trailer(
|
||||
anilist_trailer: Optional[dict],
|
||||
) -> Optional[MediaTrailer]:
|
||||
"""Maps an AniList trailer object to a generic MediaTrailer."""
|
||||
if not anilist_trailer or not anilist_trailer.get("id"):
|
||||
return None
|
||||
return MediaTrailer(
|
||||
id=anilist_trailer["id"],
|
||||
site=anilist_trailer.get("site"),
|
||||
thumbnail_url=anilist_trailer.get("thumbnail"),
|
||||
)
|
||||
|
||||
|
||||
def _to_generic_airing_schedule(
|
||||
anilist_schedule: Optional[dict],
|
||||
) -> Optional[AiringSchedule]:
|
||||
"""Maps an AniList nextAiringEpisode object to a generic AiringSchedule."""
|
||||
if not anilist_schedule or not anilist_schedule.get("airingAt"):
|
||||
return None
|
||||
return AiringSchedule(
|
||||
airing_at=datetime.fromtimestamp(anilist_schedule["airingAt"]),
|
||||
episode=anilist_schedule.get("episode", 0),
|
||||
)
|
||||
|
||||
|
||||
def _to_generic_studios(anilist_studios: Optional[dict]) -> List[Studio]:
|
||||
"""Maps AniList studio nodes to a list of generic Studio objects."""
|
||||
if not anilist_studios or not anilist_studios.get("nodes"):
|
||||
return []
|
||||
return [
|
||||
Studio(id=s["id"], name=s["name"])
|
||||
for s in anilist_studios["nodes"]
|
||||
if s.get("id") and s.get("name")
|
||||
]
|
||||
|
||||
|
||||
def _to_generic_tags(anilist_tags: Optional[list[dict]]) -> List[MediaTag]:
|
||||
"""Maps a list of AniList tags to generic MediaTag objects."""
|
||||
if not anilist_tags:
|
||||
return []
|
||||
return [
|
||||
MediaTag(name=t["name"], rank=t.get("rank"))
|
||||
for t in anilist_tags
|
||||
if t.get("name")
|
||||
]
|
||||
|
||||
|
||||
def _to_generic_user_status(
|
||||
anilist_list_entry: Optional[dict],
|
||||
) -> Optional[UserListStatus]:
|
||||
"""Maps an AniList mediaListEntry to a generic UserListStatus."""
|
||||
if not anilist_list_entry:
|
||||
return None
|
||||
|
||||
score = anilist_list_entry.get("score")
|
||||
|
||||
return UserListStatus(
|
||||
status=anilist_list_entry.get("status"),
|
||||
progress=anilist_list_entry.get("progress"),
|
||||
score=score
|
||||
if score is not None
|
||||
else None, # AniList score is 0-10, matches our generic model
|
||||
)
|
||||
|
||||
|
||||
def _to_generic_media_item(data: AnilistBaseMediaDataSchema) -> MediaItem:
|
||||
"""Maps a single AniList media schema to a generic MediaItem."""
|
||||
return MediaItem(
|
||||
id=data["id"],
|
||||
id_mal=data.get("idMal"),
|
||||
type=data.get("type", "ANIME"),
|
||||
title=_to_generic_media_title(data.get("title")),
|
||||
status=data.get("status"),
|
||||
format=data.get("format"),
|
||||
cover_image=_to_generic_media_image(data.get("coverImage")),
|
||||
banner_image=data.get("bannerImage"),
|
||||
trailer=_to_generic_media_trailer(data.get("trailer")),
|
||||
description=data.get("description"),
|
||||
episodes=data.get("episodes"),
|
||||
duration=data.get("duration"),
|
||||
genres=data.get("genres", []),
|
||||
tags=_to_generic_tags(data.get("tags")),
|
||||
studios=_to_generic_studios(data.get("studios")),
|
||||
synonyms=data.get("synonyms", []),
|
||||
average_score=data.get("averageScore"),
|
||||
popularity=data.get("popularity"),
|
||||
favourites=data.get("favourites"),
|
||||
next_airing=_to_generic_airing_schedule(data.get("nextAiringEpisode")),
|
||||
user_list_status=_to_generic_user_status(data.get("mediaListEntry")),
|
||||
)
|
||||
|
||||
|
||||
def _to_generic_page_info(data: AnilistPageInfo) -> PageInfo:
|
||||
"""Maps an AniList page info object to a generic PageInfo."""
|
||||
return PageInfo(
|
||||
total=data.get("total", 0),
|
||||
current_page=data.get("currentPage", 1),
|
||||
has_next_page=data.get("hasNextPage", False),
|
||||
per_page=data.get("perPage", 0),
|
||||
)
|
||||
|
||||
|
||||
def to_generic_search_result(api_response: dict) -> Optional[MediaSearchResult]:
|
||||
"""
|
||||
Top-level mapper to convert a raw AniList search/list API response
|
||||
into a generic MediaSearchResult object.
|
||||
"""
|
||||
if not api_response or "data" not in api_response:
|
||||
logger.warning("Mapping failed: API response is missing 'data' key.")
|
||||
return None
|
||||
|
||||
page_data = api_response["data"].get("Page")
|
||||
if not page_data:
|
||||
logger.warning("Mapping failed: API response 'data' is missing 'Page' key.")
|
||||
return None
|
||||
|
||||
raw_media_list = page_data.get("media", [])
|
||||
media_items: List[MediaItem] = [
|
||||
_to_generic_media_item(item) for item in raw_media_list if item
|
||||
]
|
||||
page_info = _to_generic_page_info(page_data.get("pageInfo", {}))
|
||||
|
||||
return MediaSearchResult(page_info=page_info, media=media_items)
|
||||
|
||||
|
||||
def to_generic_user_list_result(api_response: dict) -> Optional[MediaSearchResult]:
|
||||
"""
|
||||
Mapper for user list queries where media data is nested inside a 'mediaList' key.
|
||||
"""
|
||||
if not api_response or "data" not in api_response:
|
||||
return None
|
||||
page_data = api_response["data"].get("Page")
|
||||
if not page_data:
|
||||
return None
|
||||
|
||||
# Extract media objects from the 'mediaList' array
|
||||
media_list_items = page_data.get("mediaList", [])
|
||||
raw_media_list = [
|
||||
item.get("media") for item in media_list_items if item.get("media")
|
||||
]
|
||||
|
||||
# Now that we have a standard list of media, we can reuse the main search result mapper
|
||||
page_data["media"] = raw_media_list
|
||||
return to_generic_search_result({"data": {"Page": page_data}})
|
||||
|
||||
|
||||
def to_generic_user_profile(api_response: dict) -> Optional[UserProfile]:
|
||||
"""Maps a raw AniList viewer response to a generic UserProfile."""
|
||||
if not api_response or "data" not in api_response:
|
||||
return None
|
||||
|
||||
viewer_data: Optional[AnilistUser_] = api_response["data"].get("Viewer")
|
||||
if not viewer_data:
|
||||
return None
|
||||
|
||||
return UserProfile(
|
||||
id=viewer_data["id"],
|
||||
name=viewer_data["name"],
|
||||
avatar_url=viewer_data.get("avatar", {}).get("large"),
|
||||
banner_url=viewer_data.get("bannerImage"),
|
||||
)
|
||||
|
||||
|
||||
def to_generic_relations(api_response: dict) -> Optional[List[MediaItem]]:
|
||||
"""Maps the 'relations' part of an API response."""
|
||||
if not api_response or "data" not in api_response:
|
||||
return None
|
||||
nodes = (
|
||||
api_response.get("data", {})
|
||||
.get("Media", {})
|
||||
.get("relations", {})
|
||||
.get("nodes", [])
|
||||
)
|
||||
return [_to_generic_media_item(node) for node in nodes if node]
|
||||
|
||||
|
||||
def to_generic_recommendations(api_response: dict) -> Optional[List[MediaItem]]:
|
||||
"""Maps the 'recommendations' part of an API response."""
|
||||
if not api_response or "data" not in api_response:
|
||||
return None
|
||||
recs = (
|
||||
api_response.get("data", {})
|
||||
.get("Media", {})
|
||||
.get("recommendations", {})
|
||||
.get("nodes", [])
|
||||
)
|
||||
return [
|
||||
_to_generic_media_item(rec.get("mediaRecommendation"))
|
||||
for rec in recs
|
||||
if rec.get("mediaRecommendation")
|
||||
]
|
||||
0
fastanime/libs/api/anilist/queries/user-info.gql
Normal file
0
fastanime/libs/api/anilist/queries/user-info.gql
Normal file
86
fastanime/libs/api/base.py
Normal file
86
fastanime/libs/api/base.py
Normal file
@@ -0,0 +1,86 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import abc
|
||||
from dataclasses import dataclass
|
||||
from typing import TYPE_CHECKING, Any, Literal, Optional
|
||||
|
||||
from .types import MediaSearchResult, UserProfile
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from httpx import Client
|
||||
|
||||
from ...core.config import AnilistConfig # Import the specific config part
|
||||
|
||||
|
||||
# --- Parameter Dataclasses (Unchanged) ---
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ApiSearchParams:
|
||||
query: Optional[str] = None
|
||||
page: int = 1
|
||||
per_page: int = 20
|
||||
sort: Optional[str] = None
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class UserListParams:
|
||||
status: Literal[
|
||||
"CURRENT", "PLANNING", "COMPLETED", "DROPPED", "PAUSED", "REPEATING"
|
||||
]
|
||||
page: int = 1
|
||||
per_page: int = 20
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class UpdateListEntryParams:
|
||||
media_id: int
|
||||
status: Optional[
|
||||
Literal["CURRENT", "PLANNING", "COMPLETED", "DROPPED", "PAUSED", "REPEATING"]
|
||||
] = None
|
||||
progress: Optional[int] = None
|
||||
score: Optional[float] = None
|
||||
|
||||
|
||||
# --- Abstract Base Class (Simplified) ---
|
||||
|
||||
|
||||
class BaseApiClient(abc.ABC):
|
||||
"""
|
||||
Abstract Base Class defining a generic contract for media database APIs.
|
||||
"""
|
||||
|
||||
# The constructor now expects a specific config model, not the whole AppConfig.
|
||||
def __init__(self, config: AnilistConfig | Any, client: Client):
|
||||
self.config = config
|
||||
self.http_client = client
|
||||
|
||||
# --- Authentication & User ---
|
||||
@abc.abstractmethod
|
||||
def authenticate(self, token: str) -> Optional[UserProfile]:
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_viewer_profile(self) -> Optional[UserProfile]:
|
||||
pass
|
||||
|
||||
# --- Media Browsing & Search ---
|
||||
@abc.abstractmethod
|
||||
def search_media(self, params: ApiSearchParams) -> Optional[MediaSearchResult]:
|
||||
"""Searches for media based on a query and other filters."""
|
||||
pass
|
||||
|
||||
# Redundant fetch methods are REMOVED.
|
||||
|
||||
# --- User List Management ---
|
||||
@abc.abstractmethod
|
||||
def fetch_user_list(self, params: UserListParams) -> Optional[MediaSearchResult]:
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def update_list_entry(self, params: UpdateListEntryParams) -> bool:
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def delete_list_entry(self, media_id: int) -> bool:
|
||||
pass
|
||||
47
fastanime/libs/api/factory.py
Normal file
47
fastanime/libs/api/factory.py
Normal file
@@ -0,0 +1,47 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import importlib
|
||||
import logging
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from httpx import Client
|
||||
from yt_dlp.utils.networking import random_user_agent
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from ...core.config import AppConfig
|
||||
from .base import BaseApiClient
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Map the client name to its import path AND the config section it needs.
|
||||
API_CLIENTS = {
|
||||
"anilist": ("fastanime.libs.api.anilist.api.AniListApi", "anilist"),
|
||||
# "jikan": ("fastanime.libs.jikan.api.JikanApi", "jikan"), # For the future
|
||||
}
|
||||
|
||||
|
||||
def create_api_client(client_name: str, config: AppConfig) -> BaseApiClient:
|
||||
"""
|
||||
Factory to create an instance of a specific API client, injecting only
|
||||
the relevant section of the application configuration.
|
||||
"""
|
||||
if client_name not in API_CLIENTS:
|
||||
raise ValueError(f"Unsupported API client: '{client_name}'")
|
||||
|
||||
import_path, config_section_name = API_CLIENTS[client_name]
|
||||
module_name, class_name = import_path.rsplit(".", 1)
|
||||
|
||||
try:
|
||||
module = importlib.import_module(module_name)
|
||||
client_class = getattr(module, class_name)
|
||||
except (ImportError, AttributeError) as e:
|
||||
raise ImportError(f"Could not load API client '{client_name}': {e}") from e
|
||||
|
||||
# Create a shared httpx client for the API
|
||||
http_client = Client(headers={"User-Agent": random_user_agent()})
|
||||
|
||||
# Retrieve the specific config section from the main AppConfig
|
||||
scoped_config = getattr(config, config_section_name)
|
||||
|
||||
# Inject the scoped config into the client's constructor
|
||||
return client_class(scoped_config, http_client)
|
||||
139
fastanime/libs/api/types.py
Normal file
139
fastanime/libs/api/types.py
Normal file
@@ -0,0 +1,139 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime
|
||||
from typing import List, Literal, Optional
|
||||
|
||||
# --- Generic Enums and Type Aliases ---
|
||||
|
||||
MediaType = Literal["ANIME", "MANGA"]
|
||||
MediaStatus = Literal[
|
||||
"FINISHED", "RELEASING", "NOT_YET_RELEASED", "CANCELLED", "HIATUS"
|
||||
]
|
||||
UserListStatusType = Literal[
|
||||
"CURRENT", "PLANNING", "COMPLETED", "DROPPED", "PAUSED", "REPEATING"
|
||||
]
|
||||
|
||||
# --- Generic Data Models ---
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class MediaImage:
|
||||
"""A generic representation of media imagery URLs."""
|
||||
|
||||
medium: Optional[str] = None
|
||||
large: Optional[str] = None
|
||||
extra_large: Optional[str] = None
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class MediaTitle:
|
||||
"""A generic representation of media titles."""
|
||||
|
||||
romaji: Optional[str] = None
|
||||
english: Optional[str] = None
|
||||
native: Optional[str] = None
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class MediaTrailer:
|
||||
"""A generic representation of a media trailer."""
|
||||
|
||||
id: str
|
||||
site: str # e.g., "youtube"
|
||||
thumbnail_url: Optional[str] = None
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class AiringSchedule:
|
||||
"""A generic representation of the next airing episode."""
|
||||
|
||||
airing_at: datetime
|
||||
episode: int
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class Studio:
|
||||
"""A generic representation of an animation studio."""
|
||||
|
||||
id: int
|
||||
name: str
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class MediaTag:
|
||||
"""A generic representation of a descriptive tag."""
|
||||
|
||||
name: str
|
||||
rank: Optional[int] = None # Percentage relevance from 0-100
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class UserListStatus:
|
||||
"""Generic representation of a user's list status for a media item."""
|
||||
|
||||
status: Optional[UserListStatusType] = None
|
||||
progress: Optional[int] = None
|
||||
score: Optional[float] = None # Standardized to a 0-10 scale
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class MediaItem:
|
||||
"""
|
||||
The definitive, backend-agnostic representation of a single media item.
|
||||
This is the primary data model the application will interact with.
|
||||
"""
|
||||
|
||||
id: int
|
||||
id_mal: Optional[int] = None
|
||||
type: MediaType = "ANIME"
|
||||
title: MediaTitle = field(default_factory=MediaTitle)
|
||||
status: Optional[MediaStatus] = None
|
||||
format: Optional[str] = None # e.g., TV, MOVIE, OVA
|
||||
|
||||
cover_image: MediaImage = field(default_factory=MediaImage)
|
||||
banner_image: Optional[str] = None
|
||||
trailer: Optional[MediaTrailer] = None
|
||||
|
||||
description: Optional[str] = None
|
||||
episodes: Optional[int] = None
|
||||
duration: Optional[int] = None # In minutes
|
||||
genres: List[str] = field(default_factory=list)
|
||||
tags: List[MediaTag] = field(default_factory=list)
|
||||
studios: List[Studio] = field(default_factory=list)
|
||||
synonyms: List[str] = field(default_factory=list)
|
||||
|
||||
average_score: Optional[float] = None # Standardized to a 0-10 scale
|
||||
popularity: Optional[int] = None
|
||||
favourites: Optional[int] = None
|
||||
|
||||
next_airing: Optional[AiringSchedule] = None
|
||||
user_list_status: Optional[UserListStatus] = None
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class PageInfo:
|
||||
"""Generic pagination information."""
|
||||
|
||||
total: int
|
||||
current_page: int
|
||||
has_next_page: bool
|
||||
per_page: int
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class MediaSearchResult:
|
||||
"""A generic representation of a page of media search results."""
|
||||
|
||||
page_info: PageInfo
|
||||
media: List[MediaItem] = field(default_factory=list)
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class UserProfile:
|
||||
"""A generic representation of a user's profile."""
|
||||
|
||||
id: int
|
||||
name: str
|
||||
avatar_url: Optional[str] = None
|
||||
banner_url: Optional[str] = None
|
||||
Reference in New Issue
Block a user