Compare commits

...

4 Commits

14 changed files with 665 additions and 2 deletions

1
.gitignore vendored
View File

@@ -210,3 +210,4 @@ repomix-output.xml
.project/
result
.direnv
fastanime/libs/provider/anime/hianime/extractors/js/node_modules

View File

@@ -1,7 +1,10 @@
import os
import sys
from rich.traceback import install as rich_install
from ...core.constants import PROJECT_NAME
def custom_exception_hook(exc_type, exc_value, exc_traceback):
print(f"{exc_type.__name__}: {exc_value}")
@@ -16,6 +19,9 @@ def setup_exceptions_handler(
rich_traceback: bool | None,
rich_traceback_theme: str,
):
if dev:
# auto set env
os.environ[f"{PROJECT_NAME}_DEBUG"] = "1"
if trace or dev:
sys.excepthook = default_exception_hook
if rich_traceback:

View File

@@ -0,0 +1,12 @@
# The base domain for HiAnime.
HIANIME_DOMAIN = "hianime.to"
HIANIME_BASE_URL = f"https://{HIANIME_DOMAIN}"
# The endpoint for making AJAX requests (fetching episodes, servers, etc.).
HIANIME_AJAX_URL = f"{HIANIME_BASE_URL}/ajax"
# The base URL for search queries.
SEARCH_URL = f"{HIANIME_BASE_URL}/search"
# The Referer header is crucial for making successful requests to the AJAX endpoints.
AJAX_REFERER_HEADER = f"{HIANIME_BASE_URL}/"

View File

@@ -0,0 +1,30 @@
import logging
from typing import Optional
from ....anime.types import Server
from .megacloud import MegaCloudExtractor
logger = logging.getLogger(__name__)
def extract_server(embed_url: str) -> Optional[Server]:
"""
Acts as a router to select the correct extractor based on the embed URL.
Args:
embed_url: The URL of the video host's embed page.
Returns:
A Server object containing the stream links, or None if extraction fails.
"""
hostname = embed_url.split("/")[2]
if "megacloud" in hostname or "megaplay" in hostname:
return MegaCloudExtractor().extract(embed_url)
# In the future, you could add other extractors here:
# if "streamsb" in hostname:
# return StreamSbExtractor().extract(embed_url)
logger.warning(f"No extractor found for hostname: {hostname}")
return None

View File

@@ -0,0 +1,55 @@
const CryptoJS = require("crypto-js");
/**
* Extracts a secret key from an encrypted string based on an array of index pairs,
* then uses that key to decrypt the rest of the string.
* @param {string} encryptedString - The full encrypted sources string.
* @param {string} varsJson - A JSON string representing an array of [start, length] pairs.
* @returns {string} The decrypted JSON string of video sources.
*/
function getSecretAndDecrypt(encryptedString, varsJson) {
const values = JSON.parse(varsJson);
let secret = "";
let encryptedSource = "";
let encryptedSourceArray = encryptedString.split("");
let currentIndex = 0;
for (const index of values) {
const start = index[0] + currentIndex;
const end = start + index[1];
for (let i = start; i < end; i++) {
secret += encryptedString[i];
encryptedSourceArray[i] = "";
}
currentIndex += index[1];
}
encryptedSource = encryptedSourceArray.join("");
const decrypted = CryptoJS.AES.decrypt(encryptedSource, secret).toString(
CryptoJS.enc.Utf8,
);
return decrypted;
}
// Main execution logic
const args = process.argv.slice(2);
if (args.length < 2) {
console.error(
"Usage: node megacloud_decrypt.js <encryptedString> '<varsJson>'",
);
process.exit(1);
}
const encryptedString = args[0];
const varsJson = args[1];
try {
const result = getSecretAndDecrypt(encryptedString, varsJson);
// The result is already a JSON string of the sources, just print it to stdout.
console.log(result);
} catch (e) {
console.error(e.message);
process.exit(1);
}

View File

