feat(animepahe): remove use of node and implement custom logic to decode the string

This commit is contained in:
Benex254
2024-08-17 22:46:10 +03:00
parent 5bc0e52179
commit 5b3b9f740b
2 changed files with 155 additions and 101 deletions

View File

@@ -1,15 +1,12 @@
import logging
import random
import re
import shutil
import subprocess
import time
from typing import TYPE_CHECKING
from yt_dlp.utils import (
extract_attributes,
get_element_by_id,
get_element_text_and_html_by_tag,
get_elements_html_by_class,
)
@@ -20,6 +17,7 @@ from .constants import (
REQUEST_HEADERS,
SERVER_HEADERS,
)
from .utils import process_animepahe_embed_page
if TYPE_CHECKING:
from ..types import Anime
@@ -27,6 +25,8 @@ if TYPE_CHECKING:
JUICY_STREAM_REGEX = re.compile(r"source='(.*)';")
logger = logging.getLogger(__name__)
KWIK_RE = re.compile(r"Player\|(.+?)'")
# TODO: hack this to completion
class AnimePaheApi(AnimeProvider):
@@ -153,106 +153,79 @@ class AnimePaheApi(AnimeProvider):
def get_episode_streams(
self, anime: "Anime", episode_number: str, translation_type, *args
):
# extract episode details from memory
episode = [
episode
for episode in self.anime["data"]
if float(episode["episode"]) == float(episode_number)
]
try:
# extract episode details from memory
episode = [
episode
for episode in self.anime["data"]
if float(episode["episode"]) == float(episode_number)
]
if not episode:
logger.error(f"AnimePahe(streams): episode {episode_number} doesn't exist")
return []
episode = episode[0]
anime_id = anime["id"]
# fetch the episode page
url = f"{ANIMEPAHE_BASE}/play/{anime_id}/{episode['session']}"
response = self.session.get(url, headers=REQUEST_HEADERS)
# get the element containing links to juicy streams
c = get_element_by_id("resolutionMenu", response.text)
resolutionMenuItems = get_elements_html_by_class("dropdown-item", c)
# convert the elements containing embed links to a neat dict containing:
# data-src
# data-audio
# data-resolution
res_dicts = [extract_attributes(item) for item in resolutionMenuItems]
# get the episode title
episode_title = (
episode["title"] or f"{anime['title']}; Episode {episode['episode']}"
)
# get all links
streams = {
"server": "kwik",
"links": [],
"episode_title": episode_title,
"headers": {},
}
for res_dict in res_dicts:
# get embed url
embed_url = res_dict["data-src"]
data_audio = "dub" if res_dict["data-audio"] == "eng" else "sub"
# filter streams by translation_type
if data_audio != translation_type:
continue
if not embed_url:
logger.warn(
"AnimePahe: embed url not found please report to the developers"
if not episode:
logger.error(
f"AnimePahe(streams): episode {episode_number} doesn't exist"
)
return []
# get embed page
embed_response = self.session.get(embed_url, headers=SERVER_HEADERS)
embed = embed_response.text
# search for the encoded js
encoded_js = None
for _ in range(7):
content, html = get_element_text_and_html_by_tag("script", embed)
if not content:
embed = embed.replace(html, "")
episode = episode[0]
anime_id = anime["id"]
# fetch the episode page
url = f"{ANIMEPAHE_BASE}/play/{anime_id}/{episode['session']}"
response = self.session.get(url, headers=REQUEST_HEADERS)
# get the element containing links to juicy streams
c = get_element_by_id("resolutionMenu", response.text)
resolutionMenuItems = get_elements_html_by_class("dropdown-item", c)
# convert the elements containing embed links to a neat dict containing:
# data-src
# data-audio
# data-resolution
res_dicts = [extract_attributes(item) for item in resolutionMenuItems]
# get the episode title
episode_title = (
episode["title"] or f"{anime['title']}; Episode {episode['episode']}"
)
# get all links
streams = {
"server": "kwik",
"links": [],
"episode_title": episode_title,
"headers": {},
}
for res_dict in res_dicts:
# get embed url
embed_url = res_dict["data-src"]
data_audio = "dub" if res_dict["data-audio"] == "eng" else "sub"
# filter streams by translation_type
if data_audio != translation_type:
continue
encoded_js = content
break
if not encoded_js:
logger.warn(
"AnimePahe: Encoded js not found please report to the developers"
if not embed_url:
logger.warn(
"AnimePahe: embed url not found please report to the developers"
)
return []
# get embed page
embed_response = self.session.get(embed_url, headers=SERVER_HEADERS)
embed_page = embed_response.text
decoded_js = process_animepahe_embed_page(embed_page)
if not decoded_js:
logger.error("Animepahe: failed to decode embed page")
return
juicy_stream = JUICY_STREAM_REGEX.search(decoded_js)
if not juicy_stream:
logger.error("Animepahe: failed to find juicy stream")
return
juicy_stream = juicy_stream.group(1)
# add the link
streams["links"].append(
{
"quality": res_dict["data-resolution"],
"translation_type": data_audio,
"link": juicy_stream,
}
)
return []
# execute the encoded js with node for now or maybe forever in odrder to get a more workable info
NODE = shutil.which("node")
if not NODE:
logger.warn(
"AnimePahe: animepahe currently requires node js to extract them juicy streams"
)
return []
result = subprocess.run(
[NODE, "-e", encoded_js],
text=True,
capture_output=True,
)
# decoded js
evaluted_js = result.stderr
if not evaluted_js:
logger.warn(
"AnimePahe: could not decode encoded js using node please report to developers"
)
return []
# get that juicy stream
match = JUICY_STREAM_REGEX.search(evaluted_js)
if not match:
logger.warn(
"AnimePahe: could not find the juicy stream please report to developers"
)
return []
# get the actual hls stream link
juicy_stream = match.group(1)
# add the link
streams["links"].append(
{
"quality": res_dict["data-resolution"],
"translation_type": data_audio,
"link": juicy_stream,
}
)
yield streams
yield streams
except Exception as e:
logger.error(f"Animepahe: {e}")