feat: mass refactor

This commit is contained in:
Benexl
2025-07-06 12:31:40 +03:00
parent f042e5042b
commit ec78c81381
129 changed files with 1089 additions and 990 deletions

View File

@@ -1,4 +0,0 @@
"""This package exist as away to expose functions and classes that my be useful to a developer using the fastanime library
[TODO:description]
"""

View File

@@ -1,93 +1 @@
from typing import Literal
from fastapi import FastAPI
from requests import post
from thefuzz import fuzz
from ..AnimeProvider import AnimeProvider
from ..Utility.data import anime_normalizer
app = FastAPI()
anime_provider = AnimeProvider("allanime", "true", "true")
ANILIST_ENDPOINT = "https://graphql.anilist.co"
@app.get("/search")
def search_for_anime(title: str, translation_type: Literal["dub", "sub"] = "sub"):
return anime_provider.search_for_anime(title, translation_type)
@app.get("/anime/{anime_id}")
def get_anime(anime_id: str):
return anime_provider.get_anime(anime_id)
@app.get("/anime/{anime_id}/watch")
def get_episode_streams(
anime_id: str, episode: str, translation_type: Literal["sub", "dub"]
):
return anime_provider.get_episode_streams(anime_id, episode, translation_type)
def get_anime_by_anilist_id(anilist_id: int):
query = f"""
query {{
Media(id: {anilist_id}) {{
id
title {{
romaji
english
native
}}
synonyms
episodes
duration
}}
}}
"""
response = post(ANILIST_ENDPOINT, json={"query": query}).json()
return response["data"]["Media"]
@app.get("/watch/{anilist_id}")
def get_episode_streams_by_anilist_id(
anilist_id: int, episode: str, translation_type: Literal["sub", "dub"]
):
anime = get_anime_by_anilist_id(anilist_id)
if not anime:
return
if search_results := anime_provider.search_for_anime(
str(anime["title"]["romaji"] or anime["title"]["english"]), translation_type
):
if not search_results["results"]:
return
def match_title(possible_user_requested_anime_title):
possible_user_requested_anime_title = anime_normalizer.get(
possible_user_requested_anime_title, possible_user_requested_anime_title
)
title_a = str(anime["title"]["romaji"])
title_b = str(anime["title"]["english"])
percentage_ratio = max(
*[
fuzz.ratio(
title.lower(), possible_user_requested_anime_title.lower()
)
for title in anime["synonyms"]
],
fuzz.ratio(
title_a.lower(), possible_user_requested_anime_title.lower()
),
fuzz.ratio(
title_b.lower(), possible_user_requested_anime_title.lower()
),
)
return percentage_ratio
provider_anime = max(
search_results["results"], key=lambda x: match_title(x["title"])
)
anime_provider.get_anime(provider_anime["id"])
return anime_provider.get_episode_streams(
provider_anime["id"], episode, translation_type
)

93
fastanime/api/api.py Normal file
View File

@@ -0,0 +1,93 @@
from typing import Literal
from fastapi import FastAPI
from requests import post
from thefuzz import fuzz
from ..AnimeProvider import AnimeProvider
from ..Utility.data import anime_normalizer
app = FastAPI()
anime_provider = AnimeProvider("allanime", "true", "true")
ANILIST_ENDPOINT = "https://graphql.anilist.co"
@app.get("/search")
def search_for_anime(title: str, translation_type: Literal["dub", "sub"] = "sub"):
return anime_provider.search_for_anime(title, translation_type)
@app.get("/anime/{anime_id}")
def get_anime(anime_id: str):
return anime_provider.get_anime(anime_id)
@app.get("/anime/{anime_id}/watch")
def get_episode_streams(
anime_id: str, episode: str, translation_type: Literal["sub", "dub"]
):
return anime_provider.get_episode_streams(anime_id, episode, translation_type)
def get_anime_by_anilist_id(anilist_id: int):
query = f"""
query {{
Media(id: {anilist_id}) {{
id
title {{
romaji
english
native
}}
synonyms
episodes
duration
}}
}}
"""
response = post(ANILIST_ENDPOINT, json={"query": query}).json()
return response["data"]["Media"]
@app.get("/watch/{anilist_id}")
def get_episode_streams_by_anilist_id(
anilist_id: int, episode: str, translation_type: Literal["sub", "dub"]
):
anime = get_anime_by_anilist_id(anilist_id)
if not anime:
return
if search_results := anime_provider.search_for_anime(
str(anime["title"]["romaji"] or anime["title"]["english"]), translation_type
):
if not search_results["results"]:
return
def match_title(possible_user_requested_anime_title):
possible_user_requested_anime_title = anime_normalizer.get(
possible_user_requested_anime_title, possible_user_requested_anime_title
)
title_a = str(anime["title"]["romaji"])
title_b = str(anime["title"]["english"])
percentage_ratio = max(
*[
fuzz.ratio(
title.lower(), possible_user_requested_anime_title.lower()
)
for title in anime["synonyms"]
],
fuzz.ratio(
title_a.lower(), possible_user_requested_anime_title.lower()
),
fuzz.ratio(
title_b.lower(), possible_user_requested_anime_title.lower()
),
)
return percentage_ratio
provider_anime = max(
search_results["results"], key=lambda x: match_title(x["title"])
)
anime_provider.get_anime(provider_anime["id"])
return anime_provider.get_episode_streams(
provider_anime["id"], episode, translation_type
)

View File

