refactor: a whole lot of it

This commit is contained in:
Benex254
2024-08-11 14:58:49 +03:00
parent 912535d166
commit 5dbc3a16f7
23 changed files with 474 additions and 295 deletions

View File

@@ -1,6 +1,6 @@
import click
from ...utils.tools import QueryDict
from ...utils.tools import FastAnimeRuntimeState
from .__lazyloader__ import LazyGroup
commands = {
@@ -36,7 +36,9 @@ def anilist(ctx: click.Context):
from ....anilist import AniList
from ....AnimeProvider import AnimeProvider
from ...interfaces.anilist_interfaces import anilist as anilist_interface
from ...interfaces.anilist_interfaces import (
fastanime_main_menu as anilist_interface,
)
if TYPE_CHECKING:
from ...config import Config
@@ -45,5 +47,5 @@ def anilist(ctx: click.Context):
if user := ctx.obj.user:
AniList.update_login_info(user, user["token"])
if ctx.invoked_subcommand is None:
anilist_config = QueryDict()
anilist_interface(ctx.obj, anilist_config)
fastanime_runtime_state = FastAnimeRuntimeState()
anilist_interface(ctx.obj, fastanime_runtime_state)

View File

@@ -11,7 +11,7 @@ if TYPE_CHECKING:
def completed(config: "Config"):
from ....anilist import AniList
from ...interfaces import anilist_interfaces
from ...utils.tools import QueryDict, exit_app
from ...utils.tools import FastAnimeRuntimeState, exit_app
if not config.user:
print("Not authenticated")
@@ -27,6 +27,6 @@ def completed(config: "Config"):
for mediaListItem in anime_list[1]["data"]["Page"]["mediaList"]
] # pyright:ignore
anime_list[1]["data"]["Page"]["media"] = media # pyright:ignore
anilist_config = QueryDict()
anilist_config.data = anime_list[1]
anilist_interfaces.select_anime(config, anilist_config)
fastanime_runtime_state = FastAnimeRuntimeState()
fastanime_runtime_state.anilist_data = anime_list[1]
anilist_interfaces.anilist_results_menu(config, fastanime_runtime_state)

View File

@@ -11,7 +11,7 @@ if TYPE_CHECKING:
def dropped(config: "Config"):
from ....anilist import AniList
from ...interfaces import anilist_interfaces
from ...utils.tools import QueryDict, exit_app
from ...utils.tools import FastAnimeRuntimeState, exit_app
if not config.user:
print("Not authenticated")
@@ -27,6 +27,6 @@ def dropped(config: "Config"):
for mediaListItem in anime_list[1]["data"]["Page"]["mediaList"]
] # pyright:ignore
anime_list[1]["data"]["Page"]["media"] = media # pyright:ignore
anilist_config = QueryDict()
anilist_config.data = anime_list[1]
anilist_interfaces.select_anime(config, anilist_config)
fastanime_runtime_state = FastAnimeRuntimeState()
fastanime_runtime_state.anilist_data = anime_list[1]
anilist_interfaces.anilist_results_menu(config, fastanime_runtime_state)

View File

@@ -8,11 +8,11 @@ import click
@click.pass_obj
def favourites(config):
from ....anilist import AniList
from ...interfaces.anilist_interfaces import select_anime
from ...utils.tools import QueryDict
from ...interfaces.anilist_interfaces import anilist_results_menu
from ...utils.tools import FastAnimeRuntimeState
anime_data = AniList.get_most_favourite()
if anime_data[0]:
anilist_config = QueryDict()
anilist_config.data = anime_data[1]
select_anime(config, anilist_config)
fastanime_runtime_state = FastAnimeRuntimeState()
fastanime_runtime_state.anilist_data = anime_data[1]
anilist_results_menu(config, fastanime_runtime_state)

View File

@@ -11,7 +11,7 @@ if TYPE_CHECKING:
def paused(config: "Config"):
from ....anilist import AniList
from ...interfaces import anilist_interfaces
from ...utils.tools import QueryDict, exit_app
from ...utils.tools import FastAnimeRuntimeState, exit_app
if not config.user:
print("Not authenticated")
@@ -27,6 +27,6 @@ def paused(config: "Config"):
for mediaListItem in anime_list[1]["data"]["Page"]["mediaList"]
] # pyright:ignore
anime_list[1]["data"]["Page"]["media"] = media # pyright:ignore
anilist_config = QueryDict()
anilist_config = FastAnimeRuntimeState()
anilist_config.data = anime_list[1]
anilist_interfaces.select_anime(config, anilist_config)
anilist_interfaces.anilist_results_menu(config, anilist_config)

View File

@@ -11,7 +11,7 @@ if TYPE_CHECKING:
def planning(config: "Config"):
from ....anilist import AniList
from ...interfaces import anilist_interfaces
from ...utils.tools import QueryDict, exit_app
from ...utils.tools import FastAnimeRuntimeState, exit_app
if not config.user:
print("Not authenticated")
@@ -27,6 +27,6 @@ def planning(config: "Config"):
for mediaListItem in anime_list[1]["data"]["Page"]["mediaList"]
] # pyright:ignore
anime_list[1]["data"]["Page"]["media"] = media # pyright:ignore
anilist_config = QueryDict()
anilist_config.data = anime_list[1]
anilist_interfaces.select_anime(config, anilist_config)
fastanime_runtime_state = FastAnimeRuntimeState()
fastanime_runtime_state.anilist_data = anime_list[1]
anilist_interfaces.anilist_results_menu(config, fastanime_runtime_state)

View File

@@ -7,11 +7,11 @@ import click
@click.pass_obj
def popular(config):
from ....anilist import AniList
from ...interfaces.anilist_interfaces import select_anime
from ...utils.tools import QueryDict
from ...interfaces.anilist_interfaces import anilist_results_menu
from ...utils.tools import FastAnimeRuntimeState
anime_data = AniList.get_most_popular()
if anime_data[0]:
anilist_config = QueryDict()
anilist_config.data = anime_data[1]
select_anime(config, anilist_config)
fastanime_runtime_state = FastAnimeRuntimeState()
fastanime_runtime_state.anilist_data = anime_data[1]
anilist_results_menu(config, fastanime_runtime_state)

View File

@@ -10,8 +10,8 @@ def random_anime(config):
import random
from ....anilist import AniList
from ...interfaces.anilist_interfaces import select_anime
from ...utils.tools import QueryDict
from ...interfaces.anilist_interfaces import anilist_results_menu
from ...utils.tools import FastAnimeRuntimeState
random_anime = range(1, 15000)
@@ -20,8 +20,8 @@ def random_anime(config):
anime_data = AniList.search(id_in=list(random_anime))
if anime_data[0]:
anilist_config = QueryDict()
anilist_config.data = anime_data[1]
select_anime(config, anilist_config)
fastanime_runtime_state = FastAnimeRuntimeState()
fastanime_runtime_state.anilist_data = anime_data[1]
anilist_results_menu(config, fastanime_runtime_state)
else:
print(anime_data[1])

View File

@@ -8,11 +8,11 @@ import click
@click.pass_obj
def recent(config):
from ....anilist import AniList
from ...interfaces.anilist_interfaces import select_anime
from ...utils.tools import QueryDict
from ...interfaces.anilist_interfaces import anilist_results_menu
from ...utils.tools import FastAnimeRuntimeState
anime_data = AniList.get_most_recently_updated()
if anime_data[0]:
anilist_config = QueryDict()
anilist_config.data = anime_data[1]
select_anime(config, anilist_config)
fastanime_runtime_state = FastAnimeRuntimeState()
fastanime_runtime_state.anilist_data = anime_data[1]
anilist_results_menu(config, fastanime_runtime_state)

View File