@@ -0,0 +1,21 @@
{
"name": "hianime-extractor-helper",
"version": "1.0.0",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "hianime-extractor-helper",
"version": "1.0.0",
"dependencies": {
"crypto-js": "^4.2.0"
}
},
"node_modules/crypto-js": {
"version": "4.2.0",
"resolved": "https://registry.npmjs.org/crypto-js/-/crypto-js-4.2.0.tgz",
"integrity": "sha512-KALDyEYgpY+Rlob/iriUtjV6d5Eq+Y191A5g4UqLAi8CyGP9N1+FdVbkc1SxKc2r4YAYqG8JzO2KGL+AizD70Q==",
"license": "MIT"
}
}
}

View File

@@ -0,0 +1,9 @@
{
"name": "hianime-extractor-helper",
"version": "1.0.0",
"description": "Helper script to decrypt MegaCloud sources for FastAnime.",
"main": "megacloud_decrypt.js",
"dependencies": {
"crypto-js": "^4.2.0"
}
}

View File

@@ -0,0 +1,180 @@
import json
import logging
import re
import subprocess
from pathlib import Path
from typing import List, Optional
import httpx
from ...types import EpisodeStream, Server, Subtitle
from ..types import HiAnimeSource
logger = logging.getLogger(__name__)
# The path to our Node.js decryption script, relative to this file.
DECRYPT_SCRIPT_PATH = Path(__file__).parent / "js" / "megacloud_decrypt.js"
class MegaCloudExtractor:
"""
Extractor for MegaCloud streams.
It works by:
1. Fetching the embed page.
2. Finding the encrypted sources data and the URL to a JavaScript file.
3. Fetching the JavaScript file and using regex to find decryption keys.
4. Calling an external Node.js script to perform the decryption.
5. Parsing the decrypted result to get the final stream URLs.
"""
def _run_node_script(self, encrypted_string: str, vars_json: str) -> Optional[dict]:
"""
Executes the Node.js decryption script as a subprocess.
Args:
encrypted_string: The large encrypted sources string.
vars_json: A JSON string of the array of indexes for key extraction.
Returns:
The decrypted data as a dictionary, or None on failure.
"""
if not DECRYPT_SCRIPT_PATH.exists():
logger.error(
f"Node.js decryption script not found at: {DECRYPT_SCRIPT_PATH}"
)
return None
command = ["node", str(DECRYPT_SCRIPT_PATH), encrypted_string, vars_json]
try:
process = subprocess.run(
command,
capture_output=True,
text=True,
check=True,
cwd=DECRYPT_SCRIPT_PATH.parent, # Run from the 'js' directory
)
return json.loads(process.stdout)
except subprocess.CalledProcessError as e:
logger.error(f"Node.js script failed with error: {e.stderr}")
except json.JSONDecodeError:
logger.error("Failed to parse JSON output from Node.js script.")
except Exception as e:
logger.error(
f"An unexpected error occurred while running Node.js script: {e}"
)
return None
def extract_vars_from_script(self, script_content: str) -> Optional[str]:
"""
Uses regex to find the variable array needed for decryption from the script content.
This pattern is based on the logic from the TypeScript project.
"""
# This regex is a Python adaptation of the one in the TypeScript source.
# It looks for the specific pattern that initializes the decryption keys.
regex = r"case\s*0x[0-9a-f]+:(?![^;]*=partKey)\s*\w+\s*=\s*(\w+)\s*,\s*\w+\s*=\s*(\w+);"
matches = re.findall(regex, script_content)
if not matches:
logger.error("Could not find decryption variables in the script.")
return None
def matching_key(value: str, script: str) -> Optional[str]:
# This nested function replicates the `matchingKey` logic from the TS file.
key_regex = re.compile(f",{value}=((?:0x)?([0-9a-fA-F]+))")
match = key_regex.search(script)
return match.group(1) if match else None
vars_array = []
for match in matches:
try:
key1_hex = matching_key(match[0], script_content)
key2_hex = matching_key(match[1], script_content)
if key1_hex and key2_hex:
vars_array.append([int(key1_hex, 16), int(key2_hex, 16)])
except (ValueError, TypeError):
logger.warning(
f"Could not parse hex values from script for match: {match}"
)
continue
return json.dumps(vars_array) if vars_array else None
def extract(self, embed_url: str) -> Optional[Server]:
"""
Main extraction method.
Args:
embed_url: The URL of the MegaCloud embed page.
Returns:
A Server object containing stream links and subtitles.
"""
try:
with httpx.Client() as client:
# 1. Get the embed page content
embed_response = client.get(
embed_url, headers={"Referer": constants.HIANIME_BASE_URL}
)
embed_response.raise_for_status()
embed_html = embed_response.text
# 2. Find the encrypted sources and the script URL
# The data is usually stored in a script tag as `var sources = [...]`.
sources_match = re.search(r"var sources = ([^;]+);", embed_html)
script_url_match = re.search(
r'src="(/js/player/a/prod/e1-player.min.js\?[^"]+)"', embed_html
)
if not sources_match or not script_url_match:
logger.error("Could not find sources or script URL in embed page.")
return None
encrypted_sources_data = json.loads(sources_match.group(1))
script_url = "https:" + script_url_match.group(1)
encrypted_string = encrypted_sources_data.get("sources")
if not isinstance(encrypted_string, str) or not encrypted_string:
logger.error("Encrypted sources string is missing or invalid.")
return None
# 3. Fetch the script and extract decryption variables
script_response = client.get(script_url)
script_response.raise_for_status()
vars_json = self.extract_vars_from_script(script_response.text)
if not vars_json:
return None
# 4. Decrypt using the Node.js script
decrypted_data = self._run_node_script(encrypted_string, vars_json)
if not decrypted_data or not isinstance(decrypted_data, list):
logger.error("Decryption failed or returned invalid data.")
return None
# 5. Map to generic models
streams = [
EpisodeStream(
link=source["file"], quality="auto", format=source["type"]
)
for source in decrypted_data
]
subtitles = [
Subtitle(url=track["file"], language=track.get("label", "en"))
for track in encrypted_sources_data.get("tracks", [])
if track.get("kind") == "captions"
]
return Server(
name="MegaCloud",
links=streams,
subtitles=subtitles,
headers={"Referer": "https://megacloud.tv/"},
)
except Exception as e:
logger.error(f"MegaCloud extraction failed: {e}", exc_info=True)
return None