@@ -1,128 +1 @@
import click
from ...utils.lazyloader import LazyGroup
from ...utils.tools import FastAnimeRuntimeState
commands = {
"trending": "trending.trending",
"recent": "recent.recent",
"search": "search.search",
"upcoming": "upcoming.upcoming",
"scores": "scores.scores",
"popular": "popular.popular",
"favourites": "favourites.favourites",
"random": "random_anime.random_anime",
"login": "login.login",
"watching": "watching.watching",
"paused": "paused.paused",
"rewatching": "rewatching.rewatching",
"dropped": "dropped.dropped",
"completed": "completed.completed",
"planning": "planning.planning",
"notifier": "notifier.notifier",
"stats": "stats.stats",
"download": "download.download",
"downloads": "downloads.downloads",
}
@click.group(
lazy_subcommands=commands,
cls=LazyGroup,
invoke_without_command=True,
help="A beautiful interface that gives you access to a commplete streaming experience",
short_help="Access all streaming options",
epilog="""
\b
\b\bExamples:
# ---- search ----
\b
# get anime with the tag of isekai
fastanime anilist search -T isekai
\b
# get anime of 2024 and sort by popularity
# that has already finished airing or is releasing
# and is not in your anime lists
fastanime anilist search -y 2024 -s POPULARITY_DESC --status RELEASING --status FINISHED --not-on-list
\b
# get anime of 2024 season WINTER
fastanime anilist search -y 2024 --season WINTER
\b
# get anime genre action and tag isekai,magic
fastanime anilist search -g Action -T Isekai -T Magic
\b
# get anime of 2024 thats finished airing
fastanime anilist search -y 2024 -S FINISHED
\b
# get the most favourite anime movies
fastanime anilist search -f MOVIE -s FAVOURITES_DESC
\b
# ---- login ----
\b
# To sign in just run
fastanime anilist login
\b
# To view your login status
fastanime anilist login --status
\b
# To erase login data
fastanime anilist login --erase
\b
# ---- notifier ----
\b
# basic form
fastanime anilist notifier
\b
# with logging to stdout
fastanime --log anilist notifier
\b
# with logging to a file. stored in the same place as your config
fastanime --log-file anilist notifier
""",
)
@click.option("--resume", is_flag=True, help="Resume from the last session")
@click.pass_context
def anilist(ctx: click.Context, resume: bool):
from typing import TYPE_CHECKING
from ....anilist import AniList
from ....AnimeProvider import AnimeProvider
if TYPE_CHECKING:
from ...config import Config
config: Config = ctx.obj
config.anime_provider = AnimeProvider(config.provider)
if user := ctx.obj.user:
AniList.update_login_info(user, user["token"])
if ctx.invoked_subcommand is None:
fastanime_runtime_state = FastAnimeRuntimeState()
if resume:
from ...interfaces.anilist_interfaces import (
anime_provider_search_results_menu,
)
if not config.user_data["recent_anime"]:
click.echo("No recent anime found", err=True, color=True)
return
fastanime_runtime_state.anilist_results_data = {
"data": {"Page": {"media": config.user_data["recent_anime"]}}
}
fastanime_runtime_state.selected_anime_anilist = config.user_data[
"recent_anime"
][0]
fastanime_runtime_state.selected_anime_id_anilist = config.user_data[
"recent_anime"
][0]["id"]
fastanime_runtime_state.selected_anime_title_anilist = (
config.user_data["recent_anime"][0]["title"]["romaji"]
or config.user_data["recent_anime"][0]["title"]["english"]
)
anime_provider_search_results_menu(config, fastanime_runtime_state)
else:
from ...interfaces.anilist_interfaces import (
fastanime_main_menu as anilist_interface,
)
anilist_interface(ctx.obj, fastanime_runtime_state)
from .cmd import anilist

View File

@@ -1,42 +0,0 @@
# in lazy_group.py
import importlib
import click
class LazyGroup(click.Group):
def __init__(self, *args, lazy_subcommands=None, **kwargs):
super().__init__(*args, **kwargs)
# lazy_subcommands is a map of the form:
#
# {command-name} -> {module-name}.{command-object-name}
#
self.lazy_subcommands = lazy_subcommands or {}
def list_commands(self, ctx):
base = super().list_commands(ctx)
lazy = sorted(self.lazy_subcommands.keys())
return base + lazy
def get_command(self, ctx, cmd_name): # pyright:ignore
if cmd_name in self.lazy_subcommands:
return self._lazy_load(cmd_name)
return super().get_command(ctx, cmd_name)
def _lazy_load(self, cmd_name: str):
# lazily loading a command, first get the module name and attribute name
import_path: str = self.lazy_subcommands[cmd_name]
modname, cmd_object_name = import_path.rsplit(".", 1)
# do the import
mod = importlib.import_module(
f".{modname}", package="fastanime.cli.commands.anilist"
)
# get the Command object from that module
cmd_object = getattr(mod, cmd_object_name)
# check the result to make debugging easier
if not isinstance(cmd_object, click.Command):
raise ValueError(
f"Lazy loading of {import_path} failed by returning "
"a non-command object"
)
return cmd_object

View File

@@ -0,0 +1,128 @@
import click
from ...utils.lazyloader import LazyGroup
from ...utils.tools import FastAnimeRuntimeState
commands = {
"trending": "trending.trending",
"recent": "recent.recent",
"search": "search.search",
"upcoming": "upcoming.upcoming",
"scores": "scores.scores",
"popular": "popular.popular",
"favourites": "favourites.favourites",
"random": "random_anime.random_anime",
"login": "login.login",
"watching": "watching.watching",
"paused": "paused.paused",
"rewatching": "rewatching.rewatching",
"dropped": "dropped.dropped",
"completed": "completed.completed",
"planning": "planning.planning",
"notifier": "notifier.notifier",
"stats": "stats.stats",
"download": "download.download",
"downloads": "downloads.downloads",
}
@click.group(
lazy_subcommands=commands,
cls=LazyGroup,
invoke_without_command=True,
help="A beautiful interface that gives you access to a commplete streaming experience",
short_help="Access all streaming options",
epilog="""
\b
\b\bExamples:
# ---- search ----
\b
# get anime with the tag of isekai
fastanime anilist search -T isekai
\b
# get anime of 2024 and sort by popularity
# that has already finished airing or is releasing
# and is not in your anime lists
fastanime anilist search -y 2024 -s POPULARITY_DESC --status RELEASING --status FINISHED --not-on-list
\b
# get anime of 2024 season WINTER
fastanime anilist search -y 2024 --season WINTER
\b
# get anime genre action and tag isekai,magic
fastanime anilist search -g Action -T Isekai -T Magic
\b
# get anime of 2024 thats finished airing
fastanime anilist search -y 2024 -S FINISHED
\b
# get the most favourite anime movies
fastanime anilist search -f MOVIE -s FAVOURITES_DESC
\b
# ---- login ----
\b
# To sign in just run
fastanime anilist login
\b
# To view your login status
fastanime anilist login --status
\b
# To erase login data
fastanime anilist login --erase
\b
# ---- notifier ----
\b
# basic form
fastanime anilist notifier
\b
# with logging to stdout
fastanime --log anilist notifier
\b
# with logging to a file. stored in the same place as your config
fastanime --log-file anilist notifier
""",
)
@click.option("--resume", is_flag=True, help="Resume from the last session")
@click.pass_context
def anilist(ctx: click.Context, resume: bool):
from typing import TYPE_CHECKING
from ....anilist import AniList
from ....AnimeProvider import AnimeProvider
if TYPE_CHECKING:
from ...config import Config
config: Config = ctx.obj
config.anime_provider = AnimeProvider(config.provider)
if user := ctx.obj.user:
AniList.update_login_info(user, user["token"])
if ctx.invoked_subcommand is None:
fastanime_runtime_state = FastAnimeRuntimeState()
if resume:
from ...interfaces.anilist_interfaces import (
anime_provider_search_results_menu,
)
if not config.user_data["recent_anime"]:
click.echo("No recent anime found", err=True, color=True)
return
fastanime_runtime_state.anilist_results_data = {
"data": {"Page": {"media": config.user_data["recent_anime"]}}
}
fastanime_runtime_state.selected_anime_anilist = config.user_data[
"recent_anime"
][0]
fastanime_runtime_state.selected_anime_id_anilist = config.user_data[
"recent_anime"
][0]["id"]
fastanime_runtime_state.selected_anime_title_anilist = (
config.user_data["recent_anime"][0]["title"]["romaji"]
or config.user_data["recent_anime"][0]["title"]["english"]
)
anime_provider_search_results_menu(config, fastanime_runtime_state)
else:
from ...interfaces.anilist_interfaces import (
fastanime_main_menu as anilist_interface,
)
anilist_interface(ctx.obj, fastanime_runtime_state)

