Merge pull request #137 from axtrat/provider/animeunity

This commit is contained in:
Benexl
2025-09-07 13:57:10 +03:00
committed by GitHub
8 changed files with 332 additions and 1 deletions

View File

@@ -13,5 +13,12 @@
"Azumanga Daiou The Animation": "Azumanga Daioh",
"Mairimashita! Iruma-kun 2nd Season": "Mairimashita! Iruma-kun 2",
"Mairimashita! Iruma-kun 3rd Season": "Mairimashita! Iruma-kun 3"
},
"animeunity": {
"Kaiju No. 8": "Kaiju No.8",
"Naruto Shippuden": "Naruto: Shippuden",
"Psycho-Pass: Sinners of the System Case.1 - Crime and Punishment": "PSYCHO-PASS Sinners of the System: Case.1 Crime and Punishment",
"Psycho-Pass: Sinners of the System Case.2 - First Guardian": "PSYCHO-PASS Sinners of the System: Case.2 First Guardian",
"Psycho-Pass: Sinners of the System Case.3 - On the Other Side of Love and Hate": "PSYCHO-PASS Sinners of the System: Case.3 Beyond the Pale of Vengeance"
}
}

View File

@@ -249,7 +249,8 @@ def _change_quality(ctx: Context, state: State) -> MenuAction:
return InternalDirective.BACK
new_quality = selector.choose(
"Select a different server:", list(["360", "480", "720", "1080"])
"Select a different quality:",
[link.quality for link in state.provider.server.links],
)
if new_quality:
ctx.config.stream.quality = new_quality # type:ignore

View File

@@ -0,0 +1,14 @@
import re
ANIMEUNITY = "animeunity.so"
ANIMEUNITY_BASE = f"https://www.{ANIMEUNITY}"
MAX_TIMEOUT = 10
TOKEN_REGEX = re.compile(r'<meta.*?name="csrf-token".*?content="([^"]*)".*?>')
REPLACEMENT_WORDS = {"Season ": "", "Cour": "Part"}
# Server Specific
AVAILABLE_VIDEO_QUALITY = ["1080", "720", "480"]
VIDEO_INFO_REGEX = re.compile(r"window.video\s*=\s*(\{[^\}]*\})")
DOWNLOAD_URL_REGEX = re.compile(r"window.downloadUrl\s*=\s*'([^']*)'")

View File

@@ -0,0 +1,129 @@
from typing import Literal
from ..types import (
Anime,
AnimeEpisodeInfo,
AnimeEpisodes,
EpisodeStream,
MediaTranslationType,
PageInfo,
SearchResult,
SearchResults,
Server,
)
from .constants import AVAILABLE_VIDEO_QUALITY
def map_to_search_results(
data: dict, translation_type: Literal["sub", "dub"]
) -> SearchResults:
results = []
for result in data:
mapped_result = map_to_search_result(result, translation_type)
if mapped_result:
results.append(mapped_result)
return SearchResults(
page_info=PageInfo(),
results=results,
)
def map_to_search_result(
data: dict, translation_type: Literal["sub", "dub"] | None
) -> SearchResult | None:
if translation_type and data["dub"] != 1 if translation_type == "dub" else 0:
return None
return SearchResult(
id=str(data["id"]),
title=get_titles(data)[0] if get_titles(data) else "Unknown",
episodes=AnimeEpisodes(
sub=(
list(map(str, range(1, get_episodes_count(data) + 1)))
if data["dub"] == 0
else []
),
dub=(
list(map(str, range(1, get_episodes_count(data) + 1)))
if data["dub"] == 1
else []
),
),
other_titles=get_titles(data),
score=data["score"],
poster=data["imageurl"],
year=data["date"],
)
def map_to_anime_result(data: list, search_result: SearchResult) -> Anime:
return Anime(
id=search_result.id,
title=search_result.title,
episodes=AnimeEpisodes(
sub=[
episode["number"]
for episode in data
if len(search_result.episodes.sub) > 0
],
dub=[
episode["number"]
for episode in data
if len(search_result.episodes.dub) > 0
],
),
episodes_info=[
AnimeEpisodeInfo(
id=str(episode["id"]),
episode=episode["number"],
title=f"{search_result.title} - Ep {episode['number']}",
)
for episode in data
],
type=search_result.media_type,
poster=search_result.poster,
year=search_result.year,
)
def map_to_server(
episode: AnimeEpisodeInfo, info: dict, translation_type: Literal["sub", "dub"]
) -> Server:
return Server(
name="vixcloud",
links=[
EpisodeStream(
link=info["link"].replace(str(info["quality"]), quality),
title=info["name"],
quality=quality, # type: ignore
translation_type=MediaTranslationType(translation_type),
mp4=True,
)
for quality in AVAILABLE_VIDEO_QUALITY
if int(quality) <= info["quality"]
],
episode_title=episode.title,
)
def get_titles(data: dict) -> list[str]:
"""
Return the most appropriate title from the record.
"""
titles = []
if data.get("title_eng"):
titles.append(data["title_eng"])
if data.get("title"):
titles.append(data["title"])
if data.get("title_it"):
titles.append(data["title_it"])
return titles
def get_episodes_count(record: dict) -> int:
"""
Return the number of episodes from the record.
"""
if (count := record.get("real_episodes_count", 0)) > 0:
return count
return record.get("episodes_count", 0)

View File