View File

@@ -0,0 +1,149 @@
import re
from typing import List, Optional
from ....provider.anime.types import (
Anime,
AnimeEpisodes,
PageInfo,
SearchResult,
SearchResults,
)
from ....provider.scraping.html_parser import (
extract_attributes,
get_element_by_class,
get_elements_by_class,
)
def _parse_episodes(element_html: str) -> AnimeEpisodes:
"""Helper function to parse sub/dub episode counts from an anime item."""
sub_text = get_element_by_class("tick-sub", element_html)
dub_text = get_element_by_class("tick-dub", element_html)
sub_count = 0
dub_count = 0
if sub_text:
match = re.search(r"\d+", sub_text)
if match:
sub_count = int(match.group(0))
if dub_text:
match = re.search(r"\d+", dub_text)
if match:
dub_count = int(match.group(0))
# Generate a list of episode numbers as strings
sub_list = [str(i) for i in range(1, sub_count + 1)]
dub_list = [str(i) for i in range(1, dub_count + 1)]
return AnimeEpisodes(sub=sub_list, dub=dub_list, raw=[])
def map_to_search_results(
anime_elements: List[str], full_html: str
) -> Optional[SearchResults]:
"""
Maps a list of HTML elements from a HiAnime search page to a generic SearchResults object.
Args:
anime_elements: A list of raw HTML strings, each representing an anime (.flw-item).
full_html: The full HTML content of the search page for parsing pagination.
Returns:
A SearchResults object or None if parsing fails.
"""
results = []
for element in anime_elements:
title_element = get_element_by_class("dynamic-name", element)
if not title_element:
continue
attrs = extract_attributes(title_element)
title = title_element.split(">")[1].split("<")[0].strip()
anime_id = attrs.get("href", "").lstrip("/")
poster_element = get_element_by_class("film-poster-img", element)
poster_attrs = extract_attributes(poster_element or "")
results.append(
SearchResult(
id=anime_id,
title=title,
poster=poster_attrs.get("data-src"),
episodes=_parse_episodes(element),
)
)
# Parse pagination to determine total pages
total_pages = 1
# Use a simpler selector that is less prone to parsing issues.
pagination_elements = get_elements_by_class("page-item", full_html)
if pagination_elements:
# Find the last page number from all pagination links
last_page_num = 0
for el in pagination_elements:
attrs = extract_attributes(el)
href = attrs.get("href", "")
if "?page=" in href:
try:
num = int(href.split("?page=")[-1])
if num > last_page_num:
last_page_num = num
except (ValueError, IndexError):
continue
if last_page_num > 0:
total_pages = last_page_num
page_info = PageInfo(total=total_pages)
return SearchResults(page_info=page_info, results=results)
def map_to_anime_result(anime_id_slug: str, episode_list_html: str) -> Optional[Anime]:
"""
Maps the AJAX response for an episode list to a generic Anime object.
Args:
anime_id_slug: The anime's unique ID string (e.g., "steinsgate-3").
episode_list_html: The raw HTML snippet containing the list of episodes.
Returns:
An Anime object containing the episode list, or None.
"""
episodes = get_elements_by_class("ssl-item", episode_list_html)
episode_numbers_sub = []
# Note: HiAnime's episode list doesn't differentiate sub/dub, so we assume all are sub for now.
# The user selects sub/dub when choosing a server later.
for ep_element in episodes:
attrs = extract_attributes(ep_element)
ep_num = attrs.get("data-number")
if ep_num:
episode_numbers_sub.append(ep_num)
# The title isn't in this AJAX response, so we derive a placeholder from the slug.
# The application's state usually carries the real title from the search/list step.
placeholder_title = anime_id_slug.replace("-", " ").title()
return Anime(
id=anime_id_slug,
title=placeholder_title,
episodes=AnimeEpisodes(
sub=episode_numbers_sub,
dub=[], # We don't know dub count from this endpoint
raw=[],
),
)
def map_to_server_id(server_element_html: str) -> Optional[str]:
"""
Extracts the server's unique data-id from its HTML element.
Args:
server_element_html: The raw HTML of a server-item.
Returns:
The server ID string, or None.
"""
attrs = extract_attributes(server_element_html)
return attrs.get("data-id")