View File

@@ -0,0 +1,26 @@
import json
from pathlib import Path
from httpx import AsyncClient, Client, Response
from typing_extensions import Counter
from .networking import TIMEOUT
def execute_graphql_query(
url: str, httpx_client: Client, graphql_file: Path, variables: dict
):
response = httpx_client.get(
url,
params={
"variables": json.dumps(variables),
"query": load_graphql_from_file(graphql_file),
},
timeout=TIMEOUT,
)
return response
def load_graphql_from_file(file: Path) -> str:
query = file.read_text(encoding="utf-8")
return query

View File

@@ -0,0 +1 @@
TIMEOUT = 10

View File

View File

@@ -1,12 +0,0 @@
from .allanime.constants import SERVERS_AVAILABLE as ALLANIME_SERVERS
from .animepahe.constants import SERVERS_AVAILABLE as ANIMEPAHE_SERVERS
from .hianime.constants import SERVERS_AVAILABLE as HIANIME_SERVERS
PROVIDERS_AVAILABLE = {
"allanime": "api.AllAnime",
"animepahe": "api.AnimePahe",
"hianime": "api.HiAnime",
"nyaa": "api.Nyaa",
"yugen": "api.Yugen",
}
SERVERS_AVAILABLE = ["top", *ALLANIME_SERVERS, *ANIMEPAHE_SERVERS, *HIANIME_SERVERS]

View File

