refactor:remove animdl dependency and dependants

This commit is contained in:
Benex254
2024-08-05 09:46:57 +03:00
parent 9edaad3069
commit 46413ef174
15 changed files with 40 additions and 1153 deletions

1
.gitignore vendored
View File

@@ -5,6 +5,7 @@
vids
data/
.project/
fastanime.ini
crashdump.txt
# Byte-compiled / optimized / DLL files
__pycache__/

View File

@@ -1,42 +0,0 @@
import os
from typing import TypedDict
import plyer
from .yaml_parser import YamlParser
class AnimdlConfig(TypedDict):
default_player: str
default_provider: str
quality_string: str
if local_data_path := os.getenv("LOCALAPPDATA"):
config_dir = os.path.join(local_data_path, ".config")
if not os.path.exists(config_dir):
os.mkdir(config_dir)
animdl_config_folder_location = os.path.join(config_dir, "animdl")
else:
user_profile_path = plyer.storagepath.get_home_dir() # type: ignore
animdl_config_folder_location = os.path.join(user_profile_path, ".animdl")
if not os.path.exists(animdl_config_folder_location):
os.mkdir(animdl_config_folder_location)
animdl_config_location = os.path.join(animdl_config_folder_location, "config.yml")
# print(animdl_config_location)
animdl_config = YamlParser(
animdl_config_location,
{"default_player": "mpv", "default_provider": "allanime", "quality_string": "best"},
AnimdlConfig,
)
def update_animdl_config(field_to_update: str, value):
current_data = animdl_config.data
current_data[f"{field_to_update}"] = value
animdl_config.write(current_data)
def get_animdl_config() -> AnimdlConfig:
return animdl_config.data

View File

@@ -9,21 +9,19 @@ from kivy.logger import Logger
from kivy.storage.jsonstore import JsonStore
app_dir = os.path.dirname(__file__)
data_folder = os.path.join(app_dir, "data")
from .. import data_folder
today = date.today()
now = datetime.now()
# TODO:confirm data integrity
if os.path.exists(os.path.join(data_folder, "user_data.json")):
user_data = JsonStore(os.path.join(data_folder, "user_data.json"))
else:
# os.makedir(data_folder, exist_ok=True)
print("op2")
user_data_path = os.path.join(data_folder, "user_data.json")
user_data = JsonStore(user_data_path)
print("op2")
# Get the user data
def get_user_anime_list() -> list:
@@ -42,35 +40,3 @@ def update_user_anime_list(updated_list: list):
user_data.put("user_anime_list", user_anime_list=updated_list_)
except Exception as e:
Logger.warning(f"User Data:Update failure:{e}")
# Get the user data
def get_user_downloads() -> list:
try:
return user_data.get("user_downloads")[
"user_downloads"
] # returns a list of anime ids
except Exception as e:
Logger.warning(f"User Data:Read failure:{e}")
return []
def update_user_downloads(updated_list: list):
try:
user_data.put("user_downloads", user_downloads=list(set(updated_list)))
except Exception as e:
Logger.warning(f"User Data:Update failure:{e}")
# Yt persistent anime trailer cache
t = 1
if now.hour <= 6:
t = 1
elif now.hour <= 12:
t = 2
elif now.hour <= 18:
t = 3
else:
t = 4
yt_anime_trailer_cache_name = f"{today}{t}"

View File

@@ -1,34 +0,0 @@
import os
import yaml
class YamlParser:
"""makes managing yaml files easier"""
data = {}
def __init__(self, file_path: str, default, data_type):
self.file_path: str = file_path
self.data: data_type
if os.path.exists(file_path):
try:
with open(self.file_path, "r") as yaml_file:
self.data = yaml.safe_load(yaml_file)
except Exception:
self.data = default
with open(file_path, "w") as yaml_file:
yaml.dump(default, yaml_file)
else:
self.data = default
with open(file_path, "w") as yaml_file:
yaml.dump(default, yaml_file)
def read(self):
with open(self.file_path, "r") as yaml_file:
self.data = yaml.safe_load(yaml_file)
return self.data
def write(self, new_obj):
with open(self.file_path, "w") as yaml_file:
yaml.dump(new_obj, yaml_file)

View File

@@ -69,8 +69,8 @@ class BaseScreenView(MDScreen, Observer):
super().__init__(**kw)
# Often you need to get access to the application object from the view
# class. You can do this using this attribute.
from ..__main__ import AniXStreamApp
from ..__main__ import FastAnime
self.app: AniXStreamApp = MDApp.get_running_app() # type: ignore
self.app: FastAnime = MDApp.get_running_app() # type: ignore
# Adding a view class as observer.
self.model.add_observer(self)

View File