View File

@@ -0,0 +1,168 @@
import logging
from typing import Iterator, Optional
from ....provider.anime.base import BaseAnimeProvider
from ....provider.anime.params import AnimeParams, EpisodeStreamsParams, SearchParams
from ....provider.anime.types import Anime, SearchResults, Server
from ....provider.scraping.html_parser import get_elements_by_class
from . import constants, mappers
from .extractors import extract_server
logger = logging.getLogger(__name__)
class HiAnime(BaseAnimeProvider):
"""
Provider for scraping anime data from HiAnime.
This provider implements the search, get, and episode_streams methods
to fetch anime information and video stream URLs from HiAnime's website
and internal AJAX APIs.
"""
HEADERS = {"Referer": constants.HIANIME_BASE_URL}
def search(self, params: SearchParams) -> Optional[SearchResults]:
"""
Searches HiAnime for a given query.
Args:
params: The search parameters containing the query.
Returns:
A SearchResults object containing the found anime, or None.
"""
search_url = f"{constants.SEARCH_URL}?keyword={params.query}"
try:
response = self.client.get(search_url, follow_redirects=True)
response.raise_for_status()
# The search results are rendered in the HTML. We use our HTML parser
# to find all elements with the class '.flw-item', which represent
# individual anime search results.
anime_elements = get_elements_by_class("flw-item", response.text)
if not anime_elements:
return None
# The mapper will convert the raw HTML elements into our generic SearchResults model.
return mappers.map_to_search_results(anime_elements, response.text)
except Exception as e:
logger.error(
f"Failed to perform search on HiAnime for query '{params.query}': {e}"
)
return None
def get(self, params: AnimeParams) -> Optional[Anime]:
"""
Retrieves detailed information and a list of episodes for a specific anime.
Args:
params: The parameters containing the anime ID (slug).
Returns:
An Anime object with a full episode list, or None.
"""
try:
# The numeric ID is the last part of the slug.
clean_id_slug = params.id.split("?")[0]
anime_id_numeric = clean_id_slug.split("-")[-1]
if not anime_id_numeric.isdigit():
raise ValueError("Could not extract numeric ID from anime slug.")
# HiAnime loads episodes via an AJAX request.
episodes_url = (
f"{constants.HIANIME_AJAX_URL}/v2/episode/list/{anime_id_numeric}"
)
response = self.client.get(
episodes_url,
headers={
"X-Requested-With": "XMLHttpRequest",
"Referer": constants.AJAX_REFERER_HEADER,
},
)
response.raise_for_status()
# The response is JSON containing an 'html' key with the episode list.
html_snippet = response.json().get("html", "")
if not html_snippet:
return None
# We pass the original anime ID (slug) and the HTML snippet to the mapper.
return mappers.map_to_anime_result(params.id, html_snippet)
except Exception as e:
logger.error(f"Failed to get anime details for '{params.id}': {e}")
return None
def episode_streams(
self, params: EpisodeStreamsParams
) -> Optional[Iterator[Server]]:
"""
Fetches the actual video stream URLs for a given episode.
This is a multi-step process:
1. Get the list of available servers (e.g., MegaCloud, StreamSB).
2. For each server, get the embed URL.
3. Pass the embed URL to an extractor to get the final stream URL.
Args:
params: The parameters containing the episode ID.
Yields:
A Server object for each available video source.
"""
try:
# The episode ID is in the format 'anime-slug?ep=12345'
episode_id_numeric = params.episode.split("?ep=")[-1]
if not episode_id_numeric.isdigit():
raise ValueError("Could not extract numeric episode ID.")
# 1. Get available servers for the episode.
servers_url = f"{constants.HIANIME_AJAX_URL}/v2/episode/servers?episodeId={episode_id_numeric}"
servers_response = self.client.get(
servers_url,
headers={
"X-Requested-With": "XMLHttpRequest",
"Referer": constants.AJAX_REFERER_HEADER,
},
)
servers_response.raise_for_status()
server_elements = get_elements_by_class(
"server-item", servers_response.json().get("html", "")
)
for server_element in server_elements:
try:
# 2. Extract the server's unique ID.
server_id = mappers.map_to_server_id(server_element)
if not server_id:
continue
# 3. Get the embed URL for this server.
sources_url = f"{constants.HIANIME_AJAX_URL}/v2/episode/sources?id={server_id}"
sources_response = self.client.get(
sources_url,
headers={
"X-Requested-With": "XMLHttpRequest",
"Referer": constants.AJAX_REFERER_HEADER,
},
)
sources_response.raise_for_status()
embed_url = sources_response.json().get("link")
if not embed_url:
continue
# 4. Use an extractor to get the final stream URLs from the embed page.
# The extractor handles the complex, host-specific logic.
server = extract_server(embed_url)
if server:
yield server
except Exception as e:
logger.warning(
f"Failed to process a server for episode '{params.episode}': {e}"
)
continue
except Exception as e:
logger.error(f"Failed to get episode streams for '{params.episode}': {e}")
return None