@@ -1,500 +0,0 @@
import json
import logging
from typing import TYPE_CHECKING
from ...anime_provider.base_provider import AnimeProvider
from ..decorators import debug_provider
from ..utils import give_random_quality, one_digit_symmetric_xor
from .constants import (
API_BASE_URL,
API_ENDPOINT,
API_REFERER,
DEFAULT_COUNTRY_OF_ORIGIN,
DEFAULT_NSFW,
DEFAULT_PAGE,
DEFAULT_PER_PAGE,
DEFAULT_UNKNOWN,
MP4_SERVER_JUICY_STREAM_REGEX,
)
from .gql_queries import EPISODES_GQL, SEARCH_GQL, SHOW_GQL
if TYPE_CHECKING:
from .types import AllAnimeEpisode
logger = logging.getLogger(__name__)
class AllAnime(AnimeProvider):
"""
AllAnime is a provider class for fetching anime data from the AllAnime API.
Attributes:
HEADERS (dict): Default headers for API requests.
Methods:
_execute_graphql_query(query: str, variables: dict) -> dict:
Executes a GraphQL query and returns the response data.
search_for_anime(
**kwargs
) -> dict:
Searches for anime based on the provided keywords and other parameters.
get_anime(show_id: str) -> dict:
Retrieves detailed information about a specific anime by its ID.
_get_anime_episode(
show_id: str, episode, translation_type: str = "sub"
Retrieves information about a specific episode of an anime.
get_episode_streams(
) -> generator:
Retrieves streaming links for a specific episode of an anime.
"""
HEADERS = {
"Referer": API_REFERER,
}
def _execute_graphql_query(self, query: str, variables: dict):
"""
Executes a GraphQL query using the provided query string and variables.
Args:
query (str): The GraphQL query string to be executed.
variables (dict): A dictionary of variables to be used in the query.
Returns:
dict: The JSON response data from the GraphQL API.
Raises:
requests.exceptions.HTTPError: If the HTTP request returned an unsuccessful status code.
"""
response = self.session.get(
API_ENDPOINT,
params={
"variables": json.dumps(variables),
"query": query,
},
timeout=10,
)
response.raise_for_status()
return response.json()["data"]
@debug_provider
def search_for_anime(
self,
search_keywords: str,
translation_type: str,
*,
nsfw=DEFAULT_NSFW,
unknown=DEFAULT_UNKNOWN,
limit=DEFAULT_PER_PAGE,
page=DEFAULT_PAGE,
country_of_origin=DEFAULT_COUNTRY_OF_ORIGIN,
**kwargs,
):
"""
Search for anime based on given keywords and filters.
Args:
search_keywords (str): The keywords to search for.
translation_type (str, optional): The type of translation to search for (e.g., "sub" or "dub"). Defaults to "sub".
limit (int, optional): The maximum number of results to return. Defaults to 40.
page (int, optional): The page number to return. Defaults to 1.
country_of_origin (str, optional): The country of origin filter. Defaults to "all".
nsfw (bool, optional): Whether to include adult content in the search results. Defaults to True.
unknown (bool, optional): Whether to include unknown content in the search results. Defaults to True.
**kwargs: Additional keyword arguments.
Returns:
dict: A dictionary containing the page information and a list of search results. Each result includes:
- id (str): The ID of the anime.
- title (str): The title of the anime.
- type (str): The type of the anime.
- availableEpisodes (int): The number of available episodes.
"""
search_results = self._execute_graphql_query(
SEARCH_GQL,
variables={
"search": {
"allowAdult": nsfw,
"allowUnknown": unknown,
"query": search_keywords,
},
"limit": limit,
"page": page,
"translationtype": translation_type,
"countryorigin": country_of_origin,
},
)
return {
"pageInfo": search_results["shows"]["pageInfo"],
"results": [
{
"id": result["_id"],
"title": result["name"],
"type": result["__typename"],
"availableEpisodes": result["availableEpisodes"],
}
for result in search_results["shows"]["edges"]
],
}
@debug_provider
def get_anime(self, id: str, **kwargs):
"""
Fetches anime details using the provided show ID.
Args:
id (str): The ID of the anime show to fetch details for.
Returns:
dict: A dictionary containing the anime details, including:
- id (str): The unique identifier of the anime show.
- title (str): The title of the anime show.
- availableEpisodesDetail (list): A list of available episodes details.
- type (str, optional): The type of the anime show.
"""
anime = self._execute_graphql_query(SHOW_GQL, variables={"showId": id})
self.store.set(id, "anime_info", {"title": anime["show"]["name"]})
return {
"id": anime["show"]["_id"],
"title": anime["show"]["name"],
"availableEpisodesDetail": anime["show"]["availableEpisodesDetail"],
"type": anime.get("__typename"),
}
@debug_provider
def _get_anime_episode(
self, anime_id: str, episode, translation_type: str = "sub"
) -> "AllAnimeEpisode":
"""
Fetches a specific episode of an anime by its ID and episode number.
Args:
anime_id (str): The unique identifier of the anime.
episode (str): The episode number or string identifier.
translation_type (str, optional): The type of translation for the episode. Defaults to "sub".
Returns:
AllAnimeEpisode: The episode details retrieved from the GraphQL query.
"""
return self._execute_graphql_query(
EPISODES_GQL,
variables={
"showId": anime_id,
"translationType": translation_type,
"episodeString": episode,
},
)["episode"]
@debug_provider
def _get_server(
self,
embed,
anime_title: str,
allanime_episode: "AllAnimeEpisode",
episode_number,
):
"""
Retrieves the streaming server information for a given anime episode based on the provided embed data.
Args:
embed (dict): A dictionary containing the embed data, including the source URL and source name.
anime_title (str): The title of the anime.
allanime_episode (AllAnimeEpisode): An object representing the episode details.
Returns:
dict: A dictionary containing server information, headers, subtitles, episode title, and links to the stream.
Returns None if no valid URL or stream is found.
Raises:
requests.exceptions.RequestException: If there is an issue with the HTTP request.
"""
url = embed.get("sourceUrl")
if not url:
return
if url.startswith("--"):
url = one_digit_symmetric_xor(56, url[2:])
# FIRST CASE
match embed["sourceName"]:
case "Yt-mp4":
logger.debug("Found streams from Yt")
return {
"server": "Yt",
"episode_title": f"{anime_title}; Episode {episode_number}",
"headers": {"Referer": f"https://{API_BASE_URL}/"},
"subtitles": [],
"links": [
{
"link": url,
"quality": "1080",
}
],
}
case "Mp4":
logger.debug("Found streams from Mp4")
response = self.session.get(
url,
fresh=1, # pyright: ignore
timeout=10,
)
response.raise_for_status()
embed_html = response.text.replace(" ", "").replace("\n", "")
vid = MP4_SERVER_JUICY_STREAM_REGEX.search(embed_html)
if not vid:
return
return {
"server": "mp4-upload",
"headers": {"Referer": "https://www.mp4upload.com/"},
"subtitles": [],
"episode_title": (allanime_episode["notes"] or f"{anime_title}")
+ f"; Episode {episode_number}",
"links": [{"link": vid.group(1), "quality": "1080"}],
}
case "Fm-Hls":
# TODO: requires decoding obsfucated js (filemoon)
logger.debug("Found streams from Fm-Hls")
response = self.session.get(
url,
timeout=10,
)
response.raise_for_status()
embed_html = response.text.replace(" ", "").replace("\n", "")
vid = MP4_SERVER_JUICY_STREAM_REGEX.search(embed_html)
if not vid:
return
return {
"server": "filemoon",
"headers": {"Referer": "https://www.mp4upload.com/"},
"subtitles": [],
"episode_title": (allanime_episode["notes"] or f"{anime_title}")
+ f"; Episode {episode_number}",
"links": [{"link": vid.group(1), "quality": "1080"}],
}
case "Ok":
# TODO: requires decoding the obsfucated js (filemoon)
response = self.session.get(
url,
timeout=10,
)
response.raise_for_status()
embed_html = response.text.replace(" ", "").replace("\n", "")
vid = MP4_SERVER_JUICY_STREAM_REGEX.search(embed_html)
logger.debug("Found streams from Ok")
return {
"server": "filemoon",
"headers": {"Referer": f"https://{API_BASE_URL}/"},
"subtitles": [],
"episode_title": (allanime_episode["notes"] or f"{anime_title}")
+ f"; Episode {episode_number}",
"links": give_random_quality(response.json()["links"]),
}
case "Vid-mp4":
# TODO: requires some serious work i think : )
response = self.session.get(
url,
timeout=10,
)
response.raise_for_status()
embed_html = response.text.replace(" ", "").replace("\n", "")
logger.debug("Found streams from vid-mp4")
return {
"server": "Vid-mp4",
"headers": {"Referer": f"https://{API_BASE_URL}/"},
"subtitles": [],
"episode_title": (allanime_episode["notes"] or f"{anime_title}")
+ f"; Episode {episode_number}",
"links": give_random_quality(response.json()["links"]),
}
case "Ss-Hls":
# TODO: requires some serious work i think : )
response = self.session.get(
url,
timeout=10,
)
response.raise_for_status()
embed_html = response.text.replace(" ", "").replace("\n", "")
logger.debug("Found streams from Ss-Hls")
return {
"server": "StreamSb",
"headers": {"Referer": f"https://{API_BASE_URL}/"},
"subtitles": [],
"episode_title": (allanime_episode["notes"] or f"{anime_title}")
+ f"; Episode {episode_number}",
"links": give_random_quality(response.json()["links"]),
}
# get the stream url for an episode of the defined source names
response = self.session.get(
f"https://{API_BASE_URL}{url.replace('clock', 'clock.json')}",
timeout=10,
)
response.raise_for_status()
# SECOND CASE
match embed["sourceName"]:
case "Luf-mp4":
logger.debug("Found streams from gogoanime")
return {
"server": "gogoanime",
"headers": {"Referer": f"https://{API_BASE_URL}/"},
"subtitles": [],
"episode_title": (allanime_episode["notes"] or f"{anime_title}")
+ f"; Episode {episode_number}",
"links": give_random_quality(response.json()["links"]),
}
case "Kir":
logger.debug("Found streams from wetransfer")
return {
"server": "weTransfer",
"headers": {"Referer": f"https://{API_BASE_URL}/"},
"subtitles": [],
"episode_title": (allanime_episode["notes"] or f"{anime_title}")
+ f"; Episode {episode_number}",
"links": give_random_quality(response.json()["links"]),
}
case "S-mp4":
logger.debug("Found streams from sharepoint")
return {
"server": "sharepoint",
"headers": {"Referer": f"https://{API_BASE_URL}/"},
"subtitles": [],
"episode_title": (allanime_episode["notes"] or f"{anime_title}")
+ f"; Episode {episode_number}",
"links": give_random_quality(response.json()["links"]),
}
case "Sak":
logger.debug("Found streams from dropbox")
return {
"server": "dropbox",
"headers": {"Referer": f"https://{API_BASE_URL}/"},
"subtitles": [],
"episode_title": (allanime_episode["notes"] or f"{anime_title}")
+ f"; Episode {episode_number}",
"links": give_random_quality(response.json()["links"]),
}
case "Default":
logger.debug("Found streams from wixmp")
return {
"server": "wixmp",
"headers": {"Referer": f"https://{API_BASE_URL}/"},
"subtitles": [],
"episode_title": (allanime_episode["notes"] or f"{anime_title}")
+ f"; Episode {episode_number}",
"links": give_random_quality(response.json()["links"]),
}
case "Ak":
# TODO: works but needs further probing
logger.debug("Found streams from Ak")
return {
"server": "Ak",
"headers": {"Referer": f"https://{API_BASE_URL}/"},
"subtitles": [],
"episode_title": (allanime_episode["notes"] or f"{anime_title}")
+ f"; Episode {episode_number}",
"links": give_random_quality(response.json()["links"]),
}
@debug_provider
def get_episode_streams(
self, anime_id, episode_number: str, translation_type="sub", **kwargs
):
"""
Retrieve streaming information for a specific episode of an anime.
Args:
anime_id (str): The unique identifier for the anime.
episode_number (str): The episode number to retrieve streams for.
translation_type (str, optional): The type of translation for the episode (e.g., "sub" for subtitles). Defaults to "sub".
Yields:
dict: A dictionary containing streaming information for the episode, including:
- server (str): The name of the streaming server.
- episode_title (str): The title of the episode.
- headers (dict): HTTP headers required for accessing the stream.
- subtitles (list): A list of subtitles available for the episode.
- links (list): A list of dictionaries containing streaming links and their quality.
"""
anime_title = (self.store.get(anime_id, "anime_info", "") or {"title": ""})[
"title"
]
allanime_episode = self._get_anime_episode(
anime_id, episode_number, translation_type
)
for embed in allanime_episode["sourceUrls"]:
if embed.get("sourceName", "") not in (
# priorities based on death note
"Sak", # 7
"S-mp4", # 7.9
"Luf-mp4", # 7.7
"Default", # 8.5
"Yt-mp4", # 7.9
"Kir", # NA
"Mp4", # 4
# "Ak",#
# "Vid-mp4", # 4
# "Ok", # 3.5
# "Ss-Hls", # 5.5
# "Fm-Hls",#
):
logger.debug(f"Found {embed['sourceName']} but ignoring")
continue
if server := self._get_server(
embed, anime_title, allanime_episode, episode_number
):
yield server
if __name__ == "__main__":
import subprocess
allanime = AllAnime(cache_requests="True", use_persistent_provider_store="False")
search_term = input("Enter the search term for the anime: ")
translation_type = input("Enter the translation type (sub/dub): ")
search_results = allanime.search_for_anime(
search_keywords=search_term, translation_type=translation_type
)
if not search_results["results"]:
print("No results found.")
exit()
print("Search Results:")
for idx, result in enumerate(search_results["results"], start=1):
print(f"{idx}. {result['title']} (ID: {result['id']})")
anime_choice = int(input("Enter the number of the anime you want to watch: ")) - 1
anime_id = search_results["results"][anime_choice]["id"]
anime_details = allanime.get_anime(anime_id)
print(f"Selected Anime: {anime_details['title']}")
print("Available Episodes:")
for idx, episode in enumerate(
sorted(anime_details["availableEpisodesDetail"][translation_type], key=float),
start=1,
):
print(f"{idx}. Episode {episode}")
episode_choice = (
int(input("Enter the number of the episode you want to watch: ")) - 1
)
episode_number = anime_details["availableEpisodesDetail"][translation_type][
episode_choice
]
streams = list(
allanime.get_episode_streams(anime_id, episode_number, translation_type)
)
if not streams:
print("No streams available.")
exit()
print("Available Streams:")
for idx, stream in enumerate(streams, start=1):
print(f"{idx}. Server: {stream['server']}")
server_choice = int(input("Enter the number of the server you want to use: ")) - 1
selected_stream = streams[server_choice]
stream_link = selected_stream["links"][0]["link"]
mpv_args = ["mpv", stream_link]
headers = selected_stream["headers"]
if headers:
mpv_headers = "--http-header-fields="
for header_name, header_value in headers.items():
mpv_headers += f"{header_name}:{header_value},"
mpv_args.append(mpv_headers)
subprocess.run(mpv_args, check=False)