@@ -0,0 +1,26 @@
import os
import plyer
from kivy.resources import resource_add_path
app_dir = os.path.abspath(os.path.dirname(__file__))
data_folder = os.path.join(app_dir, "data")
if not os.path.exists(data_folder):
os.mkdir(data_folder)
if vid_path := plyer.storagepath.get_videos_dir(): # type: ignore
downloads_dir = os.path.join(vid_path, "FastAnime")
if not os.path.exists(downloads_dir):
os.mkdir(downloads_dir)
else:
downloads_dir = os.path.join(".", "videos")
if not os.path.exists(downloads_dir):
os.mkdir(downloads_dir)
assets_folder = os.path.join(app_dir, "assets")
resource_add_path(assets_folder)
conigs_folder = os.path.join(app_dir, "configs")
resource_add_path(conigs_folder)

View File

@@ -1,28 +1,22 @@
import os
import random
from queue import Queue
from subprocess import Popen
from threading import Thread
import plyer
from dotenv import load_dotenv
from kivy.config import Config
from kivy.loader import Loader
from kivy.logger import Logger
from kivy.resources import resource_add_path, resource_find, resource_remove_path
from kivy.resources import resource_find
from kivy.uix.screenmanager import FadeTransition, ScreenManager
from kivy.uix.settings import Settings, SettingsWithSidebar
from kivymd.app import MDApp
from dotenv import load_dotenv
from .libs.animdl import AnimdlApi
from . import downloads_dir
from .libs.mpv.player import mpv_player
from .Utility import (
animdl_config_manager,
show_notification,
themes_available,
user_data_helper,
)
from .Utility.utils import write_crash
from .libs.mpv.player import mpv_player
from .View.components.media_card.components.media_popup import MediaPopup
from .View.screens import screens
@@ -32,22 +26,9 @@ os.environ["KIVY_VIDEO"] = "ffpyplayer" # noqa: E402
Config.set("graphics", "width", "1000") # noqa: E402
Config.set("graphics", "minimum_width", "1000") # noqa: E402
Config.set("kivy", "window_icon", resource_find("logo.ico")) # noqa: E402
Config.set("graphics", "fullscreen", 0)
Config.set("graphics", "window_state", "visible")
Config.write() # noqa: E402
# resource_add_path("_internal")
app_dir = os.path.dirname(__file__)
# make sure we aint searching dist folder
dist_folder = os.path.join(app_dir, "dist")
resource_remove_path(dist_folder)
assets_folder = os.path.join(app_dir, "assets")
resource_add_path(assets_folder)
conigs_folder = os.path.join(app_dir, "configs")
resource_add_path(conigs_folder)
# from kivy.core.window import Window
Loader.num_workers = 5
Loader.max_upload_per_frame = 10
@@ -57,34 +38,9 @@ if not (user_data_helper.user_data.exists("user_anime_list")):
user_data_helper.update_user_anime_list([])
# TODO: Confirm data integrity from user_data and yt_cache
class AniXStreamApp(MDApp):
# some initialize
queue = Queue()
downloads_queue = Queue()
animdl_streaming_subprocess: Popen | None = None
class FastAnime(MDApp):
default_anime_image = resource_find(random.choice(["default_1.jpg", "default.jpg"]))
default_banner_image = resource_find(random.choice(["banner_1.jpg", "banner.jpg"]))
# default_video = resource_find("Billyhan_When you cant afford Crunchyroll to watch anime.mp4")
def worker(self, queue: Queue):
while True:
task = queue.get() # task should be a function
try:
task()
except Exception as e:
show_notification("An error occured while streaming", f"{e}")
self.queue.task_done()
def downloads_worker(self, queue: Queue):
while True:
download_task = queue.get() # task should be a function
try:
download_task()
except Exception as e:
show_notification("An error occured while downloading", f"{e}")
self.downloads_queue.task_done()
def __init__(self, **kwargs):
super().__init__(**kwargs)
@@ -96,20 +52,6 @@ class AniXStreamApp(MDApp):
self.manager_screens = ScreenManager()
self.manager_screens.transition = FadeTransition()
# initialize worker
self.worker_thread = Thread(target=self.worker, args=(self.queue,))
self.worker_thread.daemon = True
self.worker_thread.start()
Logger.info("AniXStream:Successfully started background tasks worker")
# initialize downloads worker
self.downloads_worker_thread = Thread(
target=self.downloads_worker, args=(self.downloads_queue,)
)
self.downloads_worker_thread.daemon = True
self.downloads_worker_thread.start()
Logger.info("AniXStream:Successfully started download worker")
def build(self) -> ScreenManager:
self.settings_cls = SettingsWithSidebar
@@ -141,14 +83,6 @@ class AniXStreamApp(MDApp):
def build_config(self, config):
# General settings setup
if vid_path := plyer.storagepath.get_videos_dir(): # type: ignore
downloads_dir = os.path.join(vid_path, "anixstream")
if not os.path.exists(downloads_dir):
os.mkdir(downloads_dir)
else:
downloads_dir = os.path.join(".", "videos")
if not os.path.exists(downloads_dir):
os.mkdir(downloads_dir)
config.setdefaults(
"Preferences",
{
@@ -159,34 +93,10 @@ class AniXStreamApp(MDApp):
},
)
# animdl config settings setup
animdl_config = animdl_config_manager.get_animdl_config()
config.setdefaults(
"Providers",
{
"default_provider": animdl_config["default_provider"],
},
)
config.setdefaults(
"Quality",
{
"quality_string": animdl_config["quality_string"],
},
)
config.setdefaults(
"PlayerSelection",
{
"default_player": animdl_config["default_player"],
},
)
def build_settings(self, settings: Settings):
settings.add_json_panel(
"Settings", self.config, resource_find("general_settings_panel.json")
)
settings.add_json_panel(
"Animdl Config", self.config, resource_find("animdl_config_panel.json")
)
def on_config_change(self, config, section, key, value):
# TODO: Change to match case
@@ -203,23 +113,10 @@ class AniXStreamApp(MDApp):
config.write()
case "theme_style":
self.theme_cls.theme_style = value
elif section == "Providers":
animdl_config_manager.update_animdl_config("default_provider", value)
elif section == "Quality":
animdl_config_manager.update_animdl_config("quality_string", value)
elif section == "PlayerSelection":
animdl_config_manager.update_animdl_config("default_player", value)
def on_stop(self):
del self.downloads_worker_thread
if self.animdl_streaming_subprocess:
self.stop_streaming = True
self.animdl_streaming_subprocess.terminate()
del self.worker_thread
pass
Logger.info("Animdl:Successfully terminated existing animdl subprocess")
# custom methods
def search_for_anime(self, search_field, **kwargs):
if self.manager_screens.current != "search screen":
self.manager_screens.current = "search screen"
@@ -236,90 +133,10 @@ class AniXStreamApp(MDApp):
updated_list.remove(id)
user_data_helper.update_user_anime_list(updated_list)
def add_anime_to_user_downloads_list(self, id: int):
updated_list = user_data_helper.get_user_downloads()
updated_list.append(id)
user_data_helper.get_user_downloads()
def show_anime_screen(self, id: int, title, caller_screen_name: str):
self.manager_screens.current = "anime screen"
self.anime_screen.controller.update_anime_view(id, title, caller_screen_name)
def download_anime_complete(
self, successful_downloads: list, failed_downloads: list, anime_title: str
):
show_notification(
f"Finished Dowloading {anime_title}",
f"There were {len(successful_downloads)} successful downloads and {len(failed_downloads)} failed downloads",
)
Logger.info(
f"Downloader:Finished Downloading {anime_title} and there were {len(failed_downloads)} failed downloads"
)
def download_anime(self, anime_id: int, default_cmds: dict):
show_notification(
"New Download Task Queued",
f"{default_cmds.get('title')} has been queued for downloading",
)
self.add_anime_to_user_downloads_list(anime_id)
# TODO:Add custom download cmds functionality
def on_progress(*args):
return self.download_screen.on_episode_download_progress(*args)
output_path = self.config.get("Preferences", "downloads_dir") # type: ignore
self.download_screen.on_new_download_task(
default_cmds["title"], default_cmds.get("episodes_range")
)
if episodes_range := default_cmds.get("episodes_range"):
def download_task():
return AnimdlApi.download_anime_by_title(
default_cmds["title"],
on_progress,
lambda anime_title, episode: show_notification(
"Finished installing an episode", f"{anime_title}-{episode}"
),
self.download_anime_complete,
output_path,
episodes_range,
) # ,default_cmds["quality"]
self.downloads_queue.put(download_task)
Logger.info(
f"Downloader:Successfully Queued {default_cmds['title']} for downloading"
)
else:
def download_task():
return AnimdlApi.download_anime_by_title(
default_cmds["title"],
on_progress,
lambda anime_title, episode: show_notification(
"Finished installing an episode", f"{anime_title}-{episode}"
),
self.download_anime_complete,
output_path,
) # ,default_cmds.get("quality")
self.downloads_queue.put(download_task)
Logger.info(
f"Downloader:Successfully Queued {default_cmds['title']} for downloading"
)
def stream_anime_with_custom_input_cmds(self, *cmds):
self.animdl_streaming_subprocess = (
AnimdlApi._run_animdl_command_and_get_subprocess(["stream", *cmds])
)
def stream_anime_by_title_with_animdl(
self, title, episodes_range: str | None = None
):
self.stop_streaming = False
self.animdl_streaming_subprocess = AnimdlApi.stream_anime_by_title_on_animdl(
title, episodes_range
)
def play_on_mpv(self, anime_video_url: str):
if mpv_player.mpv_process:
mpv_player.stop_mpv()
@@ -327,12 +144,11 @@ class AniXStreamApp(MDApp):
def run_app():
AniXStreamApp().run()
FastAnime().run()
if __name__ == "__main__":
in_development = bool(os.environ.get("IN_DEVELOPMENT", False))
print("In Development {}".format(in_development))
if in_development:
run_app()
else:

View File

@@ -1,48 +0,0 @@
[
{
"type": "title",
"title": "Providers"
},
{
"type": "options",
"title": "Default Provider",
"desc": "Sets the default provider animdl should use",
"section": "Providers",
"key": "default_provider",
"options": [
"9anime",
"allanime",
"animepahe",
"animeout",
"animtime",
"kawaifu",
"gogoanime",
"haho",
"marin",
"zoro"
]
},
{
"type": "title",
"title": "Quality"
},
{
"type": "string",
"title": "Quality String",
"desc": "Sets the animdl quality string",
"section": "Quality",
"key": "quality_string"
},
{
"type": "title",
"title": "PlayerSelection"
},
{
"type": "options",
"title": "Default Player",
"desc": "Sets the animdl default player to use",
"section": "PlayerSelection",
"key": "default_player",
"options": ["mpv", "vlc", "ffplay", "celluloid", "iina"]
}
]

View File

@@ -1,5 +0,0 @@
from .animdl_api import AnimdlApi
# import extras
# import animdl_data_helper
# import animdl_types
# import animdl_exceptions

View File

@@ -1,541 +0,0 @@
import os
import re
import shutil
from subprocess import PIPE, CompletedProcess, Popen, run
from typing import Callable
from .animdl_data_helper import (
anime_title_percentage_match,
filter_broken_streams,
filter_streams_by_quality,
parse_stream_urls_data,
path_parser,
search_output_parser,
)
from .animdl_exceptions import (
AnimdlAnimeUrlNotFoundException,
MPVNotFoundException,
NoValidAnimeStreamsException,
Python310NotFoundException,
)
from .animdl_types import AnimdlAnimeEpisode, AnimdlAnimeUrlAndTitle, AnimdlData
from .extras import Logger
broken_link_pattern = r"https://tools.fast4speed.rsvp/\w*"
def run_mpv_command(*cmds) -> Popen:
if mpv := shutil.which("mpv"):
Logger.debug({"Animdl Api: Started mpv command"})
child_process = Popen(
[mpv, *cmds],
stderr=PIPE,
text=True,
stdout=PIPE,
)
return child_process
else:
raise MPVNotFoundException("MPV is required to be on path for this to work")
# TODO: WRITE Docs for each method
class AnimdlApi:
@classmethod
def _run_animdl_command(cls, cmds: list[str], capture=True) -> CompletedProcess:
"""The core abstraction over the animdl cli that executes valid animdl commands
Args:
cmds (list): a list of valid animdl commands and options
capture (bool, optional): whether to capture the command output or not. Defaults to True.
Raises:
Python310NotFoundException: An exception raised when the machine doesn't have python 3.10 in path which is required by animdls dependencies
Returns:
CompletedProcess: the completed animdl process
"""
if py_path := shutil.which("python"):
Logger.debug("Animdl Api: Started Animdl command")
if capture:
return run(
[py_path, "-m", "animdl", *cmds],
capture_output=True,
stdin=PIPE,
text=True,
)
else:
return run([py_path, "-m", "animdl", *cmds])
else:
raise Python310NotFoundException(
"Python 3.10 is required to be in path for this to work"
)
@classmethod
def _run_animdl_command_and_get_subprocess(cls, cmds: list[str]) -> Popen:
"""An abstraction over animdl cli but offers more control as compered to _run_animdl_command
Args:
cmds (list[str]): valid animdl commands and options
Raises:
Python310NotFoundException: An exception raised when the machine doesn't have python 3.10 in path which is required by animdls dependencies
Returns:
Popen: returns a subprocess in order to offer more control
"""
# TODO: parse the commands
parsed_cmds = list(cmds)
if py_path := shutil.which("python"):
Logger.debug("Animdl Api: Started Animdl command")
base_cmds = [py_path, "-m", "animdl"]
cmds_ = [*base_cmds, *parsed_cmds]
child_process = Popen(cmds_)
return child_process
else:
raise Python310NotFoundException(
"Python 3.10 is required to be in path for this to work"
)
@classmethod
def get_anime_url_by_title(
cls, actual_user_requested_title: str
) -> AnimdlAnimeUrlAndTitle:
"""Searches for the title using animdl and gets the animdl anime url associated with a particular title which is used by animdl for scraping
Args:
actual_user_requested_title (str): any anime title the user wants
Raises:
AnimdlAnimeUrlNotFoundException: raised if no anime title is found
Returns:
AnimdlAnimeTitleAndUrl: The animdl anime url and title for the most likely one the user wants.NOTE: not always correct
"""
result = cls._run_animdl_command(["search", actual_user_requested_title])
possible_animes = search_output_parser(result.stderr)
if possible_animes:
most_likely_anime_url_and_title = max(
possible_animes,
key=lambda possible_data: anime_title_percentage_match(
possible_data.anime_title, actual_user_requested_title
),
)
return most_likely_anime_url_and_title # ("title","anime url")
else:
raise AnimdlAnimeUrlNotFoundException(
"The anime your searching for doesnt exist or animdl provider is broken or animdl not in your system path\nTry changing the default provider"
)
@classmethod
def stream_anime_by_title_on_animdl(
cls, title: str, episodes_range: str | None = None, quality: str = "best"
) -> Popen:
"""Streams the anime title on animdl
Args:
title (str): the anime title you want to stream
episodes_range (str, optional): the episodes you want to stream; should be a valid animdl range. Defaults to None.
quality (str, optional): the quality of the stream. Defaults to "best".
Returns:
Popen: the stream child subprocess for mor control
"""
anime = cls.get_anime_url_by_title(title)
base_cmds = ["stream", anime[1], "-q", quality]
cmd = [*base_cmds, "-r", episodes_range] if episodes_range else base_cmds
return cls._run_animdl_command_and_get_subprocess(cmd)
@classmethod
def stream_anime_with_mpv(
cls, title: str, episodes_range: str | None = None, quality: str = "best"
):
"""Stream an anime directly with mpv without having to interact with animdl cli
Args:
title (str): the anime title you want to stream
episodes_range (str | None, optional): a valid animdl episodes range you want ito watch. Defaults to None.
quality (str, optional): the quality of the stream. Defaults to "best".
Yields:
Popen: the child subprocess you currently are watching
"""
anime_data = cls.get_all_stream_urls_by_anime_title(title, episodes_range)
stream = []
for episode in anime_data.episodes:
if streams := filter_broken_streams(episode["streams"]):
stream = filter_streams_by_quality(streams, quality)
episode_title = str(episode["episode"])
if e_title := stream.get("title"):
episode_title = f"{episode_title}-{e_title}"
window_title = (
f"{anime_data.anime_title} episode {episode_title}".title()
)
cmds = [stream["stream_url"], f"--title={window_title}"]
if audio_tracks := stream.get("audio_tracks"):
tracks = ";".join(audio_tracks)
cmds = [*cmds, f"--audio-files={tracks}"]
if subtitles := stream.get("subtitle"):
subs = ";".join(subtitles)
cmds = [*cmds, f"--sub-files={subs}"]
Logger.debug(
f"Animdl Api Mpv Streamer: Starting to stream on mpv with commands: {cmds}"
)
yield run_mpv_command(*cmds)
Logger.debug(
f"Animdl Api Mpv Streamer: Finished to stream episode {episode['episode']} on mpv"
)
else:
Logger.debug(
f"Animdl Api Mpv Streamer: Failed to stream episode {episode['episode']} no valid streams"
)
yield f"Epiosde {episode['episode']} doesnt have any valid stream links"
@classmethod
def get_all_anime_stream_urls_by_anime_url(
cls, anime_url: str, episodes_range: str | None = None
) -> list[AnimdlAnimeEpisode]:
"""gets all the streams for the animdl url
Args:
anime_url (str): an animdl url used in scraping
episodes_range (str | None, optional): a valid animdl episodes range. Defaults to None.
Returns:
list[AnimdlAnimeEpisode]: A list of anime episodes gotten from animdl
"""
cmd = (
["grab", anime_url, "-r", episodes_range]
if episodes_range
else ["grab", anime_url]
)
result = cls._run_animdl_command(cmd)
return parse_stream_urls_data(result.stdout) # type: ignore
@classmethod
def get_all_stream_urls_by_anime_title(
cls, title: str, episodes_range: str | None = None
) -> AnimdlData:
"""retrieves all anime stream urls of the given episode range from animdl
Args:
title (str): the anime title
episodes_range (str, optional): an animdl episodes range. Defaults to None.
Returns:
AnimdlData: The parsed data from animdl grab
"""
possible_anime = cls.get_anime_url_by_title(title)
return AnimdlData(
possible_anime.anime_title,
cls.get_all_anime_stream_urls_by_anime_url(
possible_anime.animdl_anime_url, episodes_range
),
)
# TODO: Should i finish??
@classmethod
def get_stream_urls_by_anime_title_and_quality(
cls, title: str, quality="best", episodes_range=None
):
(cls.get_all_stream_urls_by_anime_title(title))
@classmethod
def download_anime_by_title(
cls,
_anime_title: str,
on_episode_download_progress: Callable,
on_episode_download_complete: Callable,
on_complete: Callable,
output_path: str,
episodes_range: str | None = None,
quality: str = "best",
) -> tuple[list[int], list[int]]:
"""Downloads anime either adaptive, progressive, or .m3u streams and uses mpv to achieve this
Args:
_anime_title (str): the anime title you want to download
on_episode_download_progress (Callable): the callback when a chunk of an episode is downloaded
on_episode_download_complete (Callable): the callback when an episode has been successfully downloaded
on_complete (Callable): callback when the downloading process is complete
output_path (str): the directory | folder to download the anime
episodes_range (str | None, optional): a valid animdl episode range. Defaults to None.
quality (str, optional): the anime quality. Defaults to "best".
Raises:
NoValidAnimeStreamsException: raised when no valid streams were found for a particular episode
Returns:
tuple[list[int], list[int]]: a tuple containing successful, and failed downloads list
"""
anime_streams_data = cls.get_all_stream_urls_by_anime_title(
_anime_title, episodes_range
)
failed_downloads = []
successful_downloads = []
anime_title = anime_streams_data.anime_title.capitalize()
# determine and parse download location
parsed_anime_title = path_parser(anime_title)
download_location = os.path.join(output_path, parsed_anime_title)
if not os.path.exists(download_location):
os.mkdir(download_location)
Logger.debug(f"Animdl Api Downloader: Started downloading: {anime_title}")
for episode in anime_streams_data.episodes:
episode_number = episode["episode"]
episode_title = f"Episode {episode_number}"
try:
streams = filter_broken_streams(episode["streams"])
# raises an exception if no streams for current episodes
if not streams:
raise NoValidAnimeStreamsException(
f"No valid streams were found for episode {episode_number}"
)
episode_stream = filter_streams_by_quality(streams, quality)
# determine episode_title
if _episode_title := episode_stream.get("title"):
episode_title = f"{episode_title} - {path_parser(_episode_title)}"
# determine episode download location
parsed_episode_title = path_parser(episode_title)
episode_download_location = os.path.join(
download_location, parsed_episode_title
)
if not os.path.exists(episode_download_location):
os.mkdir(episode_download_location)
# init download process
stream_url = episode_stream["stream_url"]
audio_tracks = episode_stream.get("audio_tracks")
subtitles = episode_stream.get("subtitle")
episode_info = {
"episode": episode_title,
"anime_title": anime_title,
}
# check if its adaptive or progressive and call the appropriate downloader
if stream_url and subtitles and audio_tracks:
Logger.debug(
f"Animdl api Downloader: Downloading adaptive episode {anime_title}-{episode_title}"
)
cls.download_adaptive(
stream_url,
audio_tracks[0],
subtitles[0],
episode_download_location,
on_episode_download_progress,
episode_info,
)
elif stream_url and subtitles:
# probably wont occur
Logger.debug(
f"Animdl api Downloader: downloading !? episode {anime_title}-{episode_title}"
)
cls.download_video_and_subtitles(
stream_url,
subtitles[0],
episode_download_location,
on_episode_download_progress,
episode_info,
)
else:
Logger.debug(
f"Animdl api Downloader: Downloading progressive episode {anime_title}-{episode_title}"
)
cls.download_progressive(
stream_url,
episode_download_location,
episode_info,
on_episode_download_progress,
)
# epiosode download complete
on_episode_download_complete(anime_title, episode_title)
successful_downloads.append(episode_number)
Logger.debug(
f"Animdl api Downloader: Success in dowloading {anime_title}-{episode_title}"
)
except Exception as e:
Logger.debug(
f"Animdl api Downloader: Failed in dowloading {anime_title}-{episode_title}; reason {e}"
)
failed_downloads.append(episode_number)
Logger.debug(
f"Animdl api Downloader: Completed in dowloading {anime_title}-{episodes_range}; Successful:{len(successful_downloads)}, Failed:{len(failed_downloads)}"
)
on_complete(successful_downloads, failed_downloads, anime_title)
return (successful_downloads, failed_downloads)
@classmethod
def download_with_mpv(cls, url: str, output_path: str, on_progress: Callable):
"""The method used to download a remote resource with mpv
Args:
url (str): the url of the remote resource to download
output_path (str): the location to download the resource to
on_progress (Callable): the callback when a chunk of the resource is downloaded
Returns:
subprocess return code: the return code of the mpv subprocess
"""
mpv_child_process = run_mpv_command(url, f"--stream-dump={output_path}")
progress_regex = re.compile(r"\d+/\d+") # eg Dumping 2044776/125359745
# extract progress info from mpv
for stream in mpv_child_process.stderr: # type: ignore
# Logger.info(f"Animdl Api Downloader: {stream}")
if progress_matches := progress_regex.findall(stream):
current_bytes, total_bytes = [
float(val) for val in progress_matches[0].split("/")
]
on_progress(current_bytes, total_bytes)
return mpv_child_process.returncode
@classmethod
def download_progressive(
cls,
video_url: str,
output_path: str,
episode_info: dict[str, str],
on_progress: Callable,
):
"""the progressive downloader of mpv
Args:
video_url (str): a video url
output_path (str): download location
episode_info (dict[str, str]): the details of the episode we downloading
on_progress (Callable): the callback when a chunk is downloaded
Raises:
Exception: exception raised when anything goes wrong
"""
episode = (
path_parser(episode_info["anime_title"])
+ " - "
+ path_parser(episode_info["episode"])
)
file_name = episode + ".mp4"
download_location = os.path.join(output_path, file_name)
def on_progress_(current_bytes, total_bytes):
return on_progress(current_bytes, total_bytes, episode_info)
isfailure = cls.download_with_mpv(video_url, download_location, on_progress_)
if isfailure:
raise Exception
@classmethod
def download_adaptive(
cls,
video_url: str,
audio_url: str,
sub_url: str,
output_path: str,
on_progress: Callable,
episode_info: dict[str, str],
):
"""the adaptive downloader
Args:
video_url (str): url of video you want ot download
audio_url (str): url of audio file you want ot download
sub_url (str): url of sub file you want ot download
output_path (str): download location
on_progress (Callable): the callback when a chunk is downloaded
episode_info (dict[str, str]): episode details
Raises:
Exception: incase anything goes wrong
"""
def on_progress_(current_bytes, total_bytes):
return on_progress(current_bytes, total_bytes, episode_info)
episode = (
path_parser(episode_info["anime_title"])
+ " - "
+ path_parser(episode_info["episode"])
)
sub_filename = episode + ".ass"
sub_filepath = os.path.join(output_path, sub_filename)
cls.download_with_mpv(sub_url, sub_filepath, on_progress_)
audio_filename = episode + ".mp3"
audio_filepath = os.path.join(output_path, audio_filename)
cls.download_with_mpv(audio_url, audio_filepath, on_progress_)
video_filename = episode + ".mp4"
video_filepath = os.path.join(output_path, video_filename)
is_video_failure = cls.download_with_mpv(
video_url, video_filepath, on_progress_
)
if is_video_failure:
raise Exception
@classmethod
def download_video_and_subtitles(
cls,
video_url: str,
sub_url: str,
output_path: str,
on_progress: Callable,
episode_info: dict[str, str],
):
"""only downloads video and subs
Args:
video_url (str): url of video you want ot download
sub_url (str): url of sub you want ot download
output_path (str): the download location
on_progress (Callable): the callback for when a chunk is downloaded
episode_info (dict[str, str]): episode details
Raises:
Exception: when anything goes wrong
"""
def on_progress_(current_bytes, total_bytes):
return on_progress(current_bytes, total_bytes, episode_info)
episode = (
path_parser(episode_info["anime_title"])
+ " - "
+ path_parser(episode_info["episode"])
)
sub_filename = episode + ".ass"
sub_filepath = os.path.join(output_path, sub_filename)
cls.download_with_mpv(sub_url, sub_filepath, on_progress_)
video_filename = episode + ".mp4"
video_filepath = os.path.join(output_path, video_filename)
is_video_failure = cls.download_with_mpv(
video_url, video_filepath, on_progress_
)
if is_video_failure:
raise Exception