View File

@@ -0,0 +1,33 @@
from typing import List, Literal, TypedDict
class HiAnimeEpisode(TypedDict):
"""
Represents a single episode entry returned by the
`/ajax/v2/episode/list/{anime_id}` endpoint.
"""
title: str | None
episodeId: str | None
number: int
isFiller: bool
class HiAnimeEpisodeServer(TypedDict):
"""
Represents a single server entry returned by the
`/ajax/v2/episode/servers?episodeId={episode_id}` endpoint.
"""
serverName: str
serverId: int | None
class HiAnimeSource(TypedDict):
"""
Represents the JSON response from the
`/ajax/v2/episode/sources?id={server_id}` endpoint,
which contains the link to the extractor's embed page.
"""
link: str

View File

@@ -12,8 +12,6 @@ PROVIDERS_AVAILABLE = {
"allanime": "provider.AllAnime",
"animepahe": "provider.AnimePahe",
"hianime": "provider.HiAnime",
"nyaa": "provider.Nyaa",
"yugen": "provider.Yugen",
}

View File

@@ -11,6 +11,7 @@ from pydantic import BaseModel, ConfigDict
class ProviderName(Enum):
ALLANIME = "allanime"
ANIMEPAHE = "animepahe"
HIANIME = "hianime"
class ProviderServer(Enum):