View File

@@ -1,56 +0,0 @@
SEARCH_GQL = """
query (
$search: SearchInput
$limit: Int
$page: Int
$translationType: VaildTranslationTypeEnumType
$countryOrigin: VaildCountryOriginEnumType
) {
shows(
search: $search
limit: $limit
page: $page
translationType: $translationType
countryOrigin: $countryOrigin
) {
pageInfo {
total
}
edges {
_id
name
availableEpisodes
__typename
}
}
}
"""
EPISODES_GQL = """\
query (
$showId: String!
$translationType: VaildTranslationTypeEnumType!
$episodeString: String!
) {
episode(
showId: $showId
translationType: $translationType
episodeString: $episodeString
) {
episodeString
sourceUrls
notes
}
}
"""
SHOW_GQL = """
query ($showId: String!) {
show(_id: $showId) {
_id
name
availableEpisodesDetail
}
}
"""

View File

@@ -1,36 +0,0 @@
import os
import requests
from yt_dlp.utils.networking import random_user_agent
from ...constants import APP_CACHE_DIR
from .providers_store import ProviderStore
class AnimeProvider:
session: requests.Session
USER_AGENT = random_user_agent()
HEADERS = {}
def __init__(self, cache_requests, use_persistent_provider_store) -> None:
if cache_requests.lower() == "true":
from ..common.requests_cacher import CachedRequestsSession
self.session = CachedRequestsSession(
os.path.join(APP_CACHE_DIR, "cached_requests.db"),
max_lifetime=int(
os.environ.get("FASTANIME_MAX_CACHE_LIFETIME", 259200)
),
)
else:
self.session = requests.session()
self.session.headers.update({"User-Agent": self.USER_AGENT, **self.HEADERS})
if use_persistent_provider_store.lower() == "true":
self.store = ProviderStore(
"persistent",
self.__class__.__name__,
os.path.join(APP_CACHE_DIR, "anime_providers_store.db"),
)
else:
self.store = ProviderStore("memory")

View File

@@ -1,90 +0,0 @@
from typing import Literal, TypedDict
class PageInfo(TypedDict):
total: int
perPage: int
currentPage: int
#
# class EpisodesDetail(TypedDict):
# dub: int
# sub: int
# raw: int
#
# search data
class SearchResult(TypedDict):
id: str
title: str
otherTitles: list[str]
availableEpisodes: list[str]
type: str
score: int
status: str
season: str
poster: str
class SearchResults(TypedDict):
pageInfo: PageInfo
results: list[SearchResult]
# anime data
class AnimeEpisodeDetails(TypedDict):
dub: list[str]
sub: list[str]
raw: list[str]
#
# class AnimeEpisode(TypedDict):
# id: str
# title: str
#
class AnimeEpisodeInfo(TypedDict):
id: str
title: str
episode: str
poster: str | None
duration: str | None
translation_type: str | None
class Anime(TypedDict):
id: str
title: str
availableEpisodesDetail: AnimeEpisodeDetails
type: str | None
episodesInfo: list[AnimeEpisodeInfo] | None
poster: str
year: str
class EpisodeStream(TypedDict):
resolution: str | None
link: str
hls: bool | None
mp4: bool | None
priority: int | None
quality: Literal["360", "720", "1080", "unknown"]
translation_type: Literal["dub", "sub"]
class Subtitle(TypedDict):
url: str
language: str
class Server(TypedDict):
headers: dict
subtitles: list[Subtitle]
audio: list
server: str
episode_title: str
links: list[EpisodeStream]

View File

@@ -0,0 +1,3 @@
from .api import connect
__all__ = ["connect"]

View File

@@ -3,7 +3,7 @@ import time
from pypresence import Presence
def discord_connect(show, episode, switch):
def connect(show, episode, switch):
presence = Presence(client_id="1292070065583165512")
presence.connect()
if not switch.is_set():