@@ -0,0 +1,175 @@
import logging
from functools import lru_cache
from ...scraping.user_agents import UserAgentGenerator
from ..base import BaseAnimeProvider
from ..params import AnimeParams, EpisodeStreamsParams, SearchParams
from ..types import Anime, AnimeEpisodeInfo, SearchResult, SearchResults
from ..utils.debug import debug_provider
from .constants import (
ANIMEUNITY_BASE,
DOWNLOAD_URL_REGEX,
MAX_TIMEOUT,
REPLACEMENT_WORDS,
TOKEN_REGEX,
VIDEO_INFO_REGEX,
)
from .mappers import (
map_to_anime_result,
map_to_search_result,
map_to_search_results,
map_to_server,
)
logger = logging.getLogger(__name__)
class AnimeUnity(BaseAnimeProvider):
HEADERS = {
"User-Agent": UserAgentGenerator().random(),
}
_cache = dict[str, SearchResult]()
@lru_cache
def _get_token(self) -> None:
response = self.client.get(
ANIMEUNITY_BASE,
headers=self.HEADERS,
timeout=MAX_TIMEOUT,
follow_redirects=True,
)
response.raise_for_status()
token_match = TOKEN_REGEX.search(response.text)
if token_match:
self.HEADERS["x-csrf-token"] = token_match.group(1)
self.client.cookies = {
"animeunity_session": response.cookies.get("animeunity_session") or ""
}
self.client.headers = self.HEADERS
@debug_provider
def search(self, params: SearchParams) -> SearchResults | None:
if not (res := self._search(params)):
return None
for result in res.results:
self._cache[result.id] = result
return res
@lru_cache
def _search(self, params: SearchParams) -> SearchResults | None:
self._get_token()
# Replace words in query to
query = params.query
for old, new in REPLACEMENT_WORDS.items():
query = query.replace(old, new)
response = self.client.post(
url=f"{ANIMEUNITY_BASE}/livesearch",
data={"title": query},
timeout=MAX_TIMEOUT,
)
response.raise_for_status()
return map_to_search_results(
response.json().get("records", []), params.translation_type
)
@debug_provider
def get(self, params: AnimeParams) -> Anime | None:
return self._get_anime(params)
@lru_cache()
def _get_search_result(self, params: AnimeParams) -> SearchResult | None:
if cached := self._cache.get(params.id):
return cached
response = self.client.get(
url=f"{ANIMEUNITY_BASE}/info_api/{params.id}/",
timeout=MAX_TIMEOUT,
)
response.raise_for_status()
data = response.json()
if res := map_to_search_result(data, None):
self._cache[params.id] = res
return res
@lru_cache
def _get_anime(self, params: AnimeParams) -> Anime | None:
if (search_result := self._get_search_result(params)) is None:
logger.error(f"No search result found for ID {params.id}")
return None
# Fetch episodes in chunks
data = []
start_range = 1
episode_count = max(
len(search_result.episodes.sub), len(search_result.episodes.dub)
)
while start_range <= episode_count:
end_range = min(start_range + 119, episode_count)
response = self.client.get(
url=f"{ANIMEUNITY_BASE}/info_api/{params.id}/1",
params={
"start_range": start_range,
"end_range": end_range,
},
timeout=MAX_TIMEOUT,
)
response.raise_for_status()
data.extend(response.json().get("episodes", []))
start_range = end_range + 1
return map_to_anime_result(data, search_result)
@lru_cache()
def _get_episode_info(
self, params: EpisodeStreamsParams
) -> AnimeEpisodeInfo | None:
anime_info = self._get_anime(
AnimeParams(id=params.anime_id, query=params.query)
)
if not anime_info:
logger.error(f"No anime info for {params.anime_id}")
return
if not anime_info.episodes_info:
logger.error(f"No episodes info for {params.anime_id}")
return
for episode in anime_info.episodes_info:
if episode.episode == params.episode:
return episode
@debug_provider
def episode_streams(self, params: EpisodeStreamsParams):
if not (episode := self._get_episode_info(params)):
logger.error(
f"Episode {params.episode} doesn't exist for anime {params.anime_id}"
)
return
# Get the Server url
response = self.client.get(
url=f"{ANIMEUNITY_BASE}/embed-url/{episode.id}", timeout=MAX_TIMEOUT
)
response.raise_for_status()
# Fetch the Server page
video_response = self.client.get(url=response.text.strip(), timeout=MAX_TIMEOUT)
video_response.raise_for_status()
video_info = VIDEO_INFO_REGEX.search(video_response.text)
download_url_match = DOWNLOAD_URL_REGEX.search(video_response.text)
if not (download_url_match and video_info):
logger.error(f"Failed to extract video info for episode {episode.id}")
return None
info = eval(video_info.group(1).replace("null", "None"))
info["link"] = download_url_match.group(1)
yield map_to_server(episode, info, params.translation_type)
if __name__ == "__main__":
from ..utils.debug import test_anime_provider
test_anime_provider(AnimeUnity)

View File

@@ -14,6 +14,7 @@ PROVIDERS_AVAILABLE = {
"hianime": "provider.HiAnime",
"nyaa": "provider.Nyaa",
"yugen": "provider.Yugen",
"animeunity": "provider.AnimeUnity",
}

View File

@@ -11,6 +11,7 @@ from pydantic import BaseModel, ConfigDict
class ProviderName(Enum):
ALLANIME = "allanime"
ANIMEPAHE = "animepahe"
ANIMEUNITY = "animeunity"
class ProviderServer(Enum):
@@ -28,6 +29,9 @@ class ProviderServer(Enum):
# AnimePaheServer values
KWIK = "kwik"
# AnimeUnityServer values
VIXCLOUD = "vixcloud"
class MediaTranslationType(Enum):
SUB = "sub"