Merge branch 'master' into master

This commit is contained in:
Elliott Ashby
2025-07-26 19:01:19 +09:00
committed by GitHub
56 changed files with 3948 additions and 2009 deletions

View File

@@ -10,6 +10,7 @@ commands = {
"download": "download.download",
# "downloads": "downloads.downloads",
"auth": "auth.auth",
"stats": "stats.stats",
}

View File

@@ -68,10 +68,10 @@ if TYPE_CHECKING:
epilog=examples.download,
)
@click.option(
"--title",
"-t",
"--title",
"-t",
shell_complete=anime_titles_shell_complete,
help="Title of the anime to search for"
help="Title of the anime to search for",
)
@click.option(
"--episode-range",
@@ -239,7 +239,9 @@ def download(config: AppConfig, **options: "Unpack[DownloadOptions]"):
# Initialize services
feedback.info("Initializing services...")
api_client, provider, selector, media_registry, download_service = _initialize_services(config)
api_client, provider, selector, media_registry, download_service = (
_initialize_services(config)
)
feedback.info(f"Using provider: {provider.__class__.__name__}")
feedback.info(f"Using media API: {config.general.media_api}")
feedback.info(f"Translation type: {config.stream.translation_type}")
@@ -256,16 +258,22 @@ def download(config: AppConfig, **options: "Unpack[DownloadOptions]"):
# Process each selected anime
for selected_anime in selected_anime_list:
feedback.info(f"Processing: {selected_anime.title.english or selected_anime.title.romaji}")
feedback.info(
f"Processing: {selected_anime.title.english or selected_anime.title.romaji}"
)
feedback.info(f"AniList ID: {selected_anime.id}")
# Get available episodes from provider
episodes_result = _get_available_episodes(provider, selected_anime, config, feedback)
episodes_result = _get_available_episodes(
provider, selected_anime, config, feedback
)
if not episodes_result:
feedback.warning(f"No episodes found for {selected_anime.title.english or selected_anime.title.romaji}")
feedback.warning(
f"No episodes found for {selected_anime.title.english or selected_anime.title.romaji}"
)
_suggest_alternatives(selected_anime, provider, config, feedback)
continue
# Unpack the result
if len(episodes_result) == 2:
available_episodes, provider_anime_data = episodes_result
@@ -282,32 +290,51 @@ def download(config: AppConfig, **options: "Unpack[DownloadOptions]"):
feedback.warning("No episodes selected for download")
continue
feedback.info(f"About to download {len(episodes_to_download)} episodes: {', '.join(episodes_to_download)}")
feedback.info(
f"About to download {len(episodes_to_download)} episodes: {', '.join(episodes_to_download)}"
)
# Test stream availability before attempting download (using provider anime data)
if episodes_to_download and provider_anime_data:
test_episode = episodes_to_download[0]
feedback.info(f"Testing stream availability for episode {test_episode}...")
success = _test_episode_stream_availability(provider, provider_anime_data, test_episode, config, feedback)
feedback.info(
f"Testing stream availability for episode {test_episode}..."
)
success = _test_episode_stream_availability(
provider, provider_anime_data, test_episode, config, feedback
)
if not success:
feedback.warning(f"Stream test failed for episode {test_episode}.")
feedback.info("Possible solutions:")
feedback.info("1. Try a different provider (check your config)")
feedback.info("2. Check if the episode number is correct")
feedback.info("3. Try a different translation type (sub/dub)")
feedback.info("4. The anime might not be available on this provider")
feedback.info(
"4. The anime might not be available on this provider"
)
# Ask user if they want to continue anyway
continue_anyway = input("\nContinue with download anyway? (y/N): ").strip().lower()
if continue_anyway not in ['y', 'yes']:
continue_anyway = (
input("\nContinue with download anyway? (y/N): ")
.strip()
.lower()
)
if continue_anyway not in ["y", "yes"]:
feedback.info("Download cancelled by user")
continue
# Download episodes (using provider anime data if available, otherwise AniList data)
anime_for_download = provider_anime_data if provider_anime_data else selected_anime
anime_for_download = (
provider_anime_data if provider_anime_data else selected_anime
)
_download_episodes(
download_service, anime_for_download, episodes_to_download,
quality, force_redownload, max_concurrent, feedback
download_service,
anime_for_download,
episodes_to_download,
quality,
force_redownload,
max_concurrent,
feedback,
)
# Show final statistics
@@ -333,18 +360,36 @@ def _validate_options(options: "DownloadOptions") -> None:
end_date_lesser = options.get("end_date_lesser")
# Score validation
if score_greater is not None and score_lesser is not None and score_greater > score_lesser:
if (
score_greater is not None
and score_lesser is not None
and score_greater > score_lesser
):
raise FastAnimeError("Minimum score cannot be higher than maximum score")
# Popularity validation
if popularity_greater is not None and popularity_lesser is not None and popularity_greater > popularity_lesser:
raise FastAnimeError("Minimum popularity cannot be higher than maximum popularity")
if (
popularity_greater is not None
and popularity_lesser is not None
and popularity_greater > popularity_lesser
):
raise FastAnimeError(
"Minimum popularity cannot be higher than maximum popularity"
)
# Date validation
if start_date_greater is not None and start_date_lesser is not None and start_date_greater > start_date_lesser:
if (
start_date_greater is not None
and start_date_lesser is not None
and start_date_greater > start_date_lesser
):
raise FastAnimeError("Minimum start date cannot be after maximum start date")
if end_date_greater is not None and end_date_lesser is not None and end_date_greater > end_date_lesser:
if (
end_date_greater is not None
and end_date_lesser is not None
and end_date_greater > end_date_lesser
):
raise FastAnimeError("Minimum end date cannot be after maximum end date")
@@ -353,27 +398,47 @@ def _initialize_services(config: AppConfig) -> tuple:
api_client = create_api_client(config.general.media_api, config)
provider = create_provider(config.general.provider)
selector = create_selector(config)
media_registry = MediaRegistryService(config.general.media_api, config.media_registry)
media_registry = MediaRegistryService(
config.general.media_api, config.media_registry
)
download_service = DownloadService(config, media_registry, provider)
return api_client, provider, selector, media_registry, download_service
def _build_search_params(options: "DownloadOptions", config: AppConfig) -> MediaSearchParams:
def _build_search_params(
options: "DownloadOptions", config: AppConfig
) -> MediaSearchParams:
"""Build MediaSearchParams from command options."""
return MediaSearchParams(
query=options.get("title"),
page=options.get("page", 1),
per_page=options.get("per_page") or config.anilist.per_page or 50,
sort=MediaSort(options.get("sort")) if options.get("sort") else None,
status_in=[MediaStatus(s) for s in options.get("status", ())] if options.get("status") else None,
status_not_in=[MediaStatus(s) for s in options.get("status_not", ())] if options.get("status_not") else None,
genre_in=[MediaGenre(g) for g in options.get("genres", ())] if options.get("genres") else None,
genre_not_in=[MediaGenre(g) for g in options.get("genres_not", ())] if options.get("genres_not") else None,
tag_in=[MediaTag(t) for t in options.get("tags", ())] if options.get("tags") else None,
tag_not_in=[MediaTag(t) for t in options.get("tags_not", ())] if options.get("tags_not") else None,
format_in=[MediaFormat(f) for f in options.get("media_format", ())] if options.get("media_format") else None,
type=MediaType(options.get("media_type")) if options.get("media_type") else None,
status_in=[MediaStatus(s) for s in options.get("status", ())]
if options.get("status")
else None,
status_not_in=[MediaStatus(s) for s in options.get("status_not", ())]
if options.get("status_not")
else None,
genre_in=[MediaGenre(g) for g in options.get("genres", ())]
if options.get("genres")
else None,
genre_not_in=[MediaGenre(g) for g in options.get("genres_not", ())]
if options.get("genres_not")
else None,
tag_in=[MediaTag(t) for t in options.get("tags", ())]
if options.get("tags")
else None,
tag_not_in=[MediaTag(t) for t in options.get("tags_not", ())]
if options.get("tags_not")
else None,
format_in=[MediaFormat(f) for f in options.get("media_format", ())]
if options.get("media_format")
else None,
type=MediaType(options.get("media_type"))
if options.get("media_type")
else None,
season=MediaSeason(options.get("season")) if options.get("season") else None,
seasonYear=int(year) if (year := options.get("year")) else None,
popularity_greater=options.get("popularity_greater"),
@@ -393,20 +458,24 @@ def _search_anime(api_client, search_params, feedback):
from rich.progress import Progress, SpinnerColumn, TextColumn
# Check if we have any search criteria at all
has_criteria = any([
search_params.query,
search_params.genre_in,
search_params.tag_in,
search_params.status_in,
search_params.season,
search_params.seasonYear,
search_params.format_in,
search_params.popularity_greater,
search_params.averageScore_greater,
])
has_criteria = any(
[
search_params.query,
search_params.genre_in,
search_params.tag_in,
search_params.status_in,
search_params.season,
search_params.seasonYear,
search_params.format_in,
search_params.popularity_greater,
search_params.averageScore_greater,
]
)
if not has_criteria:
raise FastAnimeError("Please provide at least one search criterion (title, genre, tag, status, etc.)")
raise FastAnimeError(
"Please provide at least one search criterion (title, genre, tag, status, etc.)"
)
with Progress(
SpinnerColumn(),
@@ -426,7 +495,9 @@ def _select_anime(search_result, selector, feedback):
"""Let user select anime from search results."""
if len(search_result.media) == 1:
selected_anime = search_result.media[0]
feedback.info(f"Auto-selected: {selected_anime.title.english or selected_anime.title.romaji}")
feedback.info(
f"Auto-selected: {selected_anime.title.english or selected_anime.title.romaji}"
)
return [selected_anime]
# Create choice strings with additional info
@@ -467,41 +538,53 @@ def _get_available_episodes(provider, anime, config, feedback):
try:
# Search for anime in provider first
media_title = anime.title.english or anime.title.romaji
feedback.info(f"Searching provider '{provider.__class__.__name__}' for: '{media_title}'")
feedback.info(
f"Searching provider '{provider.__class__.__name__}' for: '{media_title}'"
)
feedback.info(f"Using translation type: '{config.stream.translation_type}'")
provider_search_results = provider.search(
SearchParams(query=media_title, translation_type=config.stream.translation_type)
SearchParams(
query=media_title, translation_type=config.stream.translation_type
)
)
if not provider_search_results or not provider_search_results.results:
feedback.warning(f"Could not find '{media_title}' on provider '{provider.__class__.__name__}'")
feedback.warning(
f"Could not find '{media_title}' on provider '{provider.__class__.__name__}'"
)
return []
feedback.info(f"Found {len(provider_search_results.results)} results on provider")
feedback.info(
f"Found {len(provider_search_results.results)} results on provider"
)
# Show the first few results for debugging
for i, result in enumerate(provider_search_results.results[:3]):
feedback.info(f"Result {i+1}: ID={result.id}, Title='{getattr(result, 'title', 'Unknown')}'")
feedback.info(
f"Result {i + 1}: ID={result.id}, Title='{getattr(result, 'title', 'Unknown')}'"
)
# Get the first result (could be enhanced with fuzzy matching)
first_result = provider_search_results.results[0]
feedback.info(f"Using first result: ID={first_result.id}")
# Now get the full anime data using the PROVIDER'S ID, not AniList ID
provider_anime_data = provider.get(
AnimeParams(id=first_result.id, query=media_title)
)
if not provider_anime_data:
feedback.warning(f"Failed to get anime details from provider")
feedback.warning("Failed to get anime details from provider")
return []
# Check all available translation types
translation_types = ['sub', 'dub']
translation_types = ["sub", "dub"]
for trans_type in translation_types:
episodes = getattr(provider_anime_data.episodes, trans_type, [])
feedback.info(f"Translation '{trans_type}': {len(episodes)} episodes available")
feedback.info(
f"Translation '{trans_type}': {len(episodes)} episodes available"
)
available_episodes = getattr(
provider_anime_data.episodes, config.stream.translation_type, []
@@ -512,33 +595,46 @@ def _get_available_episodes(provider, anime, config, feedback):
# Suggest alternative translation type if available
for trans_type in translation_types:
if trans_type != config.stream.translation_type:
other_episodes = getattr(provider_anime_data.episodes, trans_type, [])
other_episodes = getattr(
provider_anime_data.episodes, trans_type, []
)
if other_episodes:
feedback.info(f"Suggestion: Try using translation type '{trans_type}' (has {len(other_episodes)} episodes)")
feedback.info(
f"Suggestion: Try using translation type '{trans_type}' (has {len(other_episodes)} episodes)"
)
return []
feedback.info(f"Found {len(available_episodes)} episodes available for download")
feedback.info(
f"Found {len(available_episodes)} episodes available for download"
)
# Return both episodes and the provider anime data for later use
return available_episodes, provider_anime_data
except Exception as e:
feedback.error(f"Error getting episodes from provider: {e}")
import traceback
feedback.error("Full traceback", traceback.format_exc())
return []
def _determine_episodes_to_download(episode_range, available_episodes, selector, feedback):
def _determine_episodes_to_download(
episode_range, available_episodes, selector, feedback
):
"""Determine which episodes to download based on range or user selection."""
if not available_episodes:
feedback.warning("No episodes available to download")
return []
if episode_range:
try:
episodes_to_download = list(parse_episode_range(episode_range, available_episodes))
feedback.info(f"Episodes from range '{episode_range}': {', '.join(episodes_to_download)}")
episodes_to_download = list(
parse_episode_range(episode_range, available_episodes)
)
feedback.info(
f"Episodes from range '{episode_range}': {', '.join(episodes_to_download)}"
)
return episodes_to_download
except (ValueError, IndexError) as e:
feedback.error(f"Invalid episode range '{episode_range}': {e}")
@@ -550,10 +646,10 @@ def _determine_episodes_to_download(episode_range, available_episodes, selector,
choices=available_episodes,
header="Use TAB to select multiple episodes, ENTER to confirm",
)
if selected_episodes:
feedback.info(f"Selected episodes: {', '.join(selected_episodes)}")
return selected_episodes
@@ -563,13 +659,17 @@ def _suggest_alternatives(anime, provider, config, feedback):
feedback.info(f"1. Current provider: {provider.__class__.__name__}")
feedback.info(f"2. AniList ID being used: {anime.id}")
feedback.info(f"3. Translation type: {config.stream.translation_type}")
# Special message for AllAnime provider
if provider.__class__.__name__ == "AllAnimeProvider":
feedback.info("4. AllAnime ID mismatch: AllAnime uses different IDs than AniList")
feedback.info(
"4. AllAnime ID mismatch: AllAnime uses different IDs than AniList"
)
feedback.info(" The provider searches by title, but episodes use AniList ID")
feedback.info(" This can cause episodes to not be found even if the anime exists")
feedback.info(
" This can cause episodes to not be found even if the anime exists"
)
# Check if provider has different ID mapping
anime_titles = []
if anime.title.english:
@@ -578,7 +678,7 @@ def _suggest_alternatives(anime, provider, config, feedback):
anime_titles.append(anime.title.romaji)
if anime.title.native:
anime_titles.append(anime.title.native)
feedback.info(f"5. Available titles: {', '.join(anime_titles)}")
feedback.info("6. Possible solutions:")
feedback.info(" - Try a different provider (GogoAnime, 9anime, etc.)")
@@ -588,7 +688,15 @@ def _suggest_alternatives(anime, provider, config, feedback):
feedback.info(" - Check if anime is available in your region")
def _download_episodes(download_service, anime, episodes, quality, force_redownload, max_concurrent, feedback):
def _download_episodes(
download_service,
anime,
episodes,
quality,
force_redownload,
max_concurrent,
feedback,
):
"""Download the specified episodes."""
from concurrent.futures import ThreadPoolExecutor, as_completed
from rich.console import Console
@@ -607,18 +715,19 @@ def _download_episodes(download_service, anime, episodes, quality, force_redownl
anime_title = anime.title.english or anime.title.romaji
console.print(f"\n[bold green]Starting downloads for: {anime_title}[/bold green]")
# Set up logging capture to get download errors
log_messages = []
class ListHandler(logging.Handler):
def emit(self, record):
log_messages.append(self.format(record))
handler = ListHandler()
handler.setLevel(logging.ERROR)
logger = logging.getLogger('fastanime')
logger = logging.getLogger("fastanime")
logger.addHandler(handler)
try:
with Progress(
SpinnerColumn(),
@@ -628,18 +737,19 @@ def _download_episodes(download_service, anime, episodes, quality, force_redownl
TaskProgressColumn(),
TimeElapsedColumn(),
) as progress:
task = progress.add_task("Downloading episodes...", total=len(episodes))
if max_concurrent == 1:
# Sequential downloads
results = {}
for episode in episodes:
progress.update(task, description=f"Downloading episode {episode}...")
progress.update(
task, description=f"Downloading episode {episode}..."
)
# Clear previous log messages for this episode
log_messages.clear()
try:
success = download_service.download_episode(
media_item=anime,
@@ -648,19 +758,26 @@ def _download_episodes(download_service, anime, episodes, quality, force_redownl
force_redownload=force_redownload,
)
results[episode] = success
if not success:
# Try to get more detailed error from registry
error_msg = _get_episode_error_details(download_service, anime, episode)
error_msg = _get_episode_error_details(
download_service, anime, episode
)
if error_msg:
feedback.error(f"Episode {episode}", error_msg)
elif log_messages:
# Show any log messages that were captured
for msg in log_messages[-3:]: # Show last 3 error messages
for msg in log_messages[
-3:
]: # Show last 3 error messages
feedback.error(f"Episode {episode}", msg)
else:
feedback.error(f"Episode {episode}", "Download failed - check logs for details")
feedback.error(
f"Episode {episode}",
"Download failed - check logs for details",
)
except Exception as e:
results[episode] = False
feedback.error(f"Episode {episode} failed", str(e))
@@ -681,7 +798,7 @@ def _download_episodes(download_service, anime, episodes, quality, force_redownl
): episode
for episode in episodes
}
# Process completed downloads
for future in as_completed(future_to_episode):
episode = future_to_episode[future]
@@ -690,15 +807,22 @@ def _download_episodes(download_service, anime, episodes, quality, force_redownl
results[episode] = success
if not success:
# Try to get more detailed error from registry
error_msg = _get_episode_error_details(download_service, anime, episode)
error_msg = _get_episode_error_details(
download_service, anime, episode
)
if error_msg:
feedback.error(f"Episode {episode}", error_msg)
else:
feedback.error(f"Episode {episode}", "Download failed - check logs for details")
feedback.error(
f"Episode {episode}",
"Download failed - check logs for details",
)
except Exception as e:
results[episode] = False
feedback.error(f"Download failed for episode {episode}", str(e))
feedback.error(
f"Download failed for episode {episode}", str(e)
)
progress.advance(task)
finally:
# Remove the log handler
@@ -715,13 +839,13 @@ def _get_episode_error_details(download_service, anime, episode_number):
media_record = download_service.media_registry.get_record(anime.id)
if not media_record:
return None
# Find the episode in the record
for episode_record in media_record.episodes:
if episode_record.episode_number == episode_number:
if episode_record.error_message:
error_msg = episode_record.error_message
# Provide more helpful error messages for common issues
if "Failed to get server for episode" in error_msg:
return f"Episode {episode_number} not available on current provider. Try a different provider or check episode number."
@@ -732,20 +856,24 @@ def _get_episode_error_details(download_service, anime, episode_number):
elif episode_record.download_status:
return f"Download status: {episode_record.download_status.value}"
break
return None
except Exception:
return None
def _test_episode_stream_availability(provider, anime, episode_number, config, feedback):
def _test_episode_stream_availability(
provider, anime, episode_number, config, feedback
):
"""Test if streams are available for a specific episode."""
try:
from .....libs.provider.anime.params import EpisodeStreamsParams
media_title = anime.title.english or anime.title.romaji
feedback.info(f"Testing stream availability for '{media_title}' episode {episode_number}")
feedback.info(
f"Testing stream availability for '{media_title}' episode {episode_number}"
)
# Test episode streams
streams = provider.episode_streams(
EpisodeStreamsParams(
@@ -755,29 +883,39 @@ def _test_episode_stream_availability(provider, anime, episode_number, config, f
translation_type=config.stream.translation_type,
)
)
if not streams:
feedback.warning(f"No streams found for episode {episode_number}")
return False
# Convert to list to check actual availability
stream_list = list(streams)
if not stream_list:
feedback.warning(f"No stream servers available for episode {episode_number}")
feedback.warning(
f"No stream servers available for episode {episode_number}"
)
return False
feedback.info(f"Found {len(stream_list)} stream server(s) for episode {episode_number}")
feedback.info(
f"Found {len(stream_list)} stream server(s) for episode {episode_number}"
)
# Show details about the first server for debugging
first_server = stream_list[0]
feedback.info(f"First server: name='{first_server.name}', type='{type(first_server).__name__}'")
feedback.info(
f"First server: name='{first_server.name}', type='{type(first_server).__name__}'"
)
return True
except TypeError as e:
if "'NoneType' object is not subscriptable" in str(e):
feedback.warning(f"Episode {episode_number} not available on provider (API returned null)")
feedback.info("This usually means the episode doesn't exist on this provider or isn't accessible")
feedback.warning(
f"Episode {episode_number} not available on provider (API returned null)"
)
feedback.info(
"This usually means the episode doesn't exist on this provider or isn't accessible"
)
return False
else:
feedback.error(f"Type error testing stream availability: {e}")
@@ -785,6 +923,7 @@ def _test_episode_stream_availability(provider, anime, episode_number, config, f
except Exception as e:
feedback.error(f"Error testing stream availability: {e}")
import traceback
feedback.error("Stream test traceback", traceback.format_exc())
return False
@@ -793,25 +932,31 @@ def _display_download_results(console, results: dict[str, bool], anime):
"""Display download results in a formatted table."""
from rich.table import Table
table = Table(title=f"Download Results for {anime.title.english or anime.title.romaji}")
table = Table(
title=f"Download Results for {anime.title.english or anime.title.romaji}"
)
table.add_column("Episode", justify="center", style="cyan")
table.add_column("Status", justify="center")
for episode, success in sorted(results.items(), key=lambda x: float(x[0])):
status = "[green]✓ Success[/green]" if success else "[red]✗ Failed[/red]"
table.add_row(episode, status)
console.print(table)
# Summary
total = len(results)
successful = sum(results.values())
failed = total - successful
if failed == 0:
console.print(f"\n[bold green]All {total} episodes downloaded successfully![/bold green]")
console.print(
f"\n[bold green]All {total} episodes downloaded successfully![/bold green]"
)
else:
console.print(f"\n[yellow]Download complete: {successful}/{total} successful, {failed} failed[/yellow]")
console.print(
f"\n[yellow]Download complete: {successful}/{total} successful, {failed} failed[/yellow]"
)
def _show_final_statistics(download_service, feedback):
@@ -820,17 +965,17 @@ def _show_final_statistics(download_service, feedback):
console = Console()
stats = download_service.get_download_statistics()
if stats:
console.print(f"\n[bold blue]Overall Download Statistics:[/bold blue]")
console.print("\n[bold blue]Overall Download Statistics:[/bold blue]")
console.print(f"Total episodes tracked: {stats.get('total_episodes', 0)}")
console.print(f"Successfully downloaded: {stats.get('downloaded', 0)}")
console.print(f"Failed downloads: {stats.get('failed', 0)}")
console.print(f"Queued downloads: {stats.get('queued', 0)}")
if stats.get('total_size_bytes', 0) > 0:
size_mb = stats['total_size_bytes'] / (1024 * 1024)
if stats.get("total_size_bytes", 0) > 0:
size_mb = stats["total_size_bytes"] / (1024 * 1024)
if size_mb > 1024:
console.print(f"Total size: {size_mb/1024:.2f} GB")
console.print(f"Total size: {size_mb / 1024:.2f} GB")
else:
console.print(f"Total size: {size_mb:.2f} MB")

View File

@@ -229,17 +229,39 @@ def search(config: AppConfig, **options: "Unpack[SearchOptions]"):
on_list = options.get("on_list")
# Validate logical relationships
if score_greater is not None and score_lesser is not None and score_greater > score_lesser:
if (
score_greater is not None
and score_lesser is not None
and score_greater > score_lesser
):
raise FastAnimeError("Minimum score cannot be higher than maximum score")
if popularity_greater is not None and popularity_lesser is not None and popularity_greater > popularity_lesser:
raise FastAnimeError("Minimum popularity cannot be higher than maximum popularity")
if start_date_greater is not None and start_date_lesser is not None and start_date_greater > start_date_lesser:
raise FastAnimeError("Start date greater cannot be later than start date lesser")
if end_date_greater is not None and end_date_lesser is not None and end_date_greater > end_date_lesser:
raise FastAnimeError("End date greater cannot be later than end date lesser")
if (
popularity_greater is not None
and popularity_lesser is not None
and popularity_greater > popularity_lesser
):
raise FastAnimeError(
"Minimum popularity cannot be higher than maximum popularity"
)
if (
start_date_greater is not None
and start_date_lesser is not None
and start_date_greater > start_date_lesser
):
raise FastAnimeError(
"Start date greater cannot be later than start date lesser"
)
if (
end_date_greater is not None
and end_date_lesser is not None
and end_date_greater > end_date_lesser
):
raise FastAnimeError(
"End date greater cannot be later than end date lesser"
)
# Build search parameters
search_params = MediaSearchParams(
@@ -287,7 +309,7 @@ def search(config: AppConfig, **options: "Unpack[SearchOptions]"):
feedback.info(
f"Found {len(search_result.media)} anime matching your search. Launching interactive mode..."
)
# Create initial state with search results
initial_state = State(
menu_name=MenuName.RESULTS,
@@ -299,7 +321,7 @@ def search(config: AppConfig, **options: "Unpack[SearchOptions]"):
page_info=search_result.page_info,
),
)
session.load_menus_from_folder("media")
session.run(config, history=[initial_state])

View File

@@ -16,7 +16,6 @@ def stats(config: "AppConfig"):
from rich.markdown import Markdown
from rich.panel import Panel
from .....core.exceptions import FastAnimeError
from .....libs.media_api.api import create_api_client
from ....service.auth import AuthService
from ....service.feedback import FeedbackService
@@ -93,4 +92,4 @@ def stats(config: "AppConfig"):
raise click.Abort()
except Exception as e:
feedback.error("Unexpected error occurred", str(e))
raise click.Abort()
raise click.Abort()

View File

@@ -150,20 +150,19 @@ def download(config: AppConfig, **options: "Unpack[Options]"):
if not anime:
raise FastAnimeError(f"Failed to fetch anime {anime_result.title}")
available_episodes: list[str] = sorted(
getattr(anime.episodes, config.stream.translation_type), key=float
)
if options["episode_range"]:
from ..utils.parser import parse_episode_range
try:
episodes_range = parse_episode_range(
options["episode_range"],
available_episodes
options["episode_range"], available_episodes
)
for episode in episodes_range:
download_anime(
config, options, provider, selector, anime, anime_title, episode

View File

@@ -2,7 +2,6 @@
Registry backup command - create full backups of the registry
"""
import shutil
import tarfile
from pathlib import Path
from datetime import datetime
@@ -19,31 +18,22 @@ from ....utils.feedback import create_feedback_manager
"--output",
"-o",
type=click.Path(),
help="Output backup file path (auto-generated if not specified)"
)
@click.option(
"--compress",
"-c",
is_flag=True,
help="Compress the backup archive"
)
@click.option(
"--include-cache",
is_flag=True,
help="Include cache files in backup"
help="Output backup file path (auto-generated if not specified)",
)
@click.option("--compress", "-c", is_flag=True, help="Compress the backup archive")
@click.option("--include-cache", is_flag=True, help="Include cache files in backup")
@click.option(
"--format",
"backup_format",
type=click.Choice(["tar", "zip"], case_sensitive=False),
default="tar",
help="Backup archive format"
help="Backup archive format",
)
@click.option(
"--api",
default="anilist",
type=click.Choice(["anilist"], case_sensitive=False),
help="Media API registry to backup"
help="Media API registry to backup",
)
@click.pass_obj
def backup(
@@ -52,35 +42,37 @@ def backup(
compress: bool,
include_cache: bool,
backup_format: str,
api: str
api: str,
):
"""
Create a complete backup of your media registry.
Includes all media records, index files, and optionally cache data.
Backups can be compressed and are suitable for restoration.
"""
feedback = create_feedback_manager(config.general.icons)
try:
registry_service = MediaRegistryService(api, config.registry)
# Generate output filename if not specified
if not output:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
extension = "tar.gz" if compress and backup_format == "tar" else backup_format
extension = (
"tar.gz" if compress and backup_format == "tar" else backup_format
)
if backup_format == "zip":
extension = "zip"
output = f"fastanime_registry_backup_{api}_{timestamp}.{extension}"
output_path = Path(output)
# Get backup statistics before starting
stats = registry_service.get_registry_stats()
total_media = stats.get('total_media', 0)
total_media = stats.get("total_media", 0)
feedback.info("Starting Backup", f"Backing up {total_media} media entries...")
# Create backup based on format
if backup_format.lower() == "tar":
_create_tar_backup(
@@ -90,101 +82,111 @@ def backup(
_create_zip_backup(
registry_service, output_path, include_cache, feedback, api
)
# Get final backup size
backup_size = _format_file_size(output_path)
feedback.success(
"Backup Complete",
f"Registry backed up to {output_path} ({backup_size})"
"Backup Complete", f"Registry backed up to {output_path} ({backup_size})"
)
# Show backup contents summary
_show_backup_summary(output_path, backup_format, feedback)
except Exception as e:
feedback.error("Backup Error", f"Failed to create backup: {e}")
raise click.Abort()
def _create_tar_backup(registry_service, output_path: Path, compress: bool, include_cache: bool, feedback, api: str):
def _create_tar_backup(
registry_service,
output_path: Path,
compress: bool,
include_cache: bool,
feedback,
api: str,
):
"""Create a tar-based backup."""
mode = "w:gz" if compress else "w"
with tarfile.open(output_path, mode) as tar:
# Add registry directory
registry_dir = registry_service.config.media_dir / api
if registry_dir.exists():
tar.add(registry_dir, arcname=f"registry/{api}")
feedback.info("Added to backup", f"Registry data ({api})")
# Add index directory
index_dir = registry_service.config.index_dir
if index_dir.exists():
tar.add(index_dir, arcname="index")
feedback.info("Added to backup", "Registry index")
# Add cache if requested
if include_cache:
cache_dir = registry_service.config.media_dir.parent / "cache"
if cache_dir.exists():
tar.add(cache_dir, arcname="cache")
feedback.info("Added to backup", "Cache data")
# Add metadata file
metadata = _create_backup_metadata(registry_service, api, include_cache)
metadata_path = output_path.parent / "backup_metadata.json"
try:
import json
with open(metadata_path, 'w', encoding='utf-8') as f:
with open(metadata_path, "w", encoding="utf-8") as f:
json.dump(metadata, f, indent=2, default=str)
tar.add(metadata_path, arcname="backup_metadata.json")
metadata_path.unlink() # Clean up temp file
except Exception as e:
feedback.warning("Metadata Error", f"Failed to add metadata: {e}")
def _create_zip_backup(registry_service, output_path: Path, include_cache: bool, feedback, api: str):
def _create_zip_backup(
registry_service, output_path: Path, include_cache: bool, feedback, api: str
):
"""Create a zip-based backup."""
import zipfile
with zipfile.ZipFile(output_path, 'w', zipfile.ZIP_DEFLATED) as zip_file:
with zipfile.ZipFile(output_path, "w", zipfile.ZIP_DEFLATED) as zip_file:
# Add registry directory
registry_dir = registry_service.config.media_dir / api
if registry_dir.exists():
for file_path in registry_dir.rglob('*'):
for file_path in registry_dir.rglob("*"):
if file_path.is_file():
arcname = f"registry/{api}/{file_path.relative_to(registry_dir)}"
zip_file.write(file_path, arcname)
feedback.info("Added to backup", f"Registry data ({api})")
# Add index directory
index_dir = registry_service.config.index_dir
if index_dir.exists():
for file_path in index_dir.rglob('*'):
for file_path in index_dir.rglob("*"):
if file_path.is_file():
arcname = f"index/{file_path.relative_to(index_dir)}"
zip_file.write(file_path, arcname)
feedback.info("Added to backup", "Registry index")
# Add cache if requested
if include_cache:
cache_dir = registry_service.config.media_dir.parent / "cache"
if cache_dir.exists():
for file_path in cache_dir.rglob('*'):
for file_path in cache_dir.rglob("*"):
if file_path.is_file():
arcname = f"cache/{file_path.relative_to(cache_dir)}"
zip_file.write(file_path, arcname)
feedback.info("Added to backup", "Cache data")
# Add metadata
metadata = _create_backup_metadata(registry_service, api, include_cache)
try:
import json
metadata_json = json.dumps(metadata, indent=2, default=str)
zip_file.writestr("backup_metadata.json", metadata_json)
except Exception as e:
@@ -194,13 +196,13 @@ def _create_zip_backup(registry_service, output_path: Path, include_cache: bool,
def _create_backup_metadata(registry_service, api: str, include_cache: bool) -> dict:
"""Create backup metadata."""
stats = registry_service.get_registry_stats()
return {
"backup_timestamp": datetime.now().isoformat(),
"fastanime_version": "unknown", # You might want to get this from somewhere
"registry_version": stats.get('version'),
"registry_version": stats.get("version"),
"api": api,
"total_media": stats.get('total_media', 0),
"total_media": stats.get("total_media", 0),
"include_cache": include_cache,
"registry_stats": stats,
"backup_type": "full",
@@ -209,22 +211,23 @@ def _create_backup_metadata(registry_service, api: str, include_cache: bool) ->
def _show_backup_summary(backup_path: Path, format_type: str, feedback):
"""Show summary of backup contents."""
try:
if format_type.lower() == "tar":
with tarfile.open(backup_path, 'r:*') as tar:
with tarfile.open(backup_path, "r:*") as tar:
members = tar.getmembers()
file_count = len([m for m in members if m.isfile()])
dir_count = len([m for m in members if m.isdir()])
else: # zip
import zipfile
with zipfile.ZipFile(backup_path, 'r') as zip_file:
with zipfile.ZipFile(backup_path, "r") as zip_file:
info_list = zip_file.infolist()
file_count = len([info for info in info_list if not info.is_dir()])
dir_count = len([info for info in info_list if info.is_dir()])
feedback.info("Backup Contents", f"{file_count} files, {dir_count} directories")
except Exception as e:
feedback.warning("Summary Error", f"Could not analyze backup contents: {e}")
@@ -233,7 +236,7 @@ def _format_file_size(file_path: Path) -> str:
"""Format file size in human-readable format."""
try:
size = file_path.stat().st_size
for unit in ['B', 'KB', 'MB', 'GB']:
for unit in ["B", "KB", "MB", "GB"]:
if size < 1024.0:
return f"{size:.1f} {unit}"
size /= 1024.0

View File

@@ -13,41 +13,26 @@ from ....utils.feedback import create_feedback_manager
@click.command(help="Clean up orphaned entries and invalid data from registry")
@click.option(
"--dry-run",
is_flag=True,
help="Show what would be cleaned without making changes"
"--dry-run", is_flag=True, help="Show what would be cleaned without making changes"
)
@click.option(
"--orphaned",
is_flag=True,
help="Remove orphaned media records (index entries without files)"
help="Remove orphaned media records (index entries without files)",
)
@click.option("--invalid", is_flag=True, help="Remove invalid or corrupted entries")
@click.option("--duplicates", is_flag=True, help="Remove duplicate entries")
@click.option(
"--old-format", is_flag=True, help="Clean entries from old registry format versions"
)
@click.option(
"--invalid",
is_flag=True,
help="Remove invalid or corrupted entries"
)
@click.option(
"--duplicates",
is_flag=True,
help="Remove duplicate entries"
)
@click.option(
"--old-format",
is_flag=True,
help="Clean entries from old registry format versions"
)
@click.option(
"--force",
"-f",
is_flag=True,
help="Force cleanup without confirmation prompts"
"--force", "-f", is_flag=True, help="Force cleanup without confirmation prompts"
)
@click.option(
"--api",
default="anilist",
type=click.Choice(["anilist"], case_sensitive=False),
help="Media API registry to clean"
help="Media API registry to clean",
)
@click.pass_obj
def clean(
@@ -58,73 +43,86 @@ def clean(
duplicates: bool,
old_format: bool,
force: bool,
api: str
api: str,
):
"""
Clean up your local media registry.
Can remove orphaned entries, invalid data, duplicates, and entries
from old format versions. Use --dry-run to preview changes.
"""
feedback = create_feedback_manager(config.general.icons)
console = Console()
# Default to all cleanup types if none specified
if not any([orphaned, invalid, duplicates, old_format]):
orphaned = invalid = duplicates = old_format = True
try:
registry_service = MediaRegistryService(api, config.registry)
cleanup_results = {
"orphaned": [],
"invalid": [],
"duplicates": [],
"old_format": []
"old_format": [],
}
# Analyze registry for cleanup opportunities
_analyze_registry(registry_service, cleanup_results, orphaned, invalid, duplicates, old_format)
_analyze_registry(
registry_service, cleanup_results, orphaned, invalid, duplicates, old_format
)
# Show cleanup summary
_display_cleanup_summary(console, cleanup_results, config.general.icons)
# Confirm cleanup if not dry run and not forced
total_items = sum(len(items) for items in cleanup_results.values())
if total_items == 0:
feedback.info("Registry Clean", "No cleanup needed - registry is already clean!")
feedback.info(
"Registry Clean", "No cleanup needed - registry is already clean!"
)
return
if not dry_run:
if not force:
if not click.confirm(f"Clean up {total_items} items from registry?"):
feedback.info("Cleanup Cancelled", "No changes were made")
return
# Perform cleanup
_perform_cleanup(registry_service, cleanup_results, feedback)
feedback.success("Cleanup Complete", f"Cleaned up {total_items} items from registry")
feedback.success(
"Cleanup Complete", f"Cleaned up {total_items} items from registry"
)
else:
feedback.info("Dry Run Complete", f"Would clean up {total_items} items")
except Exception as e:
feedback.error("Cleanup Error", f"Failed to clean registry: {e}")
raise click.Abort()
def _analyze_registry(registry_service, results: dict, check_orphaned: bool, check_invalid: bool, check_duplicates: bool, check_old_format: bool):
def _analyze_registry(
registry_service,
results: dict,
check_orphaned: bool,
check_invalid: bool,
check_duplicates: bool,
check_old_format: bool,
):
"""Analyze registry for cleanup opportunities."""
if check_orphaned:
results["orphaned"] = _find_orphaned_entries(registry_service)
if check_invalid:
results["invalid"] = _find_invalid_entries(registry_service)
if check_duplicates:
results["duplicates"] = _find_duplicate_entries(registry_service)
if check_old_format:
results["old_format"] = _find_old_format_entries(registry_service)
@@ -132,65 +130,77 @@ def _analyze_registry(registry_service, results: dict, check_orphaned: bool, che
def _find_orphaned_entries(registry_service) -> list:
"""Find index entries that don't have corresponding media files."""
orphaned = []
try:
index = registry_service._load_index()
for entry_key, entry in index.media_index.items():
media_file = registry_service._get_media_file_path(entry.media_id)
if not media_file.exists():
orphaned.append({
"type": "orphaned_index",
"id": entry.media_id,
"key": entry_key,
"reason": "Media file missing"
})
orphaned.append(
{
"type": "orphaned_index",
"id": entry.media_id,
"key": entry_key,
"reason": "Media file missing",
}
)
except Exception:
pass
return orphaned
def _find_invalid_entries(registry_service) -> list:
"""Find invalid or corrupted entries."""
invalid = []
try:
# Check all media files
for media_file in registry_service.media_registry_dir.iterdir():
if not media_file.name.endswith('.json'):
if not media_file.name.endswith(".json"):
continue
try:
media_id = int(media_file.stem)
record = registry_service.get_media_record(media_id)
# Check for invalid record structure
if not record or not record.media_item:
invalid.append({
"type": "invalid_record",
"id": media_id,
"file": media_file,
"reason": "Invalid record structure"
})
elif not record.media_item.title or not record.media_item.title.english and not record.media_item.title.romaji:
invalid.append({
"type": "invalid_title",
"id": media_id,
"file": media_file,
"reason": "Missing or invalid title"
})
invalid.append(
{
"type": "invalid_record",
"id": media_id,
"file": media_file,
"reason": "Invalid record structure",
}
)
elif (
not record.media_item.title
or not record.media_item.title.english
and not record.media_item.title.romaji
):
invalid.append(
{
"type": "invalid_title",
"id": media_id,
"file": media_file,
"reason": "Missing or invalid title",
}
)
except (ValueError, Exception) as e:
invalid.append({
"type": "corrupted_file",
"id": media_file.stem,
"file": media_file,
"reason": f"File corruption: {e}"
})
invalid.append(
{
"type": "corrupted_file",
"id": media_file.stem,
"file": media_file,
"reason": f"File corruption: {e}",
}
)
except Exception:
pass
return invalid
@@ -198,76 +208,81 @@ def _find_duplicate_entries(registry_service) -> list:
"""Find duplicate entries (same media ID appearing multiple times)."""
duplicates = []
seen_ids = set()
try:
index = registry_service._load_index()
for entry_key, entry in index.media_index.items():
if entry.media_id in seen_ids:
duplicates.append({
"type": "duplicate_index",
"id": entry.media_id,
"key": entry_key,
"reason": "Duplicate media ID in index"
})
duplicates.append(
{
"type": "duplicate_index",
"id": entry.media_id,
"key": entry_key,
"reason": "Duplicate media ID in index",
}
)
else:
seen_ids.add(entry.media_id)
except Exception:
pass
return duplicates
def _find_old_format_entries(registry_service) -> list:
"""Find entries from old registry format versions."""
old_format = []
try:
index = registry_service._load_index()
current_version = registry_service._index.version
# Check for entries that might be from old formats
# This is a placeholder - you'd implement specific checks based on your version history
for media_file in registry_service.media_registry_dir.iterdir():
if not media_file.name.endswith('.json'):
if not media_file.name.endswith(".json"):
continue
try:
import json
with open(media_file, 'r') as f:
with open(media_file, "r") as f:
data = json.load(f)
# Check for old format indicators
if 'version' in data and data['version'] < current_version:
old_format.append({
"type": "old_version",
"id": media_file.stem,
"file": media_file,
"reason": f"Old format version {data.get('version')}"
})
if "version" in data and data["version"] < current_version:
old_format.append(
{
"type": "old_version",
"id": media_file.stem,
"file": media_file,
"reason": f"Old format version {data.get('version')}",
}
)
except Exception:
pass
except Exception:
pass
return old_format
def _display_cleanup_summary(console: Console, results: dict, icons: bool):
"""Display summary of cleanup opportunities."""
table = Table(title=f"{'🧹 ' if icons else ''}Registry Cleanup Summary")
table.add_column("Category", style="cyan", no_wrap=True)
table.add_column("Count", style="magenta", justify="right")
table.add_column("Description", style="white")
categories = {
"orphaned": "Orphaned Entries",
"invalid": "Invalid Entries",
"invalid": "Invalid Entries",
"duplicates": "Duplicate Entries",
"old_format": "Old Format Entries"
"old_format": "Old Format Entries",
}
for category, display_name in categories.items():
count = len(results[category])
if count > 0:
@@ -278,52 +293,50 @@ def _display_cleanup_summary(console: Console, results: dict, icons: bool):
description += "..."
else:
description = "None found"
table.add_row(display_name, str(count), description)
console.print(table)
console.print()
# Show detailed breakdown if there are items to clean
for category, items in results.items():
if items:
_display_category_details(console, category, items, icons)
def _display_category_details(console: Console, category: str, items: list, icons: bool):
def _display_category_details(
console: Console, category: str, items: list, icons: bool
):
"""Display detailed breakdown for a cleanup category."""
category_names = {
"orphaned": "🔗 Orphaned Entries" if icons else "Orphaned Entries",
"invalid": "❌ Invalid Entries" if icons else "Invalid Entries",
"duplicates": "👥 Duplicate Entries" if icons else "Duplicate Entries",
"old_format": "📼 Old Format Entries" if icons else "Old Format Entries"
"duplicates": "👥 Duplicate Entries" if icons else "Duplicate Entries",
"old_format": "📼 Old Format Entries" if icons else "Old Format Entries",
}
table = Table(title=category_names.get(category, category.title()))
table.add_column("ID", style="cyan", no_wrap=True)
table.add_column("Type", style="magenta")
table.add_column("Reason", style="yellow")
for item in items[:10]: # Show max 10 items
table.add_row(
str(item["id"]),
item["type"],
item["reason"]
)
table.add_row(str(item["id"]), item["type"], item["reason"])
if len(items) > 10:
table.add_row("...", "...", f"And {len(items) - 10} more")
console.print(table)
console.print()
def _perform_cleanup(registry_service, results: dict, feedback):
"""Perform the actual cleanup operations."""
cleaned_count = 0
# Clean orphaned entries
for item in results["orphaned"]:
try:
@@ -334,25 +347,29 @@ def _perform_cleanup(registry_service, results: dict, feedback):
registry_service._save_index(index)
cleaned_count += 1
except Exception as e:
feedback.warning("Cleanup Error", f"Failed to clean orphaned entry {item['id']}: {e}")
feedback.warning(
"Cleanup Error", f"Failed to clean orphaned entry {item['id']}: {e}"
)
# Clean invalid entries
for item in results["invalid"]:
try:
if "file" in item:
item["file"].unlink() # Delete the file
cleaned_count += 1
# Also remove from index if present
index = registry_service._load_index()
entry_key = f"{registry_service._media_api}_{item['id']}"
if entry_key in index.media_index:
del index.media_index[entry_key]
registry_service._save_index(index)
except Exception as e:
feedback.warning("Cleanup Error", f"Failed to clean invalid entry {item['id']}: {e}")
feedback.warning(
"Cleanup Error", f"Failed to clean invalid entry {item['id']}: {e}"
)
# Clean duplicates
for item in results["duplicates"]:
try:
@@ -363,8 +380,10 @@ def _perform_cleanup(registry_service, results: dict, feedback):
registry_service._save_index(index)
cleaned_count += 1
except Exception as e:
feedback.warning("Cleanup Error", f"Failed to clean duplicate entry {item['id']}: {e}")
feedback.warning(
"Cleanup Error", f"Failed to clean duplicate entry {item['id']}: {e}"
)
# Clean old format entries
for item in results["old_format"]:
try:
@@ -374,6 +393,8 @@ def _perform_cleanup(registry_service, results: dict, feedback):
item["file"].unlink()
cleaned_count += 1
except Exception as e:
feedback.warning("Cleanup Error", f"Failed to clean old format entry {item['id']}: {e}")
feedback.warning(
"Cleanup Error", f"Failed to clean old format entry {item['id']}: {e}"
)
feedback.info("Cleanup Results", f"Successfully cleaned {cleaned_count} items")

View File

@@ -20,37 +20,32 @@ from ....utils.feedback import create_feedback_manager
"output_format",
type=click.Choice(["json", "csv", "xml"], case_sensitive=False),
default="json",
help="Export format"
help="Export format",
)
@click.option(
"--output",
"-o",
type=click.Path(),
help="Output file path (auto-generated if not specified)"
help="Output file path (auto-generated if not specified)",
)
@click.option(
"--include-metadata",
is_flag=True,
help="Include detailed media metadata in export"
"--include-metadata", is_flag=True, help="Include detailed media metadata in export"
)
@click.option(
"--status",
multiple=True,
type=click.Choice([
"watching", "completed", "planning", "dropped", "paused", "repeating"
], case_sensitive=False),
help="Only export specific status lists"
)
@click.option(
"--compress",
is_flag=True,
help="Compress the output file"
type=click.Choice(
["watching", "completed", "planning", "dropped", "paused", "repeating"],
case_sensitive=False,
),
help="Only export specific status lists",
)
@click.option("--compress", is_flag=True, help="Compress the output file")
@click.option(
"--api",
default="anilist",
type=click.Choice(["anilist"], case_sensitive=False),
help="Media API registry to export"
help="Media API registry to export",
)
@click.pass_obj
def export(
@@ -60,19 +55,19 @@ def export(
include_metadata: bool,
status: tuple[str, ...],
compress: bool,
api: str
api: str,
):
"""
Export your local media registry to various formats.
Supports JSON, CSV, and XML formats. Can optionally include
detailed metadata and compress the output.
"""
feedback = create_feedback_manager(config.general.icons)
try:
registry_service = MediaRegistryService(api, config.registry)
# Generate output filename if not specified
if not output:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
@@ -80,14 +75,12 @@ def export(
if compress:
extension += ".gz"
output = f"fastanime_registry_{api}_{timestamp}.{extension}"
output_path = Path(output)
# Get export data
export_data = _prepare_export_data(
registry_service, include_metadata, status
)
export_data = _prepare_export_data(registry_service, include_metadata, status)
# Export based on format
if output_format.lower() == "json":
_export_json(export_data, output_path, compress, feedback)
@@ -95,22 +88,25 @@ def export(
_export_csv(export_data, output_path, compress, feedback)
elif output_format.lower() == "xml":
_export_xml(export_data, output_path, compress, feedback)
feedback.success(
"Export Complete",
f"Registry exported to {output_path} ({_format_file_size(output_path)})"
f"Registry exported to {output_path} ({_format_file_size(output_path)})",
)
except Exception as e:
feedback.error("Export Error", f"Failed to export registry: {e}")
raise click.Abort()
def _prepare_export_data(registry_service, include_metadata: bool, status_filter: tuple[str, ...]) -> dict:
def _prepare_export_data(
registry_service, include_metadata: bool, status_filter: tuple[str, ...]
) -> dict:
"""Prepare data for export based on options."""
# Convert status filter to enums
from .....libs.media_api.types import UserMediaListStatus
status_map = {
"watching": UserMediaListStatus.WATCHING,
"completed": UserMediaListStatus.COMPLETED,
@@ -119,9 +115,9 @@ def _prepare_export_data(registry_service, include_metadata: bool, status_filter
"paused": UserMediaListStatus.PAUSED,
"repeating": UserMediaListStatus.REPEATING,
}
status_enums = [status_map[s] for s in status_filter] if status_filter else None
export_data = {
"metadata": {
"export_timestamp": datetime.now().isoformat(),
@@ -130,19 +126,19 @@ def _prepare_export_data(registry_service, include_metadata: bool, status_filter
"filtered_status": list(status_filter) if status_filter else None,
},
"statistics": registry_service.get_registry_stats(),
"media": []
"media": [],
}
# Get all records and filter by status if specified
all_records = registry_service.get_all_media_records()
for record in all_records:
index_entry = registry_service.get_media_index_entry(record.media_item.id)
# Skip if status filter is specified and doesn't match
if status_enums and (not index_entry or index_entry.status not in status_enums):
continue
media_data = {
"id": record.media_item.id,
"title": {
@@ -151,36 +147,63 @@ def _prepare_export_data(registry_service, include_metadata: bool, status_filter
"native": record.media_item.title.native,
},
"user_status": {
"status": index_entry.status.value if index_entry and index_entry.status else None,
"status": index_entry.status.value
if index_entry and index_entry.status
else None,
"progress": index_entry.progress if index_entry else None,
"score": index_entry.score if index_entry else None,
"last_watched": index_entry.last_watched.isoformat() if index_entry and index_entry.last_watched else None,
"last_watched": index_entry.last_watched.isoformat()
if index_entry and index_entry.last_watched
else None,
"notes": index_entry.notes if index_entry else None,
}
},
}
if include_metadata:
media_data.update({
"format": record.media_item.format.value if record.media_item.format else None,
"episodes": record.media_item.episodes,
"duration": record.media_item.duration,
"status": record.media_item.status.value if record.media_item.status else None,
"start_date": record.media_item.start_date.isoformat() if record.media_item.start_date else None,
"end_date": record.media_item.end_date.isoformat() if record.media_item.end_date else None,
"average_score": record.media_item.average_score,
"popularity": record.media_item.popularity,
"genres": [genre.value for genre in record.media_item.genres],
"tags": [{"name": tag.name.value, "rank": tag.rank} for tag in record.media_item.tags],
"studios": [studio.name for studio in record.media_item.studios if studio.name],
"description": record.media_item.description,
"cover_image": {
"large": record.media_item.cover_image.large if record.media_item.cover_image else None,
"medium": record.media_item.cover_image.medium if record.media_item.cover_image else None,
} if record.media_item.cover_image else None,
})
media_data.update(
{
"format": record.media_item.format.value
if record.media_item.format
else None,
"episodes": record.media_item.episodes,
"duration": record.media_item.duration,
"status": record.media_item.status.value
if record.media_item.status
else None,
"start_date": record.media_item.start_date.isoformat()
if record.media_item.start_date
else None,
"end_date": record.media_item.end_date.isoformat()
if record.media_item.end_date
else None,
"average_score": record.media_item.average_score,
"popularity": record.media_item.popularity,
"genres": [genre.value for genre in record.media_item.genres],
"tags": [
{"name": tag.name.value, "rank": tag.rank}
for tag in record.media_item.tags
],
"studios": [
studio.name
for studio in record.media_item.studios
if studio.name
],
"description": record.media_item.description,
"cover_image": {
"large": record.media_item.cover_image.large
if record.media_item.cover_image
else None,
"medium": record.media_item.cover_image.medium
if record.media_item.cover_image
else None,
}
if record.media_item.cover_image
else None,
}
)
export_data["media"].append(media_data)
return export_data
@@ -188,10 +211,11 @@ def _export_json(data: dict, output_path: Path, compress: bool, feedback):
"""Export data to JSON format."""
if compress:
import gzip
with gzip.open(output_path, 'wt', encoding='utf-8') as f:
with gzip.open(output_path, "wt", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
else:
with open(output_path, 'w', encoding='utf-8') as f:
with open(output_path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
@@ -199,21 +223,38 @@ def _export_csv(data: dict, output_path: Path, compress: bool, feedback):
"""Export data to CSV format."""
# Flatten media data for CSV
fieldnames = [
"id", "title_english", "title_romaji", "title_native",
"status", "progress", "score", "last_watched", "notes"
"id",
"title_english",
"title_romaji",
"title_native",
"status",
"progress",
"score",
"last_watched",
"notes",
]
# Add metadata fields if included
if data["metadata"]["include_metadata"]:
fieldnames.extend([
"format", "episodes", "duration", "media_status", "start_date", "end_date",
"average_score", "popularity", "genres", "description"
])
fieldnames.extend(
[
"format",
"episodes",
"duration",
"media_status",
"start_date",
"end_date",
"average_score",
"popularity",
"genres",
"description",
]
)
def write_csv(file_obj):
writer = csv.DictWriter(file_obj, fieldnames=fieldnames)
writer.writeheader()
for media in data["media"]:
row = {
"id": media["id"],
@@ -226,29 +267,32 @@ def _export_csv(data: dict, output_path: Path, compress: bool, feedback):
"last_watched": media["user_status"]["last_watched"],
"notes": media["user_status"]["notes"],
}
if data["metadata"]["include_metadata"]:
row.update({
"format": media.get("format"),
"episodes": media.get("episodes"),
"duration": media.get("duration"),
"media_status": media.get("status"),
"start_date": media.get("start_date"),
"end_date": media.get("end_date"),
"average_score": media.get("average_score"),
"popularity": media.get("popularity"),
"genres": ",".join(media.get("genres", [])),
"description": media.get("description"),
})
row.update(
{
"format": media.get("format"),
"episodes": media.get("episodes"),
"duration": media.get("duration"),
"media_status": media.get("status"),
"start_date": media.get("start_date"),
"end_date": media.get("end_date"),
"average_score": media.get("average_score"),
"popularity": media.get("popularity"),
"genres": ",".join(media.get("genres", [])),
"description": media.get("description"),
}
)
writer.writerow(row)
if compress:
import gzip
with gzip.open(output_path, 'wt', encoding='utf-8', newline='') as f:
with gzip.open(output_path, "wt", encoding="utf-8", newline="") as f:
write_csv(f)
else:
with open(output_path, 'w', encoding='utf-8', newline='') as f:
with open(output_path, "w", encoding="utf-8", newline="") as f:
write_csv(f)
@@ -259,43 +303,43 @@ def _export_xml(data: dict, output_path: Path, compress: bool, feedback):
except ImportError:
feedback.error("XML Export Error", "XML export requires Python's xml module")
raise click.Abort()
root = ET.Element("fastanime_registry")
# Add metadata
metadata_elem = ET.SubElement(root, "metadata")
for key, value in data["metadata"].items():
if value is not None:
elem = ET.SubElement(metadata_elem, key)
elem.text = str(value)
# Add statistics
stats_elem = ET.SubElement(root, "statistics")
for key, value in data["statistics"].items():
if value is not None:
elem = ET.SubElement(stats_elem, key)
elem.text = str(value)
# Add media
media_list_elem = ET.SubElement(root, "media_list")
for media in data["media"]:
media_elem = ET.SubElement(media_list_elem, "media")
media_elem.set("id", str(media["id"]))
# Add titles
titles_elem = ET.SubElement(media_elem, "titles")
for title_type, title_value in media["title"].items():
if title_value:
title_elem = ET.SubElement(titles_elem, title_type)
title_elem.text = title_value
# Add user status
status_elem = ET.SubElement(media_elem, "user_status")
for key, value in media["user_status"].items():
if value is not None:
elem = ET.SubElement(status_elem, key)
elem.text = str(value)
# Add metadata if included
if data["metadata"]["include_metadata"]:
for key, value in media.items():
@@ -314,22 +358,23 @@ def _export_xml(data: dict, output_path: Path, compress: bool, feedback):
else:
elem = ET.SubElement(media_elem, key)
elem.text = str(value)
# Write XML
tree = ET.ElementTree(root)
if compress:
import gzip
with gzip.open(output_path, 'wb') as f:
tree.write(f, encoding='utf-8', xml_declaration=True)
with gzip.open(output_path, "wb") as f:
tree.write(f, encoding="utf-8", xml_declaration=True)
else:
tree.write(output_path, encoding='utf-8', xml_declaration=True)
tree.write(output_path, encoding="utf-8", xml_declaration=True)
def _format_file_size(file_path: Path) -> str:
"""Format file size in human-readable format."""
try:
size = file_path.stat().st_size
for unit in ['B', 'KB', 'MB', 'GB']:
for unit in ["B", "KB", "MB", "GB"]:
if size < 1024.0:
return f"{size:.1f} {unit}"
size /= 1024.0

View File

@@ -22,34 +22,26 @@ from ....utils.feedback import create_feedback_manager
"input_format",
type=click.Choice(["json", "csv", "xml", "auto"], case_sensitive=False),
default="auto",
help="Input format (auto-detect if not specified)"
help="Input format (auto-detect if not specified)",
)
@click.option(
"--merge",
is_flag=True,
help="Merge with existing registry (default: replace)"
"--merge", is_flag=True, help="Merge with existing registry (default: replace)"
)
@click.option(
"--dry-run",
is_flag=True,
help="Show what would be imported without making changes"
"--dry-run", is_flag=True, help="Show what would be imported without making changes"
)
@click.option(
"--force",
"-f",
is_flag=True,
help="Force import even if format version doesn't match"
)
@click.option(
"--backup",
is_flag=True,
help="Create backup before importing"
help="Force import even if format version doesn't match",
)
@click.option("--backup", is_flag=True, help="Create backup before importing")
@click.option(
"--api",
default="anilist",
type=click.Choice(["anilist"], case_sensitive=False),
help="Media API registry to import to"
help="Media API registry to import to",
)
@click.pass_obj
def import_(
@@ -60,50 +52,50 @@ def import_(
dry_run: bool,
force: bool,
backup: bool,
api: str
api: str,
):
"""
Import media registry data from various formats.
Supports JSON, CSV, and XML formats exported by the export command
or compatible third-party tools.
"""
feedback = create_feedback_manager(config.general.icons)
try:
registry_service = MediaRegistryService(api, config.registry)
# Create backup if requested
if backup and not dry_run:
_create_backup(registry_service, feedback)
# Auto-detect format if needed
if input_format == "auto":
input_format = _detect_format(input_file)
feedback.info("Format Detection", f"Detected format: {input_format.upper()}")
feedback.info(
"Format Detection", f"Detected format: {input_format.upper()}"
)
# Parse input file
import_data = _parse_input_file(input_file, input_format, feedback)
# Validate import data
_validate_import_data(import_data, force, feedback)
# Import data
_import_data(
registry_service, import_data, merge, dry_run, feedback
)
_import_data(registry_service, import_data, merge, dry_run, feedback)
if not dry_run:
feedback.success(
"Import Complete",
f"Successfully imported {len(import_data.get('media', []))} media entries"
f"Successfully imported {len(import_data.get('media', []))} media entries",
)
else:
feedback.info(
"Dry Run Complete",
f"Would import {len(import_data.get('media', []))} media entries"
f"Would import {len(import_data.get('media', []))} media entries",
)
except Exception as e:
feedback.error("Import Error", f"Failed to import registry: {e}")
raise click.Abort()
@@ -112,40 +104,40 @@ def import_(
def _create_backup(registry_service, feedback):
"""Create a backup before importing."""
from .export import _prepare_export_data, _export_json
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_path = Path(f"fastanime_registry_backup_{timestamp}.json")
export_data = _prepare_export_data(registry_service, True, ())
_export_json(export_data, backup_path, False, feedback)
feedback.info("Backup Created", f"Registry backed up to {backup_path}")
def _detect_format(file_path: Path) -> str:
"""Auto-detect file format based on extension and content."""
extension = file_path.suffix.lower()
if extension in ['.json', '.gz']:
if extension in [".json", ".gz"]:
return "json"
elif extension == '.csv':
elif extension == ".csv":
return "csv"
elif extension == '.xml':
elif extension == ".xml":
return "xml"
# Try to detect by content
try:
with open(file_path, 'r', encoding='utf-8') as f:
with open(file_path, "r", encoding="utf-8") as f:
content = f.read(100).strip()
if content.startswith('{') or content.startswith('['):
if content.startswith("{") or content.startswith("["):
return "json"
elif content.startswith('<?xml') or content.startswith('<'):
elif content.startswith("<?xml") or content.startswith("<"):
return "xml"
elif ',' in content: # Very basic CSV detection
elif "," in content: # Very basic CSV detection
return "csv"
except:
pass
raise click.ClickException(f"Could not detect format for {file_path}")
@@ -164,12 +156,13 @@ def _parse_input_file(file_path: Path, format_type: str, feedback) -> dict:
def _parse_json(file_path: Path) -> dict:
"""Parse JSON input file."""
try:
if file_path.suffix.lower() == '.gz':
if file_path.suffix.lower() == ".gz":
import gzip
with gzip.open(file_path, 'rt', encoding='utf-8') as f:
with gzip.open(file_path, "rt", encoding="utf-8") as f:
return json.load(f)
else:
with open(file_path, 'r', encoding='utf-8') as f:
with open(file_path, "r", encoding="utf-8") as f:
return json.load(f)
except json.JSONDecodeError as e:
raise click.ClickException(f"Invalid JSON format: {e}")
@@ -182,11 +175,11 @@ def _parse_csv(file_path: Path) -> dict:
"import_timestamp": datetime.now().isoformat(),
"source_format": "csv",
},
"media": []
"media": [],
}
try:
with open(file_path, 'r', encoding='utf-8') as f:
with open(file_path, "r", encoding="utf-8") as f:
reader = csv.DictReader(f)
for row in reader:
media_data = {
@@ -198,33 +191,47 @@ def _parse_csv(file_path: Path) -> dict:
},
"user_status": {
"status": row.get("status"),
"progress": int(row["progress"]) if row.get("progress") else None,
"progress": int(row["progress"])
if row.get("progress")
else None,
"score": float(row["score"]) if row.get("score") else None,
"last_watched": row.get("last_watched"),
"notes": row.get("notes"),
}
},
}
# Add metadata fields if present
if "format" in row:
media_data.update({
"format": row.get("format"),
"episodes": int(row["episodes"]) if row.get("episodes") else None,
"duration": int(row["duration"]) if row.get("duration") else None,
"media_status": row.get("media_status"),
"start_date": row.get("start_date"),
"end_date": row.get("end_date"),
"average_score": float(row["average_score"]) if row.get("average_score") else None,
"popularity": int(row["popularity"]) if row.get("popularity") else None,
"genres": row.get("genres", "").split(",") if row.get("genres") else [],
"description": row.get("description"),
})
media_data.update(
{
"format": row.get("format"),
"episodes": int(row["episodes"])
if row.get("episodes")
else None,
"duration": int(row["duration"])
if row.get("duration")
else None,
"media_status": row.get("media_status"),
"start_date": row.get("start_date"),
"end_date": row.get("end_date"),
"average_score": float(row["average_score"])
if row.get("average_score")
else None,
"popularity": int(row["popularity"])
if row.get("popularity")
else None,
"genres": row.get("genres", "").split(",")
if row.get("genres")
else [],
"description": row.get("description"),
}
)
import_data["media"].append(media_data)
except (ValueError, KeyError) as e:
raise click.ClickException(f"Invalid CSV format: {e}")
return import_data
@@ -234,22 +241,19 @@ def _parse_xml(file_path: Path) -> dict:
import xml.etree.ElementTree as ET
except ImportError:
raise click.ClickException("XML import requires Python's xml module")
try:
tree = ET.parse(file_path)
root = tree.getroot()
import_data = {
"metadata": {},
"media": []
}
import_data = {"metadata": {}, "media": []}
# Parse metadata
metadata_elem = root.find("metadata")
if metadata_elem is not None:
for child in metadata_elem:
import_data["metadata"][child.tag] = child.text
# Parse media
media_list_elem = root.find("media_list")
if media_list_elem is not None:
@@ -257,15 +261,15 @@ def _parse_xml(file_path: Path) -> dict:
media_data = {
"id": int(media_elem.get("id")),
"title": {},
"user_status": {}
"user_status": {},
}
# Parse titles
titles_elem = media_elem.find("titles")
if titles_elem is not None:
for title_elem in titles_elem:
media_data["title"][title_elem.tag] = title_elem.text
# Parse user status
status_elem = media_elem.find("user_status")
if status_elem is not None:
@@ -273,32 +277,38 @@ def _parse_xml(file_path: Path) -> dict:
value = child.text
if child.tag in ["progress", "score"] and value:
try:
value = float(value) if child.tag == "score" else int(value)
value = (
float(value) if child.tag == "score" else int(value)
)
except ValueError:
pass
media_data["user_status"][child.tag] = value
# Parse other metadata
for child in media_elem:
if child.tag not in ["titles", "user_status"]:
if child.tag in ["episodes", "duration", "popularity"]:
try:
media_data[child.tag] = int(child.text) if child.text else None
media_data[child.tag] = (
int(child.text) if child.text else None
)
except ValueError:
media_data[child.tag] = child.text
elif child.tag == "average_score":
try:
media_data[child.tag] = float(child.text) if child.text else None
media_data[child.tag] = (
float(child.text) if child.text else None
)
except ValueError:
media_data[child.tag] = child.text
else:
media_data[child.tag] = child.text
import_data["media"].append(media_data)
except ET.ParseError as e:
raise click.ClickException(f"Invalid XML format: {e}")
return import_data
@@ -306,36 +316,43 @@ def _validate_import_data(data: dict, force: bool, feedback):
"""Validate import data structure and compatibility."""
if "media" not in data:
raise click.ClickException("Import data missing 'media' section")
if not isinstance(data["media"], list):
raise click.ClickException("'media' section must be a list")
# Check if any media entries exist
if not data["media"]:
feedback.warning("No Media", "Import file contains no media entries")
return
# Validate media entries
required_fields = ["id", "title"]
for i, media in enumerate(data["media"]):
for field in required_fields:
if field not in media:
raise click.ClickException(f"Media entry {i} missing required field: {field}")
raise click.ClickException(
f"Media entry {i} missing required field: {field}"
)
if not isinstance(media.get("title"), dict):
raise click.ClickException(f"Media entry {i} has invalid title format")
feedback.info("Validation", f"Import data validated - {len(data['media'])} media entries")
feedback.info(
"Validation", f"Import data validated - {len(data['media'])} media entries"
)
def _import_data(registry_service, data: dict, merge: bool, dry_run: bool, feedback):
"""Import data into the registry."""
from .....libs.media_api.types import MediaFormat, MediaGenre, MediaStatus, MediaType
from .....libs.media_api.types import (
MediaFormat,
MediaType,
)
imported_count = 0
updated_count = 0
error_count = 0
status_map = {
"watching": UserMediaListStatus.WATCHING,
"completed": UserMediaListStatus.COMPLETED,
@@ -344,47 +361,47 @@ def _import_data(registry_service, data: dict, merge: bool, dry_run: bool, feedb
"paused": UserMediaListStatus.PAUSED,
"repeating": UserMediaListStatus.REPEATING,
}
for media_data in data["media"]:
try:
media_id = media_data["id"]
if not media_id:
error_count += 1
continue
title_data = media_data.get("title", {})
title = MediaTitle(
english=title_data.get("english") or "",
romaji=title_data.get("romaji"),
native=title_data.get("native"),
)
# Create minimal MediaItem for registry
media_item = MediaItem(
id=media_id,
title=title,
type=MediaType.ANIME, # Default to anime
)
# Add additional metadata if available
if "format" in media_data and media_data["format"]:
try:
media_item.format = getattr(MediaFormat, media_data["format"])
except (AttributeError, TypeError):
pass
if "episodes" in media_data:
media_item.episodes = media_data["episodes"]
if "average_score" in media_data:
media_item.average_score = media_data["average_score"]
if dry_run:
title_str = title.english or title.romaji or f"ID:{media_id}"
feedback.info("Would import", title_str)
imported_count += 1
continue
# Check if record exists
existing_record = registry_service.get_media_record(media_id)
if existing_record and not merge:
@@ -394,11 +411,11 @@ def _import_data(registry_service, data: dict, merge: bool, dry_run: bool, feedb
updated_count += 1
else:
imported_count += 1
# Create or update record
record = registry_service.get_or_create_record(media_item)
registry_service.save_media_record(record)
# Update user status if provided
user_status = media_data.get("user_status", {})
if user_status.get("status"):
@@ -412,14 +429,17 @@ def _import_data(registry_service, data: dict, merge: bool, dry_run: bool, feedb
score=user_status.get("score"),
notes=user_status.get("notes"),
)
except Exception as e:
error_count += 1
feedback.warning("Import Error", f"Failed to import media {media_data.get('id', 'unknown')}: {e}")
feedback.warning(
"Import Error",
f"Failed to import media {media_data.get('id', 'unknown')}: {e}",
)
continue
if not dry_run:
feedback.info(
"Import Summary",
f"Imported: {imported_count}, Updated: {updated_count}, Errors: {error_count}"
f"Imported: {imported_count}, Updated: {updated_count}, Errors: {error_count}",
)

View File

@@ -17,26 +17,19 @@ from ....utils.feedback import create_feedback_manager
@click.command(help="Restore registry from a backup file")
@click.argument("backup_file", type=click.Path(exists=True, path_type=Path))
@click.option(
"--force",
"-f",
is_flag=True,
help="Force restore even if current registry exists"
"--force", "-f", is_flag=True, help="Force restore even if current registry exists"
)
@click.option(
"--backup-current",
is_flag=True,
help="Create backup of current registry before restoring"
)
@click.option(
"--verify",
is_flag=True,
help="Verify backup integrity before restoring"
help="Create backup of current registry before restoring",
)
@click.option("--verify", is_flag=True, help="Verify backup integrity before restoring")
@click.option(
"--api",
default="anilist",
type=click.Choice(["anilist"], case_sensitive=False),
help="Media API registry to restore to"
help="Media API registry to restore to",
)
@click.pass_obj
def restore(
@@ -45,57 +38,66 @@ def restore(
force: bool,
backup_current: bool,
verify: bool,
api: str
api: str,
):
"""
Restore your media registry from a backup file.
Can restore from tar or zip backups created by the backup command.
Optionally creates a backup of the current registry before restoring.
"""
feedback = create_feedback_manager(config.general.icons)
try:
# Detect backup format
backup_format = _detect_backup_format(backup_file)
feedback.info("Backup Format", f"Detected {backup_format.upper()} format")
# Verify backup if requested
if verify:
if not _verify_backup(backup_file, backup_format, feedback):
feedback.error("Verification Failed", "Backup file appears to be corrupted")
feedback.error(
"Verification Failed", "Backup file appears to be corrupted"
)
raise click.Abort()
feedback.success("Verification", "Backup file integrity verified")
# Check if current registry exists
registry_service = MediaRegistryService(api, config.registry)
registry_exists = _check_registry_exists(registry_service)
if registry_exists and not force:
if not click.confirm("Current registry exists. Continue with restore?"):
feedback.info("Restore Cancelled", "No changes were made")
return
# Create backup of current registry if requested
if backup_current and registry_exists:
_backup_current_registry(registry_service, api, feedback)
# Show restore summary
_show_restore_summary(backup_file, backup_format, feedback)
# Perform restore
_perform_restore(backup_file, backup_format, config, api, feedback)
feedback.success("Restore Complete", "Registry has been successfully restored from backup")
feedback.success(
"Restore Complete", "Registry has been successfully restored from backup"
)
# Verify restored registry
try:
restored_service = MediaRegistryService(api, config.registry)
stats = restored_service.get_registry_stats()
feedback.info("Restored Registry", f"Contains {stats.get('total_media', 0)} media entries")
feedback.info(
"Restored Registry",
f"Contains {stats.get('total_media', 0)} media entries",
)
except Exception as e:
feedback.warning("Verification Warning", f"Could not verify restored registry: {e}")
feedback.warning(
"Verification Warning", f"Could not verify restored registry: {e}"
)
except Exception as e:
feedback.error("Restore Error", f"Failed to restore registry: {e}")
raise click.Abort()
@@ -103,27 +105,28 @@ def restore(
def _detect_backup_format(backup_file: Path) -> str:
"""Detect backup file format."""
if backup_file.suffix.lower() in ['.tar', '.gz']:
if backup_file.suffix.lower() in [".tar", ".gz"]:
return "tar"
elif backup_file.suffix.lower() == '.zip':
elif backup_file.suffix.lower() == ".zip":
return "zip"
elif backup_file.name.endswith('.tar.gz'):
elif backup_file.name.endswith(".tar.gz"):
return "tar"
else:
# Try to detect by content
try:
with tarfile.open(backup_file, 'r:*'):
with tarfile.open(backup_file, "r:*"):
return "tar"
except:
pass
try:
import zipfile
with zipfile.ZipFile(backup_file, 'r'):
with zipfile.ZipFile(backup_file, "r"):
return "zip"
except:
pass
raise click.ClickException(f"Could not detect backup format for {backup_file}")
@@ -131,53 +134,68 @@ def _verify_backup(backup_file: Path, format_type: str, feedback) -> bool:
"""Verify backup file integrity."""
try:
if format_type == "tar":
with tarfile.open(backup_file, 'r:*') as tar:
with tarfile.open(backup_file, "r:*") as tar:
# Check if essential files exist
names = tar.getnames()
has_registry = any('registry/' in name for name in names)
has_index = any('index/' in name for name in names)
has_metadata = 'backup_metadata.json' in names
has_registry = any("registry/" in name for name in names)
has_index = any("index/" in name for name in names)
has_metadata = "backup_metadata.json" in names
if not (has_registry and has_index):
return False
# Try to read metadata if it exists
if has_metadata:
try:
metadata_member = tar.getmember('backup_metadata.json')
metadata_member = tar.getmember("backup_metadata.json")
metadata_file = tar.extractfile(metadata_member)
if metadata_file:
import json
metadata = json.load(metadata_file)
feedback.info("Backup Info", f"Created: {metadata.get('backup_timestamp', 'Unknown')}")
feedback.info("Backup Info", f"Total Media: {metadata.get('total_media', 'Unknown')}")
feedback.info(
"Backup Info",
f"Created: {metadata.get('backup_timestamp', 'Unknown')}",
)
feedback.info(
"Backup Info",
f"Total Media: {metadata.get('total_media', 'Unknown')}",
)
except:
pass
else: # zip
import zipfile
with zipfile.ZipFile(backup_file, 'r') as zip_file:
with zipfile.ZipFile(backup_file, "r") as zip_file:
names = zip_file.namelist()
has_registry = any('registry/' in name for name in names)
has_index = any('index/' in name for name in names)
has_metadata = 'backup_metadata.json' in names
has_registry = any("registry/" in name for name in names)
has_index = any("index/" in name for name in names)
has_metadata = "backup_metadata.json" in names
if not (has_registry and has_index):
return False
# Try to read metadata
if has_metadata:
try:
with zip_file.open('backup_metadata.json') as metadata_file:
with zip_file.open("backup_metadata.json") as metadata_file:
import json
metadata = json.load(metadata_file)
feedback.info("Backup Info", f"Created: {metadata.get('backup_timestamp', 'Unknown')}")
feedback.info("Backup Info", f"Total Media: {metadata.get('total_media', 'Unknown')}")
feedback.info(
"Backup Info",
f"Created: {metadata.get('backup_timestamp', 'Unknown')}",
)
feedback.info(
"Backup Info",
f"Total Media: {metadata.get('total_media', 'Unknown')}",
)
except:
pass
return True
except Exception:
return False
@@ -186,7 +204,7 @@ def _check_registry_exists(registry_service) -> bool:
"""Check if a registry already exists."""
try:
stats = registry_service.get_registry_stats()
return stats.get('total_media', 0) > 0
return stats.get("total_media", 0) > 0
except:
return False
@@ -194,10 +212,10 @@ def _check_registry_exists(registry_service) -> bool:
def _backup_current_registry(registry_service, api: str, feedback):
"""Create backup of current registry before restoring."""
from .backup import _create_tar_backup
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_path = Path(f"fastanime_registry_pre_restore_{api}_{timestamp}.tar.gz")
try:
_create_tar_backup(registry_service, backup_path, True, False, feedback, api)
feedback.info("Current Registry Backed Up", f"Saved to {backup_path}")
@@ -209,72 +227,89 @@ def _show_restore_summary(backup_file: Path, format_type: str, feedback):
"""Show summary of what will be restored."""
try:
if format_type == "tar":
with tarfile.open(backup_file, 'r:*') as tar:
with tarfile.open(backup_file, "r:*") as tar:
members = tar.getmembers()
file_count = len([m for m in members if m.isfile()])
# Count media files
media_files = len([m for m in members if m.name.startswith('registry/') and m.name.endswith('.json')])
media_files = len(
[
m
for m in members
if m.name.startswith("registry/") and m.name.endswith(".json")
]
)
else: # zip
import zipfile
with zipfile.ZipFile(backup_file, 'r') as zip_file:
with zipfile.ZipFile(backup_file, "r") as zip_file:
info_list = zip_file.infolist()
file_count = len([info for info in info_list if not info.is_dir()])
# Count media files
media_files = len([info for info in info_list if info.filename.startswith('registry/') and info.filename.endswith('.json')])
media_files = len(
[
info
for info in info_list
if info.filename.startswith("registry/")
and info.filename.endswith(".json")
]
)
feedback.info("Restore Preview", f"Will restore {file_count} files")
feedback.info("Media Records", f"Contains {media_files} media entries")
except Exception as e:
feedback.warning("Preview Error", f"Could not analyze backup: {e}")
def _perform_restore(backup_file: Path, format_type: str, config: AppConfig, api: str, feedback):
def _perform_restore(
backup_file: Path, format_type: str, config: AppConfig, api: str, feedback
):
"""Perform the actual restore operation."""
# Create temporary extraction directory
temp_dir = Path(config.registry.media_dir.parent / "restore_temp")
temp_dir.mkdir(exist_ok=True)
try:
# Extract backup
if format_type == "tar":
with tarfile.open(backup_file, 'r:*') as tar:
with tarfile.open(backup_file, "r:*") as tar:
tar.extractall(temp_dir)
else: # zip
import zipfile
with zipfile.ZipFile(backup_file, 'r') as zip_file:
with zipfile.ZipFile(backup_file, "r") as zip_file:
zip_file.extractall(temp_dir)
feedback.info("Extraction", "Backup extracted to temporary directory")
# Remove existing registry if it exists
registry_dir = config.registry.media_dir / api
index_dir = config.registry.index_dir
if registry_dir.exists():
shutil.rmtree(registry_dir)
feedback.info("Cleanup", "Removed existing registry data")
if index_dir.exists():
shutil.rmtree(index_dir)
feedback.info("Cleanup", "Removed existing index data")
# Move extracted files to proper locations
extracted_registry = temp_dir / "registry" / api
extracted_index = temp_dir / "index"
if extracted_registry.exists():
shutil.move(str(extracted_registry), str(registry_dir))
feedback.info("Restore", "Registry data restored")
if extracted_index.exists():
shutil.move(str(extracted_index), str(index_dir))
feedback.info("Restore", "Index data restored")
# Restore cache if it exists
extracted_cache = temp_dir / "cache"
if extracted_cache.exists():
@@ -283,7 +318,7 @@ def _perform_restore(backup_file: Path, format_type: str, config: AppConfig, api
shutil.rmtree(cache_dir)
shutil.move(str(extracted_cache), str(cache_dir))
feedback.info("Restore", "Cache data restored")
finally:
# Clean up temporary directory
if temp_dir.exists():

View File

@@ -17,63 +17,44 @@ from ....utils.feedback import create_feedback_manager
@click.argument("query", required=False)
@click.option(
"--status",
type=click.Choice([
"watching", "completed", "planning", "dropped", "paused", "repeating"
], case_sensitive=False),
help="Filter by watch status"
type=click.Choice(
["watching", "completed", "planning", "dropped", "paused", "repeating"],
case_sensitive=False,
),
help="Filter by watch status",
)
@click.option(
"--genre",
multiple=True,
help="Filter by genre (can be used multiple times)"
"--genre", multiple=True, help="Filter by genre (can be used multiple times)"
)
@click.option(
"--format",
type=click.Choice([
"TV", "TV_SHORT", "MOVIE", "SPECIAL", "OVA", "ONA", "MUSIC"
], case_sensitive=False),
help="Filter by format"
)
@click.option(
"--year",
type=int,
help="Filter by release year"
)
@click.option(
"--min-score",
type=float,
help="Minimum average score (0.0 - 10.0)"
)
@click.option(
"--max-score",
type=float,
help="Maximum average score (0.0 - 10.0)"
type=click.Choice(
["TV", "TV_SHORT", "MOVIE", "SPECIAL", "OVA", "ONA", "MUSIC"],
case_sensitive=False,
),
help="Filter by format",
)
@click.option("--year", type=int, help="Filter by release year")
@click.option("--min-score", type=float, help="Minimum average score (0.0 - 10.0)")
@click.option("--max-score", type=float, help="Maximum average score (0.0 - 10.0)")
@click.option(
"--sort",
type=click.Choice([
"title", "score", "popularity", "year", "episodes", "updated"
], case_sensitive=False),
type=click.Choice(
["title", "score", "popularity", "year", "episodes", "updated"],
case_sensitive=False,
),
default="title",
help="Sort results by field"
help="Sort results by field",
)
@click.option("--limit", type=int, default=20, help="Maximum number of results to show")
@click.option(
"--limit",
type=int,
default=20,
help="Maximum number of results to show"
)
@click.option(
"--json",
"output_json",
is_flag=True,
help="Output results in JSON format"
"--json", "output_json", is_flag=True, help="Output results in JSON format"
)
@click.option(
"--api",
default="anilist",
type=click.Choice(["anilist"], case_sensitive=False),
help="Media API registry to search"
help="Media API registry to search",
)
@click.pass_obj
def search(
@@ -88,39 +69,40 @@ def search(
sort: str,
limit: int,
output_json: bool,
api: str
api: str,
):
"""
Search through your local media registry.
You can search by title and filter by various criteria like status,
genre, format, year, and score range.
"""
feedback = create_feedback_manager(config.general.icons)
console = Console()
try:
registry_service = MediaRegistryService(api, config.registry)
# Build search parameters
search_params = _build_search_params(
query, status, genre, format, year, min_score, max_score, sort, limit
)
# Perform search
result = registry_service.search_for_media(search_params)
if not result or not result.media:
feedback.info("No Results", "No media found matching your criteria")
return
if output_json:
import json
print(json.dumps(result.model_dump(), indent=2, default=str))
return
_display_search_results(console, result, config.general.icons)
except Exception as e:
feedback.error("Search Error", f"Failed to search registry: {e}")
raise click.Abort()
@@ -130,20 +112,20 @@ def _build_search_params(
query, status, genre, format, year, min_score, max_score, sort, limit
) -> MediaSearchParams:
"""Build MediaSearchParams from command options."""
# Convert status string to enum
status_enum = None
if status:
status_map = {
"watching": UserMediaListStatus.WATCHING,
"completed": UserMediaListStatus.COMPLETED,
"completed": UserMediaListStatus.COMPLETED,
"planning": UserMediaListStatus.PLANNING,
"dropped": UserMediaListStatus.DROPPED,
"paused": UserMediaListStatus.PAUSED,
"repeating": UserMediaListStatus.REPEATING,
}
status_enum = status_map.get(status.lower())
# Convert sort string to enum
sort_map = {
"title": MediaSort.TITLE_ROMAJI,
@@ -154,29 +136,33 @@ def _build_search_params(
"updated": MediaSort.UPDATED_AT_DESC,
}
sort_enum = sort_map.get(sort.lower(), MediaSort.TITLE_ROMAJI)
# Convert format string to enum if provided
format_enum = None
if format:
from .....libs.media_api.types import MediaFormat
format_enum = getattr(MediaFormat, format.upper(), None)
# Convert genre strings to enums
genre_enums = []
if genre:
from .....libs.media_api.types import MediaGenre
for g in genre:
# Try to find matching genre enum
for genre_enum in MediaGenre:
if genre_enum.value.lower() == g.lower():
genre_enums.append(genre_enum)
break
return MediaSearchParams(
query=query,
per_page=limit,
sort=[sort_enum],
averageScore_greater=min_score * 10 if min_score else None, # Convert to AniList scale
averageScore_greater=min_score * 10
if min_score
else None, # Convert to AniList scale
averageScore_lesser=max_score * 10 if max_score else None,
genre_in=genre_enums if genre_enums else None,
format_in=[format_enum] if format_enum else None,
@@ -187,8 +173,10 @@ def _build_search_params(
def _display_search_results(console: Console, result, icons: bool):
"""Display search results in a formatted table."""
table = Table(title=f"{'🔍 ' if icons else ''}Search Results ({len(result.media)} found)")
table = Table(
title=f"{'🔍 ' if icons else ''}Search Results ({len(result.media)} found)"
)
table.add_column("Title", style="cyan", min_width=30)
table.add_column("Year", style="dim", justify="center", min_width=6)
table.add_column("Format", style="magenta", justify="center", min_width=8)
@@ -196,31 +184,35 @@ def _display_search_results(console: Console, result, icons: bool):
table.add_column("Score", style="yellow", justify="center", min_width=6)
table.add_column("Status", style="blue", justify="center", min_width=10)
table.add_column("Progress", style="white", justify="center", min_width=8)
for media in result.media:
# Get title (prefer English, fallback to Romaji)
title = media.title.english or media.title.romaji or "Unknown"
if len(title) > 40:
title = title[:37] + "..."
# Get year from start date
year = ""
if media.start_date:
year = str(media.start_date.year)
# Format episodes
episodes = str(media.episodes) if media.episodes else "?"
# Format score
score = f"{media.average_score/10:.1f}" if media.average_score else "N/A"
score = f"{media.average_score / 10:.1f}" if media.average_score else "N/A"
# Get user status
status = "Not Listed"
progress = "0"
if media.user_status:
status = media.user_status.status.value.title() if media.user_status.status else "Unknown"
status = (
media.user_status.status.value.title()
if media.user_status.status
else "Unknown"
)
progress = f"{media.user_status.progress or 0}/{episodes}"
table.add_row(
title,
year,
@@ -228,11 +220,11 @@ def _display_search_results(console: Console, result, icons: bool):
episodes,
score,
status,
progress
progress,
)
console.print(table)
# Show pagination info if applicable
if result.page_info.total > len(result.media):
console.print(

View File

@@ -17,45 +17,43 @@ from ....utils.feedback import create_feedback_manager
"--detailed",
"-d",
is_flag=True,
help="Show detailed breakdown by genre, format, and year"
help="Show detailed breakdown by genre, format, and year",
)
@click.option(
"--json",
"output_json",
is_flag=True,
help="Output statistics in JSON format"
"--json", "output_json", is_flag=True, help="Output statistics in JSON format"
)
@click.option(
"--api",
default="anilist",
type=click.Choice(["anilist"], case_sensitive=False),
help="Media API to show stats for"
help="Media API to show stats for",
)
@click.pass_obj
def stats(config: AppConfig, detailed: bool, output_json: bool, api: str):
"""
Display comprehensive statistics about your local media registry.
Shows total counts, status breakdown, and optionally detailed
Shows total counts, status breakdown, and optionally detailed
analysis by genre, format, and release year.
"""
feedback = create_feedback_manager(config.general.icons)
console = Console()
try:
registry_service = MediaRegistryService(api, config.registry)
stats_data = registry_service.get_registry_stats()
if output_json:
import json
print(json.dumps(stats_data, indent=2, default=str))
return
_display_stats_overview(console, stats_data, api, config.general.icons)
if detailed:
_display_detailed_stats(console, stats_data, config.general.icons)
except Exception as e:
feedback.error("Stats Error", f"Failed to generate statistics: {e}")
raise click.Abort()
@@ -63,118 +61,122 @@ def stats(config: AppConfig, detailed: bool, output_json: bool, api: str):
def _display_stats_overview(console: Console, stats: dict, api: str, icons: bool):
"""Display basic registry statistics overview."""
# Main overview panel
overview_text = f"[bold cyan]Media API:[/bold cyan] {api.title()}\n"
overview_text += f"[bold cyan]Total Media:[/bold cyan] {stats.get('total_media', 0)}\n"
overview_text += f"[bold cyan]Registry Version:[/bold cyan] {stats.get('version', 'Unknown')}\n"
overview_text += f"[bold cyan]Last Updated:[/bold cyan] {stats.get('last_updated', 'Never')}\n"
overview_text += f"[bold cyan]Storage Size:[/bold cyan] {stats.get('storage_size', 'Unknown')}"
overview_text += (
f"[bold cyan]Total Media:[/bold cyan] {stats.get('total_media', 0)}\n"
)
overview_text += (
f"[bold cyan]Registry Version:[/bold cyan] {stats.get('version', 'Unknown')}\n"
)
overview_text += (
f"[bold cyan]Last Updated:[/bold cyan] {stats.get('last_updated', 'Never')}\n"
)
overview_text += (
f"[bold cyan]Storage Size:[/bold cyan] {stats.get('storage_size', 'Unknown')}"
)
panel = Panel(
overview_text,
title=f"{'📊 ' if icons else ''}Registry Overview",
border_style="cyan"
border_style="cyan",
)
console.print(panel)
console.print()
# Status breakdown table
status_breakdown = stats.get('status_breakdown', {})
status_breakdown = stats.get("status_breakdown", {})
if status_breakdown:
table = Table(title=f"{'📋 ' if icons else ''}Status Breakdown")
table.add_column("Status", style="cyan", no_wrap=True)
table.add_column("Count", style="magenta", justify="right")
table.add_column("Percentage", style="green", justify="right")
total = sum(status_breakdown.values())
for status, count in sorted(status_breakdown.items()):
percentage = (count / total * 100) if total > 0 else 0
table.add_row(
status.title(),
str(count),
f"{percentage:.1f}%"
)
table.add_row(status.title(), str(count), f"{percentage:.1f}%")
console.print(table)
console.print()
# Download status breakdown
download_stats = stats.get('download_stats', {})
download_stats = stats.get("download_stats", {})
if download_stats:
table = Table(title=f"{'💾 ' if icons else ''}Download Status")
table.add_column("Status", style="cyan", no_wrap=True)
table.add_column("Count", style="magenta", justify="right")
for status, count in download_stats.items():
table.add_row(status.title(), str(count))
console.print(table)
console.print()
def _display_detailed_stats(console: Console, stats: dict, icons: bool):
"""Display detailed breakdown by various categories."""
# Genre breakdown
genre_breakdown = stats.get('genre_breakdown', {})
genre_breakdown = stats.get("genre_breakdown", {})
if genre_breakdown:
table = Table(title=f"{'🎭 ' if icons else ''}Top Genres")
table.add_column("Genre", style="cyan")
table.add_column("Count", style="magenta", justify="right")
# Sort by count and show top 10
top_genres = sorted(genre_breakdown.items(), key=lambda x: x[1], reverse=True)[:10]
top_genres = sorted(genre_breakdown.items(), key=lambda x: x[1], reverse=True)[
:10
]
for genre, count in top_genres:
table.add_row(genre, str(count))
console.print(table)
console.print()
# Format breakdown
format_breakdown = stats.get('format_breakdown', {})
format_breakdown = stats.get("format_breakdown", {})
if format_breakdown:
table = Table(title=f"{'📺 ' if icons else ''}Format Breakdown")
table.add_column("Format", style="cyan")
table.add_column("Count", style="magenta", justify="right")
table.add_column("Percentage", style="green", justify="right")
total = sum(format_breakdown.values())
for format_type, count in sorted(format_breakdown.items()):
percentage = (count / total * 100) if total > 0 else 0
table.add_row(
format_type,
str(count),
f"{percentage:.1f}%"
)
table.add_row(format_type, str(count), f"{percentage:.1f}%")
console.print(table)
console.print()
# Year breakdown
year_breakdown = stats.get('year_breakdown', {})
year_breakdown = stats.get("year_breakdown", {})
if year_breakdown:
table = Table(title=f"{'📅 ' if icons else ''}Release Years (Top 10)")
table.add_column("Year", style="cyan", justify="center")
table.add_column("Count", style="magenta", justify="right")
# Sort by year descending and show top 10
top_years = sorted(year_breakdown.items(), key=lambda x: x[0], reverse=True)[:10]
top_years = sorted(year_breakdown.items(), key=lambda x: x[0], reverse=True)[
:10
]
for year, count in top_years:
table.add_row(str(year), str(count))
console.print(table)
console.print()
# Rating breakdown
rating_breakdown = stats.get('rating_breakdown', {})
rating_breakdown = stats.get("rating_breakdown", {})
if rating_breakdown:
table = Table(title=f"{'' if icons else ''}Score Distribution")
table.add_column("Score Range", style="cyan")
table.add_column("Count", style="magenta", justify="right")
for score_range, count in sorted(rating_breakdown.items()):
table.add_row(score_range, str(count))
console.print(table)
console.print()

View File

@@ -89,22 +89,23 @@ def search(config: AppConfig, **options: "Unpack[Options]"):
if not anime:
raise FastAnimeError(f"Failed to fetch anime {anime_result.title}")
available_episodes: list[str] = sorted(
getattr(anime.episodes, config.stream.translation_type), key=float
)
if options["episode_range"]:
from ..utils.parser import parse_episode_range
try:
episodes_range = parse_episode_range(
options["episode_range"],
available_episodes
options["episode_range"], available_episodes
)
for episode in episodes_range:
stream_anime(config, provider, selector, anime, episode, anime_title)
stream_anime(
config, provider, selector, anime, episode, anime_title
)
except (ValueError, IndexError) as e:
raise FastAnimeError(f"Invalid episode range: {e}") from e
else:

View File

@@ -53,14 +53,20 @@ if TYPE_CHECKING:
)
@click.pass_context
@click.pass_obj
def update(config: "AppConfig", ctx: click.Context, force: bool, check_only: bool, release_notes: bool) -> None:
def update(
config: "AppConfig",
ctx: click.Context,
force: bool,
check_only: bool,
release_notes: bool,
) -> None:
"""
Update FastAnime to the latest version.
This command checks for available updates and optionally updates
the application to the latest version from the configured sources
(pip, uv, pipx, git, or nix depending on installation method).
Args:
config: The application configuration object
ctx: The click context containing CLI options
@@ -72,73 +78,83 @@ def update(config: "AppConfig", ctx: click.Context, force: bool, check_only: boo
if release_notes:
print("[cyan]Fetching latest release notes...[/]")
is_latest, release_json = check_for_updates()
if not release_json:
print("[yellow]Could not fetch release information. Please check your internet connection.[/]")
print(
"[yellow]Could not fetch release information. Please check your internet connection.[/]"
)
sys.exit(1)
version = release_json.get('tag_name', 'unknown')
release_name = release_json.get('name', version)
release_body = release_json.get('body', 'No release notes available.')
published_at = release_json.get('published_at', 'unknown')
version = release_json.get("tag_name", "unknown")
release_name = release_json.get("name", version)
release_body = release_json.get("body", "No release notes available.")
published_at = release_json.get("published_at", "unknown")
console = Console()
print(f"[bold cyan]Release: {release_name}[/]")
print(f"[dim]Version: {version}[/]")
print(f"[dim]Published: {published_at}[/]")
print()
# Display release notes as markdown if available
if release_body.strip():
markdown = Markdown(release_body)
console.print(markdown)
else:
print("[dim]No release notes available for this version.[/]")
return
elif check_only:
print("[cyan]Checking for updates...[/]")
is_latest, release_json = check_for_updates()
if not release_json:
print("[yellow]Could not check for updates. Please check your internet connection.[/]")
print(
"[yellow]Could not check for updates. Please check your internet connection.[/]"
)
sys.exit(1)
if is_latest:
print("[green]FastAnime is up to date![/]")
print(f"[dim]Current version: {release_json.get('tag_name', 'unknown')}[/]")
print(
f"[dim]Current version: {release_json.get('tag_name', 'unknown')}[/]"
)
else:
latest_version = release_json.get('tag_name', 'unknown')
latest_version = release_json.get("tag_name", "unknown")
print(f"[yellow]Update available: {latest_version}[/]")
print(f"[dim]Run 'fastanime update' to update[/]")
print("[dim]Run 'fastanime update' to update[/]")
sys.exit(1)
else:
print("[cyan]Checking for updates and updating if necessary...[/]")
success, release_json = update_app(force=force)
if not release_json:
print("[red]Could not check for updates. Please check your internet connection.[/]")
print(
"[red]Could not check for updates. Please check your internet connection.[/]"
)
sys.exit(1)
if success:
latest_version = release_json.get('tag_name', 'unknown')
latest_version = release_json.get("tag_name", "unknown")
print(f"[green]Successfully updated to version {latest_version}![/]")
else:
if force:
print("[red]Update failed. Please check the error messages above.[/]")
print(
"[red]Update failed. Please check the error messages above.[/]"
)
sys.exit(1)
# If not forced and update failed, it might be because already up to date
# The update_app function already prints appropriate messages
except KeyboardInterrupt:
print("\n[yellow]Update cancelled by user.[/]")
sys.exit(1)
except Exception as e:
print(f"[red]An error occurred during update: {e}[/]")
# Get trace option from parent context
trace = ctx.parent.params.get('trace', False) if ctx.parent else False
trace = ctx.parent.params.get("trace", False) if ctx.parent else False
if trace:
raise
sys.exit(1)