View File

@@ -0,0 +1,3 @@
from .anime import AnimeProvider
__all__ = ["AnimeProvider"]

View File

@@ -0,0 +1,3 @@
from .provider import PROVIDERS_AVAILABLE, SERVERS_AVAILABLE, AnimeProvider
__all__ = ["SERVERS_AVAILABLE", "PROVIDERS_AVAILABLE", "AnimeProvider"]

View File

@@ -0,0 +1,75 @@
import logging
from typing import TYPE_CHECKING
from fastanime.libs.anime_provider.allanime.parser import (
map_to_anime_result,
map_to_search_results,
)
from ....core.utils.graphql import execute_graphql_query
from ..base import AnimeProvider
from ..utils.decorators import debug_provider
from .constants import (
ANIME_GQL,
API_BASE_URL,
API_GRAPHQL_ENDPOINT,
API_GRAPHQL_REFERER,
EPISODE_GQL,
SEARCH_GQL,
)
from .extractors import extract_server
if TYPE_CHECKING:
from .types import AllAnimeEpisode
logger = logging.getLogger(__name__)
class AllAnime(AnimeProvider):
DEFAULT_HEADERS = {"Referer": API_GRAPHQL_REFERER}
@debug_provider
def search_for_anime(self, params):
response = execute_graphql_query(
API_GRAPHQL_ENDPOINT,
self.client,
SEARCH_GQL,
variables={
"search": {
"allowAdult": params.allow_nsfw,
"allowUnknown": params.allow_unknown,
"query": params.query,
},
"limit": params.page_limit,
"page": params.current_page,
"translationtype": params.translation_type,
"countryorigin": params.country_of_origin,
},
)
return map_to_search_results(response)
@debug_provider
def get_anime(self, params):
response = execute_graphql_query(
API_GRAPHQL_ENDPOINT,
self.client,
ANIME_GQL,
variables={"showId": params.anime_id},
)
return map_to_anime_result(response)
@debug_provider
def get_episode_streams(self, params):
episode_response = execute_graphql_query(
API_BASE_URL,
self.client,
EPISODE_GQL,
variables={
"showId": params.anime_id,
"translationType": params.translation_type,
"episodeString": params.episode,
},
)
episode: AllAnimeEpisode = episode_response.json()["data"]["episode"]
for source in episode["sourceUrls"]:
if server := extract_server(self.client, params.episode, episode, source):
yield server

View File

