feat: cleanup

This commit is contained in:
Benexl
2025-07-22 14:55:38 +03:00
parent 60c583d115
commit 384d326fa8
27 changed files with 20 additions and 2457 deletions

View File

@@ -1,5 +1,17 @@
{
"allanime":{
"1p":"One Piece"
}
}
"allanime": {
"1P": "one piece",
"Magia Record: Mahou Shoujo Madoka☆Magica Gaiden (TV)": "Mahou Shoujo Madoka☆Magica",
"Dungeon ni Deai o Motomeru no wa Machigatte Iru Darouka": "Dungeon ni Deai wo Motomeru no wa Machigatteiru Darou ka",
"Hazurewaku no \"Joutai Ijou Skill\" de Saikyou ni Natta Ore ga Subete wo Juurin suru made": "Hazure Waku no [Joutai Ijou Skill] de Saikyou ni Natta Ore ga Subete wo Juurin Suru made",
"Re:Zero kara Hajimeru Isekai Seikatsu Season 3": "Re:Zero kara Hajimeru Isekai Seikatsu 3rd Season"
},
"hianime": {
"My Star": "Oshi no Ko"
},
"animepahe": {
"Azumanga Daiou The Animation": "Azumanga Daioh",
"Mairimashita! Iruma-kun 2nd Season": "Mairimashita! Iruma-kun 2",
"Mairimashita! Iruma-kun 3rd Season": "Mairimashita! Iruma-kun 3"
}
}

View File

@@ -27,12 +27,9 @@ if TYPE_CHECKING:
commands = {
"config": ".config",
"search": ".search",
"download": ".download",
"anilist": ".anilist",
"queue": ".queue",
"service": ".service",
"config": "config.config",
"search": "search.search",
"anilist": "anilist.anilist",
}

View File

@@ -1,5 +0,0 @@
from .anilist import anilist
from .config import config
from .search import search
__all__ = ["config", "search", "anilist"]

View File

@@ -1,178 +0,0 @@
"""
Single download command for the anilist CLI.
Handles downloading specific episodes or continuing from watch history.
"""
import click
from pathlib import Path
from typing import List, Optional
from ....core.config.model import AppConfig
from ....libs.api.types import MediaItem
from ...services.downloads import get_download_manager
from ...services.watch_history.manager import WatchHistoryManager
def parse_episode_range(range_str: str) -> List[int]:
"""Parse episode range string into list of episode numbers."""
episodes = []
for part in range_str.split(','):
part = part.strip()
if '-' in part:
start, end = map(int, part.split('-', 1))
episodes.extend(range(start, end + 1))
else:
episodes.append(int(part))
return sorted(set(episodes)) # Remove duplicates and sort
@click.command(name="download")
@click.argument("query", required=False)
@click.option("--episode", "-e", type=int, help="Specific episode number")
@click.option("--range", "-r", help="Episode range (e.g., 1-12, 5,7,9)")
@click.option("--quality", "-q",
type=click.Choice(["360", "480", "720", "1080", "best"]),
help="Preferred download quality")
@click.option("--continue", "continue_watch", is_flag=True,
help="Continue from watch history")
@click.option("--background", "-b", is_flag=True,
help="Download in background")
@click.option("--path", type=click.Path(exists=True, file_okay=False, dir_okay=True),
help="Custom download location")
@click.option("--subtitles/--no-subtitles", default=None,
help="Include subtitles (overrides config)")
@click.option("--priority", type=int, default=0,
help="Download priority (higher number = higher priority)")
@click.pass_context
def download(ctx: click.Context, query: Optional[str], episode: Optional[int],
range: Optional[str], quality: Optional[str], continue_watch: bool,
background: bool, path: Optional[str], subtitles: Optional[bool],
priority: int):
"""
Download anime episodes with tracking.
Examples:
\b
# Download specific episode
fastanime anilist download "Attack on Titan" --episode 1
\b
# Download episode range
fastanime anilist download "Naruto" --range "1-5,10,15-20"
\b
# Continue from watch history
fastanime anilist download --continue
\b
# Download with custom quality
fastanime anilist download "One Piece" --episode 1000 --quality 720
"""
config: AppConfig = ctx.obj
download_manager = get_download_manager(config.downloads)
try:
# Handle continue from watch history
if continue_watch:
if query:
click.echo("--continue flag cannot be used with a search query", err=True)
ctx.exit(1)
# Get current watching anime from history
watch_manager = WatchHistoryManager()
current_watching = watch_manager.get_currently_watching()
if not current_watching:
click.echo("No anime currently being watched found in history", err=True)
ctx.exit(1)
if len(current_watching) == 1:
media_item = current_watching[0].media_item
next_episode = current_watching[0].last_watched_episode + 1
episodes_to_download = [next_episode]
else:
# Multiple anime, let user choose
click.echo("Multiple anime found in watch history:")
for i, entry in enumerate(current_watching):
title = entry.media_item.title.english or entry.media_item.title.romaji
next_ep = entry.last_watched_episode + 1
click.echo(f" {i + 1}. {title} (next episode: {next_ep})")
choice = click.prompt("Select anime to download", type=int)
if choice < 1 or choice > len(current_watching):
click.echo("Invalid selection", err=True)
ctx.exit(1)
selected_entry = current_watching[choice - 1]
media_item = selected_entry.media_item
next_episode = selected_entry.last_watched_episode + 1
episodes_to_download = [next_episode]
else:
# Search for anime
if not query:
click.echo("Query is required when not using --continue", err=True)
ctx.exit(1)
# TODO: Integrate with search functionality
# For now, this is a placeholder - you'll need to integrate with your existing search system
click.echo(f"Searching for: {query}")
click.echo("Note: Search integration not yet implemented in this example")
ctx.exit(1)
# Determine episodes to download
if episode:
episodes_to_download = [episode]
elif range:
try:
episodes_to_download = parse_episode_range(range)
except ValueError as e:
click.echo(f"Invalid episode range: {e}", err=True)
ctx.exit(1)
elif not continue_watch:
# Default to episode 1 if nothing specified
episodes_to_download = [1]
# Validate episodes
if not episodes_to_download:
click.echo("No episodes specified for download", err=True)
ctx.exit(1)
if media_item.episodes and max(episodes_to_download) > media_item.episodes:
click.echo(f"Episode {max(episodes_to_download)} exceeds total episodes ({media_item.episodes})", err=True)
ctx.exit(1)
# Use quality from config if not specified
if not quality:
quality = config.downloads.preferred_quality
# Add to download queue
success = download_manager.add_to_queue(
media_item=media_item,
episodes=episodes_to_download,
quality=quality,
priority=priority
)
if success:
title = media_item.title.english or media_item.title.romaji
episode_text = f"episode {episodes_to_download[0]}" if len(episodes_to_download) == 1 else f"{len(episodes_to_download)} episodes"
click.echo(f"✓ Added {episode_text} of '{title}' to download queue")
if background:
click.echo("Download will continue in the background")
else:
click.echo("Run 'fastanime anilist downloads status' to monitor progress")
else:
click.echo("Failed to add episodes to download queue", err=True)
ctx.exit(1)
except Exception as e:
click.echo(f"Error: {e}", err=True)
ctx.exit(1)

View File

@@ -1,381 +0,0 @@
"""
Downloads management commands for the anilist CLI.
Provides comprehensive download management including listing, status monitoring,
cleanup, and verification operations.
"""
import click
import json
from datetime import datetime
from pathlib import Path
from typing import Optional
from ....core.config.model import AppConfig
from ...services.downloads import get_download_manager
from ...services.downloads.validator import DownloadValidator
def format_size(size_bytes: int) -> str:
"""Format file size in human-readable format."""
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if size_bytes < 1024.0:
return f"{size_bytes:.1f} {unit}"
size_bytes /= 1024.0
return f"{size_bytes:.1f} PB"
def format_duration(seconds: Optional[float]) -> str:
"""Format duration in human-readable format."""
if seconds is None:
return "Unknown"
if seconds < 60:
return f"{seconds:.0f}s"
elif seconds < 3600:
return f"{seconds/60:.0f}m {seconds%60:.0f}s"
else:
hours = seconds // 3600
minutes = (seconds % 3600) // 60
return f"{hours:.0f}h {minutes:.0f}m"
@click.group(name="downloads")
@click.pass_context
def downloads(ctx: click.Context):
"""Manage downloaded anime."""
pass
@downloads.command()
@click.option("--status",
type=click.Choice(["all", "completed", "active", "failed", "paused"]),
default="all",
help="Filter by download status")
@click.option("--format", "output_format",
type=click.Choice(["table", "json", "simple"]),
default="table",
help="Output format")
@click.option("--limit", type=int, help="Limit number of results")
@click.pass_context
def list(ctx: click.Context, status: str, output_format: str, limit: Optional[int]):
"""List all downloads."""
config: AppConfig = ctx.obj
download_manager = get_download_manager(config.downloads)
try:
# Get download records
status_filter = None if status == "all" else status
records = download_manager.list_downloads(status_filter=status_filter, limit=limit)
if not records:
click.echo("No downloads found")
return
if output_format == "json":
# JSON output
output_data = []
for record in records:
output_data.append({
"media_id": record.media_item.id,
"title": record.display_title,
"status": record.status,
"episodes_downloaded": record.total_episodes_downloaded,
"total_episodes": record.media_item.episodes or 0,
"completion_percentage": record.completion_percentage,
"total_size_gb": record.total_size_gb,
"last_updated": record.last_updated.isoformat()
})
click.echo(json.dumps(output_data, indent=2))
elif output_format == "simple":
# Simple text output
for record in records:
title = record.display_title
status_emoji = {
"completed": "",
"active": "",
"failed": "",
"paused": ""
}.get(record.status, "?")
click.echo(f"{status_emoji} {title} ({record.total_episodes_downloaded}/{record.media_item.episodes or 0} episodes)")
else:
# Table output (default)
click.echo()
click.echo("Downloads:")
click.echo("=" * 80)
# Header
header = f"{'Title':<30} {'Status':<10} {'Episodes':<12} {'Size':<10} {'Updated':<15}"
click.echo(header)
click.echo("-" * 80)
# Rows
for record in records:
title = record.display_title
if len(title) > 28:
title = title[:25] + "..."
status_display = record.status.capitalize()
episodes_display = f"{record.total_episodes_downloaded}/{record.media_item.episodes or '?'}"
size_display = format_size(record.total_size_bytes)
updated_display = record.last_updated.strftime("%Y-%m-%d")
row = f"{title:<30} {status_display:<10} {episodes_display:<12} {size_display:<10} {updated_display:<15}"
click.echo(row)
click.echo("-" * 80)
click.echo(f"Total: {len(records)} anime")
except Exception as e:
click.echo(f"Error listing downloads: {e}", err=True)
ctx.exit(1)
@downloads.command()
@click.pass_context
def status(ctx: click.Context):
"""Show download queue status and statistics."""
config: AppConfig = ctx.obj
download_manager = get_download_manager(config.downloads)
try:
# Get statistics
stats = download_manager.get_download_stats()
click.echo()
click.echo("Download Statistics:")
click.echo("=" * 40)
click.echo(f"Total Anime: {stats.get('total_anime', 0)}")
click.echo(f"Total Episodes: {stats.get('total_episodes', 0)}")
click.echo(f"Total Size: {stats.get('total_size_gb', 0):.2f} GB")
click.echo(f"Queue Size: {stats.get('queue_size', 0)}")
# Show completion stats
completion_stats = stats.get('completion_stats', {})
if completion_stats:
click.echo()
click.echo("Status Breakdown:")
click.echo("-" * 20)
for status, count in completion_stats.items():
click.echo(f" {status.capitalize()}: {count}")
# Show active downloads
queue = download_manager._load_queue()
if queue.items:
click.echo()
click.echo("Download Queue:")
click.echo("-" * 30)
for item in queue.items[:5]: # Show first 5 items
title = f"Media {item.media_id}" # Would need to lookup title
click.echo(f" Episode {item.episode_number} of {title} ({item.quality_preference})")
if len(queue.items) > 5:
click.echo(f" ... and {len(queue.items) - 5} more items")
except Exception as e:
click.echo(f"Error getting download status: {e}", err=True)
ctx.exit(1)
@downloads.command()
@click.option("--dry-run", is_flag=True, help="Show what would be cleaned without doing it")
@click.pass_context
def clean(ctx: click.Context, dry_run: bool):
"""Clean up failed downloads and orphaned entries."""
config: AppConfig = ctx.obj
download_manager = get_download_manager(config.downloads)
try:
if dry_run:
click.echo("Dry run mode - no changes will be made")
click.echo()
# Clean up failed downloads
if not dry_run:
failed_count = download_manager.cleanup_failed_downloads()
click.echo(f"Cleaned up {failed_count} failed downloads")
else:
click.echo("Would clean up failed downloads older than retention period")
# Clean up orphaned files
validator = DownloadValidator(download_manager)
if not dry_run:
orphaned_count = validator.cleanup_orphaned_files()
click.echo(f"Cleaned up {orphaned_count} orphaned files")
else:
click.echo("Would clean up orphaned files and fix index inconsistencies")
if dry_run:
click.echo()
click.echo("Run without --dry-run to perform actual cleanup")
except Exception as e:
click.echo(f"Error during cleanup: {e}", err=True)
ctx.exit(1)
@downloads.command()
@click.argument("media_id", type=int, required=False)
@click.option("--all", "verify_all", is_flag=True, help="Verify all downloads")
@click.pass_context
def verify(ctx: click.Context, media_id: Optional[int], verify_all: bool):
"""Verify download integrity for specific anime or all downloads."""
config: AppConfig = ctx.obj
download_manager = get_download_manager(config.downloads)
try:
validator = DownloadValidator(download_manager)
if verify_all:
click.echo("Generating comprehensive validation report...")
report = validator.generate_validation_report()
click.echo()
click.echo("Validation Report:")
click.echo("=" * 50)
click.echo(f"Total Records: {report['total_records']}")
click.echo(f"Valid Records: {report['valid_records']}")
click.echo(f"Invalid Records: {report['invalid_records']}")
click.echo(f"Integrity Issues: {report['integrity_issues']}")
click.echo(f"Path Issues: {report['path_issues']}")
click.echo(f"Orphaned Files: {report['orphaned_files']}")
if report['details']['invalid_files']:
click.echo()
click.echo("Invalid Files:")
for file_path in report['details']['invalid_files']:
click.echo(f" - {file_path}")
if report['details']['integrity_failures']:
click.echo()
click.echo("Integrity Failures:")
for failure in report['details']['integrity_failures']:
click.echo(f" - {failure['title']}: Episodes {failure['failed_episodes']}")
elif media_id:
record = download_manager.get_download_record(media_id)
if not record:
click.echo(f"No download record found for media ID {media_id}", err=True)
ctx.exit(1)
click.echo(f"Verifying downloads for: {record.display_title}")
# Verify integrity
integrity_results = validator.verify_file_integrity(record)
# Verify paths
path_issues = validator.validate_file_paths(record)
# Display results
click.echo()
click.echo("Episode Verification:")
click.echo("-" * 30)
for episode_num, episode_download in record.episodes.items():
status_emoji = "" if integrity_results.get(episode_num, False) else ""
click.echo(f" {status_emoji} Episode {episode_num} ({episode_download.status})")
if not integrity_results.get(episode_num, False):
if not episode_download.file_path.exists():
click.echo(f" - File missing: {episode_download.file_path}")
elif episode_download.checksum and not episode_download.verify_integrity():
click.echo(f" - Checksum mismatch")
if path_issues:
click.echo()
click.echo("Path Issues:")
for issue in path_issues:
click.echo(f" - {issue}")
else:
click.echo("Specify --all to verify all downloads or provide a media ID", err=True)
ctx.exit(1)
except Exception as e:
click.echo(f"Error during verification: {e}", err=True)
ctx.exit(1)
@downloads.command()
@click.argument("output_file", type=click.Path())
@click.option("--format", "export_format",
type=click.Choice(["json", "csv"]),
default="json",
help="Export format")
@click.pass_context
def export(ctx: click.Context, output_file: str, export_format: str):
"""Export download list to a file."""
config: AppConfig = ctx.obj
download_manager = get_download_manager(config.downloads)
try:
records = download_manager.list_downloads()
output_path = Path(output_file)
if export_format == "json":
export_data = []
for record in records:
export_data.append({
"media_id": record.media_item.id,
"title": record.display_title,
"status": record.status,
"episodes": {
str(ep_num): {
"episode_number": ep.episode_number,
"file_path": str(ep.file_path),
"file_size": ep.file_size,
"quality": ep.quality,
"status": ep.status,
"download_date": ep.download_date.isoformat()
}
for ep_num, ep in record.episodes.items()
},
"download_path": str(record.download_path),
"created_date": record.created_date.isoformat(),
"last_updated": record.last_updated.isoformat()
})
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(export_data, f, indent=2, ensure_ascii=False)
elif export_format == "csv":
import csv
with open(output_path, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
# Write header
writer.writerow([
"Media ID", "Title", "Status", "Episodes Downloaded",
"Total Episodes", "Total Size (GB)", "Last Updated"
])
# Write data
for record in records:
writer.writerow([
record.media_item.id,
record.display_title,
record.status,
record.total_episodes_downloaded,
record.media_item.episodes or 0,
f"{record.total_size_gb:.2f}",
record.last_updated.strftime("%Y-%m-%d %H:%M:%S")
])
click.echo(f"Exported {len(records)} download records to {output_path}")
except Exception as e:
click.echo(f"Error exporting downloads: {e}", err=True)
ctx.exit(1)

View File

@@ -1,56 +0,0 @@
import click
@click.command(
help="Helper command to manage cache",
epilog="""
\b
\b\bExamples:
# delete everything in the cache dir
fastanime cache --clean
\b
# print the path to the cache dir and exit
fastanime cache --path
\b
# print the current size of the cache dir and exit
fastanime cache --size
\b
# open the cache dir and exit
fastanime cache
""",
)
@click.option("--clean", help="Clean the cache dir", is_flag=True)
@click.option("--path", help="The path to the cache dir", is_flag=True)
@click.option("--size", help="The size of the cache dir", is_flag=True)
def cache(clean, path, size):
from ...constants import APP_CACHE_DIR
if path:
print(APP_CACHE_DIR)
elif clean:
import shutil
from rich.prompt import Confirm
if Confirm.ask(
f"Are you sure you want to clean the following path: {APP_CACHE_DIR};(NOTE: !!The action is irreversible and will clean your cache!!)",
default=False,
):
print("Cleaning...")
shutil.rmtree(APP_CACHE_DIR)
print("Successfully removed: ", APP_CACHE_DIR)
elif size:
import os
from ..utils.utils import format_bytes_to_human
total_size = 0
for dirpath, dirnames, filenames in os.walk(APP_CACHE_DIR):
for f in filenames:
fp = os.path.join(dirpath, f)
total_size += os.path.getsize(fp)
print("Total Size: ", format_bytes_to_human(total_size))
else:
import click
click.launch(APP_CACHE_DIR)

View File

@@ -1,239 +0,0 @@
from typing import TYPE_CHECKING
import click
from ..utils.completions import anime_titles_shell_complete
if TYPE_CHECKING:
from ..config import Config
@click.command(
help="Helper command to get streams for anime to use externally in a non-python application",
short_help="Print anime streams to standard out",
epilog="""
\b
\b\bExamples:
# --- print anime info + episode streams ---
\b
# multiple titles can be specified with the -t option
fastanime grab -t <anime-title> -t <anime-title>
# -- or --
# print all available episodes
fastanime grab -t <anime-title> -r ':'
\b
# print the latest episode
fastanime grab -t <anime-title> -r '-1'
\b
# print a specific episode range
# be sure to observe the range Syntax
fastanime grab -t <anime-title> -r '<start>:<stop>'
\b
fastanime grab -t <anime-title> -r '<start>:<stop>:<step>'
\b
fastanime grab -t <anime-title> -r '<start>:'
\b
fastanime grab -t <anime-title> -r ':<end>'
\b
# --- grab options ---
\b
# print search results only
fastanime grab -t <anime-title> -r <range> --search-results-only
\b
# print anime info only
fastanime grab -t <anime-title> -r <range> --anime-info-only
\b
# print episode streams only
fastanime grab -t <anime-title> -r <range> --episode-streams-only
""",
)
@click.option(
"--anime-titles",
"--anime_title",
"-t",
required=True,
shell_complete=anime_titles_shell_complete,
multiple=True,
help="Specify which anime to download",
)
@click.option(
"--episode-range",
"-r",
help="A range of episodes to download (start-end)",
)
@click.option(
"--search-results-only",
"-s",
help="print only the search results to stdout",
is_flag=True,
)
@click.option(
"--anime-info-only", "-i", help="print only selected anime title info", is_flag=True
)
@click.option(
"--episode-streams-only",
"-e",
help="print only selected anime episodes streams of given range",
is_flag=True,
)
@click.pass_obj
def grab(
config: "Config",
anime_titles: tuple,
episode_range,
search_results_only,
anime_info_only,
episode_streams_only,
):
import json
from logging import getLogger
from sys import exit
from thefuzz import fuzz
logger = getLogger(__name__)
if config.manga:
manga_title = anime_titles[0]
from ...MangaProvider import MangaProvider
manga_provider = MangaProvider()
search_data = manga_provider.search_for_manga(manga_title)
if not search_data:
exit(1)
if search_results_only:
print(json.dumps(search_data))
exit(0)
search_results = search_data["results"]
if not search_results:
logger.error("no results for your search")
exit(1)
search_results_ = {
search_result["title"]: search_result for search_result in search_results
}
search_result_anime_title = max(
search_results_.keys(), key=lambda title: fuzz.ratio(title, anime_titles[0])
)
manga_info = manga_provider.get_manga(
search_results_[search_result_anime_title]["id"]
)
if not manga_info:
return
if anime_info_only:
print(json.dumps(manga_info))
exit(0)
chapter_info = manga_provider.get_chapter_thumbnails(
manga_info["id"], str(episode_range)
)
if not chapter_info:
exit(1)
print(json.dumps(chapter_info))
else:
from ...BaseAnimeProvider import BaseAnimeProvider
anime_provider = BaseAnimeProvider(config.provider)
grabbed_animes = []
for anime_title in anime_titles:
# ---- search for anime ----
search_results = anime_provider.search_for_anime(
anime_title, translation_type=config.translation_type
)
if not search_results:
exit(1)
if search_results_only:
# grab only search results skipping all lines after this
grabbed_animes.append(search_results)
continue
search_results = search_results["results"]
if not search_results:
logger.error("no results for your search")
exit(1)
search_results_ = {
search_result["title"]: search_result
for search_result in search_results
}
search_result_anime_title = max(
search_results_.keys(), key=lambda title: fuzz.ratio(title, anime_title)
)
# ---- fetch anime ----
anime = anime_provider.get_anime(
search_results_[search_result_anime_title]["id"]
)
if not anime:
exit(1)
if anime_info_only:
# grab only the anime data skipping all lines after this
grabbed_animes.append(anime)
continue
episodes = sorted(
anime["availableEpisodesDetail"][config.translation_type], key=float
)
# where the magic happens
if episode_range:
if ":" in episode_range:
ep_range_tuple = episode_range.split(":")
if len(ep_range_tuple) == 2 and all(ep_range_tuple):
episodes_start, episodes_end = ep_range_tuple
episodes_range = episodes[
int(episodes_start) : int(episodes_end)
]
elif len(ep_range_tuple) == 3 and all(ep_range_tuple):
episodes_start, episodes_end, step = ep_range_tuple
episodes_range = episodes[
int(episodes_start) : int(episodes_end) : int(step)
]
else:
episodes_start, episodes_end = ep_range_tuple
if episodes_start.strip():
episodes_range = episodes[int(episodes_start) :]
elif episodes_end.strip():
episodes_range = episodes[: int(episodes_end)]
else:
episodes_range = episodes
else:
episodes_range = episodes[int(episode_range) :]
else:
episodes_range = sorted(episodes, key=float)
if not episode_streams_only:
grabbed_anime = dict(anime)
grabbed_anime["requested_episodes"] = episodes_range
grabbed_anime["translation_type"] = config.translation_type
grabbed_anime["episodes_streams"] = {}
else:
grabbed_anime = {}
# lets download em
for episode in episodes_range:
if episode not in episodes:
continue
streams = anime_provider.get_episode_streams(
anime["id"], episode, config.translation_type
)
if not streams:
continue
episode_streams = {server["server"]: server for server in streams}
if episode_streams_only:
grabbed_anime[episode] = episode_streams
else:
grabbed_anime["episodes_streams"][ # pyright:ignore
episode
] = episode_streams
# grab the full data for single title and appen to final result or episode streams
grabbed_animes.append(grabbed_anime)
# print out the final result either {} or [] depending if more than one title os requested
if len(grabbed_animes) == 1:
print(json.dumps(grabbed_animes[0]))
else:
print(json.dumps(grabbed_animes))

View File

@@ -1,38 +0,0 @@
import click
from ...core.config import AppConfig
from ...libs.api.factory import create_api_client
from ...libs.api.params import ApiSearchParams
@click.group(hidden=True)
def helpers_cmd():
"""A hidden group for helper commands called by shell scripts."""
pass
@helpers_cmd.command("search-as-you-type")
@click.argument("query", required=False, default="")
@click.pass_obj
def search_as_you_type(config: AppConfig, query: str):
"""
Performs a live search on AniList and prints results formatted for fzf.
Called by an fzf `reload` binding.
"""
if not query or len(query) < 3:
# Don't search for very short queries to avoid spamming the API
return
api_client = create_api_client(config.general.media_api, config)
search_params = ApiSearchParams(query=query, per_page=25)
results = api_client.search_media(search_params)
if not results or not results.media:
return
# Format output for fzf: one line per item.
for item in results.media:
title = item.title.english or item.title.romaji or "Unknown Title"
score = f"{item.average_score / 10 if item.average_score else 'N/A'}"
# Use a unique, parsable format. The title must come last for the preview helper.
click.echo(f"{item.id} | Score: {score} | {title}")

View File

@@ -1,55 +0,0 @@
import click
@click.command(
help="Helper command to update fastanime to latest",
epilog="""
\b
\b\bExamples:
# update fastanime to latest
fastanime update
\b
# check for latest release
fastanime update --check
# Force an update regardless of the current version
fastanime update --force
""",
)
@click.option("--check", "-c", help="Check for the latest release", is_flag=True)
@click.option("--force", "-c", help="Force update", is_flag=True)
def update(check, force):
from rich.console import Console
from rich.markdown import Markdown
from ... import __version__
from ..app_updater import check_for_updates, update_app
def _print_release(release_data):
console = Console()
body = Markdown(release_data["body"])
tag = github_release_data["tag_name"]
tag_title = release_data["name"]
github_page_url = release_data["html_url"]
console.print(f"Release Page: {github_page_url}")
console.print(f"Tag: {tag}")
console.print(f"Title: {tag_title}")
console.print(body)
if check:
is_latest, github_release_data = check_for_updates()
if not is_latest:
print(
f"You are running an older version ({__version__}) of fastanime please update to get the latest features"
)
_print_release(github_release_data)
else:
print(f"You are running the latest version ({__version__}) of fastanime")
_print_release(github_release_data)
else:
success, github_release_data = update_app(force)
_print_release(github_release_data)
if success:
print("Successfully updated")
else:
print("failed to update")

View File

@@ -1,26 +0,0 @@
SERVERS_AVAILABLE = ["HD1", "HD2", "StreamSB", "StreamTape"]
""""
| "hd-1"
| "hd-2"
| "megacloud"
| "streamsb"
| "streamtape";
"""
"""
VidStreaming = "hd-1",
MegaCloud = "megacloud",
StreamSB = "streamsb",
StreamTape = "streamtape",
VidCloud = "hd-2",
AsianLoad = "asianload",
GogoCDN = "gogocdn",
MixDrop = "mixdrop",
UpCloud = "upcloud",
VizCloud = "vizcloud",
MyCloud = "mycloud",
Filemoon = "filemoon",
"""

View File

@@ -1,191 +0,0 @@
import hashlib
import json
import re
import time
from base64 import b64decode
from typing import TYPE_CHECKING
from Crypto.Cipher import AES
if TYPE_CHECKING:
from ...common.requests_cacher import CachedRequestsSession
# Constants
megacloud = {
"script": "https://megacloud.tv/js/player/a/prod/e1-player.min.js?v=",
"sources": "https://megacloud.tv/embed-2/ajax/e-1/getSources?id=",
}
class HiAnimeError(Exception):
def __init__(self, message, context, status_code):
super().__init__(f"{context}: {message} (Status: {status_code})")
self.context = context
self.status_code = status_code
# Adapted from https://github.com/ghoshRitesh12/aniwatch
class MegaCloud:
def __init__(self, session):
self.session: CachedRequestsSession = session
def extract(self, video_url: str) -> dict:
try:
extracted_data = {
"tracks": [],
"intro": {"start": 0, "end": 0},
"outro": {"start": 0, "end": 0},
"sources": [],
}
video_id = video_url.split("/")[-1].split("?")[0]
response = self.session.get(
megacloud["sources"] + video_id,
headers={
"Accept": "*/*",
"X-Requested-With": "XMLHttpRequest",
"Referer": video_url,
},
fresh=1, # pyright: ignore
)
srcs_data = response.json()
if not srcs_data:
raise HiAnimeError(
"Url may have an invalid video id", "getAnimeEpisodeSources", 400
)
encrypted_string = srcs_data["sources"]
if not srcs_data["encrypted"] and isinstance(encrypted_string, list):
extracted_data.update(
{
"intro": srcs_data["intro"],
"outro": srcs_data["outro"],
"tracks": srcs_data["tracks"],
"sources": [
{"url": s["file"], "type": s["type"]}
for s in encrypted_string
],
}
)
return extracted_data
# Fetch decryption script
script_response = self.session.get(
megacloud["script"] + str(int(time.time() * 1000)),
fresh=1, # pyright: ignore
)
script_text = script_response.text
if not script_text:
raise HiAnimeError(
"Couldn't fetch script to decrypt resource",
"getAnimeEpisodeSources",
500,
)
vars_ = self.extract_variables(script_text)
if not vars_:
raise Exception(
"Can't find variables. Perhaps the extractor is outdated."
)
secret, encrypted_source = self.get_secret(encrypted_string, vars_)
decrypted = self.decrypt(encrypted_source, secret)
try:
sources = json.loads(decrypted)
extracted_data.update(
{
"intro": srcs_data["intro"],
"outro": srcs_data["outro"],
"tracks": srcs_data["tracks"],
"sources": [
{"url": s["file"], "type": s["type"]} for s in sources
],
}
)
return extracted_data
except Exception:
raise HiAnimeError(
"Failed to decrypt resource", "getAnimeEpisodeSources", 500
)
except Exception as err:
raise err
def extract_variables(self, text: str) -> list[list[int]]:
regex = r"case\s*0x[0-9a-f]+:(?![^;]*=partKey)\s*\w+\s*=\s*(\w+)\s*,\s*\w+\s*=\s*(\w+);"
matches = re.finditer(regex, text)
vars_ = []
for match in matches:
key1 = self.matching_key(match[1], text)
key2 = self.matching_key(match[2], text)
try:
vars_.append([int(key1, 16), int(key2, 16)])
except ValueError:
continue
return vars_
def get_secret(
self, encrypted_string: str, values: list[list[int]]
) -> tuple[str, str]:
secret = []
encrypted_source_array = list(encrypted_string)
current_index = 0
for start, length in values:
start += current_index
end = start + length
secret.extend(encrypted_string[start:end])
encrypted_source_array[start:end] = [""] * length
current_index += length
encrypted_source = "".join(encrypted_source_array) # .replace("\x00", "")
return ("".join(secret), encrypted_source)
def decrypt(self, encrypted: str, key_or_secret: str, maybe_iv: str = "") -> str:
if maybe_iv:
key = key_or_secret.encode()
iv = maybe_iv.encode()
contents = encrypted
else:
# Decode the Base64 string
cypher = b64decode(encrypted)
# Extract the salt from the cypher text
salt = cypher[8:16]
# Combine the key_or_secret with the salt
password = key_or_secret.encode() + salt
# Generate MD5 hashes
md5_hashes = []
digest = password
for _ in range(3):
md5 = hashlib.md5()
md5.update(digest)
md5_hashes.append(md5.digest())
digest = md5_hashes[-1] + password
# Derive the key and IV
key = md5_hashes[0] + md5_hashes[1]
iv = md5_hashes[2]
# Extract the encrypted contents
contents = cypher[16:]
# Initialize the AES decipher
decipher = AES.new(key, AES.MODE_CBC, iv)
# Decrypt and decode
decrypted = decipher.decrypt(contents).decode("utf-8") # pyright: ignore
# Remove any padding (PKCS#7)
pad = ord(decrypted[-1])
return decrypted[:-pad]
def matching_key(self, value: str, script: str) -> str:
match = re.search(rf",{value}=((?:0x)?[0-9a-fA-F]+)", script)
if match:
return match.group(1).replace("0x", "")
raise Exception("Failed to match the key")

View File

@@ -1,274 +0,0 @@
import logging
import re
from html.parser import HTMLParser
from itertools import cycle
from urllib.parse import quote_plus
from yt_dlp.utils import (
clean_html,
extract_attributes,
get_element_by_class,
get_element_html_by_class,
get_elements_by_class,
get_elements_html_by_class,
)
from ..base import BaseAnimeProvider
from ..decorators import debug_provider
from ..utils.utils import give_random_quality
from .constants import SERVERS_AVAILABLE
from .extractors import MegaCloud
from .types import HiAnimeStream
logger = logging.getLogger(__name__)
LINK_TO_STREAMS_REGEX = re.compile(r".*://(.*)/embed-(2|4|6)/e-([0-9])/(.*)\?.*")
IMAGE_HTML_ELEMENT_REGEX = re.compile(r"<img.*?>")
class ParseAnchorAndImgTag(HTMLParser):
def __init__(self):
super().__init__()
self.img_tag = None
self.a_tag = None
def handle_starttag(self, tag, attrs):
if tag == "img":
self.img_tag = {attr[0]: attr[1] for attr in attrs}
if tag == "a":
self.a_tag = {attr[0]: attr[1] for attr in attrs}
class HiAnime(BaseAnimeProvider):
# HEADERS = {"Referer": "https://hianime.to/home"}
@debug_provider
def search_for_anime(self, anime_title: str, translation_type, **kwargs):
query = quote_plus(anime_title)
url = f"https://hianime.to/search?keyword={query}"
response = self.session.get(url)
if not response.ok:
return
search_page = response.text
search_results_html_items = get_elements_by_class("flw-item", search_page)
results = []
for search_results_html_item in search_results_html_items:
film_poster_html = get_element_by_class(
"film-poster", search_results_html_item
)
if not film_poster_html:
continue
# get availableEpisodes
episodes_html = get_element_html_by_class("tick-sub", film_poster_html)
episodes = clean_html(episodes_html) or 12
# get anime id and poster image url
parser = ParseAnchorAndImgTag()
parser.feed(film_poster_html)
image_data = parser.img_tag
anime_link_data = parser.a_tag
if not image_data or not anime_link_data:
continue
episodes = int(episodes)
# finally!!
image_link = image_data["data-src"]
anime_id = anime_link_data["data-id"]
title = anime_link_data["title"]
result = {
"availableEpisodes": list(range(1, episodes)),
"id": anime_id,
"title": title,
"poster": image_link,
}
results.append(result)
self.store.set(result["id"], "search_result", result)
return {"pageInfo": {}, "results": results}
@debug_provider
def get_anime(self, hianime_id, **kwargs):
anime_result = {}
if d := self.store.get(str(hianime_id), "search_result"):
anime_result = d
anime_url = f"https://hianime.to/ajax/v2/episode/list/{hianime_id}"
response = self.session.get(anime_url, timeout=10)
if response.ok:
response_json = response.json()
hianime_anime_page = response_json["html"]
episodes_info_container_html = get_element_html_by_class(
"ss-list", hianime_anime_page
)
episodes_info_html_list = get_elements_html_by_class(
"ep-item", episodes_info_container_html
)
# keys: [ data-number: episode_number, data-id: episode_id, title: episode_title , href:episode_page_url]
episodes_info_dicts = [
extract_attributes(episode_dict)
for episode_dict in episodes_info_html_list
]
episodes = [episode["data-number"] for episode in episodes_info_dicts]
episodes_info = [
{
"id": episode["data-id"],
"title": (
(episode["title"] or "").replace(
f"Episode {episode['data-number']}", ""
)
or anime_result["title"]
)
+ f"; Episode {episode['data-number']}",
"episode": episode["data-number"],
}
for episode in episodes_info_dicts
]
self.store.set(
str(hianime_id),
"anime_info",
episodes_info,
)
return {
"id": hianime_id,
"availableEpisodesDetail": {
"dub": episodes,
"sub": episodes,
"raw": episodes,
},
"poster": anime_result["poster"],
"title": anime_result["title"],
"episodes_info": episodes_info,
}
@debug_provider
def get_episode_streams(self, anime_id, episode, translation_type, **kwargs):
if d := self.store.get(str(anime_id), "anime_info"):
episodes_info = d
episode_details = [
episode_details
for episode_details in episodes_info
if episode_details["episode"] == episode
]
if not episode_details:
return
episode_details = episode_details[0]
episode_url = f"https://hianime.to/ajax/v2/episode/servers?episodeId={episode_details['id']}"
response = self.session.get(episode_url)
if response.ok:
response_json = response.json()
episode_page_html = response_json["html"]
servers_containers_html = get_elements_html_by_class(
"ps__-list", episode_page_html
)
if not servers_containers_html:
return
# sub servers
try:
servers_html_sub = get_elements_html_by_class(
"server-item", servers_containers_html[0]
)
except Exception:
logger.warning("HiAnime: sub not found")
servers_html_sub = None
# dub servers
try:
servers_html_dub = get_elements_html_by_class(
"server-item", servers_containers_html[1]
)
except Exception:
logger.warning("HiAnime: dub not found")
servers_html_dub = None
if translation_type == "dub":
servers_html = servers_html_dub
else:
servers_html = servers_html_sub
if not servers_html:
return
@debug_provider
def _get_server(server_name, server_html):
# keys: [ data-type: translation_type, data-id: embed_id, data-server-id: server_id ]
servers_info = extract_attributes(server_html)
server_id = servers_info["data-id"]
embed_url = (
f"https://hianime.to/ajax/v2/episode/sources?id={server_id}"
)
embed_response = self.session.get(embed_url)
if embed_response.ok:
embed_json = embed_response.json()
raw_link_to_streams = embed_json["link"]
match server_name:
# TODO: Finish the other servers
case "HD2":
data = MegaCloud(self.session).extract(
raw_link_to_streams
)
return {
"headers": {},
"subtitles": [
{
"url": track["file"],
"language": track["label"],
}
for track in data["tracks"]
if track["kind"] == "captions"
],
"server": server_name,
"episode_title": episode_details["title"],
"links": give_random_quality(
[
{"link": link["url"]}
for link in data["sources"]
]
),
}
case _:
# NOTE: THIS METHOD DOES'NT WORK will get the other servers later
match = LINK_TO_STREAMS_REGEX.match(raw_link_to_streams)
if not match:
return
provider_domain = match.group(1)
embed_type = match.group(2)
episode_number = match.group(3)
source_id = match.group(4)
link_to_streams = f"https://{provider_domain}/embed-{embed_type}/ajax/e-{episode_number}/getSources?id={source_id}"
link_to_streams_response = self.session.get(
link_to_streams
)
if link_to_streams_response.ok:
juicy_streams_json: HiAnimeStream = (
link_to_streams_response.json()
)
return {
"headers": {},
"subtitles": [
{
"url": track["file"],
"language": track["label"],
}
for track in juicy_streams_json["tracks"]
if track["kind"] == "captions"
],
"server": server_name,
"episode_title": episode_details["title"],
"links": give_random_quality(
[
{"link": link["file"]}
for link in juicy_streams_json["tracks"]
]
),
}
for server_name, server_html in zip(
cycle(SERVERS_AVAILABLE), servers_html
):
if server_name == "HD2":
if server := _get_server(server_name, server_html):
yield server

View File

@@ -1,26 +0,0 @@
from typing import Literal, TypedDict
class HiAnimeSkipTime(TypedDict):
start: int
end: int
class HiAnimeSource(TypedDict):
file: str
type: str
class HiAnimeTrack(TypedDict):
file: str
label: str
kind: Literal["captions", "thumbnails", "audio"]
class HiAnimeStream(TypedDict):
sources: list[HiAnimeSource]
tracks: list[HiAnimeTrack]
encrypted: bool
intro: HiAnimeSkipTime
outro: HiAnimeSkipTime
server: int

View File

@@ -1 +0,0 @@
NYAA_ENDPOINT = "https://nyaa.si"

View File

@@ -1,342 +0,0 @@
import os
import re
from logging import getLogger
from yt_dlp.utils import (
extract_attributes,
get_element_html_by_attribute,
get_element_html_by_class,
get_element_text_and_html_by_tag,
get_elements_html_by_class,
)
from ...common.mini_anilist import search_for_anime_with_anilist
from ..base import BaseAnimeProvider
from ..decorators import debug_provider
from ..types import SearchResults
from .constants import NYAA_ENDPOINT
logger = getLogger(__name__)
EXTRACT_USEFUL_INFO_PATTERN_1 = re.compile(
r"\[(\w+)\] (.+) - (\d+) [\[\(](\d+)p[\]\)].*"
)
EXTRACT_USEFUL_INFO_PATTERN_2 = re.compile(
r"\[(\w+)\] (.+)E(\d+) [\[\(]?(\d+)p.*[\]\)]?.*"
)
class Nyaa(BaseAnimeProvider):
search_results: SearchResults
@debug_provider
def search_for_anime(self, user_query: str, *args, **_):
self.search_results = search_for_anime_with_anilist(user_query, True) # pyright: ignore
self.user_query = user_query
return self.search_results
@debug_provider
def get_anime(self, anilist_id: str, *_):
for anime in self.search_results["results"]:
if anime["id"] == anilist_id:
self.titles = [anime["title"], *anime["otherTitles"], self.user_query]
return {
"id": anime["id"],
"title": anime["title"],
"poster": anime["poster"],
"availableEpisodesDetail": {
"dub": anime["availableEpisodes"],
"sub": anime["availableEpisodes"],
"raw": anime["availableEpisodes"],
},
}
@debug_provider
def get_episode_streams(
self,
anime_id: str,
episode_number: str,
translation_type: str,
trusted_only=bool(int(os.environ.get("FA_NYAA_TRUSTED_ONLY", "0"))),
allow_dangerous=bool(int(os.environ.get("FA_NYAA_ALLOW_DANGEROUS", "0"))),
sort_by="seeders",
*args,
):
anime_title = self.titles[0]
logger.debug(f"Searching nyaa for query: '{anime_title} {episode_number}'")
servers = {}
torrents_table = ""
for title in self.titles:
try:
url_arguments: dict[str, str] = {
"c": "1_2", # Language (English)
"q": f"{title} {'0' if len(episode_number) == 1 else ''}{episode_number}", # Search Query
}
# url_arguments["q"] = anime_title
# if trusted_only:
# url_arguments["f"] = "2" # Trusted uploaders only
# What to sort torrents by
if sort_by == "seeders":
url_arguments["s"] = "seeders"
elif sort_by == "date":
url_arguments["s"] = "id"
elif sort_by == "size":
url_arguments["s"] = "size"
elif sort_by == "comments":
url_arguments["s"] = "comments"
logger.debug(f"URL Arguments: {url_arguments}")
response = self.session.get(NYAA_ENDPOINT, params=url_arguments)
if not response.ok:
logger.error(f"[NYAA]: {response.text}")
return
try:
torrents_table = get_element_text_and_html_by_tag(
"table", response.text
)
except Exception as e:
logger.error(f"[NYAA]: {e}")
continue
if not torrents_table:
continue
for anime_torrent in get_elements_html_by_class(
"success", torrents_table[1]
):
td_title = get_element_html_by_attribute(
"colspan", "2", anime_torrent
)
if not td_title:
continue
title_anchor_tag = get_element_text_and_html_by_tag("a", td_title)
if not title_anchor_tag:
continue
title_anchor_tag_attrs = extract_attributes(title_anchor_tag[1])
if not title_anchor_tag_attrs:
continue
if "class" in title_anchor_tag_attrs:
td_title = td_title.replace(title_anchor_tag[1], "")
title_anchor_tag = get_element_text_and_html_by_tag(
"a", td_title
)
if not title_anchor_tag:
continue
title_anchor_tag_attrs = extract_attributes(title_anchor_tag[1])
if not title_anchor_tag_attrs:
continue
anime_title_info = title_anchor_tag_attrs["title"]
if not anime_title_info:
continue
match = EXTRACT_USEFUL_INFO_PATTERN_1.search(
anime_title_info.strip()
)
if not match:
continue
server = match[1]
match[2]
_episode_number = match[3]
quality = match[4]
if float(episode_number) != float(_episode_number):
continue
links_td = get_element_html_by_class("text-center", anime_torrent)
if not links_td:
continue
torrent_anchor_tag = get_element_text_and_html_by_tag("a", links_td)
if not torrent_anchor_tag:
continue
torrent_anchor_tag_atrrs = extract_attributes(torrent_anchor_tag[1])
if not torrent_anchor_tag_atrrs:
continue
torrent_file_url = (
f"{NYAA_ENDPOINT}{torrent_anchor_tag_atrrs['href']}"
)
if server in servers:
link = {
"translation_type": "sub",
"link": torrent_file_url,
"quality": quality,
}
if link not in servers[server]["links"]:
servers[server]["links"].append(link)
else:
servers[server] = {
"server": server,
"headers": {},
"episode_title": f"{anime_title}; Episode {episode_number}",
"subtitles": [],
"links": [
{
"translation_type": "sub",
"link": torrent_file_url,
"quality": quality,
}
],
}
for anime_torrent in get_elements_html_by_class(
"default", torrents_table[1]
):
td_title = get_element_html_by_attribute(
"colspan", "2", anime_torrent
)
if not td_title:
continue
title_anchor_tag = get_element_text_and_html_by_tag("a", td_title)
if not title_anchor_tag:
continue
title_anchor_tag_attrs = extract_attributes(title_anchor_tag[1])
if not title_anchor_tag_attrs:
continue
if "class" in title_anchor_tag_attrs:
td_title = td_title.replace(title_anchor_tag[1], "")
title_anchor_tag = get_element_text_and_html_by_tag(
"a", td_title
)
if not title_anchor_tag:
continue
title_anchor_tag_attrs = extract_attributes(title_anchor_tag[1])
if not title_anchor_tag_attrs:
continue
anime_title_info = title_anchor_tag_attrs["title"]
if not anime_title_info:
continue
match = EXTRACT_USEFUL_INFO_PATTERN_2.search(
anime_title_info.strip()
)
if not match:
continue
server = match[1]
match[2]
_episode_number = match[3]
quality = match[4]
if float(episode_number) != float(_episode_number):
continue
links_td = get_element_html_by_class("text-center", anime_torrent)
if not links_td:
continue
torrent_anchor_tag = get_element_text_and_html_by_tag("a", links_td)
if not torrent_anchor_tag:
continue
torrent_anchor_tag_atrrs = extract_attributes(torrent_anchor_tag[1])
if not torrent_anchor_tag_atrrs:
continue
torrent_file_url = (
f"{NYAA_ENDPOINT}{torrent_anchor_tag_atrrs['href']}"
)
if server in servers:
link = {
"translation_type": "sub",
"link": torrent_file_url,
"quality": quality,
}
if link not in servers[server]["links"]:
servers[server]["links"].append(link)
else:
servers[server] = {
"server": server,
"headers": {},
"episode_title": f"{anime_title}; Episode {episode_number}",
"subtitles": [],
"links": [
{
"translation_type": "sub",
"link": torrent_file_url,
"quality": quality,
}
],
}
if not allow_dangerous:
break
for anime_torrent in get_elements_html_by_class(
"danger", torrents_table[1]
):
td_title = get_element_html_by_attribute(
"colspan", "2", anime_torrent
)
if not td_title:
continue
title_anchor_tag = get_element_text_and_html_by_tag("a", td_title)
if not title_anchor_tag:
continue
title_anchor_tag_attrs = extract_attributes(title_anchor_tag[1])
if not title_anchor_tag_attrs:
continue
if "class" in title_anchor_tag_attrs:
td_title = td_title.replace(title_anchor_tag[1], "")
title_anchor_tag = get_element_text_and_html_by_tag(
"a", td_title
)
if not title_anchor_tag:
continue
title_anchor_tag_attrs = extract_attributes(title_anchor_tag[1])
if not title_anchor_tag_attrs:
continue
anime_title_info = title_anchor_tag_attrs["title"]
if not anime_title_info:
continue
match = EXTRACT_USEFUL_INFO_PATTERN_2.search(
anime_title_info.strip()
)
if not match:
continue
server = match[1]
match[2]
_episode_number = match[3]
quality = match[4]
if float(episode_number) != float(_episode_number):
continue
links_td = get_element_html_by_class("text-center", anime_torrent)
if not links_td:
continue
torrent_anchor_tag = get_element_text_and_html_by_tag("a", links_td)
if not torrent_anchor_tag:
continue
torrent_anchor_tag_atrrs = extract_attributes(torrent_anchor_tag[1])
if not torrent_anchor_tag_atrrs:
continue
torrent_file_url = (
f"{NYAA_ENDPOINT}{torrent_anchor_tag_atrrs['href']}"
)
if server in servers:
link = {
"translation_type": "sub",
"link": torrent_file_url,
"quality": quality,
}
if link not in servers[server]["links"]:
servers[server]["links"].append(link)
else:
servers[server] = {
"server": server,
"headers": {},
"episode_title": f"{anime_title}; Episode {episode_number}",
"subtitles": [],
"links": [
{
"translation_type": "sub",
"link": torrent_file_url,
"quality": quality,
}
],
}
except Exception as e:
logger.error(f"[NYAA]: {e}")
continue
for server in servers:
yield servers[server]

View File

@@ -1,126 +0,0 @@
import logging
import os
import sys
import time
import libtorrent # pyright: ignore
from rich import print
from rich.progress import (
BarColumn,
DownloadColumn,
Progress,
TextColumn,
TimeRemainingColumn,
TransferSpeedColumn,
)
logger = logging.getLogger("nyaa")
def download_torrent(
filename: str,
result_filename: str | None = None,
show_progress: bool = True,
base_path: str = "Anime",
) -> str:
session = libtorrent.session({"listen_interfaces": "0.0.0.0:6881"})
logger.debug("Started libtorrent session")
base_path = os.path.expanduser(base_path)
logger.debug(f"Downloading output to: '{base_path}'")
info = libtorrent.torrent_info(filename)
logger.debug("Started downloading torrent")
handle: libtorrent.torrent_handle = session.add_torrent(
{"ti": info, "save_path": base_path}
)
status: libtorrent.session_status = handle.status()
progress_bar = Progress(
"[progress.description]{task.description}",
BarColumn(bar_width=None),
"[progress.percentage]{task.percentage:>3.1f}%",
"",
DownloadColumn(),
"",
TransferSpeedColumn(),
"",
TimeRemainingColumn(),
"",
TextColumn("[green]Peers: {task.fields[peers]}[/green]"),
)
if show_progress:
with progress_bar:
download_task = progress_bar.add_task(
"downloading",
filename=status.name,
total=status.total_wanted,
peers=0,
start=False,
)
while not status.total_done:
# Checking files
status = handle.status()
description = "[bold yellow]Checking files[/bold yellow]"
progress_bar.update(
download_task,
completed=status.total_done,
peers=status.num_peers,
description=description,
)
# Started download
progress_bar.start_task(download_task)
description = f"[bold blue]Downloading[/bold blue] [bold yellow]{result_filename}[/bold yellow]"
while not status.is_seeding:
status = handle.status()
progress_bar.update(
download_task,
completed=status.total_done,
peers=status.num_peers,
description=description,
)
alerts = session.pop_alerts()
alert: libtorrent.alert
for alert in alerts:
if (
alert.category()
& libtorrent.alert.category_t.error_notification
):
logger.debug(f"[Alert] {alert}")
time.sleep(1)
progress_bar.update(
download_task,
description=f"[bold blue]Finished Downloading[/bold blue] [bold green]{result_filename}[/bold green]",
completed=status.total_wanted,
)
if result_filename:
old_name = f"{base_path}/{status.name}"
new_name = f"{base_path}/{result_filename}"
os.rename(old_name, new_name)
logger.debug(f"Finished torrent download, renamed '{old_name}' to '{new_name}'")
return new_name
return ""
if __name__ == "__main__":
if len(sys.argv) < 2:
print("You need to pass in the .torrent file path.")
sys.exit(1)
download_torrent(sys.argv[1])

View File

@@ -7,7 +7,6 @@ from yt_dlp.utils.networking import random_user_agent
from .allanime.constants import SERVERS_AVAILABLE as ALLANIME_SERVERS
from .animepahe.constants import SERVERS_AVAILABLE as ANIMEPAHE_SERVERS
from .base import BaseAnimeProvider
from .hianime.constants import SERVERS_AVAILABLE as HIANIME_SERVERS
logger = logging.getLogger(__name__)
@@ -18,7 +17,7 @@ PROVIDERS_AVAILABLE = {
"nyaa": "provider.Nyaa",
"yugen": "provider.Yugen",
}
SERVERS_AVAILABLE = ["TOP", *ALLANIME_SERVERS, *ANIMEPAHE_SERVERS, *HIANIME_SERVERS]
SERVERS_AVAILABLE = ["TOP", *ALLANIME_SERVERS, *ANIMEPAHE_SERVERS]
class AnimeProviderFactory:

View File

@@ -1,15 +0,0 @@
import logging
from requests import get
logger = logging.getLogger(__name__)
def fetch_anime_info_from_bal(anilist_id):
try:
url = f"https://raw.githubusercontent.com/bal-mackup/mal-backup/master/anilist/anime/{anilist_id}.json"
response = get(url, timeout=11)
if response.status_code == 200:
return response.json()
except Exception as e:
logger.error(e)

View File

@@ -1,33 +0,0 @@
"""
Just contains some useful data used across the codebase
"""
# useful incases where the anilist title is too different from the provider title
anime_normalizer_raw = {
"allanime": {
"1P": "one piece",
"Magia Record: Mahou Shoujo Madoka☆Magica Gaiden (TV)": "Mahou Shoujo Madoka☆Magica",
"Dungeon ni Deai o Motomeru no wa Machigatte Iru Darouka": "Dungeon ni Deai wo Motomeru no wa Machigatteiru Darou ka",
'Hazurewaku no "Joutai Ijou Skill" de Saikyou ni Natta Ore ga Subete wo Juurin suru made': "Hazure Waku no [Joutai Ijou Skill] de Saikyou ni Natta Ore ga Subete wo Juurin Suru made",
"Re:Zero kara Hajimeru Isekai Seikatsu Season 3": "Re:Zero kara Hajimeru Isekai Seikatsu 3rd Season",
},
"hianime": {"My Star": "Oshi no Ko"},
"animepahe": {
"Azumanga Daiou The Animation": "Azumanga Daioh",
"Mairimashita! Iruma-kun 2nd Season": "Mairimashita! Iruma-kun 2",
"Mairimashita! Iruma-kun 3rd Season": "Mairimashita! Iruma-kun 3",
},
"nyaa": {},
"yugen": {},
}
def get_anime_normalizer():
"""Used because there are different providers"""
import os
current_provider = os.environ.get("FASTANIME_PROVIDER", "allanime")
return anime_normalizer_raw[current_provider]
anime_normalizer = get_anime_normalizer()

View File

@@ -1,114 +0,0 @@
import json
import logging
import time
logger = logging.getLogger(__name__)
class ProviderStoreDB:
def __init__(
self,
provider_name,
cache_db_path: str,
max_lifetime: int = 604800,
max_size: int = (1024**2) * 10,
table_name: str = "fastanime_providers_store",
clean_db=False,
):
from ..common.sqlitedb_helper import SqliteDB
self.cache_db_path = cache_db_path
self.clean_db = clean_db
self.provider_name = provider_name
self.max_lifetime = max_lifetime
self.max_size = max_size
self.table_name = table_name
self.sqlite_db_connection = SqliteDB(self.cache_db_path)
# Prepare the cache table if it doesn't exist
self._create_store_table()
def _create_store_table(self):
"""Create cache table if it doesn't exist."""
with self.sqlite_db_connection as conn:
conn.execute(
f"""
CREATE TABLE IF NOT EXISTS {self.table_name} (
id TEXT,
data_type TEXT,
provider_name TEXT,
data TEXT,
cache_expiry INTEGER
)"""
)
def get(self, id: str, data_type: str, default=None):
with self.sqlite_db_connection as conn:
cursor = conn.cursor()
cursor.execute(
f"""
SELECT
data
FROM {self.table_name}
WHERE
id = ?
AND data_type = ?
AND provider_name = ?
AND cache_expiry > ?
""",
(id, data_type, self.provider_name, int(time.time())),
)
cached_data = cursor.fetchone()
if cached_data:
logger.debug("Found existing request in cache")
(json_data,) = cached_data
return json.loads(json_data)
return default
def set(self, id: str, data_type: str, data):
with self.sqlite_db_connection as connection:
cursor = connection.cursor()
cursor.execute(
f"""
INSERT INTO {self.table_name}
VALUES ( ?, ?,?, ?, ?)
""",
(
id,
data_type,
self.provider_name,
json.dumps(data),
int(time.time()) + self.max_lifetime,
),
)
class ProviderStoreMem:
def __init__(self) -> None:
from collections import defaultdict
self._store = defaultdict(dict)
def get(self, id: str, data_type: str, default=None):
return self._store[id][data_type]
def set(self, id: str, data_type: str, data):
self._store[id][data_type] = data
def ProviderStore(store_type, *args, **kwargs):
if store_type == "persistent":
return ProviderStoreDB(*args, **kwargs)
else:
return ProviderStoreMem()
if __name__ == "__main__":
store = ProviderStore("persistent", "test_provider", "provider_store")
store.set("123", "test", {"hello": "world"})
print(store.get("123", "test"))
print("-------------------------------")
store = ProviderStore("memory")
store.set("1", "test", {"hello": "world"})
print(store.get("1", "test"))

View File

@@ -1,70 +0,0 @@
import re
from itertools import cycle
# Dictionary to map hex values to characters
hex_to_char = {
"01": "9",
"08": "0",
"05": "=",
"0a": "2",
"0b": "3",
"0c": "4",
"07": "?",
"00": "8",
"5c": "d",
"0f": "7",
"5e": "f",
"17": "/",
"54": "l",
"09": "1",
"48": "p",
"4f": "w",
"0e": "6",
"5b": "c",
"5d": "e",
"0d": "5",
"53": "k",
"1e": "&",
"5a": "b",
"59": "a",
"4a": "r",
"4c": "t",
"4e": "v",
"57": "o",
"51": "i",
}
def give_random_quality(links):
qualities = cycle(["1080", "720", "480", "360"])
return [
{**episode_stream, "quality": quality}
for episode_stream, quality in zip(links, qualities, strict=False)
]
def one_digit_symmetric_xor(password: int, target: str):
def genexp():
for segment in bytearray.fromhex(target):
yield segment ^ password
return bytes(genexp()).decode("utf-8")
def decode_hex_string(hex_string):
"""some of the sources encrypt the urls into hex codes this function decrypts the urls
Args:
hex_string ([TODO:parameter]): [TODO:description]
Returns:
[TODO:return]
"""
# Split the hex string into pairs of characters
hex_pairs = re.findall("..", hex_string)
# Decode each hex pair
decoded_chars = [hex_to_char.get(pair.lower(), pair) for pair in hex_pairs]
return "".join(decoded_chars)

View File

@@ -1,48 +0,0 @@
import logging
from typing import TYPE_CHECKING
from thefuzz import fuzz
from .data import anime_normalizer
if TYPE_CHECKING:
from ..libs.anilist.types import AnilistBaseMediaDataSchema
logger = logging.getLogger(__name__)
def sort_by_episode_number(filename: str):
import re
match = re.search(r"\d+", filename)
return int(match.group()) if match else 0
def anime_title_percentage_match(
possible_user_requested_anime_title: str, anime: "AnilistBaseMediaDataSchema"
) -> float:
"""Returns the percentage match between the possible title and user title
Args:
possible_user_requested_anime_title (str): an Animdl search result title
title (str): the anime title the user wants
Returns:
int: the percentage match
"""
possible_user_requested_anime_title = anime_normalizer.get(
possible_user_requested_anime_title, possible_user_requested_anime_title
)
# compares both the romaji and english names and gets highest Score
title_a = str(anime["title"]["romaji"])
title_b = str(anime["title"]["english"])
percentage_ratio = max(
*[
fuzz.ratio(title.lower(), possible_user_requested_anime_title.lower())
for title in anime["synonyms"]
],
fuzz.ratio(title_a.lower(), possible_user_requested_anime_title.lower()),
fuzz.ratio(title_b.lower(), possible_user_requested_anime_title.lower()),
)
logger.info(f"{locals()}")
return percentage_ratio

View File

@@ -1,4 +0,0 @@
YUGEN_ENDPOINT: str = "https://yugenanime.tv"
SEARCH_URL = YUGEN_ENDPOINT + "/api/discover/"
SERVERS_AVAILABLE = ["gogoanime"]

View File

@@ -1,223 +0,0 @@
import base64
import re
from itertools import cycle
from yt_dlp.utils import (
extract_attributes,
get_element_by_attribute,
get_element_text_and_html_by_tag,
get_elements_text_and_html_by_attribute,
)
from yt_dlp.utils.traversal import get_element_html_by_attribute
from ..base import BaseAnimeProvider
from ..decorators import debug_provider
from .constants import SEARCH_URL, YUGEN_ENDPOINT
# ** Adapted from anipy-cli **
class Yugen(BaseAnimeProvider):
"""
Provides a fast and effective interface to YugenApi site.
"""
api_endpoint = YUGEN_ENDPOINT
# HEADERS = {
# "Referer": ALLANIME_REFERER,
# }
@debug_provider
def search_for_anime(
self,
user_query: str,
translation_type: str = "sub",
nsfw=True,
unknown=True,
**kwargs,
):
results = []
has_next = True
page = 0
while has_next:
page += 1
response = self.session.get(
SEARCH_URL, params={"q": user_query, "page": page}
)
search_results = response.json()
has_next = search_results["hasNext"]
results_html = search_results["query"]
anime = get_elements_text_and_html_by_attribute(
"class", "anime-meta", results_html, tag="a"
)
id_regex = re.compile(r"(\d+)\/([^\/]+)")
for _a in anime:
if not _a:
continue
a = extract_attributes(_a[1])
if not a:
continue
uri = a["href"]
identifier = id_regex.search(uri) # pyright:ignore
if identifier is None:
continue
if len(identifier.groups()) != 2:
continue
identifier = base64.b64encode(
f"{identifier.group(1)}/{identifier.group(2)}".encode()
).decode()
anime_title = a["title"]
languages = {"sub": 1, "dub": 0}
excl = get_element_by_attribute(
"class", "ani-exclamation", _a[1], tag="div"
)
if excl is not None:
if "dub" in excl.lower():
languages["dub"] = 1
results.append(
{
"id": identifier,
"title": anime_title,
"availableEpisodes": languages,
}
)
page += 1
return {
"pageInfo": {"total": len(results)},
"results": results,
}
@debug_provider
def get_anime(self, anime_id: str, **kwargs):
identifier = base64.b64decode(anime_id).decode()
response = self.session.get(f"{YUGEN_ENDPOINT}/anime/{identifier}")
html_page = response.text
data_map = {
"id": anime_id,
"title": None,
"poster": None,
"genres": [],
"synopsis": None,
"release_year": None,
"status": None,
"otherTitles": [],
"availableEpisodesDetail": {},
}
sub_match = re.search(
r'<div class="ap-.+?">Episodes</div><span class="description" .+?>(\d+)</span></div>',
html_page,
)
if sub_match:
eps = int(sub_match.group(1))
data_map["availableEpisodesDetail"]["sub"] = list(
map(str, range(1, eps + 1))
)
dub_match = re.search(
r'<div class="ap-.+?">Episodes \(Dub\)</div><span class="description" .+?>(\d+)</span></div>',
html_page,
)
if dub_match:
eps = int(dub_match.group(1))
data_map["availableEpisodesDetail"]["dub"] = list(
map(str, range(1, eps + 1))
)
name = get_element_text_and_html_by_tag("h1", html_page)
if name is not None:
data_map["title"] = name[0].strip()
synopsis = get_element_by_attribute("class", "description", html_page, tag="p")
if synopsis is not None:
data_map["synopsis"] = synopsis
# FIXME: This is not working because ytdl is too strict on also getting a closing tag
try:
image = get_element_html_by_attribute(
"class", "cover", html_page, tag="img"
)
img_attrs = extract_attributes(image)
if img_attrs is not None:
data_map["image"] = img_attrs.get("src")
except Exception:
pass
data = get_elements_text_and_html_by_attribute(
"class", "data", html_page, tag="div"
)
for d in data:
title = get_element_text_and_html_by_tag("div", d[1])
desc = get_element_text_and_html_by_tag("span", d[1])
if title is None or desc is None:
continue
title = title[0]
desc = desc[0]
if title in ["Native", "Romaji"]:
data_map["alternative_names"].append(desc)
elif title == "Synonyms":
data_map["alternative_names"].extend(desc.split(","))
elif title == "Premiered":
try:
data_map["release_year"] = int(desc.split()[-1])
except (ValueError, TypeError):
pass
elif title == "Status":
data_map["status"] = title
elif title == "Genres":
data_map["genres"].extend([g.strip() for g in desc.split(",")])
return data_map
@debug_provider
def get_episode_streams(
self, anime_id, episode_number: str, translation_type="sub"
):
"""get the streams of an episode
Args:
translation_type ([TODO:parameter]): [TODO:description]
anime: [TODO:description]
episode_number: [TODO:description]
Yields:
[TODO:description]
"""
identifier = base64.b64decode(anime_id).decode()
id_num, anime_title = identifier.split("/")
if translation_type == "dub":
video_query = f"{id_num}|{episode_number}|dub"
else:
video_query = f"{id_num}|{episode_number}"
res = self.session.post(
f"{YUGEN_ENDPOINT}/api/embed/",
data={
"id": base64.b64encode(video_query.encode()).decode(),
"ac": "0",
},
headers={"x-requested-with": "XMLHttpRequest"},
)
res = res.json()
yield {
"server": "gogoanime",
"episode_title": f"{anime_title}; Episode {episode_number}",
"headers": {},
"subtitles": [],
"links": [
{"quality": quality, "link": link}
for quality, link in zip(
cycle(["1080", "720", "480", "360"]), res["hls"]
)
],
}