@@ -11,7 +11,7 @@ if TYPE_CHECKING:
def rewatching(config: "Config"):
from ....anilist import AniList
from ...interfaces import anilist_interfaces
from ...utils.tools import QueryDict, exit_app
from ...utils.tools import FastAnimeRuntimeState, exit_app
if not config.user:
print("Not authenticated")
@@ -27,6 +27,6 @@ def rewatching(config: "Config"):
for mediaListItem in anime_list[1]["data"]["Page"]["mediaList"]
] # pyright:ignore
anime_list[1]["data"]["Page"]["media"] = media # pyright:ignore
anilist_config = QueryDict()
anilist_config.data = anime_list[1]
anilist_interfaces.select_anime(config, anilist_config)
fastanime_runtime_state = FastAnimeRuntimeState()
fastanime_runtime_state.anilist_data = anime_list[1]
anilist_interfaces.anilist_results_menu(config, fastanime_runtime_state)

View File

@@ -7,11 +7,11 @@ import click
@click.pass_obj
def scores(config):
from ....anilist import AniList
from ...interfaces.anilist_interfaces import select_anime
from ...utils.tools import QueryDict
from ...interfaces.anilist_interfaces import anilist_results_menu
from ...utils.tools import FastAnimeRuntimeState
anime_data = AniList.get_most_scored()
if anime_data[0]:
anilist_config = QueryDict()
anilist_config.data = anime_data[1]
select_anime(config, anilist_config)
fastanime_runtime_state = FastAnimeRuntimeState()
fastanime_runtime_state.data = anime_data[1]
anilist_results_menu(config, fastanime_runtime_state)

View File

@@ -11,11 +11,11 @@ import click
@click.pass_obj
def search(config, title):
from ....anilist import AniList
from ...interfaces.anilist_interfaces import select_anime
from ...utils.tools import QueryDict
from ...interfaces.anilist_interfaces import anilist_results_menu
from ...utils.tools import FastAnimeRuntimeState
success, search_results = AniList.search(title)
if success:
anilist_config = QueryDict()
anilist_config.data = search_results
select_anime(config, anilist_config)
fastanime_runtime_state = FastAnimeRuntimeState()
fastanime_runtime_state.anilist_data = search_results
anilist_results_menu(config, fastanime_runtime_state)

View File

@@ -8,11 +8,11 @@ import click
@click.pass_obj
def trending(config):
from ....anilist import AniList
from ...interfaces.anilist_interfaces import select_anime
from ...utils.tools import QueryDict
from ...interfaces.anilist_interfaces import anilist_results_menu
from ...utils.tools import FastAnimeRuntimeState
success, data = AniList.get_trending()
if success:
anilist_config = QueryDict()
anilist_config.data = data
select_anime(config, anilist_config)
fastanime_runtime_state = FastAnimeRuntimeState()
fastanime_runtime_state.anilist_data = data
anilist_results_menu(config, fastanime_runtime_state)

View File

@@ -7,11 +7,11 @@ import click
@click.pass_obj
def upcoming(config):
from ....anilist import AniList
from ...interfaces.anilist_interfaces import select_anime
from ...utils.tools import QueryDict
from ...interfaces.anilist_interfaces import anilist_results_menu
from ...utils.tools import FastAnimeRuntimeState
success, data = AniList.get_upcoming_anime()
if success:
anilist_config = QueryDict()
anilist_config.data = data
select_anime(config, anilist_config)
fastanime_runtime_state = FastAnimeRuntimeState()
fastanime_runtime_state.anilist_data = data
anilist_results_menu(config, fastanime_runtime_state)

View File

@@ -11,7 +11,7 @@ if TYPE_CHECKING:
def watching(config: "Config"):
from ....anilist import AniList
from ...interfaces import anilist_interfaces
from ...utils.tools import QueryDict, exit_app
from ...utils.tools import FastAnimeRuntimeState, exit_app
if not config.user:
print("Not authenticated")
@@ -27,6 +27,6 @@ def watching(config: "Config"):
for mediaListItem in anime_list[1]["data"]["Page"]["mediaList"]
] # pyright:ignore
anime_list[1]["data"]["Page"]["media"] = media # pyright:ignore
anilist_config = QueryDict()
anilist_config.data = anime_list[1]
anilist_interfaces.select_anime(config, anilist_config)
fastanime_runtime_state = FastAnimeRuntimeState()
fastanime_runtime_state.anilist_data = anime_list[1]
anilist_interfaces.anilist_results_menu(config, fastanime_runtime_state)

View File

@@ -25,14 +25,14 @@ def cache(clean, path, size):
elif size:
import os
from ..utils.utils import sizeof_fmt
from ..utils.utils import format_bytes_to_human
total_size = 0
for dirpath, dirnames, filenames in os.walk(APP_CACHE_DIR):
for f in filenames:
fp = os.path.join(dirpath, f)
total_size += os.path.getsize(fp)
print("Total Size: ", sizeof_fmt(total_size))
print("Total Size: ", format_bytes_to_human(total_size))
else:
import click

View File

@@ -70,7 +70,10 @@ def download(config: "Config", anime_title, episode_range, highest_priority):
if config.use_fzf:
search_result = fzf.run(choices, "Please Select title: ", "FastAnime")
else:
search_result = fuzzy_inquirer("Please Select title", choices)
search_result = fuzzy_inquirer(
choices,
"Please Select title",
)
# ---- fetch anime ----
with Progress() as progress:
@@ -125,7 +128,10 @@ def download(config: "Config", anime_title, episode_range, highest_priority):
if config.use_fzf:
server = fzf.run(servers_names, "Select an link: ")
else:
server = fuzzy_inquirer("Select link", servers_names)
server = fuzzy_inquirer(
servers_names,
"Select link",
)
stream_link = filter_by_quality(
config.quality, servers[server]["links"]
)

View File

@@ -63,8 +63,8 @@ def search(config: Config, anime_title: str, episode_range: str):
search_result = Rofi.run(choices, "Please Select Title")
else:
search_result = fuzzy_inquirer(
"Please Select Title",
choices,
"Please Select Title",
)
# ---- fetch selected anime ----
@@ -110,7 +110,10 @@ def search(config: Config, anime_title: str, episode_range: str):
elif config.use_rofi:
episode = Rofi.run(episodes, "Select an episode")
else:
episode = fuzzy_inquirer("Select episode", episodes)
episode = fuzzy_inquirer(
episodes,
"Select episode",
)
# ---- fetch streams ----
with Progress() as progress:
@@ -147,7 +150,10 @@ def search(config: Config, anime_title: str, episode_range: str):
elif config.use_rofi:
server = Rofi.run(servers_names, "Select an link")
else:
server = fuzzy_inquirer("Select link", servers_names)
server = fuzzy_inquirer(
servers_names,
"Select link",
)
stream_link = filter_by_quality(
config.quality, servers[server]["links"]
)

View File