@@ -1,4 +1,6 @@
import re
from importlib import resources
from pathlib import Path
SERVERS_AVAILABLE = [
"sharepoint",
@@ -10,8 +12,8 @@ SERVERS_AVAILABLE = [
"mp4-upload",
]
API_BASE_URL = "allanime.day"
API_REFERER = "https://allanime.to/"
API_ENDPOINT = f"https://api.{API_BASE_URL}/api/"
API_GRAPHQL_REFERER = "https://allanime.to/"
API_GRAPHQL_ENDPOINT = f"https://api.{API_BASE_URL}/api/"
# search constants
DEFAULT_COUNTRY_OF_ORIGIN = "all"
@@ -21,7 +23,12 @@ DEFAULT_PER_PAGE = 40
DEFAULT_PAGE = 1
# regex stuff
MP4_SERVER_JUICY_STREAM_REGEX = re.compile(
r"video/mp4\",src:\"(https?://.*/video\.mp4)\""
)
# graphql files
GQLS = resources.files("fastanime.libs.anime_provider.allanime")
SEARCH_GQL = Path(str(GQLS / "search.gql"))
ANIME_GQL = Path(str(GQLS / "anime.gql"))
EPISODE_GQL = Path(str(GQLS / "episode.gql"))

View File

@@ -0,0 +1,3 @@
from .extractor import extract_server
__all__ = ["extract_server"]

View File

@@ -0,0 +1,31 @@
from ...types import EpisodeStream, Server
from ..constants import API_BASE_URL
from ..types import AllAnimeEpisode, AllAnimeSource
from .extractor import BaseExtractor
class AkExtractor(BaseExtractor):
@classmethod
def extract(
cls,
url,
client,
episode_number: str,
episode: AllAnimeEpisode,
source: AllAnimeSource,
) -> Server:
response = client.get(
f"https://{API_BASE_URL}{url.replace('clock', 'clock.json')}",
timeout=10,
)
response.raise_for_status()
streams = response.json()
return Server(
name="Ak",
links=[
EpisodeStream(link=link, quality="1080") for link in streams["links"]
],
episode_title=episode["notes"],
headers={"Referer": f"https://{API_BASE_URL}/"},
)

View File

@@ -0,0 +1,31 @@
from ...types import EpisodeStream, Server
from ..constants import API_BASE_URL
from ..types import AllAnimeEpisode, AllAnimeSource
from .extractor import BaseExtractor
class SakExtractor(BaseExtractor):
@classmethod
def extract(
cls,
url,
client,
episode_number: str,
episode: AllAnimeEpisode,
source: AllAnimeSource,
) -> Server:
response = client.get(
f"https://{API_BASE_URL}{url.replace('clock', 'clock.json')}",
timeout=10,
)
response.raise_for_status()
streams = response.json()
return Server(
name="dropbox",
links=[
EpisodeStream(link=link, quality="1080") for link in streams["links"]
],
episode_title=episode["notes"],
headers={"Referer": f"https://{API_BASE_URL}/"},
)

View File

@@ -0,0 +1,55 @@
from abc import ABC, abstractmethod
from logging import getLogger
from ...types import Server
from ..types import AllAnimeEpisode, AllAnimeSource
from ..utils import one_digit_symmetric_xor
from .ak import AkExtractor
logger = getLogger(__name__)
class BaseExtractor(ABC):
@abstractmethod
@classmethod
def extract(cls, url, client, episode_number, episode, source) -> Server:
pass
AVAILABLE_SOURCES = {
"Sak": AkExtractor,
"S-mp4": AkExtractor,
"Luf-mp4": AkExtractor,
"Default": AkExtractor,
"Yt-mp4": AkExtractor,
"Kir": AkExtractor,
"Mp4": AkExtractor,
}
OTHER_SOURCES = {"Ak": AkExtractor, "Vid-mp4": "", "Ok": "", "Ss-Hls": "", "Fm-Hls": ""}
def extract_server(
client, episode_number: str, episode: AllAnimeEpisode, source: AllAnimeSource
) -> Server | None:
url = source.get("sourceUrl")
if not url:
logger.debug(f"Url not found in source: {source}")
return
if url.startswith("--"):
url = one_digit_symmetric_xor(56, url[2:])
if source["sourceName"] in OTHER_SOURCES:
logger.debug(f"Found {source['sourceName']} but ignoring")
return
if source["sourceName"] not in AVAILABLE_SOURCES:
logger.debug(
f"Found {source['sourceName']} but did not expect it, its time to scrape lol"
)
return
logger.debug(f"Found {source['sourceName']}")
return AVAILABLE_SOURCES[source["sourceName"]].extract(
url, client, episode_number, episode, source
)

View File

@@ -0,0 +1,64 @@
from ...types import EpisodeStream, Server
from ..constants import API_BASE_URL, MP4_SERVER_JUICY_STREAM_REGEX
from ..types import AllAnimeEpisode, AllAnimeSource
from .extractor import BaseExtractor
# TODO: requires decoding obsfucated js (filemoon)
class FmHlsExtractor(BaseExtractor):
@classmethod
def extract(
cls,
url,
client,
episode_number: str,
episode: AllAnimeEpisode,
source: AllAnimeSource,
) -> Server:
response = client.get(
f"https://{API_BASE_URL}{url.replace('clock', 'clock.json')}",
timeout=10,
)
response.raise_for_status()
streams = response.json()
embed_html = response.text.replace(" ", "").replace("\n", "")
vid = MP4_SERVER_JUICY_STREAM_REGEX.search(embed_html)
if not vid:
raise Exception("")
return Server(
name="dropbox",
links=[EpisodeStream(link=vid.group(1), quality="1080")],
episode_title=episode["notes"],
headers={"Referer": "https://www.mp4upload.com/"},
)
# TODO: requires decoding obsfucated js (filemoon)
class OkExtractor(BaseExtractor):
@classmethod
def extract(
cls,
url,
client,
episode_number: str,
episode: AllAnimeEpisode,
source: AllAnimeSource,
) -> Server:
response = client.get(
f"https://{API_BASE_URL}{url.replace('clock', 'clock.json')}",
timeout=10,
)
response.raise_for_status()
streams = response.json()
embed_html = response.text.replace(" ", "").replace("\n", "")
vid = MP4_SERVER_JUICY_STREAM_REGEX.search(embed_html)
if not vid:
raise Exception("")
return Server(
name="dropbox",
links=[EpisodeStream(link=vid.group(1), quality="1080")],
episode_title=episode["notes"],
headers={"Referer": "https://www.mp4upload.com/"},
)

View File

@@ -0,0 +1,31 @@
from ...types import EpisodeStream, Server
from ..constants import API_BASE_URL
from ..types import AllAnimeEpisode, AllAnimeSource
from .extractor import BaseExtractor
class Lufmp4Extractor(BaseExtractor):
@classmethod
def extract(
cls,
url,
client,
episode_number: str,
episode: AllAnimeEpisode,
source: AllAnimeSource,
) -> Server:
response = client.get(
f"https://{API_BASE_URL}{url.replace('clock', 'clock.json')}",
timeout=10,
)
response.raise_for_status()
streams = response.json()
return Server(
name="gogoanime",
links=[
EpisodeStream(link=link, quality="1080") for link in streams["links"]
],
episode_title=episode["notes"],
headers={"Referer": f"https://{API_BASE_URL}/"},
)

View File

@@ -0,0 +1,33 @@
from ...types import EpisodeStream, Server
from ..constants import API_BASE_URL, MP4_SERVER_JUICY_STREAM_REGEX
from ..types import AllAnimeEpisode, AllAnimeSource
from .extractor import BaseExtractor
class Mp4Extractor(BaseExtractor):
@classmethod
def extract(
cls,
url,
client,
episode_number: str,
episode: AllAnimeEpisode,
source: AllAnimeSource,
) -> Server:
response = client.get(
f"https://{API_BASE_URL}{url.replace('clock', 'clock.json')}",
timeout=10,
)
response.raise_for_status()
streams = response.json()
embed_html = response.text.replace(" ", "").replace("\n", "")
vid = MP4_SERVER_JUICY_STREAM_REGEX.search(embed_html)
if not vid:
raise Exception("")
return Server(
name="mp4-upload",
links=[EpisodeStream(link=vid.group(1), quality="1080")],
episode_title=episode["notes"],
headers={"Referer": "https://www.mp4upload.com/"},
)

View File

@@ -0,0 +1,31 @@
from ...types import EpisodeStream, Server
from ..constants import API_BASE_URL
from ..types import AllAnimeEpisode, AllAnimeSource
from .extractor import BaseExtractor
class Smp4Extractor(BaseExtractor):
@classmethod
def extract(
cls,
url,
client,
episode_number: str,
episode: AllAnimeEpisode,
source: AllAnimeSource,
) -> Server:
response = client.get(
f"https://{API_BASE_URL}{url.replace('clock', 'clock.json')}",
timeout=10,
)
response.raise_for_status()
streams = response.json()
return Server(
name="sharepoint",
links=[
EpisodeStream(link=link, quality="1080") for link in streams["links"]
],
episode_title=episode["notes"],
headers={"Referer": f"https://{API_BASE_URL}/"},
)

View File

@@ -0,0 +1,21 @@
from .extractor import BaseExtractor
# TODO: requires some serious work i think : )
response = self.session.get(
url,
timeout=10,
)
response.raise_for_status()
embed_html = response.text.replace(" ", "").replace("\n", "")
logger.debug("Found streams from Ss-Hls")
return {
"server": "StreamSb",
"headers": {"Referer": f"https://{API_BASE_URL}/"},
"subtitles": [],
"episode_title": (allanime_episode["notes"] or f"{anime_title}")
+ f"; Episode {episode_number}",
"links": give_random_quality(response.json()["links"]),
}
class SsHlsExtractor(BaseExtractor):
pass

View File

@@ -0,0 +1,21 @@
from .extractor import BaseExtractor
# TODO: requires some serious work i think : )
response = self.session.get(
url,
timeout=10,
)
response.raise_for_status()
embed_html = response.text.replace(" ", "").replace("\n", "")
logger.debug("Found streams from vid-mp4")
return {
"server": "Vid-mp4",
"headers": {"Referer": f"https://{API_BASE_URL}/"},
"subtitles": [],
"episode_title": (allanime_episode["notes"] or f"{anime_title}")
+ f"; Episode {episode_number}",
"links": give_random_quality(response.json()["links"]),
}
class VidMp4Extractor(BaseExtractor):
pass

View File

@@ -0,0 +1,22 @@
from .extractor import BaseExtractor
# get the stream url for an episode of the defined source names
response = self.session.get(
f"https://{API_BASE_URL}{url.replace('clock', 'clock.json')}",
timeout=10,
)
response.raise_for_status()
case "Kir":
logger.debug("Found streams from wetransfer")
return {
"server": "weTransfer",
"headers": {"Referer": f"https://{API_BASE_URL}/"},
"subtitles": [],
"episode_title": (allanime_episode["notes"] or f"{anime_title}")
+ f"; Episode {episode_number}",
"links": give_random_quality(response.json()["links"]),
}
class KirExtractor(BaseExtractor):
pass

View File

@@ -0,0 +1,22 @@
from .extractor import BaseExtractor
# get the stream url for an episode of the defined source names
response = self.session.get(
f"https://{API_BASE_URL}{url.replace('clock', 'clock.json')}",
timeout=10,
)
response.raise_for_status()
case "Sak":
logger.debug("Found streams from dropbox")
return {
"server": "dropbox",
"headers": {"Referer": f"https://{API_BASE_URL}/"},
"subtitles": [],
"episode_title": (allanime_episode["notes"] or f"{anime_title}")
+ f"; Episode {episode_number}",
"links": give_random_quality(response.json()["links"]),
}
class DefaultExtractor(BaseExtractor):
pass

View File

@@ -0,0 +1,17 @@
from .extractor import BaseExtractor
return {
"server": "Yt",
"episode_title": f"{anime_title}; Episode {episode_number}",
"headers": {"Referer": f"https://{API_BASE_URL}/"},
"subtitles": [],
"links": [
{
"link": url,
"quality": "1080",
}
],
}
class YtExtractor(BaseExtractor):
pass

View File

@@ -0,0 +1,38 @@
from httpx import Response
from ..types import Anime, AnimeEpisodes, PageInfo, SearchResult, SearchResults
from .types import AllAnimeSearchResults, AllAnimeShow
def generate_list(count: int) -> list[str]:
return list(map(str, range(count)))
def map_to_search_results(response: Response) -> SearchResults:
search_results: AllAnimeSearchResults = response.json()["data"]
return SearchResults(
page_info=PageInfo(total=search_results["shows"]["pageInfo"]["total"]),
results=[
SearchResult(
id=result["_id"],
title=result["name"],
media_type=result["__typename"],
available_episodes=AnimeEpisodes(sub=result["availableEpisodes"]),
)
for result in search_results["shows"]["edges"]
],
)
def map_to_anime_result(response: Response) -> Anime:
anime: AllAnimeShow = response.json()["data"]["show"]
return Anime(
id=anime["_id"],
title=anime["name"],
episodes=AnimeEpisodes(
sub=generate_list(anime["availableEpisodesDetail"]["sub"]),
dub=generate_list(anime["availableEpisodesDetail"]["dub"]),
raw=generate_list(anime["availableEpisodesDetail"]["raw"]),
),
type=anime.get("__typename"),
)

View File

@@ -0,0 +1,7 @@
query ($showId: String!) {
show(_id: $showId) {
_id
name
availableEpisodesDetail
}
}

View File

@@ -0,0 +1,15 @@
query (
$showId: String!
$translationType: VaildTranslationTypeEnumType!
$episodeString: String!
) {
episode(
showId: $showId
translationType: $translationType
episodeString: $episodeString
) {
episodeString
sourceUrls
notes
}
}

View File

@@ -0,0 +1,25 @@
query (
$search: SearchInput
$limit: Int
$page: Int
$translationType: VaildTranslationTypeEnumType
$countryOrigin: VaildCountryOriginEnumType
) {
shows(
search: $search
limit: $limit
page: $page
translationType: $translationType
countryOrigin: $countryOrigin
) {
pageInfo {
total
}
edges {
_id
name
availableEpisodes
__typename
}
}
}

View File

@@ -1,7 +1,7 @@
from typing import Literal, TypedDict
class AllAnimeEpisodesInfo(TypedDict):
class AllAnimeEpisodesDetail(TypedDict):
dub: int
sub: int
raw: int
@@ -14,7 +14,7 @@ class AllAnimePageInfo(TypedDict):
class AllAnimeShow(TypedDict):
_id: str
name: str
availableEpisodesDetail: AllAnimeEpisodesInfo
availableEpisodesDetail: AllAnimeEpisodesDetail
__typename: str
@@ -34,20 +34,33 @@ class AllAnimeSearchResults(TypedDict):
shows: AllAnimeShows
class AllAnimeSourcesDownloads(TypedDict):
class AllAnimeSourceDownload(TypedDict):
sourceName: str
dowloadUrl: str
class AllAnimeSources(TypedDict):
class AllAnimeSource(TypedDict):
sourceName: Literal[
"Sak",
"S-mp4",
"Luf-mp4",
"Default",
"Yt-mp4",
"Kir",
"Mp4",
"Ak",
"Vid-mp4",
"Ok",
"Ss-Hls",
"Fm-Hls",
]
sourceUrl: str
priority: float
sandbox: str
sourceName: str
type: str
className: str
streamerId: str
downloads: AllAnimeSourcesDownloads
downloads: AllAnimeSourceDownload
Server = Literal["gogoanime", "dropbox", "wetransfer", "sharepoint"]
@@ -55,7 +68,7 @@ Server = Literal["gogoanime", "dropbox", "wetransfer", "sharepoint"]
class AllAnimeEpisode(TypedDict):
episodeString: str
sourceUrls: list[AllAnimeSources]
sourceUrls: list[AllAnimeSource]
notes: str | None

View File

@@ -9,7 +9,7 @@ from yt_dlp.utils import (
get_elements_html_by_class,
)
from ..base_provider import AnimeProvider
from ..base import AnimeProvider
from ..decorators import debug_provider
from .constants import (
ANIMEPAHE_BASE,

View File

@@ -0,0 +1,70 @@
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import TYPE_CHECKING, Literal
from httpx import AsyncClient, Client
if TYPE_CHECKING:
from collections.abc import Iterator
from .types import Anime, SearchResults, Server
@dataclass
class SearchParams:
"""Parameters for searching anime."""
query: str
# pagination and sorting
current_page: int = 1
page_limit: int = 20
sort_by: str = "relevance"
order: Literal["asc", "desc"] = "desc"
# filters
translation_type: Literal["sub", "dub"] = "sub"
genre: str | None = None
year: int | None = None
status: str | None = None
allow_nsfw: bool = True
allow_unknown: bool = True
country_of_origin: str | None = None
@dataclass
class EpisodeStreamsParams:
"""Parameters for fetching episode streams."""
anime_id: str
episode: str
translation_type: Literal["sub", "dub"] = "sub"
server: str | None = None
quality: Literal["1080", "720", "480", "360"] = "720"
subtitles: bool = True
@dataclass
class AnimeParams:
"""Parameters for fetching anime details."""
anime_id: str
class AnimeProvider(ABC):
def __init__(self, client: Client) -> None:
self.client = client
@abstractmethod
def search_for_anime(self, params: SearchParams) -> "SearchResults | None":
pass
@abstractmethod
def get_anime(self, params: AnimeParams) -> "Anime | None":
pass
@abstractmethod
def get_episode_streams(
self, params: EpisodeStreamsParams
) -> "Iterator[Server] | None":
pass

View File

@@ -13,9 +13,9 @@ from yt_dlp.utils import (
get_elements_html_by_class,
)
from ..base_provider import AnimeProvider
from ..base import AnimeProvider
from ..decorators import debug_provider
from ..utils import give_random_quality
from ..utils.utils import give_random_quality
from .constants import SERVERS_AVAILABLE
from .extractors import MegaCloud
from .types import HiAnimeStream

Some files were not shown because too many files have changed in this diff Show More