diff --git a/fastanime/cli/cli.py b/fastanime/cli/cli.py index a3db373..70ccc51 100644 --- a/fastanime/cli/cli.py +++ b/fastanime/cli/cli.py @@ -32,6 +32,7 @@ commands = { "anilist": "anilist.anilist", "download": "download.download", "update": "update.update", + "registry": "registry.cmd.registry", } diff --git a/fastanime/cli/commands/registry/__init__.py b/fastanime/cli/commands/registry/__init__.py new file mode 100644 index 0000000..6561b40 --- /dev/null +++ b/fastanime/cli/commands/registry/__init__.py @@ -0,0 +1 @@ +# Registry package diff --git a/fastanime/cli/commands/registry/cmd.py b/fastanime/cli/commands/registry/cmd.py new file mode 100644 index 0000000..1d8ac01 --- /dev/null +++ b/fastanime/cli/commands/registry/cmd.py @@ -0,0 +1,65 @@ +import click + +from ...utils.lazyloader import LazyGroup +from . import examples + +commands = { + "sync": "sync.sync", + "stats": "stats.stats", + "search": "search.search", + "export": "export.export", + "import": "import_.import_", + "clean": "clean.clean", + "backup": "backup.backup", + "restore": "restore.restore", +} + + +@click.group( + cls=LazyGroup, + name="registry", + root="fastanime.cli.commands.registry.commands", + invoke_without_command=True, + help="Manage your local media registry - sync, search, backup and maintain your anime database", + short_help="Local media registry management", + lazy_subcommands=commands, + epilog=examples.main, +) +@click.option( + "--api", + default="anilist", + help="Media API to use (default: anilist)", + type=click.Choice(["anilist"], case_sensitive=False) +) +@click.pass_context +def registry(ctx: click.Context, api: str): + """ + The entry point for the 'registry' command. If no subcommand is invoked, + it shows registry information and statistics. + """ + from ...service.registry.service import MediaRegistryService + from ...utils.feedback import create_feedback_manager + + config = ctx.obj + feedback = create_feedback_manager(config.general.icons) + + if ctx.invoked_subcommand is None: + # Show registry overview and statistics + try: + registry_service = MediaRegistryService(api, config.registry) + stats = registry_service.get_registry_stats() + + feedback.info("Registry Overview", f"API: {api}") + feedback.info("Total Media", f"{stats.get('total_media', 0)} entries") + feedback.info("Recently Updated", f"{stats.get('recently_updated', 0)} entries in last 7 days") + feedback.info("Storage Path", str(config.registry.media_dir)) + + # Show status breakdown if available + status_breakdown = stats.get('status_breakdown', {}) + if status_breakdown: + feedback.info("Status Breakdown:") + for status, count in status_breakdown.items(): + feedback.info(f" {status.title()}", f"{count} entries") + + except Exception as e: + feedback.error("Registry Error", f"Failed to load registry: {e}") diff --git a/fastanime/cli/commands/registry/commands/__init__.py b/fastanime/cli/commands/registry/commands/__init__.py new file mode 100644 index 0000000..e58511c --- /dev/null +++ b/fastanime/cli/commands/registry/commands/__init__.py @@ -0,0 +1 @@ +# Registry commands package diff --git a/fastanime/cli/commands/registry/commands/backup.py b/fastanime/cli/commands/registry/commands/backup.py new file mode 100644 index 0000000..9b18798 --- /dev/null +++ b/fastanime/cli/commands/registry/commands/backup.py @@ -0,0 +1,242 @@ +""" +Registry backup command - create full backups of the registry +""" + +import shutil +import tarfile +from pathlib import Path +from datetime import datetime + +import click + +from .....core.config import AppConfig +from ....service.registry.service import MediaRegistryService +from ....utils.feedback import create_feedback_manager + + +@click.command(help="Create a full backup of the registry") +@click.option( + "--output", + "-o", + type=click.Path(), + help="Output backup file path (auto-generated if not specified)" +) +@click.option( + "--compress", + "-c", + is_flag=True, + help="Compress the backup archive" +) +@click.option( + "--include-cache", + is_flag=True, + help="Include cache files in backup" +) +@click.option( + "--format", + "backup_format", + type=click.Choice(["tar", "zip"], case_sensitive=False), + default="tar", + help="Backup archive format" +) +@click.option( + "--api", + default="anilist", + type=click.Choice(["anilist"], case_sensitive=False), + help="Media API registry to backup" +) +@click.pass_obj +def backup( + config: AppConfig, + output: str | None, + compress: bool, + include_cache: bool, + backup_format: str, + api: str +): + """ + Create a complete backup of your media registry. + + Includes all media records, index files, and optionally cache data. + Backups can be compressed and are suitable for restoration. + """ + feedback = create_feedback_manager(config.general.icons) + + try: + registry_service = MediaRegistryService(api, config.registry) + + # Generate output filename if not specified + if not output: + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + extension = "tar.gz" if compress and backup_format == "tar" else backup_format + if backup_format == "zip": + extension = "zip" + output = f"fastanime_registry_backup_{api}_{timestamp}.{extension}" + + output_path = Path(output) + + # Get backup statistics before starting + stats = registry_service.get_registry_stats() + total_media = stats.get('total_media', 0) + + feedback.info("Starting Backup", f"Backing up {total_media} media entries...") + + # Create backup based on format + if backup_format.lower() == "tar": + _create_tar_backup( + registry_service, output_path, compress, include_cache, feedback, api + ) + elif backup_format.lower() == "zip": + _create_zip_backup( + registry_service, output_path, include_cache, feedback, api + ) + + # Get final backup size + backup_size = _format_file_size(output_path) + + feedback.success( + "Backup Complete", + f"Registry backed up to {output_path} ({backup_size})" + ) + + # Show backup contents summary + _show_backup_summary(output_path, backup_format, feedback) + + except Exception as e: + feedback.error("Backup Error", f"Failed to create backup: {e}") + raise click.Abort() + + +def _create_tar_backup(registry_service, output_path: Path, compress: bool, include_cache: bool, feedback, api: str): + """Create a tar-based backup.""" + + mode = "w:gz" if compress else "w" + + with tarfile.open(output_path, mode) as tar: + # Add registry directory + registry_dir = registry_service.config.media_dir / api + if registry_dir.exists(): + tar.add(registry_dir, arcname=f"registry/{api}") + feedback.info("Added to backup", f"Registry data ({api})") + + # Add index directory + index_dir = registry_service.config.index_dir + if index_dir.exists(): + tar.add(index_dir, arcname="index") + feedback.info("Added to backup", "Registry index") + + # Add cache if requested + if include_cache: + cache_dir = registry_service.config.media_dir.parent / "cache" + if cache_dir.exists(): + tar.add(cache_dir, arcname="cache") + feedback.info("Added to backup", "Cache data") + + # Add metadata file + metadata = _create_backup_metadata(registry_service, api, include_cache) + metadata_path = output_path.parent / "backup_metadata.json" + + try: + import json + with open(metadata_path, 'w', encoding='utf-8') as f: + json.dump(metadata, f, indent=2, default=str) + + tar.add(metadata_path, arcname="backup_metadata.json") + metadata_path.unlink() # Clean up temp file + + except Exception as e: + feedback.warning("Metadata Error", f"Failed to add metadata: {e}") + + +def _create_zip_backup(registry_service, output_path: Path, include_cache: bool, feedback, api: str): + """Create a zip-based backup.""" + import zipfile + + with zipfile.ZipFile(output_path, 'w', zipfile.ZIP_DEFLATED) as zip_file: + # Add registry directory + registry_dir = registry_service.config.media_dir / api + if registry_dir.exists(): + for file_path in registry_dir.rglob('*'): + if file_path.is_file(): + arcname = f"registry/{api}/{file_path.relative_to(registry_dir)}" + zip_file.write(file_path, arcname) + feedback.info("Added to backup", f"Registry data ({api})") + + # Add index directory + index_dir = registry_service.config.index_dir + if index_dir.exists(): + for file_path in index_dir.rglob('*'): + if file_path.is_file(): + arcname = f"index/{file_path.relative_to(index_dir)}" + zip_file.write(file_path, arcname) + feedback.info("Added to backup", "Registry index") + + # Add cache if requested + if include_cache: + cache_dir = registry_service.config.media_dir.parent / "cache" + if cache_dir.exists(): + for file_path in cache_dir.rglob('*'): + if file_path.is_file(): + arcname = f"cache/{file_path.relative_to(cache_dir)}" + zip_file.write(file_path, arcname) + feedback.info("Added to backup", "Cache data") + + # Add metadata + metadata = _create_backup_metadata(registry_service, api, include_cache) + try: + import json + metadata_json = json.dumps(metadata, indent=2, default=str) + zip_file.writestr("backup_metadata.json", metadata_json) + except Exception as e: + feedback.warning("Metadata Error", f"Failed to add metadata: {e}") + + +def _create_backup_metadata(registry_service, api: str, include_cache: bool) -> dict: + """Create backup metadata.""" + stats = registry_service.get_registry_stats() + + return { + "backup_timestamp": datetime.now().isoformat(), + "fastanime_version": "unknown", # You might want to get this from somewhere + "registry_version": stats.get('version'), + "api": api, + "total_media": stats.get('total_media', 0), + "include_cache": include_cache, + "registry_stats": stats, + "backup_type": "full", + } + + +def _show_backup_summary(backup_path: Path, format_type: str, feedback): + """Show summary of backup contents.""" + + try: + if format_type.lower() == "tar": + with tarfile.open(backup_path, 'r:*') as tar: + members = tar.getmembers() + file_count = len([m for m in members if m.isfile()]) + dir_count = len([m for m in members if m.isdir()]) + else: # zip + import zipfile + with zipfile.ZipFile(backup_path, 'r') as zip_file: + info_list = zip_file.infolist() + file_count = len([info for info in info_list if not info.is_dir()]) + dir_count = len([info for info in info_list if info.is_dir()]) + + feedback.info("Backup Contents", f"{file_count} files, {dir_count} directories") + + except Exception as e: + feedback.warning("Summary Error", f"Could not analyze backup contents: {e}") + + +def _format_file_size(file_path: Path) -> str: + """Format file size in human-readable format.""" + try: + size = file_path.stat().st_size + for unit in ['B', 'KB', 'MB', 'GB']: + if size < 1024.0: + return f"{size:.1f} {unit}" + size /= 1024.0 + return f"{size:.1f} TB" + except: + return "Unknown size" diff --git a/fastanime/cli/commands/registry/commands/clean.py b/fastanime/cli/commands/registry/commands/clean.py new file mode 100644 index 0000000..f866dc9 --- /dev/null +++ b/fastanime/cli/commands/registry/commands/clean.py @@ -0,0 +1,379 @@ +""" +Registry clean command - clean up orphaned entries and invalid data +""" + +import click +from rich.console import Console +from rich.table import Table + +from .....core.config import AppConfig +from ....service.registry.service import MediaRegistryService +from ....utils.feedback import create_feedback_manager + + +@click.command(help="Clean up orphaned entries and invalid data from registry") +@click.option( + "--dry-run", + is_flag=True, + help="Show what would be cleaned without making changes" +) +@click.option( + "--orphaned", + is_flag=True, + help="Remove orphaned media records (index entries without files)" +) +@click.option( + "--invalid", + is_flag=True, + help="Remove invalid or corrupted entries" +) +@click.option( + "--duplicates", + is_flag=True, + help="Remove duplicate entries" +) +@click.option( + "--old-format", + is_flag=True, + help="Clean entries from old registry format versions" +) +@click.option( + "--force", + "-f", + is_flag=True, + help="Force cleanup without confirmation prompts" +) +@click.option( + "--api", + default="anilist", + type=click.Choice(["anilist"], case_sensitive=False), + help="Media API registry to clean" +) +@click.pass_obj +def clean( + config: AppConfig, + dry_run: bool, + orphaned: bool, + invalid: bool, + duplicates: bool, + old_format: bool, + force: bool, + api: str +): + """ + Clean up your local media registry. + + Can remove orphaned entries, invalid data, duplicates, and entries + from old format versions. Use --dry-run to preview changes. + """ + feedback = create_feedback_manager(config.general.icons) + console = Console() + + # Default to all cleanup types if none specified + if not any([orphaned, invalid, duplicates, old_format]): + orphaned = invalid = duplicates = old_format = True + + try: + registry_service = MediaRegistryService(api, config.registry) + + cleanup_results = { + "orphaned": [], + "invalid": [], + "duplicates": [], + "old_format": [] + } + + # Analyze registry for cleanup opportunities + _analyze_registry(registry_service, cleanup_results, orphaned, invalid, duplicates, old_format) + + # Show cleanup summary + _display_cleanup_summary(console, cleanup_results, config.general.icons) + + # Confirm cleanup if not dry run and not forced + total_items = sum(len(items) for items in cleanup_results.values()) + if total_items == 0: + feedback.info("Registry Clean", "No cleanup needed - registry is already clean!") + return + + if not dry_run: + if not force: + if not click.confirm(f"Clean up {total_items} items from registry?"): + feedback.info("Cleanup Cancelled", "No changes were made") + return + + # Perform cleanup + _perform_cleanup(registry_service, cleanup_results, feedback) + + feedback.success("Cleanup Complete", f"Cleaned up {total_items} items from registry") + else: + feedback.info("Dry Run Complete", f"Would clean up {total_items} items") + + except Exception as e: + feedback.error("Cleanup Error", f"Failed to clean registry: {e}") + raise click.Abort() + + +def _analyze_registry(registry_service, results: dict, check_orphaned: bool, check_invalid: bool, check_duplicates: bool, check_old_format: bool): + """Analyze registry for cleanup opportunities.""" + + if check_orphaned: + results["orphaned"] = _find_orphaned_entries(registry_service) + + if check_invalid: + results["invalid"] = _find_invalid_entries(registry_service) + + if check_duplicates: + results["duplicates"] = _find_duplicate_entries(registry_service) + + if check_old_format: + results["old_format"] = _find_old_format_entries(registry_service) + + +def _find_orphaned_entries(registry_service) -> list: + """Find index entries that don't have corresponding media files.""" + orphaned = [] + + try: + index = registry_service._load_index() + + for entry_key, entry in index.media_index.items(): + media_file = registry_service._get_media_file_path(entry.media_id) + if not media_file.exists(): + orphaned.append({ + "type": "orphaned_index", + "id": entry.media_id, + "key": entry_key, + "reason": "Media file missing" + }) + except Exception: + pass + + return orphaned + + +def _find_invalid_entries(registry_service) -> list: + """Find invalid or corrupted entries.""" + invalid = [] + + try: + # Check all media files + for media_file in registry_service.media_registry_dir.iterdir(): + if not media_file.name.endswith('.json'): + continue + + try: + media_id = int(media_file.stem) + record = registry_service.get_media_record(media_id) + + # Check for invalid record structure + if not record or not record.media_item: + invalid.append({ + "type": "invalid_record", + "id": media_id, + "file": media_file, + "reason": "Invalid record structure" + }) + elif not record.media_item.title or not record.media_item.title.english and not record.media_item.title.romaji: + invalid.append({ + "type": "invalid_title", + "id": media_id, + "file": media_file, + "reason": "Missing or invalid title" + }) + + except (ValueError, Exception) as e: + invalid.append({ + "type": "corrupted_file", + "id": media_file.stem, + "file": media_file, + "reason": f"File corruption: {e}" + }) + except Exception: + pass + + return invalid + + +def _find_duplicate_entries(registry_service) -> list: + """Find duplicate entries (same media ID appearing multiple times).""" + duplicates = [] + seen_ids = set() + + try: + index = registry_service._load_index() + + for entry_key, entry in index.media_index.items(): + if entry.media_id in seen_ids: + duplicates.append({ + "type": "duplicate_index", + "id": entry.media_id, + "key": entry_key, + "reason": "Duplicate media ID in index" + }) + else: + seen_ids.add(entry.media_id) + except Exception: + pass + + return duplicates + + +def _find_old_format_entries(registry_service) -> list: + """Find entries from old registry format versions.""" + old_format = [] + + try: + index = registry_service._load_index() + current_version = registry_service._index.version + + # Check for entries that might be from old formats + # This is a placeholder - you'd implement specific checks based on your version history + for media_file in registry_service.media_registry_dir.iterdir(): + if not media_file.name.endswith('.json'): + continue + + try: + import json + with open(media_file, 'r') as f: + data = json.load(f) + + # Check for old format indicators + if 'version' in data and data['version'] < current_version: + old_format.append({ + "type": "old_version", + "id": media_file.stem, + "file": media_file, + "reason": f"Old format version {data.get('version')}" + }) + except Exception: + pass + except Exception: + pass + + return old_format + + +def _display_cleanup_summary(console: Console, results: dict, icons: bool): + """Display summary of cleanup opportunities.""" + + table = Table(title=f"{'๐Ÿงน ' if icons else ''}Registry Cleanup Summary") + table.add_column("Category", style="cyan", no_wrap=True) + table.add_column("Count", style="magenta", justify="right") + table.add_column("Description", style="white") + + categories = { + "orphaned": "Orphaned Entries", + "invalid": "Invalid Entries", + "duplicates": "Duplicate Entries", + "old_format": "Old Format Entries" + } + + for category, display_name in categories.items(): + count = len(results[category]) + if count > 0: + # Get sample reasons + reasons = set(item["reason"] for item in results[category][:3]) + description = "; ".join(list(reasons)[:2]) + if len(reasons) > 2: + description += "..." + else: + description = "None found" + + table.add_row(display_name, str(count), description) + + console.print(table) + console.print() + + # Show detailed breakdown if there are items to clean + for category, items in results.items(): + if items: + _display_category_details(console, category, items, icons) + + +def _display_category_details(console: Console, category: str, items: list, icons: bool): + """Display detailed breakdown for a cleanup category.""" + + category_names = { + "orphaned": "๐Ÿ”— Orphaned Entries" if icons else "Orphaned Entries", + "invalid": "โŒ Invalid Entries" if icons else "Invalid Entries", + "duplicates": "๐Ÿ‘ฅ Duplicate Entries" if icons else "Duplicate Entries", + "old_format": "๐Ÿ“ผ Old Format Entries" if icons else "Old Format Entries" + } + + table = Table(title=category_names.get(category, category.title())) + table.add_column("ID", style="cyan", no_wrap=True) + table.add_column("Type", style="magenta") + table.add_column("Reason", style="yellow") + + for item in items[:10]: # Show max 10 items + table.add_row( + str(item["id"]), + item["type"], + item["reason"] + ) + + if len(items) > 10: + table.add_row("...", "...", f"And {len(items) - 10} more") + + console.print(table) + console.print() + + +def _perform_cleanup(registry_service, results: dict, feedback): + """Perform the actual cleanup operations.""" + + cleaned_count = 0 + + # Clean orphaned entries + for item in results["orphaned"]: + try: + if item["type"] == "orphaned_index": + index = registry_service._load_index() + if item["key"] in index.media_index: + del index.media_index[item["key"]] + registry_service._save_index(index) + cleaned_count += 1 + except Exception as e: + feedback.warning("Cleanup Error", f"Failed to clean orphaned entry {item['id']}: {e}") + + # Clean invalid entries + for item in results["invalid"]: + try: + if "file" in item: + item["file"].unlink() # Delete the file + cleaned_count += 1 + + # Also remove from index if present + index = registry_service._load_index() + entry_key = f"{registry_service._media_api}_{item['id']}" + if entry_key in index.media_index: + del index.media_index[entry_key] + registry_service._save_index(index) + + except Exception as e: + feedback.warning("Cleanup Error", f"Failed to clean invalid entry {item['id']}: {e}") + + # Clean duplicates + for item in results["duplicates"]: + try: + if item["type"] == "duplicate_index": + index = registry_service._load_index() + if item["key"] in index.media_index: + del index.media_index[item["key"]] + registry_service._save_index(index) + cleaned_count += 1 + except Exception as e: + feedback.warning("Cleanup Error", f"Failed to clean duplicate entry {item['id']}: {e}") + + # Clean old format entries + for item in results["old_format"]: + try: + if "file" in item: + # You might want to migrate instead of delete + # For now, we'll just remove old format files + item["file"].unlink() + cleaned_count += 1 + except Exception as e: + feedback.warning("Cleanup Error", f"Failed to clean old format entry {item['id']}: {e}") + + feedback.info("Cleanup Results", f"Successfully cleaned {cleaned_count} items") diff --git a/fastanime/cli/commands/registry/commands/export.py b/fastanime/cli/commands/registry/commands/export.py new file mode 100644 index 0000000..87eafda --- /dev/null +++ b/fastanime/cli/commands/registry/commands/export.py @@ -0,0 +1,338 @@ +""" +Registry export command - export registry data to various formats +""" + +import json +import csv +from pathlib import Path +from datetime import datetime + +import click + +from .....core.config import AppConfig +from ....service.registry.service import MediaRegistryService +from ....utils.feedback import create_feedback_manager + + +@click.command(help="Export registry data to various formats") +@click.option( + "--format", + "output_format", + type=click.Choice(["json", "csv", "xml"], case_sensitive=False), + default="json", + help="Export format" +) +@click.option( + "--output", + "-o", + type=click.Path(), + help="Output file path (auto-generated if not specified)" +) +@click.option( + "--include-metadata", + is_flag=True, + help="Include detailed media metadata in export" +) +@click.option( + "--status", + multiple=True, + type=click.Choice([ + "watching", "completed", "planning", "dropped", "paused", "repeating" + ], case_sensitive=False), + help="Only export specific status lists" +) +@click.option( + "--compress", + is_flag=True, + help="Compress the output file" +) +@click.option( + "--api", + default="anilist", + type=click.Choice(["anilist"], case_sensitive=False), + help="Media API registry to export" +) +@click.pass_obj +def export( + config: AppConfig, + output_format: str, + output: str | None, + include_metadata: bool, + status: tuple[str, ...], + compress: bool, + api: str +): + """ + Export your local media registry to various formats. + + Supports JSON, CSV, and XML formats. Can optionally include + detailed metadata and compress the output. + """ + feedback = create_feedback_manager(config.general.icons) + + try: + registry_service = MediaRegistryService(api, config.registry) + + # Generate output filename if not specified + if not output: + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + extension = output_format.lower() + if compress: + extension += ".gz" + output = f"fastanime_registry_{api}_{timestamp}.{extension}" + + output_path = Path(output) + + # Get export data + export_data = _prepare_export_data( + registry_service, include_metadata, status + ) + + # Export based on format + if output_format.lower() == "json": + _export_json(export_data, output_path, compress, feedback) + elif output_format.lower() == "csv": + _export_csv(export_data, output_path, compress, feedback) + elif output_format.lower() == "xml": + _export_xml(export_data, output_path, compress, feedback) + + feedback.success( + "Export Complete", + f"Registry exported to {output_path} ({_format_file_size(output_path)})" + ) + + except Exception as e: + feedback.error("Export Error", f"Failed to export registry: {e}") + raise click.Abort() + + +def _prepare_export_data(registry_service, include_metadata: bool, status_filter: tuple[str, ...]) -> dict: + """Prepare data for export based on options.""" + + # Convert status filter to enums + from .....libs.media_api.types import UserMediaListStatus + status_map = { + "watching": UserMediaListStatus.WATCHING, + "completed": UserMediaListStatus.COMPLETED, + "planning": UserMediaListStatus.PLANNING, + "dropped": UserMediaListStatus.DROPPED, + "paused": UserMediaListStatus.PAUSED, + "repeating": UserMediaListStatus.REPEATING, + } + + status_enums = [status_map[s] for s in status_filter] if status_filter else None + + export_data = { + "metadata": { + "export_timestamp": datetime.now().isoformat(), + "registry_version": registry_service._load_index().version, + "include_metadata": include_metadata, + "filtered_status": list(status_filter) if status_filter else None, + }, + "statistics": registry_service.get_registry_stats(), + "media": [] + } + + # Get all records and filter by status if specified + all_records = registry_service.get_all_media_records() + + for record in all_records: + index_entry = registry_service.get_media_index_entry(record.media_item.id) + + # Skip if status filter is specified and doesn't match + if status_enums and (not index_entry or index_entry.status not in status_enums): + continue + + media_data = { + "id": record.media_item.id, + "title": { + "english": record.media_item.title.english, + "romaji": record.media_item.title.romaji, + "native": record.media_item.title.native, + }, + "user_status": { + "status": index_entry.status.value if index_entry and index_entry.status else None, + "progress": index_entry.progress if index_entry else None, + "score": index_entry.score if index_entry else None, + "last_watched": index_entry.last_watched.isoformat() if index_entry and index_entry.last_watched else None, + "notes": index_entry.notes if index_entry else None, + } + } + + if include_metadata: + media_data.update({ + "format": record.media_item.format.value if record.media_item.format else None, + "episodes": record.media_item.episodes, + "duration": record.media_item.duration, + "status": record.media_item.status.value if record.media_item.status else None, + "start_date": record.media_item.start_date.isoformat() if record.media_item.start_date else None, + "end_date": record.media_item.end_date.isoformat() if record.media_item.end_date else None, + "average_score": record.media_item.average_score, + "popularity": record.media_item.popularity, + "genres": [genre.value for genre in record.media_item.genres], + "tags": [{"name": tag.name.value, "rank": tag.rank} for tag in record.media_item.tags], + "studios": [studio.name for studio in record.media_item.studios if studio.name], + "description": record.media_item.description, + "cover_image": { + "large": record.media_item.cover_image.large if record.media_item.cover_image else None, + "medium": record.media_item.cover_image.medium if record.media_item.cover_image else None, + } if record.media_item.cover_image else None, + }) + + export_data["media"].append(media_data) + + return export_data + + +def _export_json(data: dict, output_path: Path, compress: bool, feedback): + """Export data to JSON format.""" + if compress: + import gzip + with gzip.open(output_path, 'wt', encoding='utf-8') as f: + json.dump(data, f, indent=2, ensure_ascii=False) + else: + with open(output_path, 'w', encoding='utf-8') as f: + json.dump(data, f, indent=2, ensure_ascii=False) + + +def _export_csv(data: dict, output_path: Path, compress: bool, feedback): + """Export data to CSV format.""" + # Flatten media data for CSV + fieldnames = [ + "id", "title_english", "title_romaji", "title_native", + "status", "progress", "score", "last_watched", "notes" + ] + + # Add metadata fields if included + if data["metadata"]["include_metadata"]: + fieldnames.extend([ + "format", "episodes", "duration", "media_status", "start_date", "end_date", + "average_score", "popularity", "genres", "description" + ]) + + def write_csv(file_obj): + writer = csv.DictWriter(file_obj, fieldnames=fieldnames) + writer.writeheader() + + for media in data["media"]: + row = { + "id": media["id"], + "title_english": media["title"]["english"], + "title_romaji": media["title"]["romaji"], + "title_native": media["title"]["native"], + "status": media["user_status"]["status"], + "progress": media["user_status"]["progress"], + "score": media["user_status"]["score"], + "last_watched": media["user_status"]["last_watched"], + "notes": media["user_status"]["notes"], + } + + if data["metadata"]["include_metadata"]: + row.update({ + "format": media.get("format"), + "episodes": media.get("episodes"), + "duration": media.get("duration"), + "media_status": media.get("status"), + "start_date": media.get("start_date"), + "end_date": media.get("end_date"), + "average_score": media.get("average_score"), + "popularity": media.get("popularity"), + "genres": ",".join(media.get("genres", [])), + "description": media.get("description"), + }) + + writer.writerow(row) + + if compress: + import gzip + with gzip.open(output_path, 'wt', encoding='utf-8', newline='') as f: + write_csv(f) + else: + with open(output_path, 'w', encoding='utf-8', newline='') as f: + write_csv(f) + + +def _export_xml(data: dict, output_path: Path, compress: bool, feedback): + """Export data to XML format.""" + try: + import xml.etree.ElementTree as ET + except ImportError: + feedback.error("XML Export Error", "XML export requires Python's xml module") + raise click.Abort() + + root = ET.Element("fastanime_registry") + + # Add metadata + metadata_elem = ET.SubElement(root, "metadata") + for key, value in data["metadata"].items(): + if value is not None: + elem = ET.SubElement(metadata_elem, key) + elem.text = str(value) + + # Add statistics + stats_elem = ET.SubElement(root, "statistics") + for key, value in data["statistics"].items(): + if value is not None: + elem = ET.SubElement(stats_elem, key) + elem.text = str(value) + + # Add media + media_list_elem = ET.SubElement(root, "media_list") + for media in data["media"]: + media_elem = ET.SubElement(media_list_elem, "media") + media_elem.set("id", str(media["id"])) + + # Add titles + titles_elem = ET.SubElement(media_elem, "titles") + for title_type, title_value in media["title"].items(): + if title_value: + title_elem = ET.SubElement(titles_elem, title_type) + title_elem.text = title_value + + # Add user status + status_elem = ET.SubElement(media_elem, "user_status") + for key, value in media["user_status"].items(): + if value is not None: + elem = ET.SubElement(status_elem, key) + elem.text = str(value) + + # Add metadata if included + if data["metadata"]["include_metadata"]: + for key, value in media.items(): + if key not in ["id", "title", "user_status"] and value is not None: + if isinstance(value, list): + list_elem = ET.SubElement(media_elem, key) + for item in value: + item_elem = ET.SubElement(list_elem, "item") + item_elem.text = str(item) + elif isinstance(value, dict): + dict_elem = ET.SubElement(media_elem, key) + for sub_key, sub_value in value.items(): + if sub_value is not None: + sub_elem = ET.SubElement(dict_elem, sub_key) + sub_elem.text = str(sub_value) + else: + elem = ET.SubElement(media_elem, key) + elem.text = str(value) + + # Write XML + tree = ET.ElementTree(root) + if compress: + import gzip + with gzip.open(output_path, 'wb') as f: + tree.write(f, encoding='utf-8', xml_declaration=True) + else: + tree.write(output_path, encoding='utf-8', xml_declaration=True) + + +def _format_file_size(file_path: Path) -> str: + """Format file size in human-readable format.""" + try: + size = file_path.stat().st_size + for unit in ['B', 'KB', 'MB', 'GB']: + if size < 1024.0: + return f"{size:.1f} {unit}" + size /= 1024.0 + return f"{size:.1f} TB" + except: + return "Unknown size" diff --git a/fastanime/cli/commands/registry/commands/import_.py b/fastanime/cli/commands/registry/commands/import_.py new file mode 100644 index 0000000..8484a41 --- /dev/null +++ b/fastanime/cli/commands/registry/commands/import_.py @@ -0,0 +1,425 @@ +""" +Registry import command - import registry data from various formats +""" + +import json +import csv +from pathlib import Path +from datetime import datetime + +import click + +from .....core.config import AppConfig +from .....libs.media_api.types import UserMediaListStatus, MediaItem, MediaTitle +from ....service.registry.service import MediaRegistryService +from ....utils.feedback import create_feedback_manager + + +@click.command(name="import", help="Import registry data from various formats") +@click.argument("input_file", type=click.Path(exists=True, path_type=Path)) +@click.option( + "--format", + "input_format", + type=click.Choice(["json", "csv", "xml", "auto"], case_sensitive=False), + default="auto", + help="Input format (auto-detect if not specified)" +) +@click.option( + "--merge", + is_flag=True, + help="Merge with existing registry (default: replace)" +) +@click.option( + "--dry-run", + is_flag=True, + help="Show what would be imported without making changes" +) +@click.option( + "--force", + "-f", + is_flag=True, + help="Force import even if format version doesn't match" +) +@click.option( + "--backup", + is_flag=True, + help="Create backup before importing" +) +@click.option( + "--api", + default="anilist", + type=click.Choice(["anilist"], case_sensitive=False), + help="Media API registry to import to" +) +@click.pass_obj +def import_( + config: AppConfig, + input_file: Path, + input_format: str, + merge: bool, + dry_run: bool, + force: bool, + backup: bool, + api: str +): + """ + Import media registry data from various formats. + + Supports JSON, CSV, and XML formats exported by the export command + or compatible third-party tools. + """ + feedback = create_feedback_manager(config.general.icons) + + try: + registry_service = MediaRegistryService(api, config.registry) + + # Create backup if requested + if backup and not dry_run: + _create_backup(registry_service, feedback) + + # Auto-detect format if needed + if input_format == "auto": + input_format = _detect_format(input_file) + feedback.info("Format Detection", f"Detected format: {input_format.upper()}") + + # Parse input file + import_data = _parse_input_file(input_file, input_format, feedback) + + # Validate import data + _validate_import_data(import_data, force, feedback) + + # Import data + _import_data( + registry_service, import_data, merge, dry_run, feedback + ) + + if not dry_run: + feedback.success( + "Import Complete", + f"Successfully imported {len(import_data.get('media', []))} media entries" + ) + else: + feedback.info( + "Dry Run Complete", + f"Would import {len(import_data.get('media', []))} media entries" + ) + + except Exception as e: + feedback.error("Import Error", f"Failed to import registry: {e}") + raise click.Abort() + + +def _create_backup(registry_service, feedback): + """Create a backup before importing.""" + from .export import _prepare_export_data, _export_json + + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + backup_path = Path(f"fastanime_registry_backup_{timestamp}.json") + + export_data = _prepare_export_data(registry_service, True, ()) + _export_json(export_data, backup_path, False, feedback) + + feedback.info("Backup Created", f"Registry backed up to {backup_path}") + + +def _detect_format(file_path: Path) -> str: + """Auto-detect file format based on extension and content.""" + extension = file_path.suffix.lower() + + if extension in ['.json', '.gz']: + return "json" + elif extension == '.csv': + return "csv" + elif extension == '.xml': + return "xml" + + # Try to detect by content + try: + with open(file_path, 'r', encoding='utf-8') as f: + content = f.read(100).strip() + if content.startswith('{') or content.startswith('['): + return "json" + elif content.startswith(' dict: + """Parse input file based on format.""" + if format_type == "json": + return _parse_json(file_path) + elif format_type == "csv": + return _parse_csv(file_path) + elif format_type == "xml": + return _parse_xml(file_path) + else: + raise click.ClickException(f"Unsupported format: {format_type}") + + +def _parse_json(file_path: Path) -> dict: + """Parse JSON input file.""" + try: + if file_path.suffix.lower() == '.gz': + import gzip + with gzip.open(file_path, 'rt', encoding='utf-8') as f: + return json.load(f) + else: + with open(file_path, 'r', encoding='utf-8') as f: + return json.load(f) + except json.JSONDecodeError as e: + raise click.ClickException(f"Invalid JSON format: {e}") + + +def _parse_csv(file_path: Path) -> dict: + """Parse CSV input file.""" + import_data = { + "metadata": { + "import_timestamp": datetime.now().isoformat(), + "source_format": "csv", + }, + "media": [] + } + + try: + with open(file_path, 'r', encoding='utf-8') as f: + reader = csv.DictReader(f) + for row in reader: + media_data = { + "id": int(row["id"]) if row.get("id") else None, + "title": { + "english": row.get("title_english"), + "romaji": row.get("title_romaji"), + "native": row.get("title_native"), + }, + "user_status": { + "status": row.get("status"), + "progress": int(row["progress"]) if row.get("progress") else None, + "score": float(row["score"]) if row.get("score") else None, + "last_watched": row.get("last_watched"), + "notes": row.get("notes"), + } + } + + # Add metadata fields if present + if "format" in row: + media_data.update({ + "format": row.get("format"), + "episodes": int(row["episodes"]) if row.get("episodes") else None, + "duration": int(row["duration"]) if row.get("duration") else None, + "media_status": row.get("media_status"), + "start_date": row.get("start_date"), + "end_date": row.get("end_date"), + "average_score": float(row["average_score"]) if row.get("average_score") else None, + "popularity": int(row["popularity"]) if row.get("popularity") else None, + "genres": row.get("genres", "").split(",") if row.get("genres") else [], + "description": row.get("description"), + }) + + import_data["media"].append(media_data) + + except (ValueError, KeyError) as e: + raise click.ClickException(f"Invalid CSV format: {e}") + + return import_data + + +def _parse_xml(file_path: Path) -> dict: + """Parse XML input file.""" + try: + import xml.etree.ElementTree as ET + except ImportError: + raise click.ClickException("XML import requires Python's xml module") + + try: + tree = ET.parse(file_path) + root = tree.getroot() + + import_data = { + "metadata": {}, + "media": [] + } + + # Parse metadata + metadata_elem = root.find("metadata") + if metadata_elem is not None: + for child in metadata_elem: + import_data["metadata"][child.tag] = child.text + + # Parse media + media_list_elem = root.find("media_list") + if media_list_elem is not None: + for media_elem in media_list_elem.findall("media"): + media_data = { + "id": int(media_elem.get("id")), + "title": {}, + "user_status": {} + } + + # Parse titles + titles_elem = media_elem.find("titles") + if titles_elem is not None: + for title_elem in titles_elem: + media_data["title"][title_elem.tag] = title_elem.text + + # Parse user status + status_elem = media_elem.find("user_status") + if status_elem is not None: + for child in status_elem: + value = child.text + if child.tag in ["progress", "score"] and value: + try: + value = float(value) if child.tag == "score" else int(value) + except ValueError: + pass + media_data["user_status"][child.tag] = value + + # Parse other metadata + for child in media_elem: + if child.tag not in ["titles", "user_status"]: + if child.tag in ["episodes", "duration", "popularity"]: + try: + media_data[child.tag] = int(child.text) if child.text else None + except ValueError: + media_data[child.tag] = child.text + elif child.tag == "average_score": + try: + media_data[child.tag] = float(child.text) if child.text else None + except ValueError: + media_data[child.tag] = child.text + else: + media_data[child.tag] = child.text + + import_data["media"].append(media_data) + + except ET.ParseError as e: + raise click.ClickException(f"Invalid XML format: {e}") + + return import_data + + +def _validate_import_data(data: dict, force: bool, feedback): + """Validate import data structure and compatibility.""" + if "media" not in data: + raise click.ClickException("Import data missing 'media' section") + + if not isinstance(data["media"], list): + raise click.ClickException("'media' section must be a list") + + # Check if any media entries exist + if not data["media"]: + feedback.warning("No Media", "Import file contains no media entries") + return + + # Validate media entries + required_fields = ["id", "title"] + for i, media in enumerate(data["media"]): + for field in required_fields: + if field not in media: + raise click.ClickException(f"Media entry {i} missing required field: {field}") + + if not isinstance(media.get("title"), dict): + raise click.ClickException(f"Media entry {i} has invalid title format") + + feedback.info("Validation", f"Import data validated - {len(data['media'])} media entries") + + +def _import_data(registry_service, data: dict, merge: bool, dry_run: bool, feedback): + """Import data into the registry.""" + from .....libs.media_api.types import MediaFormat, MediaGenre, MediaStatus, MediaType + + imported_count = 0 + updated_count = 0 + error_count = 0 + + status_map = { + "watching": UserMediaListStatus.WATCHING, + "completed": UserMediaListStatus.COMPLETED, + "planning": UserMediaListStatus.PLANNING, + "dropped": UserMediaListStatus.DROPPED, + "paused": UserMediaListStatus.PAUSED, + "repeating": UserMediaListStatus.REPEATING, + } + + for media_data in data["media"]: + try: + media_id = media_data["id"] + if not media_id: + error_count += 1 + continue + + title_data = media_data.get("title", {}) + title = MediaTitle( + english=title_data.get("english") or "", + romaji=title_data.get("romaji"), + native=title_data.get("native"), + ) + + # Create minimal MediaItem for registry + media_item = MediaItem( + id=media_id, + title=title, + type=MediaType.ANIME, # Default to anime + ) + + # Add additional metadata if available + if "format" in media_data and media_data["format"]: + try: + media_item.format = getattr(MediaFormat, media_data["format"]) + except (AttributeError, TypeError): + pass + + if "episodes" in media_data: + media_item.episodes = media_data["episodes"] + + if "average_score" in media_data: + media_item.average_score = media_data["average_score"] + + if dry_run: + title_str = title.english or title.romaji or f"ID:{media_id}" + feedback.info("Would import", title_str) + imported_count += 1 + continue + + # Check if record exists + existing_record = registry_service.get_media_record(media_id) + if existing_record and not merge: + # Skip if not merging + continue + elif existing_record: + updated_count += 1 + else: + imported_count += 1 + + # Create or update record + record = registry_service.get_or_create_record(media_item) + registry_service.save_media_record(record) + + # Update user status if provided + user_status = media_data.get("user_status", {}) + if user_status.get("status"): + status_enum = status_map.get(user_status["status"].lower()) + if status_enum: + registry_service.update_media_index_entry( + media_id, + media_item=media_item, + status=status_enum, + progress=str(user_status.get("progress", 0)), + score=user_status.get("score"), + notes=user_status.get("notes"), + ) + + except Exception as e: + error_count += 1 + feedback.warning("Import Error", f"Failed to import media {media_data.get('id', 'unknown')}: {e}") + continue + + if not dry_run: + feedback.info( + "Import Summary", + f"Imported: {imported_count}, Updated: {updated_count}, Errors: {error_count}" + ) diff --git a/fastanime/cli/commands/registry/commands/restore.py b/fastanime/cli/commands/registry/commands/restore.py new file mode 100644 index 0000000..40c3c92 --- /dev/null +++ b/fastanime/cli/commands/registry/commands/restore.py @@ -0,0 +1,291 @@ +""" +Registry restore command - restore registry from backup files +""" + +import shutil +import tarfile +from pathlib import Path +from datetime import datetime + +import click + +from .....core.config import AppConfig +from ....service.registry.service import MediaRegistryService +from ....utils.feedback import create_feedback_manager + + +@click.command(help="Restore registry from a backup file") +@click.argument("backup_file", type=click.Path(exists=True, path_type=Path)) +@click.option( + "--force", + "-f", + is_flag=True, + help="Force restore even if current registry exists" +) +@click.option( + "--backup-current", + is_flag=True, + help="Create backup of current registry before restoring" +) +@click.option( + "--verify", + is_flag=True, + help="Verify backup integrity before restoring" +) +@click.option( + "--api", + default="anilist", + type=click.Choice(["anilist"], case_sensitive=False), + help="Media API registry to restore to" +) +@click.pass_obj +def restore( + config: AppConfig, + backup_file: Path, + force: bool, + backup_current: bool, + verify: bool, + api: str +): + """ + Restore your media registry from a backup file. + + Can restore from tar or zip backups created by the backup command. + Optionally creates a backup of the current registry before restoring. + """ + feedback = create_feedback_manager(config.general.icons) + + try: + # Detect backup format + backup_format = _detect_backup_format(backup_file) + feedback.info("Backup Format", f"Detected {backup_format.upper()} format") + + # Verify backup if requested + if verify: + if not _verify_backup(backup_file, backup_format, feedback): + feedback.error("Verification Failed", "Backup file appears to be corrupted") + raise click.Abort() + feedback.success("Verification", "Backup file integrity verified") + + # Check if current registry exists + registry_service = MediaRegistryService(api, config.registry) + registry_exists = _check_registry_exists(registry_service) + + if registry_exists and not force: + if not click.confirm("Current registry exists. Continue with restore?"): + feedback.info("Restore Cancelled", "No changes were made") + return + + # Create backup of current registry if requested + if backup_current and registry_exists: + _backup_current_registry(registry_service, api, feedback) + + # Show restore summary + _show_restore_summary(backup_file, backup_format, feedback) + + # Perform restore + _perform_restore(backup_file, backup_format, config, api, feedback) + + feedback.success("Restore Complete", "Registry has been successfully restored from backup") + + # Verify restored registry + try: + restored_service = MediaRegistryService(api, config.registry) + stats = restored_service.get_registry_stats() + feedback.info("Restored Registry", f"Contains {stats.get('total_media', 0)} media entries") + except Exception as e: + feedback.warning("Verification Warning", f"Could not verify restored registry: {e}") + + except Exception as e: + feedback.error("Restore Error", f"Failed to restore registry: {e}") + raise click.Abort() + + +def _detect_backup_format(backup_file: Path) -> str: + """Detect backup file format.""" + if backup_file.suffix.lower() in ['.tar', '.gz']: + return "tar" + elif backup_file.suffix.lower() == '.zip': + return "zip" + elif backup_file.name.endswith('.tar.gz'): + return "tar" + else: + # Try to detect by content + try: + with tarfile.open(backup_file, 'r:*'): + return "tar" + except: + pass + + try: + import zipfile + with zipfile.ZipFile(backup_file, 'r'): + return "zip" + except: + pass + + raise click.ClickException(f"Could not detect backup format for {backup_file}") + + +def _verify_backup(backup_file: Path, format_type: str, feedback) -> bool: + """Verify backup file integrity.""" + try: + if format_type == "tar": + with tarfile.open(backup_file, 'r:*') as tar: + # Check if essential files exist + names = tar.getnames() + has_registry = any('registry/' in name for name in names) + has_index = any('index/' in name for name in names) + has_metadata = 'backup_metadata.json' in names + + if not (has_registry and has_index): + return False + + # Try to read metadata if it exists + if has_metadata: + try: + metadata_member = tar.getmember('backup_metadata.json') + metadata_file = tar.extractfile(metadata_member) + if metadata_file: + import json + metadata = json.load(metadata_file) + feedback.info("Backup Info", f"Created: {metadata.get('backup_timestamp', 'Unknown')}") + feedback.info("Backup Info", f"Total Media: {metadata.get('total_media', 'Unknown')}") + except: + pass + + else: # zip + import zipfile + with zipfile.ZipFile(backup_file, 'r') as zip_file: + names = zip_file.namelist() + has_registry = any('registry/' in name for name in names) + has_index = any('index/' in name for name in names) + has_metadata = 'backup_metadata.json' in names + + if not (has_registry and has_index): + return False + + # Try to read metadata + if has_metadata: + try: + with zip_file.open('backup_metadata.json') as metadata_file: + import json + metadata = json.load(metadata_file) + feedback.info("Backup Info", f"Created: {metadata.get('backup_timestamp', 'Unknown')}") + feedback.info("Backup Info", f"Total Media: {metadata.get('total_media', 'Unknown')}") + except: + pass + + return True + + except Exception: + return False + + +def _check_registry_exists(registry_service) -> bool: + """Check if a registry already exists.""" + try: + stats = registry_service.get_registry_stats() + return stats.get('total_media', 0) > 0 + except: + return False + + +def _backup_current_registry(registry_service, api: str, feedback): + """Create backup of current registry before restoring.""" + from .backup import _create_tar_backup + + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + backup_path = Path(f"fastanime_registry_pre_restore_{api}_{timestamp}.tar.gz") + + try: + _create_tar_backup(registry_service, backup_path, True, False, feedback, api) + feedback.info("Current Registry Backed Up", f"Saved to {backup_path}") + except Exception as e: + feedback.warning("Backup Warning", f"Failed to backup current registry: {e}") + + +def _show_restore_summary(backup_file: Path, format_type: str, feedback): + """Show summary of what will be restored.""" + try: + if format_type == "tar": + with tarfile.open(backup_file, 'r:*') as tar: + members = tar.getmembers() + file_count = len([m for m in members if m.isfile()]) + + # Count media files + media_files = len([m for m in members if m.name.startswith('registry/') and m.name.endswith('.json')]) + + else: # zip + import zipfile + with zipfile.ZipFile(backup_file, 'r') as zip_file: + info_list = zip_file.infolist() + file_count = len([info for info in info_list if not info.is_dir()]) + + # Count media files + media_files = len([info for info in info_list if info.filename.startswith('registry/') and info.filename.endswith('.json')]) + + feedback.info("Restore Preview", f"Will restore {file_count} files") + feedback.info("Media Records", f"Contains {media_files} media entries") + + except Exception as e: + feedback.warning("Preview Error", f"Could not analyze backup: {e}") + + +def _perform_restore(backup_file: Path, format_type: str, config: AppConfig, api: str, feedback): + """Perform the actual restore operation.""" + + # Create temporary extraction directory + temp_dir = Path(config.registry.media_dir.parent / "restore_temp") + temp_dir.mkdir(exist_ok=True) + + try: + # Extract backup + if format_type == "tar": + with tarfile.open(backup_file, 'r:*') as tar: + tar.extractall(temp_dir) + else: # zip + import zipfile + with zipfile.ZipFile(backup_file, 'r') as zip_file: + zip_file.extractall(temp_dir) + + feedback.info("Extraction", "Backup extracted to temporary directory") + + # Remove existing registry if it exists + registry_dir = config.registry.media_dir / api + index_dir = config.registry.index_dir + + if registry_dir.exists(): + shutil.rmtree(registry_dir) + feedback.info("Cleanup", "Removed existing registry data") + + if index_dir.exists(): + shutil.rmtree(index_dir) + feedback.info("Cleanup", "Removed existing index data") + + # Move extracted files to proper locations + extracted_registry = temp_dir / "registry" / api + extracted_index = temp_dir / "index" + + if extracted_registry.exists(): + shutil.move(str(extracted_registry), str(registry_dir)) + feedback.info("Restore", "Registry data restored") + + if extracted_index.exists(): + shutil.move(str(extracted_index), str(index_dir)) + feedback.info("Restore", "Index data restored") + + # Restore cache if it exists + extracted_cache = temp_dir / "cache" + if extracted_cache.exists(): + cache_dir = config.registry.media_dir.parent / "cache" + if cache_dir.exists(): + shutil.rmtree(cache_dir) + shutil.move(str(extracted_cache), str(cache_dir)) + feedback.info("Restore", "Cache data restored") + + finally: + # Clean up temporary directory + if temp_dir.exists(): + shutil.rmtree(temp_dir) + feedback.info("Cleanup", "Temporary files removed") diff --git a/fastanime/cli/commands/registry/commands/search.py b/fastanime/cli/commands/registry/commands/search.py new file mode 100644 index 0000000..4689582 --- /dev/null +++ b/fastanime/cli/commands/registry/commands/search.py @@ -0,0 +1,240 @@ +""" +Registry search command - search through the local media registry +""" + +import click +from rich.console import Console +from rich.table import Table + +from .....core.config import AppConfig +from .....libs.media_api.params import MediaSearchParams +from .....libs.media_api.types import MediaSort, UserMediaListStatus +from ....service.registry.service import MediaRegistryService +from ....utils.feedback import create_feedback_manager + + +@click.command(help="Search through the local media registry") +@click.argument("query", required=False) +@click.option( + "--status", + type=click.Choice([ + "watching", "completed", "planning", "dropped", "paused", "repeating" + ], case_sensitive=False), + help="Filter by watch status" +) +@click.option( + "--genre", + multiple=True, + help="Filter by genre (can be used multiple times)" +) +@click.option( + "--format", + type=click.Choice([ + "TV", "TV_SHORT", "MOVIE", "SPECIAL", "OVA", "ONA", "MUSIC" + ], case_sensitive=False), + help="Filter by format" +) +@click.option( + "--year", + type=int, + help="Filter by release year" +) +@click.option( + "--min-score", + type=float, + help="Minimum average score (0.0 - 10.0)" +) +@click.option( + "--max-score", + type=float, + help="Maximum average score (0.0 - 10.0)" +) +@click.option( + "--sort", + type=click.Choice([ + "title", "score", "popularity", "year", "episodes", "updated" + ], case_sensitive=False), + default="title", + help="Sort results by field" +) +@click.option( + "--limit", + type=int, + default=20, + help="Maximum number of results to show" +) +@click.option( + "--json", + "output_json", + is_flag=True, + help="Output results in JSON format" +) +@click.option( + "--api", + default="anilist", + type=click.Choice(["anilist"], case_sensitive=False), + help="Media API registry to search" +) +@click.pass_obj +def search( + config: AppConfig, + query: str | None, + status: str | None, + genre: tuple[str, ...], + format: str | None, + year: int | None, + min_score: float | None, + max_score: float | None, + sort: str, + limit: int, + output_json: bool, + api: str +): + """ + Search through your local media registry. + + You can search by title and filter by various criteria like status, + genre, format, year, and score range. + """ + feedback = create_feedback_manager(config.general.icons) + console = Console() + + try: + registry_service = MediaRegistryService(api, config.registry) + + # Build search parameters + search_params = _build_search_params( + query, status, genre, format, year, min_score, max_score, sort, limit + ) + + # Perform search + result = registry_service.search_for_media(search_params) + + if not result or not result.media: + feedback.info("No Results", "No media found matching your criteria") + return + + if output_json: + import json + print(json.dumps(result.model_dump(), indent=2, default=str)) + return + + _display_search_results(console, result, config.general.icons) + + except Exception as e: + feedback.error("Search Error", f"Failed to search registry: {e}") + raise click.Abort() + + +def _build_search_params( + query, status, genre, format, year, min_score, max_score, sort, limit +) -> MediaSearchParams: + """Build MediaSearchParams from command options.""" + + # Convert status string to enum + status_enum = None + if status: + status_map = { + "watching": UserMediaListStatus.WATCHING, + "completed": UserMediaListStatus.COMPLETED, + "planning": UserMediaListStatus.PLANNING, + "dropped": UserMediaListStatus.DROPPED, + "paused": UserMediaListStatus.PAUSED, + "repeating": UserMediaListStatus.REPEATING, + } + status_enum = status_map.get(status.lower()) + + # Convert sort string to enum + sort_map = { + "title": MediaSort.TITLE_ROMAJI, + "score": MediaSort.SCORE_DESC, + "popularity": MediaSort.POPULARITY_DESC, + "year": MediaSort.START_DATE_DESC, + "episodes": MediaSort.EPISODES_DESC, + "updated": MediaSort.UPDATED_AT_DESC, + } + sort_enum = sort_map.get(sort.lower(), MediaSort.TITLE_ROMAJI) + + # Convert format string to enum if provided + format_enum = None + if format: + from .....libs.media_api.types import MediaFormat + format_enum = getattr(MediaFormat, format.upper(), None) + + # Convert genre strings to enums + genre_enums = [] + if genre: + from .....libs.media_api.types import MediaGenre + for g in genre: + # Try to find matching genre enum + for genre_enum in MediaGenre: + if genre_enum.value.lower() == g.lower(): + genre_enums.append(genre_enum) + break + + return MediaSearchParams( + query=query, + per_page=limit, + sort=[sort_enum], + averageScore_greater=min_score * 10 if min_score else None, # Convert to AniList scale + averageScore_lesser=max_score * 10 if max_score else None, + genre_in=genre_enums if genre_enums else None, + format_in=[format_enum] if format_enum else None, + seasonYear=year, + # We'll handle status filtering differently since it's user-specific + ) + + +def _display_search_results(console: Console, result, icons: bool): + """Display search results in a formatted table.""" + + table = Table(title=f"{'๐Ÿ” ' if icons else ''}Search Results ({len(result.media)} found)") + table.add_column("Title", style="cyan", min_width=30) + table.add_column("Year", style="dim", justify="center", min_width=6) + table.add_column("Format", style="magenta", justify="center", min_width=8) + table.add_column("Episodes", style="green", justify="center", min_width=8) + table.add_column("Score", style="yellow", justify="center", min_width=6) + table.add_column("Status", style="blue", justify="center", min_width=10) + table.add_column("Progress", style="white", justify="center", min_width=8) + + for media in result.media: + # Get title (prefer English, fallback to Romaji) + title = media.title.english or media.title.romaji or "Unknown" + if len(title) > 40: + title = title[:37] + "..." + + # Get year from start date + year = "" + if media.start_date: + year = str(media.start_date.year) + + # Format episodes + episodes = str(media.episodes) if media.episodes else "?" + + # Format score + score = f"{media.average_score/10:.1f}" if media.average_score else "N/A" + + # Get user status + status = "Not Listed" + progress = "0" + if media.user_status: + status = media.user_status.status.value.title() if media.user_status.status else "Unknown" + progress = f"{media.user_status.progress or 0}/{episodes}" + + table.add_row( + title, + year, + media.format.value if media.format else "Unknown", + episodes, + score, + status, + progress + ) + + console.print(table) + + # Show pagination info if applicable + if result.page_info.total > len(result.media): + console.print( + f"\n[dim]Showing {len(result.media)} of {result.page_info.total} total results[/dim]" + ) diff --git a/fastanime/cli/commands/registry/commands/stats.py b/fastanime/cli/commands/registry/commands/stats.py new file mode 100644 index 0000000..cd96cdf --- /dev/null +++ b/fastanime/cli/commands/registry/commands/stats.py @@ -0,0 +1,180 @@ +""" +Registry stats command - show detailed statistics about the local registry +""" + +import click +from rich.console import Console +from rich.table import Table +from rich.panel import Panel + +from .....core.config import AppConfig +from ....service.registry.service import MediaRegistryService +from ....utils.feedback import create_feedback_manager + + +@click.command(help="Show detailed statistics about the local media registry") +@click.option( + "--detailed", + "-d", + is_flag=True, + help="Show detailed breakdown by genre, format, and year" +) +@click.option( + "--json", + "output_json", + is_flag=True, + help="Output statistics in JSON format" +) +@click.option( + "--api", + default="anilist", + type=click.Choice(["anilist"], case_sensitive=False), + help="Media API to show stats for" +) +@click.pass_obj +def stats(config: AppConfig, detailed: bool, output_json: bool, api: str): + """ + Display comprehensive statistics about your local media registry. + + Shows total counts, status breakdown, and optionally detailed + analysis by genre, format, and release year. + """ + feedback = create_feedback_manager(config.general.icons) + console = Console() + + try: + registry_service = MediaRegistryService(api, config.registry) + stats_data = registry_service.get_registry_stats() + + if output_json: + import json + print(json.dumps(stats_data, indent=2, default=str)) + return + + _display_stats_overview(console, stats_data, api, config.general.icons) + + if detailed: + _display_detailed_stats(console, stats_data, config.general.icons) + + except Exception as e: + feedback.error("Stats Error", f"Failed to generate statistics: {e}") + raise click.Abort() + + +def _display_stats_overview(console: Console, stats: dict, api: str, icons: bool): + """Display basic registry statistics overview.""" + + # Main overview panel + overview_text = f"[bold cyan]Media API:[/bold cyan] {api.title()}\n" + overview_text += f"[bold cyan]Total Media:[/bold cyan] {stats.get('total_media', 0)}\n" + overview_text += f"[bold cyan]Registry Version:[/bold cyan] {stats.get('version', 'Unknown')}\n" + overview_text += f"[bold cyan]Last Updated:[/bold cyan] {stats.get('last_updated', 'Never')}\n" + overview_text += f"[bold cyan]Storage Size:[/bold cyan] {stats.get('storage_size', 'Unknown')}" + + panel = Panel( + overview_text, + title=f"{'๐Ÿ“Š ' if icons else ''}Registry Overview", + border_style="cyan" + ) + console.print(panel) + console.print() + + # Status breakdown table + status_breakdown = stats.get('status_breakdown', {}) + if status_breakdown: + table = Table(title=f"{'๐Ÿ“‹ ' if icons else ''}Status Breakdown") + table.add_column("Status", style="cyan", no_wrap=True) + table.add_column("Count", style="magenta", justify="right") + table.add_column("Percentage", style="green", justify="right") + + total = sum(status_breakdown.values()) + for status, count in sorted(status_breakdown.items()): + percentage = (count / total * 100) if total > 0 else 0 + table.add_row( + status.title(), + str(count), + f"{percentage:.1f}%" + ) + + console.print(table) + console.print() + + # Download status breakdown + download_stats = stats.get('download_stats', {}) + if download_stats: + table = Table(title=f"{'๐Ÿ’พ ' if icons else ''}Download Status") + table.add_column("Status", style="cyan", no_wrap=True) + table.add_column("Count", style="magenta", justify="right") + + for status, count in download_stats.items(): + table.add_row(status.title(), str(count)) + + console.print(table) + console.print() + + +def _display_detailed_stats(console: Console, stats: dict, icons: bool): + """Display detailed breakdown by various categories.""" + + # Genre breakdown + genre_breakdown = stats.get('genre_breakdown', {}) + if genre_breakdown: + table = Table(title=f"{'๐ŸŽญ ' if icons else ''}Top Genres") + table.add_column("Genre", style="cyan") + table.add_column("Count", style="magenta", justify="right") + + # Sort by count and show top 10 + top_genres = sorted(genre_breakdown.items(), key=lambda x: x[1], reverse=True)[:10] + for genre, count in top_genres: + table.add_row(genre, str(count)) + + console.print(table) + console.print() + + # Format breakdown + format_breakdown = stats.get('format_breakdown', {}) + if format_breakdown: + table = Table(title=f"{'๐Ÿ“บ ' if icons else ''}Format Breakdown") + table.add_column("Format", style="cyan") + table.add_column("Count", style="magenta", justify="right") + table.add_column("Percentage", style="green", justify="right") + + total = sum(format_breakdown.values()) + for format_type, count in sorted(format_breakdown.items()): + percentage = (count / total * 100) if total > 0 else 0 + table.add_row( + format_type, + str(count), + f"{percentage:.1f}%" + ) + + console.print(table) + console.print() + + # Year breakdown + year_breakdown = stats.get('year_breakdown', {}) + if year_breakdown: + table = Table(title=f"{'๐Ÿ“… ' if icons else ''}Release Years (Top 10)") + table.add_column("Year", style="cyan", justify="center") + table.add_column("Count", style="magenta", justify="right") + + # Sort by year descending and show top 10 + top_years = sorted(year_breakdown.items(), key=lambda x: x[0], reverse=True)[:10] + for year, count in top_years: + table.add_row(str(year), str(count)) + + console.print(table) + console.print() + + # Rating breakdown + rating_breakdown = stats.get('rating_breakdown', {}) + if rating_breakdown: + table = Table(title=f"{'โญ ' if icons else ''}Score Distribution") + table.add_column("Score Range", style="cyan") + table.add_column("Count", style="magenta", justify="right") + + for score_range, count in sorted(rating_breakdown.items()): + table.add_row(score_range, str(count)) + + console.print(table) + console.print() diff --git a/fastanime/cli/commands/registry/commands/sync.py b/fastanime/cli/commands/registry/commands/sync.py new file mode 100644 index 0000000..997ff3b --- /dev/null +++ b/fastanime/cli/commands/registry/commands/sync.py @@ -0,0 +1,268 @@ +""" +Registry sync command - synchronize local registry with remote media API +""" + +import click +from rich.progress import Progress + +from .....core.config import AppConfig +from .....core.exceptions import FastAnimeError +from .....libs.media_api.api import create_api_client +from .....libs.media_api.params import UserMediaListSearchParams +from .....libs.media_api.types import UserMediaListStatus +from ....service.registry.service import MediaRegistryService +from ....utils.feedback import create_feedback_manager + + +@click.command(help="Synchronize local registry with remote media API") +@click.option( + "--download", + "-d", + is_flag=True, + help="Download remote user list to local registry" +) +@click.option( + "--upload", + "-u", + is_flag=True, + help="Upload local registry changes to remote API" +) +@click.option( + "--force", + "-f", + is_flag=True, + help="Force sync even if there are conflicts" +) +@click.option( + "--dry-run", + is_flag=True, + help="Show what would be synced without making changes" +) +@click.option( + "--status", + multiple=True, + type=click.Choice([ + "watching", "completed", "planning", "dropped", "paused", "repeating" + ], case_sensitive=False), + help="Only sync specific status lists (can be used multiple times)" +) +@click.option( + "--api", + default="anilist", + type=click.Choice(["anilist"], case_sensitive=False), + help="Media API to sync with" +) +@click.pass_obj +def sync( + config: AppConfig, + download: bool, + upload: bool, + force: bool, + dry_run: bool, + status: tuple[str, ...], + api: str +): + """ + Synchronize local registry with remote media API. + + This command can download your remote media list to the local registry, + upload local changes to the remote API, or both. + """ + feedback = create_feedback_manager(config.general.icons) + + # Default to both download and upload if neither specified + if not download and not upload: + download = upload = True + + # Check authentication + try: + api_client = create_api_client(api, config) + if not api_client.is_authenticated(): + feedback.error( + "Authentication Required", + f"You must be logged in to {api} to sync your media list." + ) + feedback.info("Run this command to authenticate:", f"fastanime {api} auth") + raise click.Abort() + + except Exception as e: + feedback.error("API Error", f"Failed to connect to {api}: {e}") + raise click.Abort() + + # Initialize registry service + try: + registry_service = MediaRegistryService(api, config.registry) + except Exception as e: + feedback.error("Registry Error", f"Failed to initialize registry: {e}") + raise click.Abort() + + # Determine which statuses to sync + status_list = list(status) if status else [ + "watching", "completed", "planning", "dropped", "paused", "repeating" + ] + + # Convert to enum values + status_map = { + "watching": UserMediaListStatus.WATCHING, + "completed": UserMediaListStatus.COMPLETED, + "planning": UserMediaListStatus.PLANNING, + "dropped": UserMediaListStatus.DROPPED, + "paused": UserMediaListStatus.PAUSED, + "repeating": UserMediaListStatus.REPEATING, + } + + statuses_to_sync = [status_map[s] for s in status_list] + + with Progress() as progress: + if download: + _sync_download( + api_client, registry_service, statuses_to_sync, + feedback, progress, dry_run, force + ) + + if upload: + _sync_upload( + api_client, registry_service, statuses_to_sync, + feedback, progress, dry_run, force + ) + + feedback.success("Sync Complete", "Registry synchronization finished successfully") + + +def _sync_download( + api_client, registry_service, statuses, feedback, progress, dry_run, force +): + """Download remote media list to local registry.""" + feedback.info("Starting Download", "Fetching remote media lists...") + + download_task = progress.add_task("Downloading media lists...", total=len(statuses)) + + total_downloaded = 0 + total_updated = 0 + + for status in statuses: + try: + # Fetch all pages for this status + page = 1 + while True: + params = UserMediaListSearchParams( + status=status, + page=page, + per_page=50 + ) + + result = api_client.search_media_list(params) + if not result or not result.media: + break + + for media_item in result.media: + if dry_run: + feedback.info( + "Would download", + f"{media_item.title.english or media_item.title.romaji} ({status.value})" + ) + else: + # Get or create record and update with user status + record = registry_service.get_or_create_record(media_item) + + # Update index entry with latest status + if media_item.user_status: + registry_service.update_media_index_entry( + media_item.id, + media_item=media_item, + status=media_item.user_status.status, + progress=str(media_item.user_status.progress or 0), + score=media_item.user_status.score, + repeat=media_item.user_status.repeat, + notes=media_item.user_status.notes, + ) + total_updated += 1 + + registry_service.save_media_record(record) + total_downloaded += 1 + + if not result.page_info.has_next_page: + break + page += 1 + + except Exception as e: + feedback.error(f"Download Error ({status.value})", str(e)) + continue + + progress.advance(download_task) + + if not dry_run: + feedback.success( + "Download Complete", + f"Downloaded {total_downloaded} media entries, updated {total_updated} existing entries" + ) + + +def _sync_upload( + api_client, registry_service, statuses, feedback, progress, dry_run, force +): + """Upload local registry changes to remote API.""" + feedback.info("Starting Upload", "Syncing local changes to remote...") + + upload_task = progress.add_task("Uploading changes...", total=None) + + total_uploaded = 0 + total_errors = 0 + + try: + # Get all media records from registry + all_records = registry_service.get_all_media_records() + + for record in all_records: + try: + # Get the index entry for this media + index_entry = registry_service.get_media_index_entry(record.media_item.id) + if not index_entry or not index_entry.status: + continue + + # Only sync if status is in our target list + if index_entry.status not in statuses: + continue + + if dry_run: + feedback.info( + "Would upload", + f"{record.media_item.title.english or record.media_item.title.romaji} " + f"({index_entry.status.value}, progress: {index_entry.progress or 0})" + ) + else: + # Update remote list entry + from .....libs.media_api.params import UpdateUserMediaListEntryParams + + update_params = UpdateUserMediaListEntryParams( + media_id=record.media_item.id, + status=index_entry.status, + progress=index_entry.progress, + score=index_entry.score, + ) + + if api_client.update_list_entry(update_params): + total_uploaded += 1 + else: + total_errors += 1 + feedback.warning( + "Upload Failed", + f"Failed to upload {record.media_item.title.english or record.media_item.title.romaji}" + ) + + except Exception as e: + total_errors += 1 + feedback.error("Upload Error", f"Failed to upload media {record.media_item.id}: {e}") + continue + + except Exception as e: + feedback.error("Upload Error", f"Failed to get local records: {e}") + return + + progress.remove_task(upload_task) + + if not dry_run: + feedback.success( + "Upload Complete", + f"Uploaded {total_uploaded} entries, {total_errors} errors" + ) diff --git a/fastanime/cli/commands/registry/examples.py b/fastanime/cli/commands/registry/examples.py new file mode 100644 index 0000000..4d17aa3 --- /dev/null +++ b/fastanime/cli/commands/registry/examples.py @@ -0,0 +1,31 @@ +""" +Example usage for the registry command +""" + +main = """ + +Examples: + # Sync with remote AniList + fastanime registry sync --upload --download + + # Show detailed registry statistics + fastanime registry stats --detailed + + # Search local registry + fastanime registry search "attack on titan" + + # Export registry to JSON + fastanime registry export --format json --output backup.json + + # Import from backup + fastanime registry import backup.json + + # Clean up orphaned entries + fastanime registry clean --dry-run + + # Create full backup + fastanime registry backup --compress + + # Restore from backup + fastanime registry restore backup.tar.gz +""" diff --git a/fastanime/cli/service/registry/service.py b/fastanime/cli/service/registry/service.py index f0bcbf5..9584e75 100644 --- a/fastanime/cli/service/registry/service.py +++ b/fastanime/cli/service/registry/service.py @@ -313,10 +313,10 @@ class MediaRegistryService: # Tag filters if params.tag_in: - media_tags = [tag.name for tag in media.tags] + media_tags = [tag.name for media in filtered for tag in media.tags] filtered = [media for media in filtered if any(tag in media_tags for tag in params.tag_in)] if params.tag_not_in: - media_tags = [tag.name for tag in media.tags] + media_tags = [tag.name for media in filtered for tag in media.tags] filtered = [media for media in filtered if not any(tag in media_tags for tag in params.tag_not_in)] # Format filter