feat: update config logic with new philosophy

This commit is contained in:
Benexl
2025-07-05 17:13:21 +03:00
parent 759889acd4
commit 3af31a2dfd
59 changed files with 981 additions and 1610 deletions

View File

@@ -0,0 +1,95 @@
import logging
logger = logging.getLogger(__name__)
ANILIST_ENDPOINT = "https://graphql.anilist.co"
anime_title_query = """
query ($query: String) {
Page(perPage: 50) {
pageInfo {
total
}
media(search: $query, type: ANIME) {
id
idMal
title {
romaji
english
}
}
}
}
"""
def get_anime_titles(query: str, variables: dict = {}):
"""the abstraction over all none authenticated requests and that returns data of a similar type
Args:
query: the anilist query
variables: the anilist api variables
Returns:
a boolean indicating success and none or an anilist object depending on success
"""
from requests import post
try:
response = post(
ANILIST_ENDPOINT,
json={"query": query, "variables": variables},
timeout=10,
)
anilist_data = response.json()
if response.status_code == 200:
eng_titles = [
anime["title"]["english"]
for anime in anilist_data["data"]["Page"]["media"]
if anime["title"]["english"]
]
romaji_titles = [
anime["title"]["romaji"]
for anime in anilist_data["data"]["Page"]["media"]
if anime["title"]["romaji"]
]
return [*eng_titles, *romaji_titles]
else:
return []
except Exception as e:
logger.error(f"Something unexpected occurred {e}")
return []
def downloaded_anime_titles(ctx, param, incomplete):
import os
from ..constants import USER_VIDEOS_DIR
try:
titles = [
title
for title in os.listdir(USER_VIDEOS_DIR)
if title.lower().startswith(incomplete.lower()) or not incomplete
]
return titles
except Exception:
return []
def anime_titles_shell_complete(ctx, param, incomplete):
incomplete = incomplete.strip()
if not incomplete:
incomplete = None
variables = {}
else:
variables = {"query": incomplete}
return get_anime_titles(anime_title_query, variables)
if __name__ == "__main__":
t = input("Enter title")
results = get_anime_titles(anime_title_query, {"query": t})
print(results)

View File

@@ -9,4 +9,4 @@ def feh_manga_viewer(image_links: list[str], window_title: str):
print("feh not found")
exit(1)
commands = [FEH_EXECUTABLE, *image_links, "--title", window_title]
subprocess.run(commands)
subprocess.run(commands, check=False)

View File

@@ -5,9 +5,9 @@ import termios
import tty
from sys import exit
from rich.align import Align
from rich.console import Console
from rich.panel import Panel
from rich.align import Align
from rich.text import Text
console = Console()
@@ -72,7 +72,8 @@ def icat_manga_viewer(image_links: list[str], window_title: str):
"--z-index",
"-1",
image_links[idx],
]
],
check=False,
)
if show_banner:

View File

@@ -0,0 +1,40 @@
import importlib
import click
class LazyGroup(click.Group):
def __init__(self, root:str, *args, lazy_subcommands=None, **kwargs):
super().__init__(*args, **kwargs)
# lazy_subcommands is a map of the form:
#
# {command-name} -> {module-name}.{command-object-name}
#
self.root = root
self.lazy_subcommands = lazy_subcommands or {}
def list_commands(self, ctx):
base = super().list_commands(ctx)
lazy = sorted(self.lazy_subcommands.keys())
return base + lazy
def get_command(self, ctx, cmd_name): # pyright:ignore
if cmd_name in self.lazy_subcommands:
return self._lazy_load(cmd_name)
return super().get_command(ctx, cmd_name)
def _lazy_load(self, cmd_name: str):
# lazily loading a command, first get the module name and attribute name
import_path: str = self.lazy_subcommands[cmd_name]
modname, cmd_object_name = import_path.rsplit(".", 1)
# do the import
mod = importlib.import_module(f".{modname}", package=self.root)
# get the Command object from that module
cmd_object = getattr(mod, cmd_object_name)
# check the result to make debugging easier
if not isinstance(cmd_object, click.Command):
raise ValueError(
f"Lazy loading of {import_path} failed by returning "
"a non-command object"
)
return cmd_object

View File

@@ -0,0 +1,30 @@
import logging
from rich.traceback import install as rich_install
from ..constants import LOG_FILE_PATH
def setup_logging(log: bool, log_file: bool, rich_traceback: bool) -> None:
"""Configures the application's logging based on CLI flags."""
if rich_traceback:
rich_install(show_locals=True)
if log:
from rich.logging import RichHandler
logging.basicConfig(
level="DEBUG",
format="%(message)s",
datefmt="[%X]",
handlers=[RichHandler()],
)
logging.getLogger(__name__).info("Rich logging initialized.")
elif log_file:
logging.basicConfig(
level="DEBUG",
filename=LOG_FILE_PATH,
format="%(asctime)s %(levelname)s: %(message)s",
datefmt="[%d/%m/%Y@%H:%M:%S]",
filemode="w",
)
else:
logging.basicConfig(level="CRITICAL")

View File

@@ -64,6 +64,7 @@ def stream_video(MPV, url, mpv_args, custom_args, pre_args=[]):
capture_output=True,
text=True,
encoding="utf-8",
check=False,
)
if proc.stdout:
for line in reversed(proc.stdout.split("\n")):
@@ -101,7 +102,7 @@ def run_mpv(
time.sleep(120)
return "0", "0"
cmd = [WEBTORRENT_CLI, link, f"--{player}"]
subprocess.run(cmd, encoding="utf-8")
subprocess.run(cmd, encoding="utf-8", check=False)
return "0", "0"
if player == "vlc":
VLC = shutil.which("vlc")
@@ -141,7 +142,7 @@ def run_mpv(
title,
]
subprocess.run(args)
subprocess.run(args, check=False)
return "0", "0"
else:
args = ["vlc", link]
@@ -152,7 +153,7 @@ def run_mpv(
if title:
args.append("--video-title")
args.append(title)
subprocess.run(args, encoding="utf-8")
subprocess.run(args, encoding="utf-8", check=False)
return "0", "0"
else:
# Determine if mpv is available
@@ -191,7 +192,7 @@ def run_mpv(
"is.xyz.mpv/.MPVActivity",
]
subprocess.run(args)
subprocess.run(args, check=False)
return "0", "0"
else:
# General mpv command with custom arguments

View File

@@ -20,7 +20,7 @@ def format_time(duration_in_secs: float):
return f"{int(h):2d}:{int(m):2d}:{int(s):2d}".replace(" ", "0")
class MpvPlayer(object):
class MpvPlayer:
anime_provider: "AnimeProvider"
config: "Config"
subs = []
@@ -97,8 +97,7 @@ class MpvPlayer(object):
else:
self.mpv_player.show_text("Fetching previous episode...")
prev_episode = total_episodes.index(current_episode_number) - 1
if prev_episode <= 0:
prev_episode = 0
prev_episode = max(0, prev_episode)
fastanime_runtime_state.provider_current_episode_number = total_episodes[
prev_episode
]

View File

@@ -11,7 +11,7 @@ def print_img(url: str):
url: [TODO:description]
"""
if EXECUTABLE := shutil.which("icat"):
subprocess.run([EXECUTABLE, url])
subprocess.run([EXECUTABLE, url], check=False)
else:
EXECUTABLE = shutil.which("chafa")
@@ -30,4 +30,4 @@ def print_img(url: str):
subprocess.run([EXECUTABLE, url, "--size=15x15"], input=img_bytes)
"""
subprocess.run([EXECUTABLE, "--size=15x15"], input=img_bytes)
subprocess.run([EXECUTABLE, "--size=15x15"], input=img_bytes, check=False)

View File

@@ -27,7 +27,8 @@ def SyncPlayer(url: str, anime_title=None, headers={}, subtitles=[], *args):
[
SYNCPLAY_EXECUTABLE,
url,
]
],
check=False,
)
else:
subprocess.run(
@@ -37,7 +38,8 @@ def SyncPlayer(url: str, anime_title=None, headers={}, subtitles=[], *args):
"--",
f"--force-media-title={anime_title}",
*mpv_args,
]
],
check=False,
)
# for compatability

View File

@@ -1,13 +1,14 @@
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from typing import Any, Callable
from collections.abc import Callable
from typing import Any
from ...libs.anilist.types import AnilistBaseMediaDataSchema
from ...libs.anime_provider.types import Anime, EpisodeStream, SearchResult, Server
class FastAnimeRuntimeState(object):
class FastAnimeRuntimeState:
"""A class that manages fastanime runtime during anilist command runtime"""
provider_current_episode_stream_link: str

View File

@@ -0,0 +1,148 @@
import os
import pathlib
import re
import shlex
import shutil
import subprocess
import sys
import requests
from rich import print
from ... import APP_NAME, AUTHOR, GIT_REPO, __version__
API_URL = f"https://api.{GIT_REPO}/repos/{AUTHOR}/{APP_NAME}/releases/latest"
def check_for_updates():
USER_AGENT = f"{APP_NAME} user"
try:
request = requests.get(
API_URL,
headers={
"User-Agent": USER_AGENT,
"X-GitHub-Api-Version": "2022-11-28",
"Accept": "application/vnd.github+json",
},
)
except Exception:
print("You are not connected to the internet")
return True, {}
if request.status_code == 200:
release_json = request.json()
remote_tag = list(
map(int, release_json["tag_name"].replace("v", "").split("."))
)
local_tag = list(map(int, __version__.replace("v", "").split(".")))
if (
(remote_tag[0] > local_tag[0])
or (remote_tag[1] > local_tag[1] and remote_tag[0] == local_tag[0])
or (
remote_tag[2] > local_tag[2]
and remote_tag[0] == local_tag[0]
and remote_tag[1] == local_tag[1]
)
):
is_latest = False
else:
is_latest = True
return (is_latest, release_json)
else:
print("Failed to check for updates")
print(request.text)
return (True, {})
def is_git_repo(author, repository):
# Check if the current directory contains a .git folder
git_dir = pathlib.Path(".git")
if not git_dir.exists() or not git_dir.is_dir():
return False
# Check if the config file exists
config_path = git_dir / "config"
if not config_path.exists():
return False
try:
# Read the .git/config file to find the remote repository URL
with config_path.open("r") as git_config:
git_config_content = git_config.read()
except (FileNotFoundError, PermissionError):
return False
# Use regex to find the repository URL in the config file
repo_name_pattern = r"url\s*=\s*.+/([^/]+/[^/]+)\.git"
match = re.search(repo_name_pattern, git_config_content)
# Return True if match found and repository name matches
return bool(match) and match.group(1) == f"{author}/{repository}"
def update_app(force=False):
is_latest, release_json = check_for_updates()
if is_latest and not force:
print("[green]App is up to date[/]")
return False, release_json
tag_name = release_json["tag_name"]
print("[cyan]Updating app to version %s[/]" % tag_name)
if os.path.exists("/nix/store") and os.path.exists("/run/current-system"):
NIX = shutil.which("nix")
if not NIX:
print("[red]Cannot find nix, it looks like your system is broken.[/]")
return False, release_json
process = subprocess.run(
[NIX, "profile", "upgrade", APP_NAME.lower()], check=False
)
elif is_git_repo(AUTHOR, APP_NAME):
GIT_EXECUTABLE = shutil.which("git")
args = [
GIT_EXECUTABLE,
"pull",
]
print(f"Pulling latest changes from the repository via git: {shlex.join(args)}")
if not GIT_EXECUTABLE:
print("[red]Cannot find git please install it.[/]")
return False, release_json
process = subprocess.run(
args,
check=False,
)
elif UV := shutil.which("uv"):
process = subprocess.run([UV, "tool", "upgrade", APP_NAME], check=False)
elif PIPX := shutil.which("pipx"):
process = subprocess.run([PIPX, "upgrade", APP_NAME], check=False)
else:
PYTHON_EXECUTABLE = sys.executable
args = [
PYTHON_EXECUTABLE,
"-m",
"pip",
"install",
APP_NAME,
"-U",
"--no-warn-script-location",
]
if sys.prefix == sys.base_prefix:
# ensure NOT in a venv, where --user flag can cause an error.
# TODO: Get value of 'include-system-site-packages' in pyenv.cfg.
args.append("--user")
process = subprocess.run(args, check=False)
if process.returncode == 0:
print(
"[green]Its recommended to run the following after updating:\n\tfastanime config --update (to get the latest config docs)\n\tfastanime cache --clean (to get rid of any potential issues)[/]",
file=sys.stderr,
)
return True, release_json
else:
return False, release_json

View File

@@ -39,8 +39,7 @@ def get_requested_quality_or_default_to_first(url, quality):
m3u8_format["height"] < quality_u and m3u8_format["height"] > quality_l
):
return m3u8_format["url"]
else:
return m3u8_formats[0]["url"]
return m3u8_formats[0]["url"]
def move_preferred_subtitle_lang_to_top(sub_list, lang_str):
@@ -78,20 +77,19 @@ def filter_by_quality(quality: str, stream_links: "list[EpisodeStream]", default
# some providers have inaccurate/weird/non-standard eg qualities 718 instead of 720
if Q <= q + 80 and Q >= q - 80:
return stream_link
else:
if stream_links and default:
from rich import print
if stream_links and default:
from rich import print
try:
print("[yellow bold]WARNING Qualities were:[/] ", stream_links)
print(
"[cyan bold]Using default of quality:[/] ",
stream_links[0]["quality"],
)
return stream_links[0]
except Exception as e:
print(e)
return
try:
print("[yellow bold]WARNING Qualities were:[/] ", stream_links)
print(
"[cyan bold]Using default of quality:[/] ",
stream_links[0]["quality"],
)
return stream_links[0]
except Exception as e:
print(e)
return
def format_bytes_to_human(num_of_bytes: float, suffix: str = "B"):
@@ -195,4 +193,8 @@ def which_bashlike():
Returns:
the path to the bash executable or None if not found
"""
return (shutil.which("bash") or "bash") if S_PLATFORM != "win32" else which_win32_gitbash()
return (
(shutil.which("bash") or "bash")
if S_PLATFORM != "win32"
else which_win32_gitbash()
)