@@ -20,7 +20,7 @@ from ...libs.rofi import Rofi
from ...Utility.data import anime_normalizer
from ...Utility.utils import anime_title_percentage_match
from ..utils.mpv import run_mpv
from ..utils.tools import QueryDict, exit_app
from ..utils.tools import FastAnimeRuntimeState, exit_app
from ..utils.utils import filter_by_quality, fuzzy_inquirer
from .utils import aniskip
@@ -43,25 +43,25 @@ def calculate_time_delta(start_time, end_time):
return delta
def player_controls(config: "Config", anilist_config: QueryDict):
def player_controls(config: "Config", fastanime_runtime_state: FastAnimeRuntimeState):
# user config
config.translation_type.lower()
# internal config
current_episode: str = anilist_config.episode_number
episodes: list = sorted(anilist_config.episodes, key=float)
links: list = anilist_config.current_stream_links
current_link: str = anilist_config.current_stream_link
anime_title: str = anilist_config.anime_title
anime_id: int = anilist_config.anime_id
current_episode: str = fastanime_runtime_state.episode_number
episodes: list = sorted(fastanime_runtime_state.episodes, key=float)
links: list = fastanime_runtime_state.current_stream_links
current_link: str = fastanime_runtime_state.current_stream_link
anime_title: str = fastanime_runtime_state.anime_title
anime_id: int = fastanime_runtime_state.anime_id
def _servers():
config.server = ""
fetch_streams(config, anilist_config)
fetch_streams(config, fastanime_runtime_state)
def _replay():
selected_server: "Server" = anilist_config.current_server
selected_server: "Server" = fastanime_runtime_state.current_server
print(
"[bold magenta]Now Replaying:[/]",
anime_title,
@@ -74,7 +74,7 @@ def player_controls(config: "Config", anilist_config: QueryDict):
custom_args = []
if config.skip:
if args := aniskip(
anilist_config.selected_anime_anilist["idMal"], current_episode
fastanime_runtime_state.selected_anime_anilist["idMal"], current_episode
):
custom_args.extend(args)
if config.use_mpv_mod:
@@ -83,7 +83,7 @@ def player_controls(config: "Config", anilist_config: QueryDict):
mpv = player.create_player(
current_link,
config.anime_provider,
anilist_config,
fastanime_runtime_state,
config,
selected_server["episode_title"],
)
@@ -119,7 +119,7 @@ def player_controls(config: "Config", anilist_config: QueryDict):
clear()
config.update_watch_history(anime_id, episode, stop_time, total_time)
player_controls(config, anilist_config)
player_controls(config, fastanime_runtime_state)
def _next_episode():
# ensures you dont accidentally erase your progress for an in complete episode
@@ -139,21 +139,21 @@ def player_controls(config: "Config", anilist_config: QueryDict):
if not Rofi.confirm(
"Are you sure you wish to continue to the next episode you haven't completed the current episode?"
):
anilist_options(config, anilist_config)
anilist_options(config, fastanime_runtime_state)
return
else:
if not Confirm.ask(
"Are you sure you wish to continue to the next episode you haven't completed the current episode?",
default=False,
):
anilist_options(config, anilist_config)
anilist_options(config, fastanime_runtime_state)
return
elif not config.use_rofi:
if not Confirm.ask(
"Are you sure you wish to continue to the next episode, your progress for the current episodes will be erased?",
default=True,
):
player_controls(config, anilist_config)
player_controls(config, fastanime_runtime_state)
return
# all checks have passed lets go to the next episode
@@ -162,33 +162,33 @@ def player_controls(config: "Config", anilist_config: QueryDict):
next_episode = len(episodes) - 1
# updateinternal config
anilist_config.episode_number = episodes[next_episode]
fastanime_runtime_state.episode_number = episodes[next_episode]
# update user config
config.update_watch_history(anime_id, episodes[next_episode])
# call interface
fetch_streams(config, anilist_config)
fetch_streams(config, fastanime_runtime_state)
def _episodes():
# reset watch_history
config.continue_from_history = False
# call interface
fetch_episode(config, anilist_config)
fetch_episode(config, fastanime_runtime_state)
def _previous_episode():
prev_episode = episodes.index(current_episode) - 1
if prev_episode <= 0:
prev_episode = 0
# anilist_config.episode_title = episode["title"]
anilist_config.episode_number = episodes[prev_episode]
# fastanime_runtime_state.episode_title = episode["title"]
fastanime_runtime_state.episode_number = episodes[prev_episode]
# update user config
config.update_watch_history(anime_id, episodes[prev_episode])
# call interface
fetch_streams(config, anilist_config)
fetch_streams(config, fastanime_runtime_state)
def _change_quality():
# extract the actual link urls
@@ -202,9 +202,12 @@ def player_controls(config: "Config", anilist_config: QueryDict):
elif config.use_rofi:
quality = Rofi.run(options, "Select Quality")
else:
quality = fuzzy_inquirer("Select Quality", options)
quality = fuzzy_inquirer(
options,
"Select Quality",
)
config.quality = quality # set quality
player_controls(config, anilist_config)
player_controls(config, fastanime_runtime_state)
def _change_translation_type():
# prompt for new translation type
@@ -217,14 +220,15 @@ def player_controls(config: "Config", anilist_config: QueryDict):
translation_type = Rofi.run(options, "Select Translation Type")
else:
translation_type = fuzzy_inquirer(
"Select Translation Type", options
options,
"Select Translation Type",
).lower()
# update internal config
config.translation_type = translation_type.lower()
# reload to controls
player_controls(config, anilist_config)
player_controls(config, fastanime_runtime_state)
icons = config.icons
options = {
@@ -235,14 +239,14 @@ def player_controls(config: "Config", anilist_config: QueryDict):
f"{'📀 ' if icons else ''}Change Quality": _change_quality,
f"{'🎧 ' if icons else ''}Change Translation Type": _change_translation_type,
f"{'💽 ' if icons else ''}Servers": _servers,
f"{'📱 ' if icons else ''}Main Menu": lambda: anilist_menu(
config, anilist_config
f"{'📱 ' if icons else ''}Main Menu": lambda: fastanime_main_menu(
config, fastanime_runtime_state
),
f"{'📜 ' if icons else ''}Anime Options Menu": lambda: anilist_options(
config, anilist_config
config, fastanime_runtime_state
),
f"{'🔎 ' if icons else ''}Search Results": lambda: select_anime(
config, anilist_config
f"{'🔎 ' if icons else ''}Search Results": lambda: anilist_results_menu(
config, fastanime_runtime_state
),
f"{'' if icons else ''}Exit": exit_app,
}
@@ -251,26 +255,26 @@ def player_controls(config: "Config", anilist_config: QueryDict):
print("Auto selecting next episode")
_next_episode()
return
choices = list(options.keys())
if config.use_fzf:
action = fzf.run(
list(options.keys()), prompt="Select Action:", header="Player Controls"
)
action = fzf.run(choices, prompt="Select Action:", header="Player Controls")
elif config.use_rofi:
action = Rofi.run(list(options.keys()), "Select Action")
action = Rofi.run(choices, "Select Action")
else:
action = fuzzy_inquirer("Select Action", options.keys())
action = fuzzy_inquirer(choices, "Select Action")
options[action]()
def fetch_streams(config: "Config", anilist_config: QueryDict):
def fetch_streams(config: "Config", fastanime_runtime_state: FastAnimeRuntimeState):
# user config
quality: str = config.quality
# internal config
episode_number: str = anilist_config.episode_number
anime_title: str = anilist_config.anime_title
anime_id: int = anilist_config.anime_id
anime: "Anime" = anilist_config.anime
episode_number: str = fastanime_runtime_state.episode_number
anime_title: str = fastanime_runtime_state.anime_title
anime_id: int = fastanime_runtime_state.anime_id
anime: "Anime" = fastanime_runtime_state.anime
translation_type = config.translation_type
anime_provider = config.anime_provider
@@ -282,7 +286,7 @@ def fetch_streams(config: "Config", anilist_config: QueryDict):
anime,
episode_number,
translation_type,
anilist_config.selected_anime_anilist,
fastanime_runtime_state.selected_anime_anilist,
)
if not episode_streams:
if not config.use_rofi:
@@ -291,7 +295,7 @@ def fetch_streams(config: "Config", anilist_config: QueryDict):
else:
if not Rofi.confirm("Sth went wrong!!Enter to continue..."):
exit(1)
return fetch_streams(config, anilist_config)
return fetch_streams(config, fastanime_runtime_state)
if config.server == "top":
# no need to get all servers if top just works
@@ -321,12 +325,15 @@ def fetch_streams(config: "Config", anilist_config: QueryDict):
elif config.use_rofi:
server = Rofi.run(choices, "Select Server")
else:
server = fuzzy_inquirer("Select Server", choices)
server = fuzzy_inquirer(
choices,
"Select Server",
)
if server == "Back":
# reset watch_history
config.update_watch_history(anime_id, None)
fetch_episode(config, anilist_config)
fetch_episode(config, fastanime_runtime_state)
return
elif server == "top":
selected_server = episode_streams_dict[list(episode_streams_dict.keys())[0]]
@@ -339,14 +346,14 @@ def fetch_streams(config: "Config", anilist_config: QueryDict):
if not stream_link_:
print("Quality not found")
input("Enter to continue...")
anilist_options(config, anilist_config)
anilist_options(config, fastanime_runtime_state)
return
stream_link = stream_link_["link"]
# update internal config
anilist_config.current_stream_links = links
anilist_config.current_stream_link = stream_link
anilist_config.current_server = selected_server
anilist_config.current_server_name = server
fastanime_runtime_state.current_stream_links = links
fastanime_runtime_state.current_stream_link = stream_link
fastanime_runtime_state.current_server = selected_server
fastanime_runtime_state.current_server_name = server
# play video
print(
@@ -371,7 +378,7 @@ def fetch_streams(config: "Config", anilist_config: QueryDict):
custom_args = []
if config.skip:
if args := aniskip(
anilist_config.selected_anime_anilist["idMal"], episode_number
fastanime_runtime_state.selected_anime_anilist["idMal"], episode_number
):
custom_args.extend(args)
if config.use_mpv_mod:
@@ -380,7 +387,7 @@ def fetch_streams(config: "Config", anilist_config: QueryDict):
mpv = player.create_player(
stream_link,
anime_provider,
anilist_config,
fastanime_runtime_state,
config,
selected_server["episode_title"],
)
@@ -425,22 +432,22 @@ def fetch_streams(config: "Config", anilist_config: QueryDict):
# switch to controls
clear()
player_controls(config, anilist_config)
player_controls(config, fastanime_runtime_state)
def fetch_episode(config: "Config", anilist_config: QueryDict):
def fetch_episode(config: "Config", fastanime_runtime_state: FastAnimeRuntimeState):
# user config
translation_type: str = config.translation_type.lower()
continue_from_history: bool = config.continue_from_history
user_watch_history: dict = config.watch_history
anime_id: int = anilist_config.anime_id
anime_title: str = anilist_config.anime_title
anime_id: int = fastanime_runtime_state.anime_id
anime_title: str = fastanime_runtime_state.anime_title
# internal config
anime: "Anime" = anilist_config.anime
_anime: "SearchResult" = anilist_config._anime
anime: "Anime" = fastanime_runtime_state.anime
_anime: "SearchResult" = fastanime_runtime_state._anime
selected_anime_anilist: "AnilistBaseMediaDataSchema" = (
anilist_config.selected_anime_anilist
fastanime_runtime_state.selected_anime_anilist
)
# prompt for episode number
episodes = anime["availableEpisodesDetail"][translation_type]
@@ -472,32 +479,35 @@ def fetch_episode(config: "Config", anilist_config: QueryDict):
elif config.use_rofi:
episode_number = Rofi.run(choices, "Select Episode")
else:
episode_number = fuzzy_inquirer("Select Episode", choices)
episode_number = fuzzy_inquirer(
choices,
"Select Episode",
)
if episode_number == "Back":
anilist_options(config, anilist_config)
anilist_options(config, fastanime_runtime_state)
return
start_time = user_watch_history.get(str(anime_id), {}).get("start_time", "0")
config.update_watch_history(anime_id, episode_number, start_time=start_time)
# update internal config
anilist_config.episodes = episodes
# anilist_config.episode_title = episode["title"]
anilist_config.episode_number = episode_number
fastanime_runtime_state.episodes = episodes
# fastanime_runtime_state.episode_title = episode["title"]
fastanime_runtime_state.episode_number = episode_number
# next interface
fetch_streams(config, anilist_config)
fetch_streams(config, fastanime_runtime_state)
def fetch_anime_episode(config, anilist_config: QueryDict):
selected_anime: "SearchResult" = anilist_config._anime
def fetch_anime_episode(config, fastanime_runtime_state: FastAnimeRuntimeState):
selected_anime: "SearchResult" = fastanime_runtime_state._anime
anime_provider = config.anime_provider
with Progress() as progress:
progress.add_task("Fetching Anime Info...", total=None)
anilist_config.anime = anime_provider.get_anime(
selected_anime["id"], anilist_config.selected_anime_anilist
fastanime_runtime_state.anime = anime_provider.get_anime(
selected_anime["id"], fastanime_runtime_state.selected_anime_anilist
)
if not anilist_config.anime:
if not fastanime_runtime_state.anime:
print(
"Sth went wrong :cry: this could mean the provider is down or your internet"
)
@@ -506,20 +516,22 @@ def fetch_anime_episode(config, anilist_config: QueryDict):
else:
if not Rofi.confirm("Sth went wrong!!Enter to continue..."):
exit(1)
fetch_anime_episode(config, anilist_config)
fetch_anime_episode(config, fastanime_runtime_state)
return
fetch_episode(config, anilist_config)
fetch_episode(config, fastanime_runtime_state)
def provide_anime(config: "Config", anilist_config: QueryDict):
def provide_anime(config: "Config", fastanime_runtime_state: FastAnimeRuntimeState):
# user config
translation_type = config.translation_type.lower()
# internal config
selected_anime_title = anilist_config.selected_anime_title
selected_anime_title = fastanime_runtime_state.selected_anime_title_anilist
anime_data: "AnilistBaseMediaDataSchema" = anilist_config.selected_anime_anilist
anime_data: "AnilistBaseMediaDataSchema" = (
fastanime_runtime_state.selected_anime_anilist
)
anime_provider = config.anime_provider
# search and get the requested title from provider
@@ -528,7 +540,7 @@ def provide_anime(config: "Config", anilist_config: QueryDict):
search_results = anime_provider.search_for_anime(
selected_anime_title,
translation_type,
anilist_config.selected_anime_anilist,
fastanime_runtime_state.selected_anime_anilist,
)
if not search_results:
print(
@@ -539,7 +551,7 @@ def provide_anime(config: "Config", anilist_config: QueryDict):
else:
if not Rofi.confirm("Sth went wrong!!Enter to continue..."):
exit(1)
provide_anime(config, anilist_config)
provide_anime(config, fastanime_runtime_state)
return
search_results = {anime["title"]: anime for anime in search_results["results"]}
@@ -572,22 +584,31 @@ def provide_anime(config: "Config", anilist_config: QueryDict):
elif config.use_rofi:
anime_title = Rofi.run(choices, "Select Search Result")
else:
anime_title = fuzzy_inquirer("Select Search Result", choices)
anime_title = fuzzy_inquirer(
choices,
"Select Search Result",
)
if anime_title == "Back":
anilist_options(config, anilist_config)
anilist_options(config, fastanime_runtime_state)
return
anilist_config.anime_title = anime_normalizer.get(anime_title) or anime_title
anilist_config._anime = search_results[anime_title]
fetch_anime_episode(config, anilist_config)
fastanime_runtime_state.anime_title = (
anime_normalizer.get(anime_title) or anime_title
)
fastanime_runtime_state._anime = search_results[anime_title]
fetch_anime_episode(config, fastanime_runtime_state)
def anilist_options(config, anilist_config: QueryDict):
selected_anime: "AnilistBaseMediaDataSchema" = anilist_config.selected_anime_anilist
selected_anime_title: str = anilist_config.selected_anime_title
def anilist_options(config, fastanime_runtime_state: FastAnimeRuntimeState):
selected_anime: "AnilistBaseMediaDataSchema" = (
fastanime_runtime_state.selected_anime_anilist
)
selected_anime_title: str = fastanime_runtime_state.selected_anime_title_anilist
progress = (selected_anime["mediaListEntry"] or {"progress": 0}).get("progress", 0)
episodes_total = selected_anime["episodes"] or "Inf"
def _watch_trailer(config: "Config", anilist_config: QueryDict):
def _watch_trailer(
config: "Config", fastanime_runtime_state: FastAnimeRuntimeState
):
if trailer := selected_anime.get("trailer"):
trailer_url = "https://youtube.com/watch?v=" + trailer["id"]
print("[bold magenta]Watching Trailer of:[/]", selected_anime_title)
@@ -595,7 +616,7 @@ def anilist_options(config, anilist_config: QueryDict):
trailer_url,
ytdl_format=config.format,
)
anilist_options(config, anilist_config)
anilist_options(config, fastanime_runtime_state)
else:
if not config.use_rofi:
print("no trailer available :confused:")
@@ -603,10 +624,10 @@ def anilist_options(config, anilist_config: QueryDict):
else:
if not Rofi.confirm("No trailler found!!Enter to continue"):
exit(0)
anilist_options(config, anilist_config)
anilist_options(config, fastanime_runtime_state)
def _add_to_list(config: "Config", anilist_config: QueryDict):
# config.update_anime_list(anilist_config.anime_id)
def _add_to_list(config: "Config", fastanime_runtime_state: FastAnimeRuntimeState):
# config.update_anime_list(fastanime_runtime_state.anime_id)
anime_lists = {
"Watching": "CURRENT",
"Paused": "PAUSED",
@@ -615,19 +636,19 @@ def anilist_options(config, anilist_config: QueryDict):
"Rewatching": "REPEATING",
"Completed": "COMPLETED",
}
choices = list(anime_lists.keys())
if config.use_fzf:
anime_list = fzf.run(
list(anime_lists.keys()),
choices,
"Choose the list you want to add to",
"Add your animelist",
)
elif config.use_rofi:
anime_list = Rofi.run(
list(anime_lists.keys()), "Choose list you want to add to"
)
anime_list = Rofi.run(choices, "Choose list you want to add to")
else:
anime_list = fuzzy_inquirer(
"Choose the list you want to add to", list(anime_lists.keys())
choices,
"Choose the list you want to add to",
)
result = AniList.update_anime_list(
{"status": anime_lists[anime_list], "mediaId": selected_anime["id"]}
@@ -640,9 +661,9 @@ def anilist_options(config, anilist_config: QueryDict):
)
if not config.use_rofi:
input("Enter to continue...")
anilist_options(config, anilist_config)
anilist_options(config, fastanime_runtime_state)
def _score_anime(config: "Config", anilist_config: QueryDict):
def _score_anime(config: "Config", fastanime_runtime_state: FastAnimeRuntimeState):
if config.use_rofi:
score = Rofi.ask("Enter Score", is_int=True)
score = max(100, min(0, score))
@@ -663,9 +684,11 @@ def anilist_options(config, anilist_config: QueryDict):
print(f"Successfully scored {selected_anime_title}; score: {score}")
if not config.use_rofi:
input("Enter to continue...")
anilist_options(config, anilist_config)
anilist_options(config, fastanime_runtime_state)
def _remove_from_list(config: "Config", anilist_config: QueryDict):
def _remove_from_list(
config: "Config", fastanime_runtime_state: FastAnimeRuntimeState
):
if Confirm.ask(
f"Are you sure you want to procede, the folowing action will permanently remove {selected_anime_title} from your list and your progress will be erased",
default=False,
@@ -681,9 +704,11 @@ def anilist_options(config, anilist_config: QueryDict):
print(selected_anime_title, ":relieved:")
if not config.use_rofi:
input("Enter to continue...")
anilist_options(config, anilist_config)
anilist_options(config, fastanime_runtime_state)
def _change_translation_type(config: "Config", anilist_config: QueryDict):
def _change_translation_type(
config: "Config", fastanime_runtime_state: FastAnimeRuntimeState
):
# prompt for new translation type
options = ["Sub", "Dub"]
if config.use_fzf:
@@ -693,14 +718,17 @@ def anilist_options(config, anilist_config: QueryDict):
elif config.use_rofi:
translation_type = Rofi.run(options, "Select Translation Type")
else:
translation_type = fuzzy_inquirer("Select translation type", options)
translation_type = fuzzy_inquirer(
options,
"Select translation type",
)
# update internal config
config.translation_type = translation_type.lower()
anilist_options(config, anilist_config)
anilist_options(config, fastanime_runtime_state)
def _view_info(config, anilist_config):
def _view_info(config, fastanime_runtime_state):
from rich.console import Console
from rich.prompt import Confirm
@@ -750,18 +778,18 @@ def anilist_options(config, anilist_config: QueryDict):
remove_html_tags(str(selected_anime["description"])),
)
if Confirm.ask("Enter to continue...", default=True):
anilist_options(config, anilist_config)
anilist_options(config, fastanime_runtime_state)
return
def _toggle_auto_select(config, anilist_config):
def _toggle_auto_select(config, fastanime_runtime_state):
config.auto_select = not config.auto_select
anilist_options(config, anilist_config)
anilist_options(config, fastanime_runtime_state)
def _toggle_auto_next(config, anilist_config):
def _toggle_auto_next(config, fastanime_runtime_state):
config.auto_next = not config.auto_next
anilist_options(config, anilist_config)
anilist_options(config, fastanime_runtime_state)
def _change_provider(config: "Config", anilist_config):
def _change_provider(config: "Config", fastanime_runtime_state):
options = ["allanime", "animepahe"]
if config.use_fzf:
provider = fzf.run(
@@ -770,13 +798,16 @@ def anilist_options(config, anilist_config: QueryDict):
elif config.use_rofi:
provider = Rofi.run(options, "Select Translation Type")
else:
provider = fuzzy_inquirer("Select translation type", options)
provider = fuzzy_inquirer(
options,
"Select translation type",
)
config.provider = provider
config.anime_provider.provider = provider
config.anime_provider.lazyload_provider()
anilist_options(config, anilist_config)
anilist_options(config, fastanime_runtime_state)
icons = config.icons
options = {
@@ -790,32 +821,50 @@ def anilist_options(config, anilist_config: QueryDict):
f"{'💽 ' if icons else ''}Change Provider": _change_provider,
f"{'🔘 ' if icons else ''}Toggle auto select anime": _toggle_auto_select, # problematic if you choose an anime that doesnt match id
f"{'💠 ' if icons else ''}Toggle auto next episode": _toggle_auto_next,
f"{'🔙 ' if icons else ''}Back": select_anime,
f"{'🔙 ' if icons else ''}Back": anilist_results_menu,
f"{'' if icons else ''}Exit": exit_app,
}
choices = list(options.keys())
if config.use_fzf:
action = fzf.run(
list(options.keys()), prompt="Select Action:", header="Anime Menu"
)
action = fzf.run(choices, prompt="Select Action:", header="Anime Menu")
elif config.use_rofi:
action = Rofi.run(list(options.keys()), "Select Action")
action = Rofi.run(choices, "Select Action")
else:
action = fuzzy_inquirer("Select Action", options.keys())
options[action](config, anilist_config)
action = fuzzy_inquirer(
choices,
"Select Action",
)
options[action](config, fastanime_runtime_state)
def select_anime(config: "Config", anilist_config: QueryDict):
search_results = anilist_config.data["data"]["Page"]["media"]
def anilist_results_menu(
config: "Config", fastanime_runtime_state: FastAnimeRuntimeState
):
"""The menu that handles and displays the results of an anilist action enabling using to select anime of choice
Args:
config: [TODO:description]
fastanime_runtime_state: [TODO:description]
"""
search_results = fastanime_runtime_state.anilist_data["data"]["Page"]["media"]
anime_data = {}
for anime in search_results:
anime: "AnilistBaseMediaDataSchema"
# determine the progress of watching the anime based on whats in anilist data !! NOT LOCALLY
progress = (anime["mediaListEntry"] or {"progress": 0}).get("progress", 0)
# if the max episodes is none set it to inf meaning currently not determinable or infinity
episodes_total = anime["episodes"] or "Inf"
# set the actual title and ensure its a string since even after this it may be none
title = str(
anime["title"][config.preferred_language] or anime["title"]["romaji"]
)
# this process is mostly need inoder for the preview to work correctly
title = sanitize_filename(f"{title} ({progress} of {episodes_total})")
# Check if the anime is currently airing and has new/unwatched episodes
if (
anime["status"] == "RELEASING"
@@ -825,14 +874,18 @@ def select_anime(config: "Config", anilist_config: QueryDict):
last_aired_episode = anime["nextAiringEpisode"]["episode"] - 1
if last_aired_episode - progress > 0:
title += f" 🔹{last_aired_episode - progress} new episode(s)🔹"
# add the anime to the anime data dict setting the key to the title
# this dict is used for promting the title and maps directly to the anime object of interest containing the actual data
anime_data[title] = anime
# prompt for the anime of choice
choices = [*anime_data.keys(), "Back"]
if config.use_fzf:
if config.preview:
from .utils import get_preview
from .utils import get_fzf_preview
preview = get_preview(search_results, anime_data.keys())
preview = get_fzf_preview(search_results, anime_data.keys())
selected_anime_title = fzf.run(
choices,
prompt="Select Anime: ",
@@ -846,44 +899,47 @@ def select_anime(config: "Config", anilist_config: QueryDict):
header="Search Results",
)
elif config.use_rofi:
# TODO: Make this faster
if config.preview:
from .utils import IMAGES_DIR, get_icons
from .utils import IMAGES_CACHE_DIR, get_rofi_icons
get_icons(search_results, anime_data.keys())
get_rofi_icons(search_results, anime_data.keys())
choices = []
for title in anime_data.keys():
icon_path = os.path.join(IMAGES_DIR, title)
icon_path = os.path.join(IMAGES_CACHE_DIR, title)
choices.append(f"{title}\0icon\x1f{icon_path}")
choices.append("Back")
selected_anime_title = Rofi.run_with_icons(choices, "Select Anime")
else:
selected_anime_title = Rofi.run(choices, "Select Anime")
else:
selected_anime_title = fuzzy_inquirer("Select Anime", choices)
# "bat %s/{}" % SEARCH_RESULTS_CACHE
selected_anime_title = fuzzy_inquirer(
choices,
"Select Anime",
)
if selected_anime_title == "Back":
anilist_menu(config, anilist_config)
fastanime_main_menu(config, fastanime_runtime_state)
return
selected_anime: "AnilistBaseMediaDataSchema" = anime_data[selected_anime_title]
anilist_config.selected_anime_anilist = selected_anime
anilist_config.selected_anime_title = (
fastanime_runtime_state.selected_anime_anilist = selected_anime
fastanime_runtime_state.selected_anime_title_anilist = (
selected_anime["title"]["romaji"] or selected_anime["title"]["english"]
)
anilist_config.anime_id = selected_anime["id"]
fastanime_runtime_state.anime_id = selected_anime["id"]
anilist_options(config, anilist_config)
anilist_options(config, fastanime_runtime_state)
#
# ---- ANILIST MENU ----
# ---- FASTANIME MAIN MENU ----
#
def handle_animelist(config: "Config", anilist_config: "QueryDict", list_type: str):
def handle_animelist(
config: "Config", fastanime_runtime_state: "FastAnimeRuntimeState", list_type: str
):
"""A helper function that handles user media lists
Args:
anilist_config ([TODO:parameter]): [TODO:description]
fastanime_runtime_state ([TODO:parameter]): [TODO:description]
config: [TODO:description]
list_type: [TODO:description]
@@ -897,7 +953,7 @@ def handle_animelist(config: "Config", anilist_config: "QueryDict", list_type: s
else:
if not Rofi.confirm("You haven't logged in!!Enter to continue"):
exit(1)
anilist_menu(config, anilist_config)
fastanime_main_menu(config, fastanime_runtime_state)
return
# determine the watch list to get
match list_type:
@@ -926,7 +982,7 @@ def handle_animelist(config: "Config", anilist_config: "QueryDict", list_type: s
else:
if not Rofi.confirm("Sth went wrong!!Enter to continue..."):
exit(1)
anilist_menu(config, anilist_config)
fastanime_main_menu(config, fastanime_runtime_state)
return
# handle failure
if not anime_list[0] or not anime_list[1]:
@@ -937,8 +993,9 @@ def handle_animelist(config: "Config", anilist_config: "QueryDict", list_type: s
if not Rofi.confirm("Sth went wrong!!Enter to continue..."):
exit(1)
# recall anilist menu
anilist_menu(config, anilist_config)
fastanime_main_menu(config, fastanime_runtime_state)
return
# injecting the data is the simplest way since the ui expects a field called media that should have media type
media = [
mediaListItem["media"]
for mediaListItem in anime_list[1]["data"]["Page"]["mediaList"]
@@ -947,12 +1004,14 @@ def handle_animelist(config: "Config", anilist_config: "QueryDict", list_type: s
return anime_list
def anilist_menu(config: "Config", anilist_config: QueryDict):
def fastanime_main_menu(
config: "Config", fastanime_runtime_state: FastAnimeRuntimeState
):
"""The main entry point to the anilist command
Args:
config: An object containing cconfiguration data
anilist_config: A query dict used to store data during navigation of the ui # initially this was very messy
fastanime_runtime_state: A query dict used to store data during navigation of the ui # initially this was very messy
"""
def _anilist_search():
@@ -1007,29 +1066,29 @@ def anilist_menu(config: "Config", anilist_config: QueryDict):
else:
config.load_config()
anilist_menu(config, anilist_config)
fastanime_main_menu(config, fastanime_runtime_state)
icons = config.icons
# each option maps to anilist data that is described by the option name
options = {
f"{'🔥 ' if icons else ''}Trending": AniList.get_trending,
f"{'📺 ' if icons else ''}Watching": lambda media_list_type="Watching": handle_animelist(
config, anilist_config, media_list_type
config, fastanime_runtime_state, media_list_type
),
f"{'' if icons else ''}Paused": lambda media_list_type="Paused": handle_animelist(
config, anilist_config, media_list_type
config, fastanime_runtime_state, media_list_type
),
f"{'🚮 ' if icons else ''}Dropped": lambda media_list_type="Dropped": handle_animelist(
config, anilist_config, media_list_type
config, fastanime_runtime_state, media_list_type
),
f"{'📑 ' if icons else ''}Planned": lambda media_list_type="Planned": handle_animelist(
config, anilist_config, media_list_type
config, fastanime_runtime_state, media_list_type
),
f"{'' if icons else ''}Completed": lambda media_list_type="Completed": handle_animelist(
config, anilist_config, media_list_type
config, fastanime_runtime_state, media_list_type
),
f"{'🔁 ' if icons else ''}Rewatching": lambda media_list_type="Repeating": handle_animelist(
config, anilist_config, media_list_type
config, fastanime_runtime_state, media_list_type
),
f"{'🔔 ' if icons else ''}Recently Updated Anime": AniList.get_most_recently_updated,
f"{'🔎 ' if icons else ''}Search": _anilist_search,
@@ -1044,22 +1103,26 @@ def anilist_menu(config: "Config", anilist_config: QueryDict):
f"{'' if icons else ''}Exit": exit_app,
}
# prompt user to select an action
choices = list(options.keys())
if config.use_fzf:
action = fzf.run(
list(options.keys()),
choices,
prompt="Select Action: ",
header="Anilist Menu",
)
elif config.use_rofi:
action = Rofi.run(list(options.keys()), "Select Action")
action = Rofi.run(choices, "Select Action")
else:
action = fuzzy_inquirer("Select Action", options.keys())
action = fuzzy_inquirer(
choices,
"Select Action",
)
anilist_data = options[action]()
# anilist data is a (bool,data)
# the bool indicated success
if anilist_data[0]:
anilist_config.data = anilist_data[1]
select_anime(config, anilist_config)
fastanime_runtime_state.anilist_data = anilist_data[1]
anilist_results_menu(config, fastanime_runtime_state)
else:
print(anilist_data[1])
@@ -1069,4 +1132,4 @@ def anilist_menu(config: "Config", anilist_config: QueryDict):
if not Rofi.confirm("Sth went wrong!!Enter to continue..."):
exit(1)
# recall the anilist function for the user to reattempt their choice
anilist_menu(config, anilist_config)
fastanime_main_menu(config, fastanime_runtime_state)

View File

@@ -16,6 +16,8 @@ from ..utils.utils import get_true_fg
logger = logging.getLogger(__name__)
# this script was written by the fzf devs as an example on how to preview images
# its only here for convinience
fzf_preview = r"""
#
# The purpose of this script is to demonstrate how to preview a file or an
@@ -95,7 +97,16 @@ fzf-preview(){
# ---- aniskip intergration ----
def aniskip(mal_id, episode):
def aniskip(mal_id: int, episode: str):
"""helper function to be used for setting and getting skip data
Args:
mal_id: mal id of the anime
episode: episode number
Returns:
mpv chapter options
"""
ANISKIP = shutil.which("ani-skip")
if not ANISKIP:
print("Aniskip not found, please install and try again")
@@ -111,37 +122,61 @@ def aniskip(mal_id, episode):
# ---- prevew stuff ----
# import tempfile
# NOTE: May change this to a temp dir but there were issues so later
WORKING_DIR = APP_CACHE_DIR # tempfile.gettempdir()
IMAGES_DIR = os.path.join(WORKING_DIR, "images")
if not os.path.exists(IMAGES_DIR):
os.mkdir(IMAGES_DIR)
INFO_DIR = os.path.join(WORKING_DIR, "info")
if not os.path.exists(INFO_DIR):
os.mkdir(INFO_DIR)
IMAGES_CACHE_DIR = os.path.join(WORKING_DIR, "images")
if not os.path.exists(IMAGES_CACHE_DIR):
os.mkdir(IMAGES_CACHE_DIR)
ANIME_INFO_CACHE_DIR = os.path.join(WORKING_DIR, "info")
if not os.path.exists(ANIME_INFO_CACHE_DIR):
os.mkdir(ANIME_INFO_CACHE_DIR)
def save_image_from_url(url: str, file_name: str):
"""Helper function that downloads an image to the FastAnime images cache dir given its url and filename
Args:
url: image url to download
file_name: filename to use
"""
image = requests.get(url)
with open(f"{IMAGES_DIR}/{file_name}", "wb") as f:
with open(f"{IMAGES_CACHE_DIR}/{file_name}", "wb") as f:
f.write(image.content)
def save_info_from_str(info: str, file_name: str):
with open(f"{INFO_DIR}/{file_name}", "w") as f:
"""Helper function that writes text (anime details and info) to a file given its filename
Args:
info: the information anilist has on the anime
file_name: the filename to use
"""
with open(f"{ANIME_INFO_CACHE_DIR}/{file_name}", "w") as f:
f.write(info)
def write_search_results(
search_results: list[AnilistBaseMediaDataSchema],
titles,
workers=None,
anilist_results: list[AnilistBaseMediaDataSchema],
titles: list[str],
workers: int | None = None,
):
H_COLOR = 215, 0, 95
S_COLOR = 208, 208, 208
S_WIDTH = 45
"""A helper function used by and run in a background thread by get_fzf_preview function inorder to get the actual preview data to be displayed by fzf
Args:
anilist_results: the anilist results from an anilist action
titles: sanitized anime titles
workers:number of threads to use defaults to as many as possible
"""
# NOTE: Will probably make this a configuraable option
HEADER_COLOR = 215, 0, 95
SEPARATOR_COLOR = 208, 208, 208
SEPARATOR_WIDTH = 45
# use concurency to download and write as fast as possible
with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor:
future_to_task = {}
for anime, title in zip(search_results, titles):
for anime, title in zip(anilist_results, titles):
# actual image url
image_url = anime["coverImage"]["large"]
future_to_task[executor.submit(save_image_from_url, image_url, title)] = (
image_url
@@ -149,19 +184,19 @@ def write_search_results(
# handle the text data
template = f"""
{get_true_fg("-"*S_WIDTH,*S_COLOR,bold=False)}
{get_true_fg('Title(jp):',*H_COLOR)} {anime['title']['romaji']}
{get_true_fg('Title(eng):',*H_COLOR)} {anime['title']['english']}
{get_true_fg('Popularity:',*H_COLOR)} {anime['popularity']}
{get_true_fg('Favourites:',*H_COLOR)} {anime['favourites']}
{get_true_fg('Status:',*H_COLOR)} {anime['status']}
{get_true_fg('Episodes:',*H_COLOR)} {anime['episodes']}
{get_true_fg('Genres:',*H_COLOR)} {anilist_data_helper.format_list_data_with_comma(anime['genres'])}
{get_true_fg('Next Episode:',*H_COLOR)} {anilist_data_helper.extract_next_airing_episode(anime['nextAiringEpisode'])}
{get_true_fg('Start Date:',*H_COLOR)} {anilist_data_helper.format_anilist_date_object(anime['startDate'])}
{get_true_fg('End Date:',*H_COLOR)} {anilist_data_helper.format_anilist_date_object(anime['endDate'])}
{get_true_fg("-"*S_WIDTH,*S_COLOR,bold=False)}
{get_true_fg('Description:',*H_COLOR)}
{get_true_fg("-"*SEPARATOR_WIDTH,*SEPARATOR_COLOR,bold=False)}
{get_true_fg('Title(jp):',*HEADER_COLOR)} {anime['title']['romaji']}
{get_true_fg('Title(eng):',*HEADER_COLOR)} {anime['title']['english']}
{get_true_fg('Popularity:',*HEADER_COLOR)} {anime['popularity']}
{get_true_fg('Favourites:',*HEADER_COLOR)} {anime['favourites']}
{get_true_fg('Status:',*HEADER_COLOR)} {anime['status']}
{get_true_fg('Episodes:',*HEADER_COLOR)} {anime['episodes']}
{get_true_fg('Genres:',*HEADER_COLOR)} {anilist_data_helper.format_list_data_with_comma(anime['genres'])}
{get_true_fg('Next Episode:',*HEADER_COLOR)} {anilist_data_helper.extract_next_airing_episode(anime['nextAiringEpisode'])}
{get_true_fg('Start Date:',*HEADER_COLOR)} {anilist_data_helper.format_anilist_date_object(anime['startDate'])}
{get_true_fg('End Date:',*HEADER_COLOR)} {anilist_data_helper.format_anilist_date_object(anime['endDate'])}
{get_true_fg("-"*SEPARATOR_WIDTH,*SEPARATOR_COLOR,bold=False)}
{get_true_fg('Description:',*HEADER_COLOR)}
"""
template = textwrap.dedent(template)
template = f"""
@@ -181,11 +216,22 @@ def write_search_results(
# get rofi icons
def get_icons(search_results: list[AnilistBaseMediaDataSchema], titles, workers=None):
def get_rofi_icons(
anilist_results: list[AnilistBaseMediaDataSchema], titles, workers=None
):
"""A helper function to make sure that the images are downloaded so they can be used as icons
Args:
titles (list[str]): sanitized titles of the anime; NOTE: its important that they are sanitized since they are used as the filenames of the images
workers ([TODO:parameter]): Number of threads to use to download the images; defaults to as many as possible
anilist_results: the anilist results from an anilist action
"""
# use concurrency to download the images as fast as possible
with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor:
# load the jobs
future_to_url = {}
for anime, title in zip(search_results, titles):
for anime, title in zip(anilist_results, titles):
# actual link to download image from
image_url = anime["coverImage"]["large"]
future_to_url[executor.submit(save_image_from_url, image_url, title)] = (
image_url
@@ -196,19 +242,32 @@ def get_icons(search_results: list[AnilistBaseMediaDataSchema], titles, workers=
url = future_to_url[future]
try:
future.result()
except Exception as exc:
logger.error("%r generated an exception: %s" % (url, exc))
except Exception as e:
logger.error("%r generated an exception: %s" % (url, e))
def get_preview(search_results: list[AnilistBaseMediaDataSchema], titles, wait=False):
def get_fzf_preview(
anilist_results: list[AnilistBaseMediaDataSchema], titles, wait=False
):
"""A helper function that constructs data to be used for the fzf preview
Args:
titles (list[str]): The sanitized titles to use, NOTE: its important that they are sanitized since thay will be used as filenames
wait (bool): whether to block the ui as we wait for preview defaults to false
anilist_results: the anilist results got from an anilist action
Returns:
THe fzf preview script to use
"""
# ensure images and info exists
background_worker = Thread(
target=write_search_results, args=(search_results, titles)
target=write_search_results, args=(anilist_results, titles)
)
background_worker.daemon = True
background_worker.start()
os.environ["SHELL"] = shutil.which("bash") or "sh"
# the preview script is in bash so making sure fzf doesnt use any other shell lang to process the preview script
os.environ["SHELL"] = shutil.which("bash") or "bash"
preview = """
%s
if [ -s %s/{} ]; then fzf-preview %s/{}
@@ -219,12 +278,11 @@ def get_preview(search_results: list[AnilistBaseMediaDataSchema], titles, wait=F
fi
""" % (
fzf_preview,
IMAGES_DIR,
IMAGES_DIR,
INFO_DIR,
INFO_DIR,
IMAGES_CACHE_DIR,
IMAGES_CACHE_DIR,
ANIME_INFO_CACHE_DIR,
ANIME_INFO_CACHE_DIR,
)
# preview.replace("\n", ";")
if wait:
background_worker.join()
return preview

View File

@@ -36,13 +36,13 @@ class MpvPlayer(object):
ep_no=None,
server="top",
):
anilist_config = self.anilist_config
fastanime_runtime_state = self.fastanime_runtime_state
config = self.config
episode_number: str = anilist_config.episode_number
episode_number: str = fastanime_runtime_state.episode_number
quality = config.quality
episodes: list = sorted(anilist_config.episodes, key=float)
anime_id: int = anilist_config.anime_id
anime = anilist_config.anime
episodes: list = sorted(fastanime_runtime_state.episodes, key=float)
anime_id: int = fastanime_runtime_state.anime_id
anime = fastanime_runtime_state.anime
translation_type = config.translation_type
anime_provider = config.anime_provider
self.last_stop_time: str = "0"
@@ -56,8 +56,8 @@ class MpvPlayer(object):
next_episode = episodes.index(episode_number) + 1
if next_episode >= len(episodes):
next_episode = len(episodes) - 1
anilist_config.episode_number = episodes[next_episode]
episode_number = anilist_config.episode_number
fastanime_runtime_state.episode_number = episodes[next_episode]
episode_number = fastanime_runtime_state.episode_number
config.update_watch_history(anime_id, str(episode_number))
elif type == "reload":
if episode_number not in episodes:
@@ -75,14 +75,14 @@ class MpvPlayer(object):
self.mpv_player.show_text(f"Fetching episode {ep_no}")
episode_number = ep_no
config.update_watch_history(anime_id, str(ep_no))
anilist_config.episode_number = str(ep_no)
fastanime_runtime_state.episode_number = str(ep_no)
else:
self.mpv_player.show_text("Fetching previous episode...")
prev_episode = episodes.index(episode_number) - 1
if prev_episode <= 0:
prev_episode = 0
anilist_config.episode_number = episodes[prev_episode]
episode_number = anilist_config.episode_number
fastanime_runtime_state.episode_number = episodes[prev_episode]
episode_number = fastanime_runtime_state.episode_number
config.update_watch_history(anime_id, str(episode_number))
# update episode progress
if config.user and episode_number:
@@ -97,7 +97,7 @@ class MpvPlayer(object):
anime,
episode_number,
translation_type,
anilist_config.selected_anime_anilist,
fastanime_runtime_state.selected_anime_anilist,
)
if not episode_streams:
self.mpv_player.show_text("No streams were found")
@@ -131,12 +131,12 @@ class MpvPlayer(object):
self,
stream_link,
anime_provider: "AnimeProvider",
anilist_config,
fastanime_runtime_state,
config: "Config",
title,
):
self.anime_provider = anime_provider
self.anilist_config = anilist_config
self.fastanime_runtime_state = fastanime_runtime_state
self.config = config
self.last_stop_time: str = "0"
self.last_total_time: str = "0"
@@ -219,13 +219,15 @@ class MpvPlayer(object):
def _toggle_translation_type():
translation_type = "sub" if config.translation_type == "dub" else "dub"
anime = anime_provider.get_anime(
anilist_config._anime["id"],
anilist_config.selected_anime_anilist,
fastanime_runtime_state._anime["id"],
fastanime_runtime_state.selected_anime_anilist,
)
if not anime:
mpv_player.show_text("Failed to update translation type")
return
anilist_config.episodes = anime["availableEpisodesDetail"][translation_type]
fastanime_runtime_state.episodes = anime["availableEpisodesDetail"][
translation_type
]
config.translation_type = translation_type
if config.translation_type == "dub":
@@ -276,7 +278,7 @@ class MpvPlayer(object):
return
q = ["360", "720", "1080"]
quality = quality_raw.decode()
links: list = anilist_config.current_stream_links
links: list = fastanime_runtime_state.current_stream_links
q = [link["quality"] for link in links]
if quality in q:
config.quality = quality

View File

@@ -1,4 +1,4 @@
class QueryDict(dict):
class FastAnimeRuntimeState(dict):
"""dot.notation access to dictionary attributes"""
def __getattr__(self, attr):

View File

@@ -23,20 +23,51 @@ GREEN = "\033[38;2;45;24;45;m"
def filter_by_quality(quality: str, stream_links: "list[EpisodeStream]"):
"""Helper function used to filter a list of EpisodeStream objects to one that has a corresponding quality
Args:
quality: the quality to use
stream_links: a list of EpisodeStream objects
Returns:
an EpisodeStream object or None incase the quality was not found
"""
for stream_link in stream_links:
if stream_link["quality"] == quality:
return stream_link
def sizeof_fmt(num, suffix="B"):
def format_bytes_to_human(num_of_bytes: float, suffix: str = "B"):
"""Helper function usedd to format bytes to human
Args:
num_of_bytes: the number of bytes to format
suffix: the suffix to use
Returns:
formated bytes
"""
for unit in ("", "K", "M", "G", "T", "P", "E", "Z"):
if abs(num) < 1024.0:
return f"{num:3.1f}{unit}{suffix}"
num /= 1024.0
return f"{num:.1f}Yi{suffix}"
if abs(num_of_bytes) < 1024.0:
return f"{num_of_bytes:3.1f}{unit}{suffix}"
num_of_bytes /= 1024.0
return f"{num_of_bytes:.1f}Yi{suffix}"
def get_true_fg(string: str, r: int, g: int, b: int, bold=True) -> str:
def get_true_fg(string: str, r: int, g: int, b: int, bold: bool = True) -> str:
"""Custom helper function that enables colored text in the terminal
Args:
bold: whether to bolden the text
string: string to color
r: red
g: green
b: blue
Returns:
colored string
"""
# NOTE: Currently only supports terminals that support true color
if bold:
return f"{BOLD}\033[38;2;{r};{g};{b};m{string}{RESET}"
else:
@@ -47,7 +78,17 @@ def get_true_bg(string, r: int, g: int, b: int) -> str:
return f"\033[48;2;{r};{g};{b};m{string}{RESET}"
def fuzzy_inquirer(prompt: str, choices, **kwargs):
def fuzzy_inquirer(choices: list, prompt: str, **kwargs):
"""helper function that enables easier interaction with InquirerPy lib
Args:
choices: the choices to prompt
prompt: the prompt string to use
**kwargs: other options to pass to fuzzy_inquirer
Returns:
a choice
"""
from click import clear
clear()
@@ -62,6 +103,7 @@ def fuzzy_inquirer(prompt: str, choices, **kwargs):
return action
# WARNING: This function is depracated use the one from the main utility package
def anime_title_percentage_match(
possible_user_requested_anime_title: str, title: tuple
) -> float: