feat: Initial implementation of AnimeUnity provider

This commit is contained in:
axtrat
2025-08-21 10:19:25 +02:00
parent 3f63198563
commit f1b796d72b
7 changed files with 269 additions and 1 deletions

View File

@@ -1,6 +1,5 @@
"""Update command for Viu CLI."""
import sys
from typing import TYPE_CHECKING
import click

View File

@@ -0,0 +1,9 @@
import re
ANIMEUNITY = "animeunity.so"
ANIMEUNITY_BASE = f"https://www.{ANIMEUNITY}"
MAX_TIMEOUT = 10
TOKEN_REGEX = re.compile(r'<meta.*?name="csrf-token".*?content="([^"]*)".*?>')
DOWNLOAD_URL_REGEX = r"window.downloadUrl\s*=\s*'([^']*)'"

View File

@@ -0,0 +1,111 @@
from httpx import Response
from ..types import (
Anime,
AnimeEpisodeInfo,
AnimeEpisodes,
EpisodeStream,
MediaTranslationType,
PageInfo,
SearchResult,
SearchResults,
Server,
)
translation_type_map = {
"sub": MediaTranslationType.SUB,
"dub": MediaTranslationType.DUB,
"raw": MediaTranslationType.RAW,
}
def map_to_search_results(response: Response) -> SearchResults:
"""
animes = list[Anime]()
for result in results:
title, anilist_id, info = self._parse_info(result)
anime = Anime(title, result['id'])
anime._set_info(anilist_id, info)
animes.append(anime)
return animes
"""
data = response.json().get("records", [])
return SearchResults(
page_info=PageInfo(),
results=[
SearchResult(
id=str(result["id"]),
title=get_real_title(result),
episodes=AnimeEpisodes(
sub=(
list(map(str, range(1, result["episodes_count"] + 1)))
if result["dub"] == 0
else []
),
dub=(
list(map(str, range(1, result["episodes_count"] + 1)))
if result["dub"] == 1
else []
),
),
# other_titles=[title for title in [result["title_eng"], result["title_it"]] if title],
media_type=result["type"],
score=result["score"],
status=result["status"],
season=result["season"],
poster=result["imageurl"],
year=result["date"],
)
for result in data
],
)
def map_to_anime_result(response: Response, search_result: SearchResult) -> Anime:
data = response.json()["episodes"]
return Anime(
id=search_result.id,
title=search_result.title,
episodes=search_result.episodes,
episodes_info=[
AnimeEpisodeInfo(
id=str(episode["id"]),
episode=episode["number"],
# session_id=episode.get("session_id"),
title=f"{search_result.title} - Ep {episode['number']}",
# poster=episode["tg_post"],
# duration=episode.get("duration"),
)
for episode in data
],
type=search_result.media_type,
poster=search_result.poster,
year=search_result.year,
)
def map_to_server(episode: AnimeEpisodeInfo, download_url: str) -> Server:
return Server(
name="vixcloud",
links=[
EpisodeStream(
link=download_url,
# translation_type=translation_type_map[episode.]
)
],
episode_title=episode.title,
)
def get_real_title(record: dict) -> str:
"""
Return the most appropriate title from the record.
"""
if record.get("title_eng"):
return record["title_eng"]
elif record.get("title"):
return record["title"]
else:
return record.get("title_it", "")

View File

@@ -0,0 +1,147 @@
import logging
import re
import time
from functools import lru_cache
import httpx
from ...scraping.user_agents import UserAgentGenerator
from ..base import BaseAnimeProvider
from ..params import AnimeParams, EpisodeStreamsParams, SearchParams
from ..types import Anime, AnimeEpisodeInfo, SearchResult, SearchResults
from ..utils.debug import debug_provider
from .constants import ANIMEUNITY_BASE, DOWNLOAD_URL_REGEX, MAX_TIMEOUT
from .mappers import map_to_anime_result, map_to_search_results, map_to_server
logger = logging.getLogger(__name__)
class AnimeUnity(BaseAnimeProvider):
HEADERS = {
"user-agent": UserAgentGenerator().random(),
}
@lru_cache
def _get_token(self) -> dict[str, str]:
response = self.client.get(ANIMEUNITY_BASE, headers=self.HEADERS)
data = response.cookies
cookies = {
"animeunity_session": data["animeunity_session"],
}
self.HEADERS["x-xsrf-token"] = data["XSRF-TOKEN"]
return cookies
@debug_provider
def search(self, params: SearchParams) -> SearchResults | None:
return self._search(params)
@lru_cache
def _search(self, params: SearchParams) -> SearchResults | None:
cookies = self._get_token()
try:
response = self.client.post(
url=f"{ANIMEUNITY_BASE}/livesearch",
data={"title": params.query},
headers=self.HEADERS,
cookies=cookies,
timeout=MAX_TIMEOUT,
)
response.raise_for_status()
except httpx.HTTPStatusError as e:
logger.error(f"AnimeUnity 500 error for query '{params.query}'")
# Opzionale: retry dopo un breve delay
logger.info("Retrying after 2 seconds...")
time.sleep(2)
return self._search(params)
return map_to_search_results(response)
@debug_provider
def get(self, params: AnimeParams) -> Anime | None:
return self._get_anime(params)
@lru_cache()
def _get_search_result(self, params: AnimeParams) -> SearchResult | None:
search_results = self._search(SearchParams(query=params.query))
if not search_results or not search_results.results:
logger.error(f"No search results found for ID {params.id}")
return None
for search_result in search_results.results:
if search_result.id == params.id:
return search_result
@lru_cache
def _get_anime(self, params: AnimeParams) -> Anime | None:
search_result = self._get_search_result(params)
if not search_result:
logger.error(f"No search result found for ID {params.id}")
return None
cookies = self._get_token()
response = self.client.get(
url=f"{ANIMEUNITY_BASE}/info_api/{params.id}/1",
params={
"start_range": 0,
"end_range": max(
len(search_result.episodes.sub), len(search_result.episodes.dub)
),
},
headers=self.HEADERS,
cookies=cookies,
timeout=MAX_TIMEOUT,
)
response.raise_for_status()
return map_to_anime_result(response, search_result)
@lru_cache()
def _get_episode_info(
self, params: EpisodeStreamsParams
) -> AnimeEpisodeInfo | None:
anime_info = self._get_anime(
AnimeParams(id=params.anime_id, query=params.query)
)
if not anime_info:
logger.error(f"No anime info for {params.anime_id}")
return
if not anime_info.episodes_info:
logger.error(f"No episodes info for {params.anime_id}")
return
for episode in anime_info.episodes_info:
if episode.episode == params.episode:
return episode
@debug_provider
def episode_streams(self, params):
episode = self._get_episode_info(params)
if not episode:
logger.error(
f"Episode {params.episode} doesn't exist for anime {params.anime_id}"
)
return
cookies = self._get_token()
response = self.client.get(
url=f"{ANIMEUNITY_BASE}/embed-url/{episode.id}",
headers=self.HEADERS,
cookies=cookies,
timeout=MAX_TIMEOUT,
)
response.raise_for_status()
# The embed URL is returned as plain text
iframe_src = response.text.strip()
# Fetch the video page
video_response = self.client.get(
iframe_src, headers=self.HEADERS, cookies=cookies, timeout=MAX_TIMEOUT
)
video_response.raise_for_status()
download_url_match = re.search(DOWNLOAD_URL_REGEX, video_response.text)
if download_url_match:
yield map_to_server(episode, download_url_match.group(1))
return None
if __name__ == "__main__":
from ..utils.debug import test_anime_provider
test_anime_provider(AnimeUnity)

View File

@@ -14,6 +14,7 @@ PROVIDERS_AVAILABLE = {
"hianime": "provider.HiAnime",
"nyaa": "provider.Nyaa",
"yugen": "provider.Yugen",
"animeunity": "provider.AnimeUnity",
}

View File

@@ -11,6 +11,7 @@ from pydantic import BaseModel, ConfigDict
class ProviderName(Enum):
ALLANIME = "allanime"
ANIMEPAHE = "animepahe"
ANIMEUNITY = "animeunity"
class ProviderServer(Enum):