View File

@@ -1,201 +0,0 @@
import json
import re
from fuzzywuzzy import fuzz
from .animdl_types import (
AnimdlAnimeEpisode,
AnimdlAnimeUrlAndTitle,
AnimdlEpisodeStream,
)
from .extras import Logger
# Currently this links don't work so we filter it out
broken_link_pattern = r"https://tools.fast4speed.rsvp/\w*"
def path_parser(path: str) -> str:
"""Parses a string and removes path unsafe characters
Args:
path (str): a path literal
Returns:
str: a parsed string that can be used as a valid path
"""
return (
path.replace(":", "")
.replace("/", "")
.replace("\\", "")
.replace('"', "")
.replace("'", "")
.replace("<", "")
.replace(">", "")
.replace("|", "")
.replace("?", "")
.replace(".", "")
.replace("*", "")
)
def string_contains_only_spaces(input_string: str) -> bool:
"""Checks if the string is a string of spaces
Args:
input_string (str): any string
Returns:
bool: a boolean in indicating whether it does contain only spaces or not
"""
return all(char.isspace() for char in input_string)
def anime_title_percentage_match(
possible_user_requested_anime_title: str, title: str
) -> int:
"""Returns the percentage match between the possible title and user title
Args:
possible_user_requested_anime_title (str): an Animdl search result title
title (str): the anime title the user wants
Returns:
int: the percentage match
"""
percentage_ratio = fuzz.ratio(title, possible_user_requested_anime_title)
Logger.debug(
f"Animdl Api Fuzzy: Percentage match of {possible_user_requested_anime_title} against {title}: {percentage_ratio}%"
)
return percentage_ratio
def filter_broken_streams(
streams: list[AnimdlEpisodeStream],
) -> list[AnimdlEpisodeStream]:
"""filters the streams that the project has evaluated doesnt work
Args:
streams (list[AnimdlEpisodeStream]): the streams to filter
Returns:
list[AnimdlEpisodeStream]: the valid streams
"""
def stream_filter(stream):
return (
True if not re.match(broken_link_pattern, stream["stream_url"]) else False
)
return list(filter(stream_filter, streams))
def filter_streams_by_quality(
anime_episode_streams: list[AnimdlEpisodeStream], quality: str | int, strict=False
) -> AnimdlEpisodeStream:
"""filters streams by quality
Args:
anime_episode_streams (list[AnimdlEpisodeStream]): the streams to filter
quality (str | int): the quality you want to get
strict (bool, optional): whether to always return an episode if quality isn,t found. Defaults to False.
Returns:
AnimdlEpisodeStream: the stream of specified quality
"""
# get the appropriate stream or default to best
def get_quality_func(stream_):
return stream_.get("quality") if stream_.get("quality") else 0
match quality:
case "best":
return max(anime_episode_streams, key=get_quality_func)
case "worst":
return min(anime_episode_streams, key=get_quality_func)
case _:
for episode_stream in anime_episode_streams:
if str(episode_stream.get("quality")) == str(quality):
return episode_stream
else:
# if not strict:
Logger.debug("Animdl Api: Not strict so defaulting to best")
return max(anime_episode_streams, key=get_quality_func)
# else:
# Logger.warning(
# f"Animdl Api: No stream matching the given quality was found"
# )
# return AnimdlEpisodeStream({})
def parse_stream_urls_data(raw_stream_urls_data: str) -> list[AnimdlAnimeEpisode]:
"""parses the streams data gotten from animdl grab
Args:
raw_stream_urls_data (str): the animdl grab data to parse
Returns:
list[AnimdlAnimeEpisode]: the parsed streams for all episode
"""
try:
return [
AnimdlAnimeEpisode(json.loads(episode.strip()))
for episode in raw_stream_urls_data.strip().split("\n")
]
except Exception as e:
Logger.error(f"Animdl Api Parser {e}")
return []
def search_output_parser(raw_data: str) -> list[AnimdlAnimeUrlAndTitle]:
"""Parses the recieved raw search animdl data and makes it more easy to use
Args:
raw_data (str): valid animdl data
Returns:
AnimdlAnimeUrlAndTitle: parsed animdl data containing an animdl anime url and anime title
"""
# get each line of dat and ignore those that contain unwanted data
data = raw_data.split("\n")[3:]
parsed_data = []
pass_next = False
# loop through all lines and return an appropriate AnimdlAimeUrlAndTitle
for i, data_item in enumerate(data[:]):
# continue if current was used in creating previous animdlanimeurlandtitle
if pass_next:
pass_next = False
continue
# there is no data or its just spaces so ignore and continue
if not data_item or string_contains_only_spaces(data_item):
continue
# split title? from url?
item = data_item.split(" / ")
numbering_pattern = r"^\d*\.\s*"
# attempt to parse
try:
# remove numbering from search results
anime_title = re.sub(numbering_pattern, "", item[0]).lower()
# special case for onepiece since allanime labels it as 1p instead of onepiece
one_piece_regex = re.compile(r"1p", re.IGNORECASE)
if one_piece_regex.match(anime_title):
anime_title = "one piece"
# checks if the data is already structure like anime title, animdl url if not makes it that way
if item[1] == "" or string_contains_only_spaces(item[1]):
pass_next = True
parsed_data.append(AnimdlAnimeUrlAndTitle(anime_title, data[(i + 1)]))
else:
parsed_data.append(AnimdlAnimeUrlAndTitle(anime_title, item[1]))
except Exception:
pass
return parsed_data # anime title,url

View File

@@ -1,18 +0,0 @@
class MPVNotFoundException(Exception):
pass
class Python310NotFoundException(Exception):
pass
class AnimdlAnimeUrlNotFoundException(Exception):
pass
class NoValidAnimeStreamsException(Exception):
pass
class InvalidAnimdlCommandsException(Exception):
pass

View File

@@ -1,24 +0,0 @@
from typing import NamedTuple, TypedDict
class AnimdlAnimeUrlAndTitle(NamedTuple):
anime_title: str
animdl_anime_url: str
class AnimdlEpisodeStream(TypedDict):
stream_url: str
quality: int
subtitle: list[str] | None
audio_tracks: list[str] | None
title: str | None
class AnimdlAnimeEpisode(TypedDict):
episode: int
streams: list[AnimdlEpisodeStream]
class AnimdlData(NamedTuple):
anime_title: str
episodes: list[AnimdlAnimeEpisode]

View File

@@ -1,9 +0,0 @@
import logging
Logger = logging.getLogger(__name__)
# Logger.setLevel(logging.DEBUG)
# formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
# console_handler = logging.StreamHandler()
# console_handler.setLevel(logging.INFO)
# console_handler.setFormatter(formatter)
# Logger.addHandler(console_handler)