diff --git a/tags.json b/dev/generated/anilist/tags.json similarity index 100% rename from tags.json rename to dev/generated/anilist/tags.json diff --git a/fastanime/Utility/downloader/downloader.py b/fastanime/Utility/downloader/downloader.py deleted file mode 100644 index c48fa6e..0000000 --- a/fastanime/Utility/downloader/downloader.py +++ /dev/null @@ -1,237 +0,0 @@ -import logging -import os -import shutil -import subprocess -import tempfile -from queue import Queue -from threading import Thread - -import yt_dlp -from rich import print -from rich.prompt import Confirm -from yt_dlp.utils import sanitize_filename - -logger = logging.getLogger(__name__) - - -class YtDLPDownloader: - downloads_queue = Queue() - _thread = None - - def _worker(self): - while True: - task, args = self.downloads_queue.get() - try: - task(*args) - except Exception as e: - logger.error(f"Something went wrong {e}") - self.downloads_queue.task_done() - - def _download_file( - self, - url: str, - anime_title: str, - episode_title: str, - download_dir: str, - silent: bool, - progress_hooks=[], - vid_format: str = "best", - force_unknown_ext=False, - verbose=False, - headers={}, - sub="", - merge=False, - clean=False, - prompt=True, - force_ffmpeg=False, - hls_use_mpegts=False, - hls_use_h264=False, - nocheckcertificate=False, - ): - """Helper function that downloads anime given url and path details - - Args: - url: [TODO:description] - anime_title: [TODO:description] - episode_title: [TODO:description] - download_dir: [TODO:description] - silent: [TODO:description] - vid_format: [TODO:description] - """ - anime_title = sanitize_filename(anime_title) - episode_title = sanitize_filename(episode_title) - if url.endswith(".torrent"): - WEBTORRENT_CLI = shutil.which("webtorrent") - if not WEBTORRENT_CLI: - import time - - print( - "webtorrent cli is not installed which is required for downloading and streaming from nyaa\nplease install it or use another provider" - ) - time.sleep(120) - return - cmd = [ - WEBTORRENT_CLI, - "download", - url, - "--out", - os.path.join(download_dir, anime_title, episode_title), - ] - subprocess.run(cmd) - return - ydl_opts = { - # Specify the output path and template - "http_headers": headers, - "outtmpl": f"{download_dir}/{anime_title}/{episode_title}.%(ext)s", - "silent": silent, - "verbose": verbose, - "format": vid_format, - "compat_opts": ("allow-unsafe-ext",) if force_unknown_ext else tuple(), - "progress_hooks": progress_hooks, - "nocheckcertificate": nocheckcertificate, - } - urls = [url] - if sub: - urls.append(sub) - vid_path = "" - sub_path = "" - for i, url in enumerate(urls): - options = ydl_opts - if i == 0: - if force_ffmpeg: - options = options | { - "external_downloader": {"default": "ffmpeg"}, - "external_downloader_args": { - "ffmpeg_i1": ["-v", "error", "-stats"], - }, - } - if hls_use_mpegts: - options = options | { - "hls_use_mpegts": True, - "outtmpl": ".".join(options["outtmpl"].split(".")[:-1]) + ".ts", # force .ts extension - } - elif hls_use_h264: - options = options | { - "external_downloader_args": options["external_downloader_args"] | { - "ffmpeg_o1": [ - "-c:v", "copy", - "-c:a", "aac", - "-bsf:a", "aac_adtstoasc", - "-q:a", "1", - "-ac", "2", - "-af", "loudnorm=I=-22:TP=-2.5:LRA=11,alimiter=limit=-1.5dB", # prevent clipping from HE-AAC to AAC convertion - ], - } - } - - with yt_dlp.YoutubeDL(options) as ydl: - info = ydl.extract_info(url, download=True) - if not info: - continue - if i == 0: - vid_path: str = info["requested_downloads"][0]["filepath"] - if vid_path.endswith(".unknown_video"): - print("Normalizing path...") - _vid_path = vid_path.replace(".unknown_video", ".mp4") - shutil.move(vid_path, _vid_path) - vid_path = _vid_path - print("successfully normalized path") - - else: - sub_path = info["requested_downloads"][0]["filepath"] - if sub_path and vid_path and merge: - self.merge_subtitles(vid_path, sub_path, clean, prompt) - - def merge_subtitles(self, video_path, sub_path, clean, prompt): - # Extract the directory and filename - video_dir = os.path.dirname(video_path) - video_name = os.path.basename(video_path) - video_name, _ = os.path.splitext(video_name) - video_name += ".mkv" - - FFMPEG_EXECUTABLE = shutil.which("ffmpeg") - if not FFMPEG_EXECUTABLE: - print("[yellow bold]WARNING: [/]FFmpeg not found") - return - # Create a temporary directory - with tempfile.TemporaryDirectory() as temp_dir: - # Temporary output path in the temporary directory - temp_output_path = os.path.join(temp_dir, video_name) - # FFmpeg command to merge subtitles - command = [ - FFMPEG_EXECUTABLE, - "-hide_banner", - "-i", - video_path, - "-i", - sub_path, - "-c", - "copy", - "-map", - "0", - "-map", - "1", - temp_output_path, - ] - - # Run the command - try: - subprocess.run(command, check=True) - - # Move the file back to the original directory with the original name - final_output_path = os.path.join(video_dir, video_name) - - if os.path.exists(final_output_path): - if not prompt or Confirm.ask( - f"File exists({final_output_path}) would you like to overwrite it", - default=True, - ): - # move file to dest - os.remove(final_output_path) - shutil.move(temp_output_path, final_output_path) - else: - shutil.move(temp_output_path, final_output_path) - # clean up - if clean: - print("[cyan]Cleaning original files...[/]") - os.remove(video_path) - os.remove(sub_path) - - print( - f"[green bold]Subtitles merged successfully.[/] Output file: {final_output_path}" - ) - except subprocess.CalledProcessError as e: - print(f"[red bold]Error[/] during merging subtitles: {e}") - except Exception as e: - print(f"[red bold]An error[/] occurred: {e}") - - def download_file( - self, - url: str, - anime_title: str, - episode_title: str, - download_dir: str, - silent: bool = True, - **kwargs, - ): - """A helper that just does things in the background - - Args: - title ([TODO:parameter]): [TODO:description] - silent ([TODO:parameter]): [TODO:description] - url: [TODO:description] - """ - if not self._thread: - self._thread = Thread(target=self._worker) - self._thread.daemon = True - self._thread.start() - - self.downloads_queue.put( - ( - self._download_file, - (url, anime_title, episode_title, download_dir, silent), - ) - ) - - -downloader = YtDLPDownloader() diff --git a/fastanime/__init__.py b/fastanime/__init__.py index c27fcc8..0da8781 100644 --- a/fastanime/__init__.py +++ b/fastanime/__init__.py @@ -6,15 +6,7 @@ if sys.version_info < (3, 10): ) # noqa: F541 -__version__ = "v2.9.9" - -APP_NAME = "FastAnime" -AUTHOR = "Benexl" -GIT_REPO = "github.com" -REPO = f"{GIT_REPO}/{AUTHOR}/{APP_NAME}" - - -def FastAnime(): +def Cli(): from .cli import run_cli run_cli() diff --git a/fastanime/__main__.py b/fastanime/__main__.py index bc4cea3..bede7bc 100644 --- a/fastanime/__main__.py +++ b/fastanime/__main__.py @@ -9,6 +9,6 @@ if __package__ is None and not getattr(sys, "frozen", False): if __name__ == "__main__": - from . import FastAnime + from . import Cli - FastAnime() + Cli() diff --git a/fastanime/cli/interfaces/anilist_interfaces.py b/fastanime/cli/interfaces/anilist_interfaces.py deleted file mode 100644 index fcf07fa..0000000 --- a/fastanime/cli/interfaces/anilist_interfaces.py +++ /dev/null @@ -1,2103 +0,0 @@ -from __future__ import annotations - -import os -import random -import threading -from hashlib import sha256 -from typing import TYPE_CHECKING - -from click import clear -from InquirerPy import inquirer -from InquirerPy.validator import EmptyInputValidator -from rich import print -from rich.progress import Progress -from rich.prompt import Confirm, Prompt -from yt_dlp.utils import sanitize_filename - -from ...anilist import AniList -from ...constants import USER_CONFIG_PATH -from ...libs.discord import discord -from ...libs.fzf import fzf -from ...libs.rofi import Rofi -from ...Utility.data import anime_normalizer -from ...Utility.utils import anime_title_percentage_match -from ..utils.mpv import run_mpv -from ..utils.tools import exit_app -from ..utils.utils import ( - filter_by_quality, - fuzzy_inquirer, - move_preferred_subtitle_lang_to_top, -) -from .utils import aniskip - -if TYPE_CHECKING: - from ...libs.anilist.types import AnilistBaseMediaDataSchema - from ...libs.anime_provider.types import Anime, SearchResult, Server - from ..config import Config - from ..utils.tools import FastAnimeRuntimeState - - -def calculate_percentage_completion(start_time, end_time): - """helper function used to calculate the difference between two timestamps in seconds - - Args: - start_time ([TODO:parameter]): [TODO:description] - end_time ([TODO:parameter]): [TODO:description] - - Returns: - [TODO:return] - """ - - try: - start = start_time.split(":") - end = end_time.split(":") - start_secs = int(start[0]) * 3600 + int(start[1]) * 60 + int(start[2]) - end_secs = int(end[0]) * 3600 + int(end[1]) * 60 + int(end[2]) - return start_secs / end_secs * 100 - except Exception: - return 0 - - -def discord_updater(show, episode, switch): - discord.discord_connect(show, episode, switch) - - -def media_player_controls( - config: "Config", fastanime_runtime_state: "FastAnimeRuntimeState" -): - """Menu that that offers media player controls - - Args: - config: [TODO:description] - fastanime_runtime_state: [TODO:description] - """ - # user config - config.translation_type.lower() - - # internal config - current_episode_number: str = ( - fastanime_runtime_state.provider_current_episode_number - ) - available_episodes: list = sorted( - fastanime_runtime_state.provider_available_episodes, key=float - ) - server_episode_streams: list = ( - fastanime_runtime_state.provider_server_episode_streams - ) - current_episode_stream_link: str = ( - fastanime_runtime_state.provider_current_episode_stream_link - ) - provider_anime_title: str = fastanime_runtime_state.provider_anime_title - anime_id_anilist: int = fastanime_runtime_state.selected_anime_id_anilist - - def _servers(): - """Go to servers menu""" - config.server = "" - - provider_anime_episode_servers_menu(config, fastanime_runtime_state) - - def _replay(): - """replay the current media""" - selected_server: "Server" = fastanime_runtime_state.provider_current_server - print( - "[bold magenta]Now Replaying:[/]", - provider_anime_title, - "[bold magenta] Episode: [/]", - current_episode_number, - ) - - if ( - config.watch_history[str(anime_id_anilist)]["episode_no"] - == current_episode_number - ): - start_time = config.watch_history[str(anime_id_anilist)][ - "episode_stopped_at" - ] - print("[green]Continuing from:[/] ", start_time) - else: - start_time = "0" - custom_args = [] - if config.skip: - if args := aniskip( - fastanime_runtime_state.selected_anime_anilist["idMal"], - current_episode_number, - ): - custom_args.extend(args) - subtitles = move_preferred_subtitle_lang_to_top( - selected_server["subtitles"], config.sub_lang - ) - episode_title = selected_server["episode_title"] - if config.normalize_titles: - import re - - for episode_detail in fastanime_runtime_state.selected_anime_anilist[ - "streamingEpisodes" - ]: - if re.match( - f"Episode {current_episode_number} ", episode_detail["title"] - ): - episode_title = episode_detail["title"] - break - if config.sync_play: - from ..utils.syncplay import SyncPlayer - - stop_time, total_time = SyncPlayer( - current_episode_stream_link, - episode_title, - headers=selected_server["headers"], - subtitles=subtitles, - ) - elif config.use_python_mpv: - from ..utils.player import player - - player.create_player( - current_episode_stream_link, - config.anime_provider, - fastanime_runtime_state, - config, - episode_title, - start_time, - headers=selected_server["headers"], - subtitles=subtitles, - ) - stop_time = player.last_stop_time - total_time = player.last_total_time - else: - stop_time, total_time = run_mpv( - current_episode_stream_link, - episode_title, - start_time=start_time, - custom_args=custom_args, - headers=selected_server["headers"], - subtitles=subtitles, - player=config.player, - ) - - # either update the watch history to the next episode or current depending on progress - if stop_time == "0" or total_time == "0": - episode = str(int(current_episode_number) + 1) - else: - percentage_completion_of_episode = calculate_percentage_completion( - stop_time, total_time - ) - if percentage_completion_of_episode < config.episode_complete_at: - episode = current_episode_number - else: - episode = str(int(current_episode_number) + 1) - stop_time = "0" - total_time = "0" - - clear() - config.media_list_track( - anime_id_anilist, - episode_no=episode, - episode_stopped_at=stop_time, - episode_total_length=total_time, - progress_tracking=fastanime_runtime_state.progress_tracking, - ) - media_player_controls(config, fastanime_runtime_state) - - def _next_episode(): - """watch the next episode""" - # ensures you dont accidentally erase your progress for an in complete episode - stop_time = config.watch_history.get(str(anime_id_anilist), {}).get( - "episode_stopped_at", "0" - ) - - total_time = config.watch_history.get(str(anime_id_anilist), {}).get( - "episode_total_length", "0" - ) - - # compute if the episode is actually completed - if stop_time == "0" or total_time == "0": - percentage_completion_of_episode = 0 - else: - percentage_completion_of_episode = calculate_percentage_completion( - stop_time, total_time - ) - if percentage_completion_of_episode < config.episode_complete_at: - if config.auto_next: - if config.use_rofi: - if not Rofi.confirm( - "Are you sure you wish to continue to the next episode you haven't completed the current episode?" - ): - media_actions_menu(config, fastanime_runtime_state) - return - else: - if not Confirm.ask( - "Are you sure you wish to continue to the next episode you haven't completed the current episode?", - default=False, - ): - media_actions_menu(config, fastanime_runtime_state) - return - elif not config.use_rofi: - if not Confirm.ask( - "Are you sure you wish to continue to the next episode, your progress for the current episodes will be erased?", - default=True, - ): - media_actions_menu(config, fastanime_runtime_state) - return - - # all checks have passed lets go to the next episode - next_episode = available_episodes.index(current_episode_number) + 1 - if next_episode >= len(available_episodes): - next_episode = len(available_episodes) - 1 - - # updateinternal config - fastanime_runtime_state.provider_current_episode_number = available_episodes[ - next_episode - ] - - # update user config - config.media_list_track( - anime_id_anilist, - episode_no=available_episodes[next_episode], - progress_tracking=fastanime_runtime_state.progress_tracking, - ) - - # call interface - provider_anime_episode_servers_menu(config, fastanime_runtime_state) - - def _episodes(): - """Go to episodes menu""" - # reset watch_history - config.continue_from_history = False - - # call interface - provider_anime_episodes_menu(config, fastanime_runtime_state) - - def _previous_episode(): - """Watch previous episode""" - prev_episode = available_episodes.index(current_episode_number) - 1 - if prev_episode <= 0: - prev_episode = 0 - # fastanime_runtime_state.episode_title = episode["title"] - fastanime_runtime_state.provider_current_episode_number = available_episodes[ - prev_episode - ] - - # update user config - config.media_list_track( - anime_id_anilist, - episode_no=available_episodes[prev_episode], - progress_tracking=fastanime_runtime_state.progress_tracking, - ) - - # call interface - provider_anime_episode_servers_menu(config, fastanime_runtime_state) - - def _change_quality(): - """Change the quality of the media""" - # extract the actual link urls - options = [link["quality"] for link in server_episode_streams] - - # prompt for new quality - if config.use_fzf: - quality = fzf.run( - options, prompt="Select Quality", header="Quality Options" - ) - elif config.use_rofi: - quality = Rofi.run(options, "Select Quality") - else: - quality = fuzzy_inquirer( - options, - "Select Quality", - ) - config.quality = quality # set quality - media_player_controls(config, fastanime_runtime_state) - - def _change_translation_type(): - """change translation type""" - # prompt for new translation type - options = ["sub", "dub"] - if config.use_fzf: - translation_type = fzf.run( - options, prompt="Select Translation Type", header="Lang Options" - ).lower() - elif config.use_rofi: - translation_type = Rofi.run(options, "Select Translation Type") - else: - translation_type = fuzzy_inquirer( - options, - "Select Translation Type", - ).lower() - - # update internal config - config.translation_type = translation_type.lower() - - # reload to controls - media_player_controls(config, fastanime_runtime_state) - - icons = config.icons - options = {} - - # Only show Next Episode option if the current episode is not the last one - current_index = available_episodes.index(current_episode_number) - if current_index < len(available_episodes) - 1: - options[f"{'โญ ' if icons else ''}Next Episode"] = _next_episode - - def _toggle_auto_next( - config: "Config", fastanime_runtime_state: "FastAnimeRuntimeState" - ): - """helper function to toggle auto next - - Args: - config: [TODO:description] - fastanime_runtime_state: [TODO:description] - """ - config.auto_next = not config.auto_next - media_player_controls(config, fastanime_runtime_state) - - options.update( - { - f"{'๐Ÿ”‚ ' if icons else ''}Replay": _replay, - f"{'โฎ ' if icons else ''}Previous Episode": _previous_episode, - f"{'๐Ÿ—ƒ๏ธ ' if icons else ''}Episodes": _episodes, - f"{'๐Ÿ“€ ' if icons else ''}Change Quality": _change_quality, - f"{'๐ŸŽง ' if icons else ''}Change Translation Type": _change_translation_type, - f"{'๐Ÿ’  ' if icons else ''}Toggle auto next episode": lambda: _toggle_auto_next( - config, fastanime_runtime_state - ), - f"{'๐Ÿ’ฝ ' if icons else ''}Servers": _servers, - f"{'๐Ÿ“ฑ ' if icons else ''}Main Menu": lambda: fastanime_main_menu( - config, fastanime_runtime_state - ), - f"{'๐Ÿ“œ ' if icons else ''}Media Actions Menu": lambda: media_actions_menu( - config, fastanime_runtime_state - ), - f"{'๐Ÿ”Ž ' if icons else ''}Anilist Results Menu": lambda: anilist_results_menu( - config, fastanime_runtime_state - ), - f"{'โŒ ' if icons else ''}Exit": exit_app, - } - ) - - if config.auto_next: - if current_index < len(available_episodes) - 1: - print("Auto selecting next episode") - _next_episode() - return - else: - print("Last episode reached") - - choices = list(options.keys()) - if config.use_fzf: - action = fzf.run( - choices, - prompt="Select Action", - ) - elif config.use_rofi: - action = Rofi.run(choices, "Select Action") - else: - action = fuzzy_inquirer(choices, "Select Action") - options[action]() - - -def provider_anime_episode_servers_menu( - config: "Config", fastanime_runtime_state: "FastAnimeRuntimeState" -): - """Menu that enables selection of a server either manually or automatically based on user config then plays the stream link of the quality the user prefers - - Args: - config: [TODO:description] - fastanime_runtime_state: [TODO:description] - - Returns: - [TODO:return] - """ - # user config - quality: str = config.quality - translation_type = config.translation_type - anime_provider = config.anime_provider - - # runtime configuration - current_episode_number: str = ( - fastanime_runtime_state.provider_current_episode_number - ) - provider_anime_title: str = fastanime_runtime_state.provider_anime_title - anime_id_anilist: int = fastanime_runtime_state.selected_anime_id_anilist - provider_anime: "Anime" = fastanime_runtime_state.provider_anime - - server_name = "" - # get streams for episode from provider - with Progress() as progress: - progress.add_task("Fetching Episode Streams...", total=None) - episode_streams_generator = anime_provider.get_episode_streams( - provider_anime["id"], - current_episode_number, - translation_type, - ) - if not episode_streams_generator: - if not config.use_rofi: - print("Failed to fetch :cry:") - input("Enter to retry...") - else: - if not Rofi.confirm("Sth went wrong!!Enter to continue..."): - exit(1) - media_actions_menu(config, fastanime_runtime_state) - return - - if config.server == "top": - # no need to get all servers if top just works - with Progress() as progress: - progress.add_task("Fetching top server...", total=None) - selected_server = next(episode_streams_generator, None) - if not selected_server: - if config.use_rofi: - if Rofi.confirm("Sth went wrong enter to continue"): - media_actions_menu(config, fastanime_runtime_state) - else: - exit_app(1) - else: - print("Sth went wrong") - input("Enter to continue...") - media_actions_menu(config, fastanime_runtime_state) - return - else: - with Progress() as progress: - progress.add_task("Fetching servers...", total=None) - episode_streams_dict = { - episode_stream["server"]: episode_stream - for episode_stream in episode_streams_generator - } - - if not episode_streams_dict: - if config.use_rofi: - if Rofi.confirm("Sth went wrong enter to continue"): - media_actions_menu(config, fastanime_runtime_state) - else: - exit_app(1) - else: - print("Sth went wrong") - input("Enter to continue...") - media_actions_menu(config, fastanime_runtime_state) - return - # check if user server exists and is actually a valid serrver then sets it - if config.server and config.server in episode_streams_dict.keys(): - server_name = config.server - - # prompt for preferred server if not automatically set using config - if not server_name: - choices = [*episode_streams_dict.keys(), "top", "Back"] - if config.use_fzf: - server_name = fzf.run( - choices, - prompt="Select Server", - header="Servers", - ) - elif config.use_rofi: - server_name = Rofi.run(choices, "Select Server") - else: - server_name = fuzzy_inquirer( - choices, - "Select Server", - ) - if server_name == "Back": - # set continue_from_history to false in order for episodes menu to be shown or continue from history if true will prevent this from happening - config.continue_from_history = False - - provider_anime_episodes_menu(config, fastanime_runtime_state) - return - elif server_name == "top" and episode_streams_dict.keys(): - selected_server = episode_streams_dict[list(episode_streams_dict.keys())[0]] - else: - if server_name == "top" or server_name == "back": - if config.use_rofi: - if not Rofi.confirm("No severs available..."): - exit_app() - else: - media_actions_menu(config, fastanime_runtime_state) - return - else: - print("Failed to set server") - input("Enter to continue") - media_actions_menu(config, fastanime_runtime_state) - return - selected_server = episode_streams_dict[server_name] - - # get the stream of the preferred quality - provider_server_episode_streams = selected_server["links"] - provider_server_episode_stream = filter_by_quality( - quality, provider_server_episode_streams - ) - if not provider_server_episode_stream: - print("Quality not found") - input("Enter to continue...") - media_actions_menu(config, fastanime_runtime_state) - return - - current_stream_link = provider_server_episode_stream["link"] - - # update internal config - fastanime_runtime_state.provider_server_episode_streams = ( - provider_server_episode_streams - ) - fastanime_runtime_state.provider_current_episode_stream_link = current_stream_link - fastanime_runtime_state.provider_current_server = selected_server - fastanime_runtime_state.provider_current_server_name = server_name - - # play video - print( - "[bold magenta]Now playing:[/]", - provider_anime_title, - "[bold magenta] Episode: [/]", - current_episode_number, - ) - # update discord activity for user - switch = threading.Event() - if config.discord: - discord_proc = threading.Thread( - target=discord_updater, - args=(provider_anime_title, current_episode_number, switch), - ) - discord_proc.start() - - # try to get the timestamp you left off from if available - start_time = config.watch_history.get(str(anime_id_anilist), {}).get( - "episode_stopped_at", "0" - ) - episode_in_history = config.watch_history.get(str(anime_id_anilist), {}).get( - "episode_no", "" - ) - if start_time != "0" and episode_in_history == current_episode_number: - print("[green]Continuing from:[/] ", start_time) - else: - start_time = "0" - custom_args = [] - if config.skip: - if args := aniskip( - fastanime_runtime_state.selected_anime_anilist["idMal"], - current_episode_number, - ): - custom_args.extend(args) - subtitles = move_preferred_subtitle_lang_to_top( - selected_server["subtitles"], config.sub_lang - ) - episode_title = selected_server["episode_title"] - if config.normalize_titles: - import re - - for episode_detail in fastanime_runtime_state.selected_anime_anilist[ - "streamingEpisodes" - ]: - if re.match(f"Episode {current_episode_number} ", episode_detail["title"]): - episode_title = episode_detail["title"] - break - - if config.recent: - config.update_recent( - [ - fastanime_runtime_state.selected_anime_anilist, - *config.user_data["recent_anime"], - ] - ) - print("Updating recent anime...") - if config.sync_play: - from ..utils.syncplay import SyncPlayer - - stop_time, total_time = SyncPlayer( - current_stream_link, - episode_title, - headers=selected_server["headers"], - subtitles=subtitles, - ) - elif config.use_python_mpv: - from ..utils.player import player - - if start_time == "0" and episode_in_history != current_episode_number: - start_time = "0" - player.create_player( - current_stream_link, - anime_provider, - fastanime_runtime_state, - config, - episode_title, - start_time, - headers=selected_server["headers"], - subtitles=subtitles, - ) - - stop_time = player.last_stop_time - total_time = player.last_total_time - current_episode_number = fastanime_runtime_state.provider_current_episode_number - else: - if not episode_in_history == current_episode_number: - start_time = "0" - stop_time, total_time = run_mpv( - current_stream_link, - episode_title, - start_time=start_time, - custom_args=custom_args, - headers=selected_server["headers"], - subtitles=subtitles, - player=config.player, - ) - print("Finished at: ", stop_time) - - # stop discord activity updater - if config.discord: - switch.set() - - # update_watch_history - # this will try to update the episode to be the next episode if delta has reached a specific threshhold - # this update will only apply locally - # the remote(anilist) is only updated when its certain you are going to open the player - if stop_time == "0" or total_time == "0": - # increment the episodes - # next_episode = available_episodes.index(current_episode_number) + 1 - # if next_episode >= len(available_episodes): - # next_episode = len(available_episodes) - 1 - # episode = available_episodes[next_episode] - pass - else: - percentage_completion_of_episode = calculate_percentage_completion( - stop_time, total_time - ) - if percentage_completion_of_episode > config.episode_complete_at: - # -- update anilist progress if user -- - remote_progress = ( - fastanime_runtime_state.selected_anime_anilist["mediaListEntry"] or {} - ).get("progress") - disable_anilist_update = False - if remote_progress: - if ( - float(remote_progress) > float(current_episode_number) - and config.force_forward_tracking - ): - disable_anilist_update = True - if ( - fastanime_runtime_state.progress_tracking == "track" - and config.user - and not disable_anilist_update - and current_episode_number - ): - AniList.update_anime_list( - { - "mediaId": anime_id_anilist, - "progress": int(float(current_episode_number)), - } - ) - - # increment the episodes - # next_episode = available_episodes.index(current_episode_number) + 1 - # if next_episode >= len(available_episodes): - # next_episode = len(available_episodes) - 1 - # episode = available_episodes[next_episode] - # stop_time = "0" - # total_time = "0" - - config.media_list_track( - anime_id_anilist, - episode_no=current_episode_number, - episode_stopped_at=stop_time, - episode_total_length=total_time, - progress_tracking=fastanime_runtime_state.progress_tracking, - ) - - # switch to controls - clear() - - media_player_controls(config, fastanime_runtime_state) - - -def provider_anime_episodes_menu( - config: "Config", fastanime_runtime_state: "FastAnimeRuntimeState" -): - """A menu that handles selection of episode either manually or automatically based on either local episode progress or remote(anilist) progress - - Args: - config: [TODO:description] - fastanime_runtime_state: [TODO:description] - """ - # user config - translation_type: str = config.translation_type.lower() - continue_from_history: bool = config.continue_from_history - user_watch_history: dict = config.watch_history - - # runtime configuration - anime_id_anilist: int = fastanime_runtime_state.selected_anime_id_anilist - anime_title: str = fastanime_runtime_state.provider_anime_title - provider_anime: "Anime" = fastanime_runtime_state.provider_anime - selected_anime_anilist: "AnilistBaseMediaDataSchema" = ( - fastanime_runtime_state.selected_anime_anilist - ) - - # prompt for episode number - available_episodes = sorted( - provider_anime["availableEpisodesDetail"][translation_type], key=float - ) - current_episode_number = "" - - # auto select episode if continue from history otherwise prompt episode number - if continue_from_history: - # the user watch history thats locally available - # will be preferred over remote - if ( - config.preferred_history == "local" - or not selected_anime_anilist["mediaListEntry"] - ): - if ( - user_watch_history.get(str(anime_id_anilist), {}).get("episode_no") - in available_episodes - ): - current_episode_number = user_watch_history[str(anime_id_anilist)][ - "episode_no" - ] - - stop_time = user_watch_history.get(str(anime_id_anilist), {}).get( - "episode_stopped_at", "0" - ) - total_time = user_watch_history.get(str(anime_id_anilist), {}).get( - "episode_total_length", "0" - ) - if stop_time != "0" and total_time != "0": - percentage_completion_of_episode = calculate_percentage_completion( - stop_time, total_time - ) - if percentage_completion_of_episode > config.episode_complete_at: - # increment the episodes - next_episode = ( - available_episodes.index(current_episode_number) + 1 - ) - if next_episode >= len(available_episodes): - next_episode = len(available_episodes) - 1 - episode = available_episodes[next_episode] - stop_time = "0" - total_time = "0" - current_episode_number = episode - - else: - current_episode_number = str( - (selected_anime_anilist["mediaListEntry"] or {"progress": 0}).get( - "progress" - ) - ) - print( - f"[bold cyan]Continuing from Episode:[/] [bold]{current_episode_number}[/]" - ) - - # try to get the episode from anilist if present - elif selected_anime_anilist["mediaListEntry"]: - current_episode_number = str( - (selected_anime_anilist["mediaListEntry"] or {"progress": 0}).get( - "progress" - ) - ) - current_episode_number = str(int(current_episode_number) + 1) - if current_episode_number not in available_episodes: - current_episode_number = "" - print( - f"[bold cyan]Continuing from Episode:[/] [bold]{current_episode_number}[/]" - ) - # reset to none if not found - else: - current_episode_number = "" - - # prompt for episode number if not set - if not current_episode_number or current_episode_number not in available_episodes: - choices = [*available_episodes, "Back"] - preview = None - if config.use_fzf: - if config.preview: - from .utils import get_fzf_episode_preview - - e = fastanime_runtime_state.selected_anime_anilist["episodes"] - if e: - eps = range(0, e + 1) - else: - eps = available_episodes - preview = get_fzf_episode_preview( - fastanime_runtime_state.selected_anime_anilist, eps - ) - - if not preview: - print( - "Failed to find bash executable which is necessary for preview with fzf.\nIf you are on Windows, please make sure Git is installed and available in PATH." - ) - - current_episode_number = fzf.run( - choices, prompt="Select Episode", header=anime_title, preview=preview - ) - elif config.use_rofi: - current_episode_number = Rofi.run(choices, "Select Episode") - else: - current_episode_number = fuzzy_inquirer( - choices, - "Select Episode", - ) - - if current_episode_number == "Back": - media_actions_menu(config, fastanime_runtime_state) - return - # - # # try to get the start time and if not found default to "0" - # start_time = user_watch_history.get(str(anime_id_anilist), {}).get( - # "start_time", "0" - # ) - # config.update_watch_history( - # anime_id_anilist, current_episode_number, start_time=start_time - # ) - - # update runtime data - fastanime_runtime_state.provider_available_episodes = available_episodes - fastanime_runtime_state.provider_current_episode_number = current_episode_number - - # next interface - provider_anime_episode_servers_menu(config, fastanime_runtime_state) - - -def fetch_anime_episode( - config: "Config", fastanime_runtime_state: "FastAnimeRuntimeState" -): - selected_anime: "SearchResult" = ( - fastanime_runtime_state.provider_anime_search_result - ) - anime_provider = config.anime_provider - with Progress() as progress: - progress.add_task("Fetching Anime Info...", total=None) - provider_anime = anime_provider.get_anime( - selected_anime["id"], - ) - if not provider_anime: - print( - "Sth went wrong :cry: this could mean the provider is down or your internet" - ) - if not config.use_rofi: - input("Enter to continue...") - else: - if not Rofi.confirm("Sth went wrong!!Enter to continue..."): - exit(1) - return media_actions_menu(config, fastanime_runtime_state) - - fastanime_runtime_state.provider_anime = provider_anime - provider_anime_episodes_menu(config, fastanime_runtime_state) - - -# -# ---- ANIME PROVIDER SEARCH RESULTS MENU ---- -# - - -def set_prefered_progress_tracking( - config: "Config", fastanime_runtime_state: "FastAnimeRuntimeState", update=False -): - if ( - fastanime_runtime_state.progress_tracking == "" - or update - or fastanime_runtime_state.progress_tracking == "prompt" - ): - if config.default_media_list_tracking == "track": - fastanime_runtime_state.progress_tracking = "track" - elif config.default_media_list_tracking == "disabled": - fastanime_runtime_state.progress_tracking = "disabled" - else: - options = ["disabled", "track"] - if config.use_fzf: - fastanime_runtime_state.progress_tracking = fzf.run( - options, - "Enter your preferred progress tracking for the current anime", - ) - elif config.use_rofi: - fastanime_runtime_state.progress_tracking = Rofi.run( - options, - "Enter your preferred progress tracking for the current anime", - ) - else: - fastanime_runtime_state.progress_tracking = fuzzy_inquirer( - options, - "Enter your preferred progress tracking for the current anime", - ) - - -def anime_provider_search_results_menu( - config: "Config", fastanime_runtime_state: "FastAnimeRuntimeState" -): - """A menu that handles searching and selecting provider results; either manually or through fuzzy matching - - Args: - config: [TODO:description] - fastanime_runtime_state: [TODO:description] - """ - # user config - translation_type = config.translation_type.lower() - - # runtime data - selected_anime_title = fastanime_runtime_state.selected_anime_title_anilist - - selected_anime_anilist: "AnilistBaseMediaDataSchema" = ( - fastanime_runtime_state.selected_anime_anilist - ) - anime_provider = config.anime_provider - - # search and get the requested title from provider - with Progress() as progress: - progress.add_task("Fetching Search Results...", total=None) - provider_search_results = anime_provider.search_for_anime( - selected_anime_title, - translation_type, - ) - if not provider_search_results: - print( - "Sth went wrong :cry: while fetching this could mean you have poor internet connection or the provider is down" - ) - if not config.use_rofi: - input("Enter to continue...") - else: - if not Rofi.confirm("Sth went wrong!!Enter to continue..."): - exit(1) - return media_actions_menu(config, fastanime_runtime_state) - - provider_search_results = { - anime["title"]: anime for anime in provider_search_results["results"] - } - _title = None - if _title := next( - ( - original - for original, normalized in anime_normalizer.items() - if normalized.lower() == selected_anime_title.lower() - ), - None, - ): - _title = _title - - if config.auto_select: - provider_anime_title = max( - provider_search_results.keys(), - key=lambda title: anime_title_percentage_match( - title, selected_anime_anilist - ), - ) - print(f"[cyan]Auto selecting[/]: {provider_anime_title}") - else: - choices = [*provider_search_results.keys(), "Back"] - if config.use_fzf: - provider_anime_title = fzf.run( - choices, - prompt="Select Search Result", - header="Anime Search Results", - ) - - elif config.use_rofi: - provider_anime_title = Rofi.run(choices, "Select Search Result") - else: - provider_anime_title = fuzzy_inquirer( - choices, - "Select Search Result", - ) - if provider_anime_title == "Back": - media_actions_menu(config, fastanime_runtime_state) - return - - # update runtime data - fastanime_runtime_state.provider_anime_title = ( - anime_normalizer.get(provider_anime_title) or provider_anime_title - ) - fastanime_runtime_state.provider_anime_search_result = provider_search_results[ - provider_anime_title - ] - - fastanime_runtime_state.progress_tracking = config.watch_history.get( - str(fastanime_runtime_state.selected_anime_id_anilist), {} - ).get("progress_tracking", "prompt") - set_prefered_progress_tracking(config, fastanime_runtime_state) - fetch_anime_episode(config, fastanime_runtime_state) - - -def download_anime(config: "Config", fastanime_runtime_state: "FastAnimeRuntimeState"): - import time - - from rich.prompt import Confirm, Prompt - from thefuzz import fuzz - - from ...Utility.downloader.downloader import downloader - - download_dir = config.downloads_dir - force_unknown_ext = True - verbose = False - silent = True - merge = Confirm.ask("Merge audio and video", default=True) - clean = Confirm.ask("Clean up files", default=True) - prompt = Confirm.ask("Prompt incase for actions while downloading", default=True) - force_ffmpeg = Confirm.ask("Force ffmpeg", default=False) - hls_use_mpegts = Confirm.ask("Use mpegts", default=False) - hls_use_h264 = Confirm.ask("Use h264", default=False) - nocheckcertificate = True - - force_ffmpeg |= hls_use_mpegts or hls_use_h264 - anime_title = Prompt.ask( - "Anime title", default=fastanime_runtime_state.selected_anime_title_anilist - ) - translation_type = Prompt.ask("Translation type", default=config.translation_type) - anime_provider = config.anime_provider - anilist_anime_info = fastanime_runtime_state.selected_anime_anilist - print(f"[green bold]Now Downloading: [/] {anime_title}") - # ---- search for anime ---- - with Progress() as progress: - progress.add_task("Fetching Search Results...", total=None) - search_results = anime_provider.search_for_anime( - anime_title, translation_type=translation_type - ) - if not search_results: - print("Search results failed") - input("Enter to retry") - return - search_results = search_results["results"] - if not search_results: - print("Nothing muches your search term") - return - search_results_ = { - search_result["title"]: search_result for search_result in search_results - } - - if config.auto_select: - selected_anime_title = max( - search_results_.keys(), - key=lambda title: fuzz.ratio( - anime_normalizer.get(title, title), anime_title - ), - ) - print("[cyan]Auto selecting:[/] ", selected_anime_title) - else: - choices = list(search_results_.keys()) - if config.use_fzf: - selected_anime_title = fzf.run(choices, "Please Select title", "FastAnime") - else: - selected_anime_title = fuzzy_inquirer( - choices, - "Please Select title", - ) - - # ---- fetch anime ---- - with Progress() as progress: - progress.add_task("Fetching Anime...", total=None) - anime: Anime | None = anime_provider.get_anime( - search_results_[selected_anime_title]["id"] - ) - if not anime: - print("Sth went wring anime no found") - input("Enter to continue...") - return - - episodes = sorted( - anime["availableEpisodesDetail"][config.translation_type], key=float - ) - episode_ranges = Prompt.ask( - "Enter episode ranges (e.g 1:12 or 1:12:2 or 1: or :12 or 5)" - ).split(" ") - episodes_range = [] - if episode_ranges != [""]: - for episode_range in episode_ranges: - if ":" in episode_range: - ep_range_tuple = episode_range.split(":") - if len(ep_range_tuple) == 2 and all(ep_range_tuple): - episodes_start, episodes_end = ep_range_tuple - episodes_range += episodes[ - int(episodes_start) - 1 : int(episodes_end) - ] - elif len(ep_range_tuple) == 3 and all(ep_range_tuple): - episodes_start, episodes_end, step = ep_range_tuple - episodes_range += episodes[ - int(episodes_start) - 1 : int(episodes_end) : int(step) - ] - else: - episodes_start, episodes_end = ep_range_tuple - if episodes_start.strip(): - episodes_range += episodes[int(episodes_start) - 1 :] - elif episodes_end.strip(): - episodes_range += episodes[: int(episodes_end)] - else: - episodes_range += episodes - else: - episodes_range += [episode_range] if episode_range in episodes else [] - - else: - episodes_range = sorted(episodes, key=float) - - episodes_range = list( - dict.fromkeys(episodes_range) - ) # To preserve order while removing duplicates - print(f"[green bold]Downloading: [/] {episodes_range}") - - if config.normalize_titles: - from ...libs.common.mini_anilist import get_basic_anime_info_by_title - - anilist_anime_info = get_basic_anime_info_by_title(anime["title"]) - - # lets download em - for episode in episodes_range: - try: - episode = str(episode) - if episode not in episodes: - print(f"[cyan]Warning[/]: Episode {episode} not found, skipping") - continue - with Progress() as progress: - progress.add_task("Fetching Episode Streams...", total=None) - streams = anime_provider.get_episode_streams( - anime["id"], episode, config.translation_type - ) - if not streams: - print("No streams skipping") - continue - # ---- fetch servers ---- - if config.server == "top": - with Progress() as progress: - progress.add_task("Fetching top server...", total=None) - server_name = next(streams, None) - if not server_name: - print("Sth went wrong when fetching the server") - continue - stream_link = filter_by_quality(config.quality, server_name["links"]) - if not stream_link: - print("[yellow bold]WARNING:[/] No streams found") - time.sleep(1) - print("Continuing...") - continue - link = stream_link["link"] - provider_headers = server_name["headers"] - episode_title = server_name["episode_title"] - subtitles = server_name["subtitles"] - else: - with Progress() as progress: - progress.add_task("Fetching servers", total=None) - # prompt for server selection - servers = {server["server"]: server for server in streams} - servers_names = list(servers.keys()) - if config.server in servers_names: - server_name = config.server - else: - if config.use_fzf: - server_name = fzf.run(servers_names, "Select an link") - else: - server_name = fuzzy_inquirer( - servers_names, - "Select link", - ) - stream_link = filter_by_quality( - config.quality, servers[server_name]["links"] - ) - if not stream_link: - print("[yellow bold]WARNING:[/] No streams found") - time.sleep(1) - print("Continuing...") - continue - link = stream_link["link"] - provider_headers = servers[server_name]["headers"] - - subtitles = servers[server_name]["subtitles"] - episode_title = servers[server_name]["episode_title"] - - if anilist_anime_info: - selected_anime_title = ( - anilist_anime_info["title"][config.preferred_language] - or anilist_anime_info["title"]["romaji"] - or anilist_anime_info["title"]["english"] - ) - import re - - if anilist_anime_info.get("streamingEpisodes"): - for episode_detail in anilist_anime_info["streamingEpisodes"]: - if re.match(f"Episode {episode} ", episode_detail["title"]): - episode_title = episode_detail["title"] - break - print(f"[purple]Now Downloading:[/] {episode_title}") - subtitles = move_preferred_subtitle_lang_to_top(subtitles, config.sub_lang) - downloader._download_file( - link, - selected_anime_title, - episode_title, - download_dir, - silent, - vid_format=config.format, - force_unknown_ext=force_unknown_ext, - verbose=verbose, - headers=provider_headers, - sub=subtitles[0]["url"] if subtitles else "", - merge=merge, - clean=clean, - prompt=prompt, - force_ffmpeg=force_ffmpeg, - hls_use_mpegts=hls_use_mpegts, - hls_use_h264=hls_use_h264, - nocheckcertificate=nocheckcertificate, - ) - except Exception as e: - print(e) - time.sleep(1) - print("Continuing...") - - -# -# ---- ANILIST MEDIA ACTIONS MENU ---- -# -def media_actions_menu( - config: "Config", fastanime_runtime_state: "FastAnimeRuntimeState" -): - """The menu responsible for handling all media actions such as watching a trailer or streaming it - - Args: - config: [TODO:description] - fastanime_runtime_state: [TODO:description] - """ - selected_anime_anilist: "AnilistBaseMediaDataSchema" = ( - fastanime_runtime_state.selected_anime_anilist - ) - selected_anime_title_anilist: str = ( - fastanime_runtime_state.selected_anime_title_anilist - ) - - # the progress of the episode based on what anilist has not locally - progress = (selected_anime_anilist["mediaListEntry"] or {"progress": 0}).get( - "progress", 0 - ) - episodes_total = selected_anime_anilist["episodes"] or "Inf" - - def _watch_trailer( - config: "Config", fastanime_runtime_state: "FastAnimeRuntimeState" - ): - """Helper function to watch trailers with - - Args: - config: [TODO:description] - fastanime_runtime_state: [TODO:description] - """ - if trailer := selected_anime_anilist.get("trailer"): - trailer_url = "https://youtube.com/watch?v=" + trailer["id"] - print("[bold magenta]Watching Trailer of:[/]", selected_anime_title_anilist) - run_mpv( - trailer_url, - ytdl_format=config.format, - player=config.player, - ) - media_actions_menu(config, fastanime_runtime_state) - else: - if not config.use_rofi: - print("no trailer available :confused") - input("Enter to continue...") - else: - if not Rofi.confirm("No trailler found!!Enter to continue"): - exit(0) - media_actions_menu(config, fastanime_runtime_state) - - def _add_to_list( - config: "Config", fastanime_runtime_state: "FastAnimeRuntimeState" - ): - """Helper function to update an anime's media_list_type - - Args: - config: [TODO:description] - fastanime_runtime_state: [TODO:description] - """ - if not config.user: - print("You aint logged in") - input("Enter to continue") - media_actions_menu(config, fastanime_runtime_state) - return - - anime_lists = { - "Watching": "CURRENT", - "Paused": "PAUSED", - "Planning": "PLANNING", - "Dropped": "DROPPED", - "Rewatching": "REPEATING", - "Completed": "COMPLETED", - } - choices = list(anime_lists.keys()) - if config.use_fzf: - anime_list = fzf.run( - choices, - "Choose the list you want to add to", - "Add your animelist", - ) - elif config.use_rofi: - anime_list = Rofi.run(choices, "Choose list you want to add to") - else: - anime_list = fuzzy_inquirer( - choices, - "Choose the list you want to add to", - ) - result = AniList.update_anime_list( - {"status": anime_lists[anime_list], "mediaId": selected_anime_anilist["id"]} - ) - if not result[0]: - print("Failed to update", result) - else: - print( - f"Successfully added {selected_anime_title_anilist} to your {anime_list} list :smile:" - ) - if not config.use_rofi: - input("Enter to continue...") - media_actions_menu(config, fastanime_runtime_state) - - def _score_anime( - config: "Config", fastanime_runtime_state: "FastAnimeRuntimeState" - ): - """Helper function to score anime on anilist from terminal or rofi - - Args: - config: [TODO:description] - fastanime_runtime_state: [TODO:description] - """ - if not config.user: - print("You aint logged in") - input("Enter to continue") - media_actions_menu(config, fastanime_runtime_state) - return - if config.use_rofi: - score = Rofi.ask("Enter Score", is_int=True) - score = max(100, min(0, score)) - else: - score = inquirer.number( # pyright:ignore - message="Enter the score:", - min_allowed=0, - max_allowed=100, - validate=EmptyInputValidator(), - ).execute() - - result = AniList.update_anime_list( - {"scoreRaw": score, "mediaId": selected_anime_anilist["id"]} - ) - if not result[0]: - print("Failed to update", result) - else: - print(f"Successfully scored {selected_anime_title_anilist}; score: {score}") - if not config.use_rofi: - input("Enter to continue...") - media_actions_menu(config, fastanime_runtime_state) - - # FIX: For some reason this fails to delete - def _remove_from_list( - config: "Config", fastanime_runtime_state: "FastAnimeRuntimeState" - ): - """Remove an anime from your media list - - Args: - config: [TODO:description] - fastanime_runtime_state: [TODO:description] - """ - if Confirm.ask( - f"Are you sure you want to procede, the folowing action will permanently remove {selected_anime_title_anilist} from your list and your progress will be erased", - default=False, - ): - success, data = AniList.delete_medialist_entry(selected_anime_anilist["id"]) - if not success or not data: - print("Failed to delete", data) - elif not data.get("deleted"): - print("Failed to delete", data) - else: - print("Successfully deleted :cry:", selected_anime_title_anilist) - else: - print(selected_anime_title_anilist, ":relieved:") - if not config.use_rofi: - input("Enter to continue...") - media_actions_menu(config, fastanime_runtime_state) - - def _change_translation_type( - config: "Config", fastanime_runtime_state: "FastAnimeRuntimeState" - ): - """Change the translation type to use - - Args: - config: [TODO:description] - fastanime_runtime_state: [TODO:description] - """ - # prompt for new translation type - options = ["Sub", "Dub"] - if config.use_fzf: - translation_type = fzf.run( - options, prompt="Select Translation Type", header="Language Options" - ) - elif config.use_rofi: - translation_type = Rofi.run(options, "Select Translation Type") - else: - translation_type = fuzzy_inquirer( - options, - "Select translation type", - ) - - # update internal config - config.translation_type = translation_type.lower() - - media_actions_menu(config, fastanime_runtime_state) - - def _change_player( - config: "Config", fastanime_runtime_state: "FastAnimeRuntimeState" - ): - """Change the translation type to use - - Args: - config: [TODO:description] - fastanime_runtime_state: [TODO:description] - """ - # prompt for new translation type - options = ["syncplay", "mpv-mod", "default"] - if config.use_fzf: - player = fzf.run( - options, - prompt="Select Player", - ) - elif config.use_rofi: - player = Rofi.run(options, "Select Player") - else: - player = fuzzy_inquirer( - options, - "Select Player", - ) - - # update internal config - if player == "syncplay": - config.sync_play = True - config.use_python_mpv = False - else: - config.sync_play = False - if player == "mpv-mod": - config.use_python_mpv = True - else: - config.use_python_mpv = False - media_actions_menu(config, fastanime_runtime_state) - - def _view_info(config: "Config", fastanime_runtime_state: "FastAnimeRuntimeState"): - """helper function to view info of an anime from terminal - - Args: - config ([TODO:parameter]): [TODO:description] - fastanime_runtime_state ([TODO:parameter]): [TODO:description] - """ - from rich.console import Console - from rich.prompt import Confirm - from yt_dlp.utils import clean_html - - from ...Utility import anilist_data_helper - from ..utils.print_img import print_img - - clear() - console = Console() - - print_img(selected_anime_anilist["coverImage"]["large"]) - console.print( - "[bold cyan]Title(jp): ", selected_anime_anilist["title"]["romaji"] - ) - console.print( - "[bold cyan]Title(eng): ", selected_anime_anilist["title"]["english"] - ) - console.print("[bold cyan]Popularity: ", selected_anime_anilist["popularity"]) - console.print("[bold cyan]Favourites: ", selected_anime_anilist["favourites"]) - console.print("[bold cyan]Status: ", selected_anime_anilist["status"]) - console.print( - "[bold cyan]Start Date: ", - anilist_data_helper.format_anilist_date_object( - selected_anime_anilist["startDate"] - ), - ) - console.print( - "[bold cyan]End Date: ", - anilist_data_helper.format_anilist_date_object( - selected_anime_anilist["endDate"] - ), - ) - # console.print("[bold cyan]Season: ", selected_anime["season"]) - console.print("[bold cyan]Episodes: ", selected_anime_anilist["episodes"]) - console.print( - "[bold cyan]Tags: ", - anilist_data_helper.format_list_data_with_comma( - [tag["name"] for tag in selected_anime_anilist["tags"]] - ), - ) - console.print( - "[bold cyan]Genres: ", - anilist_data_helper.format_list_data_with_comma( - selected_anime_anilist["genres"] - ), - ) - if selected_anime_anilist["nextAiringEpisode"]: - console.print( - "[bold cyan]Next Episode: ", - anilist_data_helper.extract_next_airing_episode( - selected_anime_anilist["nextAiringEpisode"] - ), - ) - console.print( - "[bold underline cyan]Description\n[/]", - clean_html(str(selected_anime_anilist["description"])), - ) - if Confirm.ask("Enter to continue...", default=True): - media_actions_menu(config, fastanime_runtime_state) - return - - def _toggle_auto_select( - config: "Config", fastanime_runtime_state: "FastAnimeRuntimeState" - ): - """helper function to toggle auto select anime title using fuzzy matching - - Args: - config: [TODO:description] - fastanime_runtime_state: [TODO:description] - """ - config.auto_select = not config.auto_select - media_actions_menu(config, fastanime_runtime_state) - - def _toggle_continue_from_history( - config: "Config", fastanime_runtime_state: "FastAnimeRuntimeState" - ): - """helper function to toggle continue from history - - Args: - config: [TODO:description] - fastanime_runtime_state: [TODO:description] - """ - config.continue_from_history = not config.continue_from_history - media_actions_menu(config, fastanime_runtime_state) - - def _toggle_auto_next( - config: "Config", fastanime_runtime_state: "FastAnimeRuntimeState" - ): - """helper function to toggle auto next - - Args: - config: [TODO:description] - fastanime_runtime_state: [TODO:description] - """ - config.auto_next = not config.auto_next - media_actions_menu(config, fastanime_runtime_state) - - def _change_provider( - config: "Config", fastanime_runtime_state: "FastAnimeRuntimeState" - ): - """Helper function to change provider to use - - Args: - config: [TODO:description] - fastanime_runtime_state: [TODO:description] - """ - from ...libs.anime_provider import anime_sources - - options = list(anime_sources.keys()) - if config.use_fzf: - provider = fzf.run( - options, prompt="Select Translation Type", header="Language Options" - ) - elif config.use_rofi: - provider = Rofi.run(options, "Select Translation Type") - else: - provider = fuzzy_inquirer( - options, - "Select translation type", - ) - - config.provider = provider - config.anime_provider.provider = provider - config.anime_provider.lazyload_provider(provider) - - media_actions_menu(config, fastanime_runtime_state) - - def _stream_anime( - config: "Config", fastanime_runtime_state: "FastAnimeRuntimeState" - ): - """helper function to go to the next menu respecting your config - - Args: - config: [TODO:description] - fastanime_runtime_state: [TODO:description] - """ - anime_provider_search_results_menu(config, fastanime_runtime_state) - - def _select_episode_to_stream( - config: "Config", fastanime_runtime_state: "FastAnimeRuntimeState" - ): - """Convinience function to disable continue from history and show the episodes menu - - Args: - config: [TODO:description] - fastanime_runtime_state: [TODO:description] - """ - config.continue_from_history = False - anime_provider_search_results_menu(config, fastanime_runtime_state) - - def _set_progress_tracking( - config: "Config", fastanime_runtime_state: "FastAnimeRuntimeState" - ): - set_prefered_progress_tracking(config, fastanime_runtime_state, update=True) - media_actions_menu(config, fastanime_runtime_state) - - def _relations(config: "Config", fastanime_runtime_state: "FastAnimeRuntimeState"): - """Helper function to get anime recommendations - Args: - config: [TODO:description] - fastanime_runtime_state: [TODO:description] - """ - relations = AniList.get_related_anime_for( - fastanime_runtime_state.selected_anime_id_anilist - ) - if not relations[0]: - print("No recommendations found", relations[1]) - input("Enter to continue...") - media_actions_menu(config, fastanime_runtime_state) - return - - relations = relations[1]["data"]["Media"]["relations"] # pyright:ignore - relations["nodes"] = [ - node for node in relations["nodes"] if node.get("type") == "ANIME" - ] - fastanime_runtime_state.anilist_results_data = { - "data": {"Page": {"media": relations["nodes"]}} # pyright:ignore - } - anilist_results_menu(config, fastanime_runtime_state) - - def _recommendations( - config: "Config", fastanime_runtime_state: "FastAnimeRuntimeState" - ): - """Helper function to get anime recommendations - Args: - config: [TODO:description] - fastanime_runtime_state: [TODO:description] - """ - recommendations = AniList.get_recommended_anime_for( - fastanime_runtime_state.selected_anime_id_anilist - ) - if not recommendations[0]: - print("No recommendations found", recommendations[1]) - input("Enter to continue...") - media_actions_menu(config, fastanime_runtime_state) - return - - fastanime_runtime_state.anilist_results_data = { - "data": { - "Page": { - "media": [ - media["media"] - for media in recommendations[1]["data"]["Page"][ - "recommendations" # pyright:ignore - ] - ] - } - } - } - anilist_results_menu(config, fastanime_runtime_state) - - def _download_anime( - config: "Config", fastanime_runtime_state: "FastAnimeRuntimeState" - ): - download_anime(config, fastanime_runtime_state) - media_actions_menu(config, fastanime_runtime_state) - - icons = config.icons - options = { - f"{'๐Ÿ“ฝ๏ธ ' if icons else ''}Stream ({progress}/{episodes_total})": _stream_anime, - f"{'๐Ÿ“ฝ๏ธ ' if icons else ''}Episodes": _select_episode_to_stream, - f"{'๐Ÿ“ผ ' if icons else ''}Watch Trailer": _watch_trailer, - f"{'โœจ ' if icons else ''}Score Anime": _score_anime, - f"{'โœจ ' if icons else ''}Progress Tracking": _set_progress_tracking, - f"{'๐Ÿ“ฅ ' if icons else ''}Add to List": _add_to_list, - f"{'๐Ÿ“ค ' if icons else ''}Remove from List": _remove_from_list, - f"{'๐Ÿ“– ' if icons else ''}Recommendations": _recommendations, - f"{'๐Ÿ“– ' if icons else ''}Relations": _relations, - f"{'๐Ÿ“– ' if icons else ''}View Info": _view_info, - f"{'๐ŸŽง ' if icons else ''}Change Translation Type": _change_translation_type, - f"{'๐Ÿ’ฝ ' if icons else ''}Change Provider": _change_provider, - f"{'๐Ÿ’ฝ ' if icons else ''}Change Player": _change_player, - f"{'๐Ÿ“ฅ ' if icons else ''}Download Anime": _download_anime, - f"{'๐Ÿ”˜ ' if icons else ''}Toggle auto select anime": _toggle_auto_select, # WARN: problematic if you choose an anime that doesnt match id - f"{'๐Ÿ’  ' if icons else ''}Toggle auto next episode": _toggle_auto_next, - f"{'๐Ÿ”˜ ' if icons else ''}Toggle continue from history": _toggle_continue_from_history, - f"{'๐Ÿ”™ ' if icons else ''}Back": anilist_results_menu, - f"{'โŒ ' if icons else ''}Exit": lambda *_: exit_app(), - } - choices = list(options.keys()) - if config.use_fzf: - action = fzf.run(choices, prompt="Select Action", header="Anime Menu") - elif config.use_rofi: - action = Rofi.run(choices, "Select Action") - else: - action = fuzzy_inquirer( - choices, - "Select Action", - ) - options[action](config, fastanime_runtime_state) - - -# -# ---- ANILIST RESULTS MENU ---- -# -def anilist_results_menu( - config: "Config", fastanime_runtime_state: "FastAnimeRuntimeState" -): - """The menu that handles and displays the results of an anilist action enabling using to select anime of choice - - Args: - config: [TODO:description] - fastanime_runtime_state: [TODO:description] - """ - search_results = fastanime_runtime_state.anilist_results_data["data"]["Page"][ - "media" - ] - - anime_data = {} - for anime in search_results: - anime: "AnilistBaseMediaDataSchema" - - # determine the progress of watching the anime based on whats in anilist data !! NOT LOCALLY - progress = (anime["mediaListEntry"] or {"progress": 0}).get("progress", 0) - - # if the max episodes is none set it to inf meaning currently not determinable or infinity - episodes_total = anime["episodes"] or "Inf" - - # set the actual title and ensure its a string since even after this it may be none - title = str( - anime["title"][config.preferred_language] or anime["title"]["romaji"] - ) - # this process is mostly need inoder for the preview to work correctly - title = sanitize_filename(f"{title} ({progress} of {episodes_total})") - - # Check if the anime is currently airing and has new/unwatched episodes - if ( - anime["status"] == "RELEASING" - and anime["nextAiringEpisode"] - and progress > 0 - and (anime["mediaListEntry"] or {}).get("status", "") == "CURRENT" - ): - last_aired_episode = anime["nextAiringEpisode"]["episode"] - 1 - if last_aired_episode - progress > 0: - title += f" ๐Ÿ”น{last_aired_episode - progress} new episode(s)๐Ÿ”น" - - # add the anime to the anime data dict setting the key to the title - # this dict is used for promting the title and maps directly to the anime object of interest containing the actual data - anime_data[title] = anime - - # prompt for the anime of choice - choices = [*anime_data.keys(), "Next Page", "Previous Page", "Back"] - if config.use_fzf: - if config.preview: - from .utils import get_fzf_anime_preview - - preview = get_fzf_anime_preview(search_results, anime_data.keys()) - if not preview: - print( - "Failed to find bash executable which is necessary for preview with fzf.\nIf you are on Windows, please make sure Git is installed and available in PATH." - ) - - selected_anime_title = fzf.run( - choices, - prompt="Select Anime", - header="Search Results", - preview=preview, - ) - else: - selected_anime_title = fzf.run( - choices, - prompt="Select Anime", - header="Search Results", - ) - elif config.use_rofi: - if config.preview: - from .utils import IMAGES_CACHE_DIR, get_rofi_icons - - get_rofi_icons(search_results, anime_data.keys()) - choices = [] - for title in anime_data.keys(): - icon_path = os.path.join( - IMAGES_CACHE_DIR, sha256(title.encode("utf-8")).hexdigest() - ) - choices.append(f"{title}\0icon\x1f{icon_path}.png") - choices.append("Back") - selected_anime_title = Rofi.run_with_icons(choices, "Select Anime") - else: - selected_anime_title = Rofi.run(choices, "Select Anime") - else: - selected_anime_title = fuzzy_inquirer( - choices, - "Select Anime", - ) - if selected_anime_title == "Back": - fastanime_main_menu(config, fastanime_runtime_state) - return - if selected_anime_title == "Next Page": - fastanime_runtime_state.current_page = page = ( - fastanime_runtime_state.current_page + 1 - ) - success, data = fastanime_runtime_state.current_data_loader( - config=config, page=page - ) - if success: - fastanime_runtime_state.anilist_results_data = data - - anilist_results_menu(config, fastanime_runtime_state) - else: - print("Failed to get next page") - print(data) - input("Enter to continue...") - anilist_results_menu(config, fastanime_runtime_state) - - return - if selected_anime_title == "Previous Page": - fastanime_runtime_state.current_page = page = ( - (fastanime_runtime_state.current_page - 1) - if fastanime_runtime_state.current_page > 1 - else 1 - ) - success, data = fastanime_runtime_state.current_data_loader( - config=config, page=page - ) - if success: - fastanime_runtime_state.anilist_results_data = data - - anilist_results_menu(config, fastanime_runtime_state) - else: - print("Failed to get previous page") - print(data) - input("Enter to continue...") - anilist_results_menu(config, fastanime_runtime_state) - return - - selected_anime: "AnilistBaseMediaDataSchema" = anime_data[selected_anime_title] - fastanime_runtime_state.selected_anime_anilist = selected_anime - fastanime_runtime_state.selected_anime_title_anilist = ( - selected_anime["title"]["romaji"] or selected_anime["title"]["english"] - ) - fastanime_runtime_state.selected_anime_id_anilist = selected_anime["id"] - - media_actions_menu(config, fastanime_runtime_state) - - -# -# ---- FASTANIME MAIN MENU ---- -# -def _handle_animelist( - config: "Config", - fastanime_runtime_state: "FastAnimeRuntimeState", - list_type: str, - page=1, -): - """A helper function that handles user media lists - - Args: - fastanime_runtime_state ([TODO:parameter]): [TODO:description] - config: [TODO:description] - list_type: [TODO:description] - - Returns: - [TODO:return] - """ - if not config.user: - if not config.use_rofi: - print("You haven't logged in please run: fastanime anilist login") - input("Enter to continue...") - else: - if not Rofi.confirm("You haven't logged in!!Enter to continue"): - exit(1) - fastanime_main_menu(config, fastanime_runtime_state) - return - # determine the watch list to get - match list_type: - case "Watching": - status = "CURRENT" - case "Planned": - status = "PLANNING" - case "Completed": - status = "COMPLETED" - case "Dropped": - status = "DROPPED" - case "Paused": - status = "PAUSED" - case "Rewatching": - status = "REPEATING" - case _: - return - - # get the media list - anime_list = AniList.get_anime_list(status, page=page) - # handle null - if not anime_list: - print("Sth went wrong", anime_list) - if not config.use_rofi: - input("Enter to continue") - else: - if not Rofi.confirm("Sth went wrong!!Enter to continue..."): - exit(1) - fastanime_main_menu(config, fastanime_runtime_state) - return - # handle failure - if not anime_list[0] or not anime_list[1]: - print("Sth went wrong", anime_list) - if not config.use_rofi: - input("Enter to continue") - else: - if not Rofi.confirm("Sth went wrong!!Enter to continue..."): - exit(1) - # recall anilist menu - fastanime_main_menu(config, fastanime_runtime_state) - return - # injecting the data is the simplest way since the ui expects a field called media that should have media type - media = [ - mediaListItem["media"] - for mediaListItem in anime_list[1]["data"]["Page"]["mediaList"] - ] - anime_list[1]["data"]["Page"]["media"] = media # pyright:ignore - return anime_list - - -def _anilist_search(config: "Config", page=1): - """A function that enables seaching of an anime - - Returns: - [TODO:return] - """ - # TODO: Add filters and other search features - if config.use_rofi: - search_term = str(Rofi.ask("Search for")) - elif config.use_fzf and config.use_experimental_fzf_anilist_search: - search_term = fzf.search_for_anime() - else: - search_term = Prompt.ask("[cyan]Search for[/]") - - # Return to main menu if search term is empty - if not search_term.strip(): - return False, "Search canceled - return to main menu" - - return AniList.search(query=search_term, page=page) - - -def _anilist_random(config: "Config", page=1): - """A function that generates random anilist ids enabling random discovery of anime - - Returns: - [TODO:return] - """ - random_anime = range(1, 15000) - random_anime = random.sample(random_anime, k=50) - - return AniList.search(id_in=list(random_anime)) - - -def _watch_history(config: "Config", page=1): - """Function that lets you see all the anime that has locally been saved to your watch history - - Returns: - [TODO:return] - """ - watch_history = list(map(int, config.watch_history.keys())) - return AniList.search(id_in=watch_history, sort="TRENDING_DESC", page=page) - - -def _recent(config: "Config", page=1): - return ( - True, - {"data": {"Page": {"media": config.user_data["recent_anime"]}}}, - ) - - -# WARNING: Will probably be depracated -def _anime_list(config: "Config", page=1): - anime_list = config.anime_list - return AniList.search(id_in=anime_list, pages=page) - - -def fastanime_main_menu( - config: "Config", fastanime_runtime_state: "FastAnimeRuntimeState" -): - """The main entry point to the anilist command - - Args: - config: An object containing cconfiguration data - fastanime_runtime_state: A query dict used to store data during navigation of the ui # initially this was very messy - """ - - def _edit_config(*args, **kwargs): - """Helper function to edit your config when the ui is still running""" - - from click import edit - - edit(filename=USER_CONFIG_PATH) - if config.use_rofi: - config.load_config() - config.use_rofi = True - config.use_fzf = False - else: - config.load_config() - - config.set_fastanime_config_environs() - - config.anime_provider.provider = config.provider - config.anime_provider.lazyload_provider(config.provider) - - fastanime_main_menu(config, fastanime_runtime_state) - - icons = config.icons - # each option maps to anilist data that is described by the option name - options = { - f"{'๐Ÿ”ฅ ' if icons else ''}Trending": AniList.get_trending, - f"{'๐ŸŽž๏ธ ' if icons else ''}Recent": _recent, - f"{'๐Ÿ“บ ' if icons else ''}Watching": lambda config, media_list_type="Watching", page=1: _handle_animelist( - config, fastanime_runtime_state, media_list_type, page=page - ), - f"{'โธ ' if icons else ''}Paused": lambda config, media_list_type="Paused", page=1: _handle_animelist( - config, fastanime_runtime_state, media_list_type, page=page - ), - f"{'๐Ÿšฎ ' if icons else ''}Dropped": lambda config, media_list_type="Dropped", page=1: _handle_animelist( - config, fastanime_runtime_state, media_list_type, page=page - ), - f"{'๐Ÿ“‘ ' if icons else ''}Planned": lambda config, media_list_type="Planned", page=1: _handle_animelist( - config, fastanime_runtime_state, media_list_type, page=page - ), - f"{'โœ… ' if icons else ''}Completed": lambda config, media_list_type="Completed", page=1: _handle_animelist( - config, fastanime_runtime_state, media_list_type, page=page - ), - f"{'๐Ÿ” ' if icons else ''}Rewatching": lambda config, media_list_type="Rewatching", page=1: _handle_animelist( - config, fastanime_runtime_state, media_list_type, page=page - ), - f"{'๐Ÿ”” ' if icons else ''}Recently Updated Anime": AniList.get_most_recently_updated, - f"{'๐Ÿ”Ž ' if icons else ''}Search": _anilist_search, - f"{'๐ŸŽž๏ธ ' if icons else ''}Watch History": _watch_history, - # "AnimeList": _anime_list๐Ÿ’ฏ, - f"{'๐ŸŽฒ ' if icons else ''}Random Anime": _anilist_random, - f"{'๐ŸŒŸ ' if icons else ''}Most Popular Anime": AniList.get_most_popular, - f"{'๐Ÿ’– ' if icons else ''}Most Favourite Anime": AniList.get_most_favourite, - f"{'โœจ ' if icons else ''}Most Scored Anime": AniList.get_most_scored, - f"{'๐ŸŽฌ ' if icons else ''}Upcoming Anime": AniList.get_upcoming_anime, - f"{'๐Ÿ“ ' if icons else ''}Edit Config": _edit_config, - f"{'โŒ ' if icons else ''}Exit": exit_app, - } - - # Load main menu order if set in config file - if config.menu_order: - menu_order_list = config.menu_order.split(",") - lookup = {key.split(" ", 1)[-1]: key for key in options} - ordered_dict = { - lookup[key]: options[lookup[key]] - for key in menu_order_list - if key in lookup - } - options = ordered_dict - - # prompt user to select an action - choices = list(options.keys()) - if config.use_fzf: - action = fzf.run( - choices, - prompt="Select Action", - header="Anilist Menu", - ) - elif config.use_rofi: - action = Rofi.run(choices, "Select Action") - else: - action = fuzzy_inquirer( - choices, - "Select Action", - ) - fastanime_runtime_state.current_data_loader = options[action] - fastanime_runtime_state.current_page = 1 - anilist_data = options[action](config=config) - # anilist data is a (bool,data) - # the bool indicated success - if anilist_data[0]: - fastanime_runtime_state.anilist_results_data = anilist_data[1] - anilist_results_menu(config, fastanime_runtime_state) - - else: - print(anilist_data[1]) - if not config.use_rofi: - input("Enter to continue...") - else: - if not Rofi.confirm("Sth went wrong!!Enter to continue..."): - exit(1) - # recall the anilist function for the user to reattempt their choice - fastanime_main_menu(config, fastanime_runtime_state) diff --git a/fastanime/fastanime.py b/fastanime/fastanime.py index 28f1419..75fce47 100755 --- a/fastanime/fastanime.py +++ b/fastanime/fastanime.py @@ -8,7 +8,7 @@ if getattr(sys, "frozen", False): sys.path.insert(0, application_path) # Import and run the main application -from fastanime import FastAnime +from fastanime import Cli if __name__ == "__main__": - FastAnime() + Cli() diff --git a/fastanime/libs/api/anilist/constants.py b/fastanime/libs/api/anilist/constants.py deleted file mode 100644 index e69de29..0000000