feat: Add registry commands for restore, search, stats, sync, and examples

- Implemented `restore` command to restore the media registry from backup files, with options for verification and backup of current registry.
- Created `search` command to search through the local media registry with various filtering options.
- Added `stats` command to display detailed statistics about the local media registry, including breakdowns by genre, format, and year.
- Developed `sync` command to synchronize the local registry with a remote media API, allowing for both download and upload of media lists.
- Included example usage for the registry commands in `examples.py`.
- Fixed tag filtering logic in `MediaRegistryService` to ensure correct filtering based on tags.
This commit is contained in:
Benexl
2025-07-25 01:33:22 +03:00
parent 9fc66db248
commit 2924fcd077
14 changed files with 2464 additions and 2 deletions

View File

@@ -0,0 +1 @@
# Registry package

View File

@@ -0,0 +1,65 @@
import click
from ...utils.lazyloader import LazyGroup
from . import examples
commands = {
"sync": "sync.sync",
"stats": "stats.stats",
"search": "search.search",
"export": "export.export",
"import": "import_.import_",
"clean": "clean.clean",
"backup": "backup.backup",
"restore": "restore.restore",
}
@click.group(
cls=LazyGroup,
name="registry",
root="fastanime.cli.commands.registry.commands",
invoke_without_command=True,
help="Manage your local media registry - sync, search, backup and maintain your anime database",
short_help="Local media registry management",
lazy_subcommands=commands,
epilog=examples.main,
)
@click.option(
"--api",
default="anilist",
help="Media API to use (default: anilist)",
type=click.Choice(["anilist"], case_sensitive=False)
)
@click.pass_context
def registry(ctx: click.Context, api: str):
"""
The entry point for the 'registry' command. If no subcommand is invoked,
it shows registry information and statistics.
"""
from ...service.registry.service import MediaRegistryService
from ...utils.feedback import create_feedback_manager
config = ctx.obj
feedback = create_feedback_manager(config.general.icons)
if ctx.invoked_subcommand is None:
# Show registry overview and statistics
try:
registry_service = MediaRegistryService(api, config.registry)
stats = registry_service.get_registry_stats()
feedback.info("Registry Overview", f"API: {api}")
feedback.info("Total Media", f"{stats.get('total_media', 0)} entries")
feedback.info("Recently Updated", f"{stats.get('recently_updated', 0)} entries in last 7 days")
feedback.info("Storage Path", str(config.registry.media_dir))
# Show status breakdown if available
status_breakdown = stats.get('status_breakdown', {})
if status_breakdown:
feedback.info("Status Breakdown:")
for status, count in status_breakdown.items():
feedback.info(f" {status.title()}", f"{count} entries")
except Exception as e:
feedback.error("Registry Error", f"Failed to load registry: {e}")

View File

@@ -0,0 +1 @@
# Registry commands package

View File

@@ -0,0 +1,242 @@
"""
Registry backup command - create full backups of the registry
"""
import shutil
import tarfile
from pathlib import Path
from datetime import datetime
import click
from .....core.config import AppConfig
from ....service.registry.service import MediaRegistryService
from ....utils.feedback import create_feedback_manager
@click.command(help="Create a full backup of the registry")
@click.option(
"--output",
"-o",
type=click.Path(),
help="Output backup file path (auto-generated if not specified)"
)
@click.option(
"--compress",
"-c",
is_flag=True,
help="Compress the backup archive"
)
@click.option(
"--include-cache",
is_flag=True,
help="Include cache files in backup"
)
@click.option(
"--format",
"backup_format",
type=click.Choice(["tar", "zip"], case_sensitive=False),
default="tar",
help="Backup archive format"
)
@click.option(
"--api",
default="anilist",
type=click.Choice(["anilist"], case_sensitive=False),
help="Media API registry to backup"
)
@click.pass_obj
def backup(
config: AppConfig,
output: str | None,
compress: bool,
include_cache: bool,
backup_format: str,
api: str
):
"""
Create a complete backup of your media registry.
Includes all media records, index files, and optionally cache data.
Backups can be compressed and are suitable for restoration.
"""
feedback = create_feedback_manager(config.general.icons)
try:
registry_service = MediaRegistryService(api, config.registry)
# Generate output filename if not specified
if not output:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
extension = "tar.gz" if compress and backup_format == "tar" else backup_format
if backup_format == "zip":
extension = "zip"
output = f"fastanime_registry_backup_{api}_{timestamp}.{extension}"
output_path = Path(output)
# Get backup statistics before starting
stats = registry_service.get_registry_stats()
total_media = stats.get('total_media', 0)
feedback.info("Starting Backup", f"Backing up {total_media} media entries...")
# Create backup based on format
if backup_format.lower() == "tar":
_create_tar_backup(
registry_service, output_path, compress, include_cache, feedback, api
)
elif backup_format.lower() == "zip":
_create_zip_backup(
registry_service, output_path, include_cache, feedback, api
)
# Get final backup size
backup_size = _format_file_size(output_path)
feedback.success(
"Backup Complete",
f"Registry backed up to {output_path} ({backup_size})"
)
# Show backup contents summary
_show_backup_summary(output_path, backup_format, feedback)
except Exception as e:
feedback.error("Backup Error", f"Failed to create backup: {e}")
raise click.Abort()
def _create_tar_backup(registry_service, output_path: Path, compress: bool, include_cache: bool, feedback, api: str):
"""Create a tar-based backup."""
mode = "w:gz" if compress else "w"
with tarfile.open(output_path, mode) as tar:
# Add registry directory
registry_dir = registry_service.config.media_dir / api
if registry_dir.exists():
tar.add(registry_dir, arcname=f"registry/{api}")
feedback.info("Added to backup", f"Registry data ({api})")
# Add index directory
index_dir = registry_service.config.index_dir
if index_dir.exists():
tar.add(index_dir, arcname="index")
feedback.info("Added to backup", "Registry index")
# Add cache if requested
if include_cache:
cache_dir = registry_service.config.media_dir.parent / "cache"
if cache_dir.exists():
tar.add(cache_dir, arcname="cache")
feedback.info("Added to backup", "Cache data")
# Add metadata file
metadata = _create_backup_metadata(registry_service, api, include_cache)
metadata_path = output_path.parent / "backup_metadata.json"
try:
import json
with open(metadata_path, 'w', encoding='utf-8') as f:
json.dump(metadata, f, indent=2, default=str)
tar.add(metadata_path, arcname="backup_metadata.json")
metadata_path.unlink() # Clean up temp file
except Exception as e:
feedback.warning("Metadata Error", f"Failed to add metadata: {e}")
def _create_zip_backup(registry_service, output_path: Path, include_cache: bool, feedback, api: str):
"""Create a zip-based backup."""
import zipfile
with zipfile.ZipFile(output_path, 'w', zipfile.ZIP_DEFLATED) as zip_file:
# Add registry directory
registry_dir = registry_service.config.media_dir / api
if registry_dir.exists():
for file_path in registry_dir.rglob('*'):
if file_path.is_file():
arcname = f"registry/{api}/{file_path.relative_to(registry_dir)}"
zip_file.write(file_path, arcname)
feedback.info("Added to backup", f"Registry data ({api})")
# Add index directory
index_dir = registry_service.config.index_dir
if index_dir.exists():
for file_path in index_dir.rglob('*'):
if file_path.is_file():
arcname = f"index/{file_path.relative_to(index_dir)}"
zip_file.write(file_path, arcname)
feedback.info("Added to backup", "Registry index")
# Add cache if requested
if include_cache:
cache_dir = registry_service.config.media_dir.parent / "cache"
if cache_dir.exists():
for file_path in cache_dir.rglob('*'):
if file_path.is_file():
arcname = f"cache/{file_path.relative_to(cache_dir)}"
zip_file.write(file_path, arcname)
feedback.info("Added to backup", "Cache data")
# Add metadata
metadata = _create_backup_metadata(registry_service, api, include_cache)
try:
import json
metadata_json = json.dumps(metadata, indent=2, default=str)
zip_file.writestr("backup_metadata.json", metadata_json)
except Exception as e:
feedback.warning("Metadata Error", f"Failed to add metadata: {e}")
def _create_backup_metadata(registry_service, api: str, include_cache: bool) -> dict:
"""Create backup metadata."""
stats = registry_service.get_registry_stats()
return {
"backup_timestamp": datetime.now().isoformat(),
"fastanime_version": "unknown", # You might want to get this from somewhere
"registry_version": stats.get('version'),
"api": api,
"total_media": stats.get('total_media', 0),
"include_cache": include_cache,
"registry_stats": stats,
"backup_type": "full",
}
def _show_backup_summary(backup_path: Path, format_type: str, feedback):
"""Show summary of backup contents."""
try:
if format_type.lower() == "tar":
with tarfile.open(backup_path, 'r:*') as tar:
members = tar.getmembers()
file_count = len([m for m in members if m.isfile()])
dir_count = len([m for m in members if m.isdir()])
else: # zip
import zipfile
with zipfile.ZipFile(backup_path, 'r') as zip_file:
info_list = zip_file.infolist()
file_count = len([info for info in info_list if not info.is_dir()])
dir_count = len([info for info in info_list if info.is_dir()])
feedback.info("Backup Contents", f"{file_count} files, {dir_count} directories")
except Exception as e:
feedback.warning("Summary Error", f"Could not analyze backup contents: {e}")
def _format_file_size(file_path: Path) -> str:
"""Format file size in human-readable format."""
try:
size = file_path.stat().st_size
for unit in ['B', 'KB', 'MB', 'GB']:
if size < 1024.0:
return f"{size:.1f} {unit}"
size /= 1024.0
return f"{size:.1f} TB"
except:
return "Unknown size"

View File

@@ -0,0 +1,379 @@
"""
Registry clean command - clean up orphaned entries and invalid data
"""
import click
from rich.console import Console
from rich.table import Table
from .....core.config import AppConfig
from ....service.registry.service import MediaRegistryService
from ....utils.feedback import create_feedback_manager
@click.command(help="Clean up orphaned entries and invalid data from registry")
@click.option(
"--dry-run",
is_flag=True,
help="Show what would be cleaned without making changes"
)
@click.option(
"--orphaned",
is_flag=True,
help="Remove orphaned media records (index entries without files)"
)
@click.option(
"--invalid",
is_flag=True,
help="Remove invalid or corrupted entries"
)
@click.option(
"--duplicates",
is_flag=True,
help="Remove duplicate entries"
)
@click.option(
"--old-format",
is_flag=True,
help="Clean entries from old registry format versions"
)
@click.option(
"--force",
"-f",
is_flag=True,
help="Force cleanup without confirmation prompts"
)
@click.option(
"--api",
default="anilist",
type=click.Choice(["anilist"], case_sensitive=False),
help="Media API registry to clean"
)
@click.pass_obj
def clean(
config: AppConfig,
dry_run: bool,
orphaned: bool,
invalid: bool,
duplicates: bool,
old_format: bool,
force: bool,
api: str
):
"""
Clean up your local media registry.
Can remove orphaned entries, invalid data, duplicates, and entries
from old format versions. Use --dry-run to preview changes.
"""
feedback = create_feedback_manager(config.general.icons)
console = Console()
# Default to all cleanup types if none specified
if not any([orphaned, invalid, duplicates, old_format]):
orphaned = invalid = duplicates = old_format = True
try:
registry_service = MediaRegistryService(api, config.registry)
cleanup_results = {
"orphaned": [],
"invalid": [],
"duplicates": [],
"old_format": []
}
# Analyze registry for cleanup opportunities
_analyze_registry(registry_service, cleanup_results, orphaned, invalid, duplicates, old_format)
# Show cleanup summary
_display_cleanup_summary(console, cleanup_results, config.general.icons)
# Confirm cleanup if not dry run and not forced
total_items = sum(len(items) for items in cleanup_results.values())
if total_items == 0:
feedback.info("Registry Clean", "No cleanup needed - registry is already clean!")
return
if not dry_run:
if not force:
if not click.confirm(f"Clean up {total_items} items from registry?"):
feedback.info("Cleanup Cancelled", "No changes were made")
return
# Perform cleanup
_perform_cleanup(registry_service, cleanup_results, feedback)
feedback.success("Cleanup Complete", f"Cleaned up {total_items} items from registry")
else:
feedback.info("Dry Run Complete", f"Would clean up {total_items} items")
except Exception as e:
feedback.error("Cleanup Error", f"Failed to clean registry: {e}")
raise click.Abort()
def _analyze_registry(registry_service, results: dict, check_orphaned: bool, check_invalid: bool, check_duplicates: bool, check_old_format: bool):
"""Analyze registry for cleanup opportunities."""
if check_orphaned:
results["orphaned"] = _find_orphaned_entries(registry_service)
if check_invalid:
results["invalid"] = _find_invalid_entries(registry_service)
if check_duplicates:
results["duplicates"] = _find_duplicate_entries(registry_service)
if check_old_format:
results["old_format"] = _find_old_format_entries(registry_service)
def _find_orphaned_entries(registry_service) -> list:
"""Find index entries that don't have corresponding media files."""
orphaned = []
try:
index = registry_service._load_index()
for entry_key, entry in index.media_index.items():
media_file = registry_service._get_media_file_path(entry.media_id)
if not media_file.exists():
orphaned.append({
"type": "orphaned_index",
"id": entry.media_id,
"key": entry_key,
"reason": "Media file missing"
})
except Exception:
pass
return orphaned
def _find_invalid_entries(registry_service) -> list:
"""Find invalid or corrupted entries."""
invalid = []
try:
# Check all media files
for media_file in registry_service.media_registry_dir.iterdir():
if not media_file.name.endswith('.json'):
continue
try:
media_id = int(media_file.stem)
record = registry_service.get_media_record(media_id)
# Check for invalid record structure
if not record or not record.media_item:
invalid.append({
"type": "invalid_record",
"id": media_id,
"file": media_file,
"reason": "Invalid record structure"
})
elif not record.media_item.title or not record.media_item.title.english and not record.media_item.title.romaji:
invalid.append({
"type": "invalid_title",
"id": media_id,
"file": media_file,
"reason": "Missing or invalid title"
})
except (ValueError, Exception) as e:
invalid.append({
"type": "corrupted_file",
"id": media_file.stem,
"file": media_file,
"reason": f"File corruption: {e}"
})
except Exception:
pass
return invalid
def _find_duplicate_entries(registry_service) -> list:
"""Find duplicate entries (same media ID appearing multiple times)."""
duplicates = []
seen_ids = set()
try:
index = registry_service._load_index()
for entry_key, entry in index.media_index.items():
if entry.media_id in seen_ids:
duplicates.append({
"type": "duplicate_index",
"id": entry.media_id,
"key": entry_key,
"reason": "Duplicate media ID in index"
})
else:
seen_ids.add(entry.media_id)
except Exception:
pass
return duplicates
def _find_old_format_entries(registry_service) -> list:
"""Find entries from old registry format versions."""
old_format = []
try:
index = registry_service._load_index()
current_version = registry_service._index.version
# Check for entries that might be from old formats
# This is a placeholder - you'd implement specific checks based on your version history
for media_file in registry_service.media_registry_dir.iterdir():
if not media_file.name.endswith('.json'):
continue
try:
import json
with open(media_file, 'r') as f:
data = json.load(f)
# Check for old format indicators
if 'version' in data and data['version'] < current_version:
old_format.append({
"type": "old_version",
"id": media_file.stem,
"file": media_file,
"reason": f"Old format version {data.get('version')}"
})
except Exception:
pass
except Exception:
pass
return old_format
def _display_cleanup_summary(console: Console, results: dict, icons: bool):
"""Display summary of cleanup opportunities."""
table = Table(title=f"{'🧹 ' if icons else ''}Registry Cleanup Summary")
table.add_column("Category", style="cyan", no_wrap=True)
table.add_column("Count", style="magenta", justify="right")
table.add_column("Description", style="white")
categories = {
"orphaned": "Orphaned Entries",
"invalid": "Invalid Entries",
"duplicates": "Duplicate Entries",
"old_format": "Old Format Entries"
}
for category, display_name in categories.items():
count = len(results[category])
if count > 0:
# Get sample reasons
reasons = set(item["reason"] for item in results[category][:3])
description = "; ".join(list(reasons)[:2])
if len(reasons) > 2:
description += "..."
else:
description = "None found"
table.add_row(display_name, str(count), description)
console.print(table)
console.print()
# Show detailed breakdown if there are items to clean
for category, items in results.items():
if items:
_display_category_details(console, category, items, icons)
def _display_category_details(console: Console, category: str, items: list, icons: bool):
"""Display detailed breakdown for a cleanup category."""
category_names = {
"orphaned": "🔗 Orphaned Entries" if icons else "Orphaned Entries",
"invalid": "❌ Invalid Entries" if icons else "Invalid Entries",
"duplicates": "👥 Duplicate Entries" if icons else "Duplicate Entries",
"old_format": "📼 Old Format Entries" if icons else "Old Format Entries"
}
table = Table(title=category_names.get(category, category.title()))
table.add_column("ID", style="cyan", no_wrap=True)
table.add_column("Type", style="magenta")
table.add_column("Reason", style="yellow")
for item in items[:10]: # Show max 10 items
table.add_row(
str(item["id"]),
item["type"],
item["reason"]
)
if len(items) > 10:
table.add_row("...", "...", f"And {len(items) - 10} more")
console.print(table)
console.print()
def _perform_cleanup(registry_service, results: dict, feedback):
"""Perform the actual cleanup operations."""
cleaned_count = 0
# Clean orphaned entries
for item in results["orphaned"]:
try:
if item["type"] == "orphaned_index":
index = registry_service._load_index()
if item["key"] in index.media_index:
del index.media_index[item["key"]]
registry_service._save_index(index)
cleaned_count += 1
except Exception as e:
feedback.warning("Cleanup Error", f"Failed to clean orphaned entry {item['id']}: {e}")
# Clean invalid entries
for item in results["invalid"]:
try:
if "file" in item:
item["file"].unlink() # Delete the file
cleaned_count += 1
# Also remove from index if present
index = registry_service._load_index()
entry_key = f"{registry_service._media_api}_{item['id']}"
if entry_key in index.media_index:
del index.media_index[entry_key]
registry_service._save_index(index)
except Exception as e:
feedback.warning("Cleanup Error", f"Failed to clean invalid entry {item['id']}: {e}")
# Clean duplicates
for item in results["duplicates"]:
try:
if item["type"] == "duplicate_index":
index = registry_service._load_index()
if item["key"] in index.media_index:
del index.media_index[item["key"]]
registry_service._save_index(index)
cleaned_count += 1
except Exception as e:
feedback.warning("Cleanup Error", f"Failed to clean duplicate entry {item['id']}: {e}")
# Clean old format entries
for item in results["old_format"]:
try:
if "file" in item:
# You might want to migrate instead of delete
# For now, we'll just remove old format files
item["file"].unlink()
cleaned_count += 1
except Exception as e:
feedback.warning("Cleanup Error", f"Failed to clean old format entry {item['id']}: {e}")
feedback.info("Cleanup Results", f"Successfully cleaned {cleaned_count} items")

View File

@@ -0,0 +1,338 @@
"""
Registry export command - export registry data to various formats
"""
import json
import csv
from pathlib import Path
from datetime import datetime
import click
from .....core.config import AppConfig
from ....service.registry.service import MediaRegistryService
from ....utils.feedback import create_feedback_manager
@click.command(help="Export registry data to various formats")
@click.option(
"--format",
"output_format",
type=click.Choice(["json", "csv", "xml"], case_sensitive=False),
default="json",
help="Export format"
)
@click.option(
"--output",
"-o",
type=click.Path(),
help="Output file path (auto-generated if not specified)"
)
@click.option(
"--include-metadata",
is_flag=True,
help="Include detailed media metadata in export"
)
@click.option(
"--status",
multiple=True,
type=click.Choice([
"watching", "completed", "planning", "dropped", "paused", "repeating"
], case_sensitive=False),
help="Only export specific status lists"
)
@click.option(
"--compress",
is_flag=True,
help="Compress the output file"
)
@click.option(
"--api",
default="anilist",
type=click.Choice(["anilist"], case_sensitive=False),
help="Media API registry to export"
)
@click.pass_obj
def export(
config: AppConfig,
output_format: str,
output: str | None,
include_metadata: bool,
status: tuple[str, ...],
compress: bool,
api: str
):
"""
Export your local media registry to various formats.
Supports JSON, CSV, and XML formats. Can optionally include
detailed metadata and compress the output.
"""
feedback = create_feedback_manager(config.general.icons)
try:
registry_service = MediaRegistryService(api, config.registry)
# Generate output filename if not specified
if not output:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
extension = output_format.lower()
if compress:
extension += ".gz"
output = f"fastanime_registry_{api}_{timestamp}.{extension}"
output_path = Path(output)
# Get export data
export_data = _prepare_export_data(
registry_service, include_metadata, status
)
# Export based on format
if output_format.lower() == "json":
_export_json(export_data, output_path, compress, feedback)
elif output_format.lower() == "csv":
_export_csv(export_data, output_path, compress, feedback)
elif output_format.lower() == "xml":
_export_xml(export_data, output_path, compress, feedback)
feedback.success(
"Export Complete",
f"Registry exported to {output_path} ({_format_file_size(output_path)})"
)
except Exception as e:
feedback.error("Export Error", f"Failed to export registry: {e}")
raise click.Abort()
def _prepare_export_data(registry_service, include_metadata: bool, status_filter: tuple[str, ...]) -> dict:
"""Prepare data for export based on options."""
# Convert status filter to enums
from .....libs.media_api.types import UserMediaListStatus
status_map = {
"watching": UserMediaListStatus.WATCHING,
"completed": UserMediaListStatus.COMPLETED,
"planning": UserMediaListStatus.PLANNING,
"dropped": UserMediaListStatus.DROPPED,
"paused": UserMediaListStatus.PAUSED,
"repeating": UserMediaListStatus.REPEATING,
}
status_enums = [status_map[s] for s in status_filter] if status_filter else None
export_data = {
"metadata": {
"export_timestamp": datetime.now().isoformat(),
"registry_version": registry_service._load_index().version,
"include_metadata": include_metadata,
"filtered_status": list(status_filter) if status_filter else None,
},
"statistics": registry_service.get_registry_stats(),
"media": []
}
# Get all records and filter by status if specified
all_records = registry_service.get_all_media_records()
for record in all_records:
index_entry = registry_service.get_media_index_entry(record.media_item.id)
# Skip if status filter is specified and doesn't match
if status_enums and (not index_entry or index_entry.status not in status_enums):
continue
media_data = {
"id": record.media_item.id,
"title": {
"english": record.media_item.title.english,
"romaji": record.media_item.title.romaji,
"native": record.media_item.title.native,
},
"user_status": {
"status": index_entry.status.value if index_entry and index_entry.status else None,
"progress": index_entry.progress if index_entry else None,
"score": index_entry.score if index_entry else None,
"last_watched": index_entry.last_watched.isoformat() if index_entry and index_entry.last_watched else None,
"notes": index_entry.notes if index_entry else None,
}
}
if include_metadata:
media_data.update({
"format": record.media_item.format.value if record.media_item.format else None,
"episodes": record.media_item.episodes,
"duration": record.media_item.duration,
"status": record.media_item.status.value if record.media_item.status else None,
"start_date": record.media_item.start_date.isoformat() if record.media_item.start_date else None,
"end_date": record.media_item.end_date.isoformat() if record.media_item.end_date else None,
"average_score": record.media_item.average_score,
"popularity": record.media_item.popularity,
"genres": [genre.value for genre in record.media_item.genres],
"tags": [{"name": tag.name.value, "rank": tag.rank} for tag in record.media_item.tags],
"studios": [studio.name for studio in record.media_item.studios if studio.name],
"description": record.media_item.description,
"cover_image": {
"large": record.media_item.cover_image.large if record.media_item.cover_image else None,
"medium": record.media_item.cover_image.medium if record.media_item.cover_image else None,
} if record.media_item.cover_image else None,
})
export_data["media"].append(media_data)
return export_data
def _export_json(data: dict, output_path: Path, compress: bool, feedback):
"""Export data to JSON format."""
if compress:
import gzip
with gzip.open(output_path, 'wt', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
else:
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
def _export_csv(data: dict, output_path: Path, compress: bool, feedback):
"""Export data to CSV format."""
# Flatten media data for CSV
fieldnames = [
"id", "title_english", "title_romaji", "title_native",
"status", "progress", "score", "last_watched", "notes"
]
# Add metadata fields if included
if data["metadata"]["include_metadata"]:
fieldnames.extend([
"format", "episodes", "duration", "media_status", "start_date", "end_date",
"average_score", "popularity", "genres", "description"
])
def write_csv(file_obj):
writer = csv.DictWriter(file_obj, fieldnames=fieldnames)
writer.writeheader()
for media in data["media"]:
row = {
"id": media["id"],
"title_english": media["title"]["english"],
"title_romaji": media["title"]["romaji"],
"title_native": media["title"]["native"],
"status": media["user_status"]["status"],
"progress": media["user_status"]["progress"],
"score": media["user_status"]["score"],
"last_watched": media["user_status"]["last_watched"],
"notes": media["user_status"]["notes"],
}
if data["metadata"]["include_metadata"]:
row.update({
"format": media.get("format"),
"episodes": media.get("episodes"),
"duration": media.get("duration"),
"media_status": media.get("status"),
"start_date": media.get("start_date"),
"end_date": media.get("end_date"),
"average_score": media.get("average_score"),
"popularity": media.get("popularity"),
"genres": ",".join(media.get("genres", [])),
"description": media.get("description"),
})
writer.writerow(row)
if compress:
import gzip
with gzip.open(output_path, 'wt', encoding='utf-8', newline='') as f:
write_csv(f)
else:
with open(output_path, 'w', encoding='utf-8', newline='') as f:
write_csv(f)
def _export_xml(data: dict, output_path: Path, compress: bool, feedback):
"""Export data to XML format."""
try:
import xml.etree.ElementTree as ET
except ImportError:
feedback.error("XML Export Error", "XML export requires Python's xml module")
raise click.Abort()
root = ET.Element("fastanime_registry")
# Add metadata
metadata_elem = ET.SubElement(root, "metadata")
for key, value in data["metadata"].items():
if value is not None:
elem = ET.SubElement(metadata_elem, key)
elem.text = str(value)
# Add statistics
stats_elem = ET.SubElement(root, "statistics")
for key, value in data["statistics"].items():
if value is not None:
elem = ET.SubElement(stats_elem, key)
elem.text = str(value)
# Add media
media_list_elem = ET.SubElement(root, "media_list")
for media in data["media"]:
media_elem = ET.SubElement(media_list_elem, "media")
media_elem.set("id", str(media["id"]))
# Add titles
titles_elem = ET.SubElement(media_elem, "titles")
for title_type, title_value in media["title"].items():
if title_value:
title_elem = ET.SubElement(titles_elem, title_type)
title_elem.text = title_value
# Add user status
status_elem = ET.SubElement(media_elem, "user_status")
for key, value in media["user_status"].items():
if value is not None:
elem = ET.SubElement(status_elem, key)
elem.text = str(value)
# Add metadata if included
if data["metadata"]["include_metadata"]:
for key, value in media.items():
if key not in ["id", "title", "user_status"] and value is not None:
if isinstance(value, list):
list_elem = ET.SubElement(media_elem, key)
for item in value:
item_elem = ET.SubElement(list_elem, "item")
item_elem.text = str(item)
elif isinstance(value, dict):
dict_elem = ET.SubElement(media_elem, key)
for sub_key, sub_value in value.items():
if sub_value is not None:
sub_elem = ET.SubElement(dict_elem, sub_key)
sub_elem.text = str(sub_value)
else:
elem = ET.SubElement(media_elem, key)
elem.text = str(value)
# Write XML
tree = ET.ElementTree(root)
if compress:
import gzip
with gzip.open(output_path, 'wb') as f:
tree.write(f, encoding='utf-8', xml_declaration=True)
else:
tree.write(output_path, encoding='utf-8', xml_declaration=True)
def _format_file_size(file_path: Path) -> str:
"""Format file size in human-readable format."""
try:
size = file_path.stat().st_size
for unit in ['B', 'KB', 'MB', 'GB']:
if size < 1024.0:
return f"{size:.1f} {unit}"
size /= 1024.0
return f"{size:.1f} TB"
except:
return "Unknown size"

View File

@@ -0,0 +1,425 @@
"""
Registry import command - import registry data from various formats
"""
import json
import csv
from pathlib import Path
from datetime import datetime
import click
from .....core.config import AppConfig
from .....libs.media_api.types import UserMediaListStatus, MediaItem, MediaTitle
from ....service.registry.service import MediaRegistryService
from ....utils.feedback import create_feedback_manager
@click.command(name="import", help="Import registry data from various formats")
@click.argument("input_file", type=click.Path(exists=True, path_type=Path))
@click.option(
"--format",
"input_format",
type=click.Choice(["json", "csv", "xml", "auto"], case_sensitive=False),
default="auto",
help="Input format (auto-detect if not specified)"
)
@click.option(
"--merge",
is_flag=True,
help="Merge with existing registry (default: replace)"
)
@click.option(
"--dry-run",
is_flag=True,
help="Show what would be imported without making changes"
)
@click.option(
"--force",
"-f",
is_flag=True,
help="Force import even if format version doesn't match"
)
@click.option(
"--backup",
is_flag=True,
help="Create backup before importing"
)
@click.option(
"--api",
default="anilist",
type=click.Choice(["anilist"], case_sensitive=False),
help="Media API registry to import to"
)
@click.pass_obj
def import_(
config: AppConfig,
input_file: Path,
input_format: str,
merge: bool,
dry_run: bool,
force: bool,
backup: bool,
api: str
):
"""
Import media registry data from various formats.
Supports JSON, CSV, and XML formats exported by the export command
or compatible third-party tools.
"""
feedback = create_feedback_manager(config.general.icons)
try:
registry_service = MediaRegistryService(api, config.registry)
# Create backup if requested
if backup and not dry_run:
_create_backup(registry_service, feedback)
# Auto-detect format if needed
if input_format == "auto":
input_format = _detect_format(input_file)
feedback.info("Format Detection", f"Detected format: {input_format.upper()}")
# Parse input file
import_data = _parse_input_file(input_file, input_format, feedback)
# Validate import data
_validate_import_data(import_data, force, feedback)
# Import data
_import_data(
registry_service, import_data, merge, dry_run, feedback
)
if not dry_run:
feedback.success(
"Import Complete",
f"Successfully imported {len(import_data.get('media', []))} media entries"
)
else:
feedback.info(
"Dry Run Complete",
f"Would import {len(import_data.get('media', []))} media entries"
)
except Exception as e:
feedback.error("Import Error", f"Failed to import registry: {e}")
raise click.Abort()
def _create_backup(registry_service, feedback):
"""Create a backup before importing."""
from .export import _prepare_export_data, _export_json
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_path = Path(f"fastanime_registry_backup_{timestamp}.json")
export_data = _prepare_export_data(registry_service, True, ())
_export_json(export_data, backup_path, False, feedback)
feedback.info("Backup Created", f"Registry backed up to {backup_path}")
def _detect_format(file_path: Path) -> str:
"""Auto-detect file format based on extension and content."""
extension = file_path.suffix.lower()
if extension in ['.json', '.gz']:
return "json"
elif extension == '.csv':
return "csv"
elif extension == '.xml':
return "xml"
# Try to detect by content
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read(100).strip()
if content.startswith('{') or content.startswith('['):
return "json"
elif content.startswith('<?xml') or content.startswith('<'):
return "xml"
elif ',' in content: # Very basic CSV detection
return "csv"
except:
pass
raise click.ClickException(f"Could not detect format for {file_path}")
def _parse_input_file(file_path: Path, format_type: str, feedback) -> dict:
"""Parse input file based on format."""
if format_type == "json":
return _parse_json(file_path)
elif format_type == "csv":
return _parse_csv(file_path)
elif format_type == "xml":
return _parse_xml(file_path)
else:
raise click.ClickException(f"Unsupported format: {format_type}")
def _parse_json(file_path: Path) -> dict:
"""Parse JSON input file."""
try:
if file_path.suffix.lower() == '.gz':
import gzip
with gzip.open(file_path, 'rt', encoding='utf-8') as f:
return json.load(f)
else:
with open(file_path, 'r', encoding='utf-8') as f:
return json.load(f)
except json.JSONDecodeError as e:
raise click.ClickException(f"Invalid JSON format: {e}")
def _parse_csv(file_path: Path) -> dict:
"""Parse CSV input file."""
import_data = {
"metadata": {
"import_timestamp": datetime.now().isoformat(),
"source_format": "csv",
},
"media": []
}
try:
with open(file_path, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
media_data = {
"id": int(row["id"]) if row.get("id") else None,
"title": {
"english": row.get("title_english"),
"romaji": row.get("title_romaji"),
"native": row.get("title_native"),
},
"user_status": {
"status": row.get("status"),
"progress": int(row["progress"]) if row.get("progress") else None,
"score": float(row["score"]) if row.get("score") else None,
"last_watched": row.get("last_watched"),
"notes": row.get("notes"),
}
}
# Add metadata fields if present
if "format" in row:
media_data.update({
"format": row.get("format"),
"episodes": int(row["episodes"]) if row.get("episodes") else None,
"duration": int(row["duration"]) if row.get("duration") else None,
"media_status": row.get("media_status"),
"start_date": row.get("start_date"),
"end_date": row.get("end_date"),
"average_score": float(row["average_score"]) if row.get("average_score") else None,
"popularity": int(row["popularity"]) if row.get("popularity") else None,
"genres": row.get("genres", "").split(",") if row.get("genres") else [],
"description": row.get("description"),
})
import_data["media"].append(media_data)
except (ValueError, KeyError) as e:
raise click.ClickException(f"Invalid CSV format: {e}")
return import_data
def _parse_xml(file_path: Path) -> dict:
"""Parse XML input file."""
try:
import xml.etree.ElementTree as ET
except ImportError:
raise click.ClickException("XML import requires Python's xml module")
try:
tree = ET.parse(file_path)
root = tree.getroot()
import_data = {
"metadata": {},
"media": []
}
# Parse metadata
metadata_elem = root.find("metadata")
if metadata_elem is not None:
for child in metadata_elem:
import_data["metadata"][child.tag] = child.text
# Parse media
media_list_elem = root.find("media_list")
if media_list_elem is not None:
for media_elem in media_list_elem.findall("media"):
media_data = {
"id": int(media_elem.get("id")),
"title": {},
"user_status": {}
}
# Parse titles
titles_elem = media_elem.find("titles")
if titles_elem is not None:
for title_elem in titles_elem:
media_data["title"][title_elem.tag] = title_elem.text
# Parse user status
status_elem = media_elem.find("user_status")
if status_elem is not None:
for child in status_elem:
value = child.text
if child.tag in ["progress", "score"] and value:
try:
value = float(value) if child.tag == "score" else int(value)
except ValueError:
pass
media_data["user_status"][child.tag] = value
# Parse other metadata
for child in media_elem:
if child.tag not in ["titles", "user_status"]:
if child.tag in ["episodes", "duration", "popularity"]:
try:
media_data[child.tag] = int(child.text) if child.text else None
except ValueError:
media_data[child.tag] = child.text
elif child.tag == "average_score":
try:
media_data[child.tag] = float(child.text) if child.text else None
except ValueError:
media_data[child.tag] = child.text
else:
media_data[child.tag] = child.text
import_data["media"].append(media_data)
except ET.ParseError as e:
raise click.ClickException(f"Invalid XML format: {e}")
return import_data
def _validate_import_data(data: dict, force: bool, feedback):
"""Validate import data structure and compatibility."""
if "media" not in data:
raise click.ClickException("Import data missing 'media' section")
if not isinstance(data["media"], list):
raise click.ClickException("'media' section must be a list")
# Check if any media entries exist
if not data["media"]:
feedback.warning("No Media", "Import file contains no media entries")
return
# Validate media entries
required_fields = ["id", "title"]
for i, media in enumerate(data["media"]):
for field in required_fields:
if field not in media:
raise click.ClickException(f"Media entry {i} missing required field: {field}")
if not isinstance(media.get("title"), dict):
raise click.ClickException(f"Media entry {i} has invalid title format")
feedback.info("Validation", f"Import data validated - {len(data['media'])} media entries")
def _import_data(registry_service, data: dict, merge: bool, dry_run: bool, feedback):
"""Import data into the registry."""
from .....libs.media_api.types import MediaFormat, MediaGenre, MediaStatus, MediaType
imported_count = 0
updated_count = 0
error_count = 0
status_map = {
"watching": UserMediaListStatus.WATCHING,
"completed": UserMediaListStatus.COMPLETED,
"planning": UserMediaListStatus.PLANNING,
"dropped": UserMediaListStatus.DROPPED,
"paused": UserMediaListStatus.PAUSED,
"repeating": UserMediaListStatus.REPEATING,
}
for media_data in data["media"]:
try:
media_id = media_data["id"]
if not media_id:
error_count += 1
continue
title_data = media_data.get("title", {})
title = MediaTitle(
english=title_data.get("english") or "",
romaji=title_data.get("romaji"),
native=title_data.get("native"),
)
# Create minimal MediaItem for registry
media_item = MediaItem(
id=media_id,
title=title,
type=MediaType.ANIME, # Default to anime
)
# Add additional metadata if available
if "format" in media_data and media_data["format"]:
try:
media_item.format = getattr(MediaFormat, media_data["format"])
except (AttributeError, TypeError):
pass
if "episodes" in media_data:
media_item.episodes = media_data["episodes"]
if "average_score" in media_data:
media_item.average_score = media_data["average_score"]
if dry_run:
title_str = title.english or title.romaji or f"ID:{media_id}"
feedback.info("Would import", title_str)
imported_count += 1
continue
# Check if record exists
existing_record = registry_service.get_media_record(media_id)
if existing_record and not merge:
# Skip if not merging
continue
elif existing_record:
updated_count += 1
else:
imported_count += 1
# Create or update record
record = registry_service.get_or_create_record(media_item)
registry_service.save_media_record(record)
# Update user status if provided
user_status = media_data.get("user_status", {})
if user_status.get("status"):
status_enum = status_map.get(user_status["status"].lower())
if status_enum:
registry_service.update_media_index_entry(
media_id,
media_item=media_item,
status=status_enum,
progress=str(user_status.get("progress", 0)),
score=user_status.get("score"),
notes=user_status.get("notes"),
)
except Exception as e:
error_count += 1
feedback.warning("Import Error", f"Failed to import media {media_data.get('id', 'unknown')}: {e}")
continue
if not dry_run:
feedback.info(
"Import Summary",
f"Imported: {imported_count}, Updated: {updated_count}, Errors: {error_count}"
)

View File

@@ -0,0 +1,291 @@
"""
Registry restore command - restore registry from backup files
"""
import shutil
import tarfile
from pathlib import Path
from datetime import datetime
import click
from .....core.config import AppConfig
from ....service.registry.service import MediaRegistryService
from ....utils.feedback import create_feedback_manager
@click.command(help="Restore registry from a backup file")
@click.argument("backup_file", type=click.Path(exists=True, path_type=Path))
@click.option(
"--force",
"-f",
is_flag=True,
help="Force restore even if current registry exists"
)
@click.option(
"--backup-current",
is_flag=True,
help="Create backup of current registry before restoring"
)
@click.option(
"--verify",
is_flag=True,
help="Verify backup integrity before restoring"
)
@click.option(
"--api",
default="anilist",
type=click.Choice(["anilist"], case_sensitive=False),
help="Media API registry to restore to"
)
@click.pass_obj
def restore(
config: AppConfig,
backup_file: Path,
force: bool,
backup_current: bool,
verify: bool,
api: str
):
"""
Restore your media registry from a backup file.
Can restore from tar or zip backups created by the backup command.
Optionally creates a backup of the current registry before restoring.
"""
feedback = create_feedback_manager(config.general.icons)
try:
# Detect backup format
backup_format = _detect_backup_format(backup_file)
feedback.info("Backup Format", f"Detected {backup_format.upper()} format")
# Verify backup if requested
if verify:
if not _verify_backup(backup_file, backup_format, feedback):
feedback.error("Verification Failed", "Backup file appears to be corrupted")
raise click.Abort()
feedback.success("Verification", "Backup file integrity verified")
# Check if current registry exists
registry_service = MediaRegistryService(api, config.registry)
registry_exists = _check_registry_exists(registry_service)
if registry_exists and not force:
if not click.confirm("Current registry exists. Continue with restore?"):
feedback.info("Restore Cancelled", "No changes were made")
return
# Create backup of current registry if requested
if backup_current and registry_exists:
_backup_current_registry(registry_service, api, feedback)
# Show restore summary
_show_restore_summary(backup_file, backup_format, feedback)
# Perform restore
_perform_restore(backup_file, backup_format, config, api, feedback)
feedback.success("Restore Complete", "Registry has been successfully restored from backup")
# Verify restored registry
try:
restored_service = MediaRegistryService(api, config.registry)
stats = restored_service.get_registry_stats()
feedback.info("Restored Registry", f"Contains {stats.get('total_media', 0)} media entries")
except Exception as e:
feedback.warning("Verification Warning", f"Could not verify restored registry: {e}")
except Exception as e:
feedback.error("Restore Error", f"Failed to restore registry: {e}")
raise click.Abort()
def _detect_backup_format(backup_file: Path) -> str:
"""Detect backup file format."""
if backup_file.suffix.lower() in ['.tar', '.gz']:
return "tar"
elif backup_file.suffix.lower() == '.zip':
return "zip"
elif backup_file.name.endswith('.tar.gz'):
return "tar"
else:
# Try to detect by content
try:
with tarfile.open(backup_file, 'r:*'):
return "tar"
except:
pass
try:
import zipfile
with zipfile.ZipFile(backup_file, 'r'):
return "zip"
except:
pass
raise click.ClickException(f"Could not detect backup format for {backup_file}")
def _verify_backup(backup_file: Path, format_type: str, feedback) -> bool:
"""Verify backup file integrity."""
try:
if format_type == "tar":
with tarfile.open(backup_file, 'r:*') as tar:
# Check if essential files exist
names = tar.getnames()
has_registry = any('registry/' in name for name in names)
has_index = any('index/' in name for name in names)
has_metadata = 'backup_metadata.json' in names
if not (has_registry and has_index):
return False
# Try to read metadata if it exists
if has_metadata:
try:
metadata_member = tar.getmember('backup_metadata.json')
metadata_file = tar.extractfile(metadata_member)
if metadata_file:
import json
metadata = json.load(metadata_file)
feedback.info("Backup Info", f"Created: {metadata.get('backup_timestamp', 'Unknown')}")
feedback.info("Backup Info", f"Total Media: {metadata.get('total_media', 'Unknown')}")
except:
pass
else: # zip
import zipfile
with zipfile.ZipFile(backup_file, 'r') as zip_file:
names = zip_file.namelist()
has_registry = any('registry/' in name for name in names)
has_index = any('index/' in name for name in names)
has_metadata = 'backup_metadata.json' in names
if not (has_registry and has_index):
return False
# Try to read metadata
if has_metadata:
try:
with zip_file.open('backup_metadata.json') as metadata_file:
import json
metadata = json.load(metadata_file)
feedback.info("Backup Info", f"Created: {metadata.get('backup_timestamp', 'Unknown')}")
feedback.info("Backup Info", f"Total Media: {metadata.get('total_media', 'Unknown')}")
except:
pass
return True
except Exception:
return False
def _check_registry_exists(registry_service) -> bool:
"""Check if a registry already exists."""
try:
stats = registry_service.get_registry_stats()
return stats.get('total_media', 0) > 0
except:
return False
def _backup_current_registry(registry_service, api: str, feedback):
"""Create backup of current registry before restoring."""
from .backup import _create_tar_backup
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_path = Path(f"fastanime_registry_pre_restore_{api}_{timestamp}.tar.gz")
try:
_create_tar_backup(registry_service, backup_path, True, False, feedback, api)
feedback.info("Current Registry Backed Up", f"Saved to {backup_path}")
except Exception as e:
feedback.warning("Backup Warning", f"Failed to backup current registry: {e}")
def _show_restore_summary(backup_file: Path, format_type: str, feedback):
"""Show summary of what will be restored."""
try:
if format_type == "tar":
with tarfile.open(backup_file, 'r:*') as tar:
members = tar.getmembers()
file_count = len([m for m in members if m.isfile()])
# Count media files
media_files = len([m for m in members if m.name.startswith('registry/') and m.name.endswith('.json')])
else: # zip
import zipfile
with zipfile.ZipFile(backup_file, 'r') as zip_file:
info_list = zip_file.infolist()
file_count = len([info for info in info_list if not info.is_dir()])
# Count media files
media_files = len([info for info in info_list if info.filename.startswith('registry/') and info.filename.endswith('.json')])
feedback.info("Restore Preview", f"Will restore {file_count} files")
feedback.info("Media Records", f"Contains {media_files} media entries")
except Exception as e:
feedback.warning("Preview Error", f"Could not analyze backup: {e}")
def _perform_restore(backup_file: Path, format_type: str, config: AppConfig, api: str, feedback):
"""Perform the actual restore operation."""
# Create temporary extraction directory
temp_dir = Path(config.registry.media_dir.parent / "restore_temp")
temp_dir.mkdir(exist_ok=True)
try:
# Extract backup
if format_type == "tar":
with tarfile.open(backup_file, 'r:*') as tar:
tar.extractall(temp_dir)
else: # zip
import zipfile
with zipfile.ZipFile(backup_file, 'r') as zip_file:
zip_file.extractall(temp_dir)
feedback.info("Extraction", "Backup extracted to temporary directory")
# Remove existing registry if it exists
registry_dir = config.registry.media_dir / api
index_dir = config.registry.index_dir
if registry_dir.exists():
shutil.rmtree(registry_dir)
feedback.info("Cleanup", "Removed existing registry data")
if index_dir.exists():
shutil.rmtree(index_dir)
feedback.info("Cleanup", "Removed existing index data")
# Move extracted files to proper locations
extracted_registry = temp_dir / "registry" / api
extracted_index = temp_dir / "index"
if extracted_registry.exists():
shutil.move(str(extracted_registry), str(registry_dir))
feedback.info("Restore", "Registry data restored")
if extracted_index.exists():
shutil.move(str(extracted_index), str(index_dir))
feedback.info("Restore", "Index data restored")
# Restore cache if it exists
extracted_cache = temp_dir / "cache"
if extracted_cache.exists():
cache_dir = config.registry.media_dir.parent / "cache"
if cache_dir.exists():
shutil.rmtree(cache_dir)
shutil.move(str(extracted_cache), str(cache_dir))
feedback.info("Restore", "Cache data restored")
finally:
# Clean up temporary directory
if temp_dir.exists():
shutil.rmtree(temp_dir)
feedback.info("Cleanup", "Temporary files removed")

View File

@@ -0,0 +1,240 @@
"""
Registry search command - search through the local media registry
"""
import click
from rich.console import Console
from rich.table import Table
from .....core.config import AppConfig
from .....libs.media_api.params import MediaSearchParams
from .....libs.media_api.types import MediaSort, UserMediaListStatus
from ....service.registry.service import MediaRegistryService
from ....utils.feedback import create_feedback_manager
@click.command(help="Search through the local media registry")
@click.argument("query", required=False)
@click.option(
"--status",
type=click.Choice([
"watching", "completed", "planning", "dropped", "paused", "repeating"
], case_sensitive=False),
help="Filter by watch status"
)
@click.option(
"--genre",
multiple=True,
help="Filter by genre (can be used multiple times)"
)
@click.option(
"--format",
type=click.Choice([
"TV", "TV_SHORT", "MOVIE", "SPECIAL", "OVA", "ONA", "MUSIC"
], case_sensitive=False),
help="Filter by format"
)
@click.option(
"--year",
type=int,
help="Filter by release year"
)
@click.option(
"--min-score",
type=float,
help="Minimum average score (0.0 - 10.0)"
)
@click.option(
"--max-score",
type=float,
help="Maximum average score (0.0 - 10.0)"
)
@click.option(
"--sort",
type=click.Choice([
"title", "score", "popularity", "year", "episodes", "updated"
], case_sensitive=False),
default="title",
help="Sort results by field"
)
@click.option(
"--limit",
type=int,
default=20,
help="Maximum number of results to show"
)
@click.option(
"--json",
"output_json",
is_flag=True,
help="Output results in JSON format"
)
@click.option(
"--api",
default="anilist",
type=click.Choice(["anilist"], case_sensitive=False),
help="Media API registry to search"
)
@click.pass_obj
def search(
config: AppConfig,
query: str | None,
status: str | None,
genre: tuple[str, ...],
format: str | None,
year: int | None,
min_score: float | None,
max_score: float | None,
sort: str,
limit: int,
output_json: bool,
api: str
):
"""
Search through your local media registry.
You can search by title and filter by various criteria like status,
genre, format, year, and score range.
"""
feedback = create_feedback_manager(config.general.icons)
console = Console()
try:
registry_service = MediaRegistryService(api, config.registry)
# Build search parameters
search_params = _build_search_params(
query, status, genre, format, year, min_score, max_score, sort, limit
)
# Perform search
result = registry_service.search_for_media(search_params)
if not result or not result.media:
feedback.info("No Results", "No media found matching your criteria")
return
if output_json:
import json
print(json.dumps(result.model_dump(), indent=2, default=str))
return
_display_search_results(console, result, config.general.icons)
except Exception as e:
feedback.error("Search Error", f"Failed to search registry: {e}")
raise click.Abort()
def _build_search_params(
query, status, genre, format, year, min_score, max_score, sort, limit
) -> MediaSearchParams:
"""Build MediaSearchParams from command options."""
# Convert status string to enum
status_enum = None
if status:
status_map = {
"watching": UserMediaListStatus.WATCHING,
"completed": UserMediaListStatus.COMPLETED,
"planning": UserMediaListStatus.PLANNING,
"dropped": UserMediaListStatus.DROPPED,
"paused": UserMediaListStatus.PAUSED,
"repeating": UserMediaListStatus.REPEATING,
}
status_enum = status_map.get(status.lower())
# Convert sort string to enum
sort_map = {
"title": MediaSort.TITLE_ROMAJI,
"score": MediaSort.SCORE_DESC,
"popularity": MediaSort.POPULARITY_DESC,
"year": MediaSort.START_DATE_DESC,
"episodes": MediaSort.EPISODES_DESC,
"updated": MediaSort.UPDATED_AT_DESC,
}
sort_enum = sort_map.get(sort.lower(), MediaSort.TITLE_ROMAJI)
# Convert format string to enum if provided
format_enum = None
if format:
from .....libs.media_api.types import MediaFormat
format_enum = getattr(MediaFormat, format.upper(), None)
# Convert genre strings to enums
genre_enums = []
if genre:
from .....libs.media_api.types import MediaGenre
for g in genre:
# Try to find matching genre enum
for genre_enum in MediaGenre:
if genre_enum.value.lower() == g.lower():
genre_enums.append(genre_enum)
break
return MediaSearchParams(
query=query,
per_page=limit,
sort=[sort_enum],
averageScore_greater=min_score * 10 if min_score else None, # Convert to AniList scale
averageScore_lesser=max_score * 10 if max_score else None,
genre_in=genre_enums if genre_enums else None,
format_in=[format_enum] if format_enum else None,
seasonYear=year,
# We'll handle status filtering differently since it's user-specific
)
def _display_search_results(console: Console, result, icons: bool):
"""Display search results in a formatted table."""
table = Table(title=f"{'🔍 ' if icons else ''}Search Results ({len(result.media)} found)")
table.add_column("Title", style="cyan", min_width=30)
table.add_column("Year", style="dim", justify="center", min_width=6)
table.add_column("Format", style="magenta", justify="center", min_width=8)
table.add_column("Episodes", style="green", justify="center", min_width=8)
table.add_column("Score", style="yellow", justify="center", min_width=6)
table.add_column("Status", style="blue", justify="center", min_width=10)
table.add_column("Progress", style="white", justify="center", min_width=8)
for media in result.media:
# Get title (prefer English, fallback to Romaji)
title = media.title.english or media.title.romaji or "Unknown"
if len(title) > 40:
title = title[:37] + "..."
# Get year from start date
year = ""
if media.start_date:
year = str(media.start_date.year)
# Format episodes
episodes = str(media.episodes) if media.episodes else "?"
# Format score
score = f"{media.average_score/10:.1f}" if media.average_score else "N/A"
# Get user status
status = "Not Listed"
progress = "0"
if media.user_status:
status = media.user_status.status.value.title() if media.user_status.status else "Unknown"
progress = f"{media.user_status.progress or 0}/{episodes}"
table.add_row(
title,
year,
media.format.value if media.format else "Unknown",
episodes,
score,
status,
progress
)
console.print(table)
# Show pagination info if applicable
if result.page_info.total > len(result.media):
console.print(
f"\n[dim]Showing {len(result.media)} of {result.page_info.total} total results[/dim]"
)

View File

@@ -0,0 +1,180 @@
"""
Registry stats command - show detailed statistics about the local registry
"""
import click
from rich.console import Console
from rich.table import Table
from rich.panel import Panel
from .....core.config import AppConfig
from ....service.registry.service import MediaRegistryService
from ....utils.feedback import create_feedback_manager
@click.command(help="Show detailed statistics about the local media registry")
@click.option(
"--detailed",
"-d",
is_flag=True,
help="Show detailed breakdown by genre, format, and year"
)
@click.option(
"--json",
"output_json",
is_flag=True,
help="Output statistics in JSON format"
)
@click.option(
"--api",
default="anilist",
type=click.Choice(["anilist"], case_sensitive=False),
help="Media API to show stats for"
)
@click.pass_obj
def stats(config: AppConfig, detailed: bool, output_json: bool, api: str):
"""
Display comprehensive statistics about your local media registry.
Shows total counts, status breakdown, and optionally detailed
analysis by genre, format, and release year.
"""
feedback = create_feedback_manager(config.general.icons)
console = Console()
try:
registry_service = MediaRegistryService(api, config.registry)
stats_data = registry_service.get_registry_stats()
if output_json:
import json
print(json.dumps(stats_data, indent=2, default=str))
return
_display_stats_overview(console, stats_data, api, config.general.icons)
if detailed:
_display_detailed_stats(console, stats_data, config.general.icons)
except Exception as e:
feedback.error("Stats Error", f"Failed to generate statistics: {e}")
raise click.Abort()
def _display_stats_overview(console: Console, stats: dict, api: str, icons: bool):
"""Display basic registry statistics overview."""
# Main overview panel
overview_text = f"[bold cyan]Media API:[/bold cyan] {api.title()}\n"
overview_text += f"[bold cyan]Total Media:[/bold cyan] {stats.get('total_media', 0)}\n"
overview_text += f"[bold cyan]Registry Version:[/bold cyan] {stats.get('version', 'Unknown')}\n"
overview_text += f"[bold cyan]Last Updated:[/bold cyan] {stats.get('last_updated', 'Never')}\n"
overview_text += f"[bold cyan]Storage Size:[/bold cyan] {stats.get('storage_size', 'Unknown')}"
panel = Panel(
overview_text,
title=f"{'📊 ' if icons else ''}Registry Overview",
border_style="cyan"
)
console.print(panel)
console.print()
# Status breakdown table
status_breakdown = stats.get('status_breakdown', {})
if status_breakdown:
table = Table(title=f"{'📋 ' if icons else ''}Status Breakdown")
table.add_column("Status", style="cyan", no_wrap=True)
table.add_column("Count", style="magenta", justify="right")
table.add_column("Percentage", style="green", justify="right")
total = sum(status_breakdown.values())
for status, count in sorted(status_breakdown.items()):
percentage = (count / total * 100) if total > 0 else 0
table.add_row(
status.title(),
str(count),
f"{percentage:.1f}%"
)
console.print(table)
console.print()
# Download status breakdown
download_stats = stats.get('download_stats', {})
if download_stats:
table = Table(title=f"{'💾 ' if icons else ''}Download Status")
table.add_column("Status", style="cyan", no_wrap=True)
table.add_column("Count", style="magenta", justify="right")
for status, count in download_stats.items():
table.add_row(status.title(), str(count))
console.print(table)
console.print()
def _display_detailed_stats(console: Console, stats: dict, icons: bool):
"""Display detailed breakdown by various categories."""
# Genre breakdown
genre_breakdown = stats.get('genre_breakdown', {})
if genre_breakdown:
table = Table(title=f"{'🎭 ' if icons else ''}Top Genres")
table.add_column("Genre", style="cyan")
table.add_column("Count", style="magenta", justify="right")
# Sort by count and show top 10
top_genres = sorted(genre_breakdown.items(), key=lambda x: x[1], reverse=True)[:10]
for genre, count in top_genres:
table.add_row(genre, str(count))
console.print(table)
console.print()
# Format breakdown
format_breakdown = stats.get('format_breakdown', {})
if format_breakdown:
table = Table(title=f"{'📺 ' if icons else ''}Format Breakdown")
table.add_column("Format", style="cyan")
table.add_column("Count", style="magenta", justify="right")
table.add_column("Percentage", style="green", justify="right")
total = sum(format_breakdown.values())
for format_type, count in sorted(format_breakdown.items()):
percentage = (count / total * 100) if total > 0 else 0
table.add_row(
format_type,
str(count),
f"{percentage:.1f}%"
)
console.print(table)
console.print()
# Year breakdown
year_breakdown = stats.get('year_breakdown', {})
if year_breakdown:
table = Table(title=f"{'📅 ' if icons else ''}Release Years (Top 10)")
table.add_column("Year", style="cyan", justify="center")
table.add_column("Count", style="magenta", justify="right")
# Sort by year descending and show top 10
top_years = sorted(year_breakdown.items(), key=lambda x: x[0], reverse=True)[:10]
for year, count in top_years:
table.add_row(str(year), str(count))
console.print(table)
console.print()
# Rating breakdown
rating_breakdown = stats.get('rating_breakdown', {})
if rating_breakdown:
table = Table(title=f"{'' if icons else ''}Score Distribution")
table.add_column("Score Range", style="cyan")
table.add_column("Count", style="magenta", justify="right")
for score_range, count in sorted(rating_breakdown.items()):
table.add_row(score_range, str(count))
console.print(table)
console.print()

View File

@@ -0,0 +1,268 @@
"""
Registry sync command - synchronize local registry with remote media API
"""
import click
from rich.progress import Progress
from .....core.config import AppConfig
from .....core.exceptions import FastAnimeError
from .....libs.media_api.api import create_api_client
from .....libs.media_api.params import UserMediaListSearchParams
from .....libs.media_api.types import UserMediaListStatus
from ....service.registry.service import MediaRegistryService
from ....utils.feedback import create_feedback_manager
@click.command(help="Synchronize local registry with remote media API")
@click.option(
"--download",
"-d",
is_flag=True,
help="Download remote user list to local registry"
)
@click.option(
"--upload",
"-u",
is_flag=True,
help="Upload local registry changes to remote API"
)
@click.option(
"--force",
"-f",
is_flag=True,
help="Force sync even if there are conflicts"
)
@click.option(
"--dry-run",
is_flag=True,
help="Show what would be synced without making changes"
)
@click.option(
"--status",
multiple=True,
type=click.Choice([
"watching", "completed", "planning", "dropped", "paused", "repeating"
], case_sensitive=False),
help="Only sync specific status lists (can be used multiple times)"
)
@click.option(
"--api",
default="anilist",
type=click.Choice(["anilist"], case_sensitive=False),
help="Media API to sync with"
)
@click.pass_obj
def sync(
config: AppConfig,
download: bool,
upload: bool,
force: bool,
dry_run: bool,
status: tuple[str, ...],
api: str
):
"""
Synchronize local registry with remote media API.
This command can download your remote media list to the local registry,
upload local changes to the remote API, or both.
"""
feedback = create_feedback_manager(config.general.icons)
# Default to both download and upload if neither specified
if not download and not upload:
download = upload = True
# Check authentication
try:
api_client = create_api_client(api, config)
if not api_client.is_authenticated():
feedback.error(
"Authentication Required",
f"You must be logged in to {api} to sync your media list."
)
feedback.info("Run this command to authenticate:", f"fastanime {api} auth")
raise click.Abort()
except Exception as e:
feedback.error("API Error", f"Failed to connect to {api}: {e}")
raise click.Abort()
# Initialize registry service
try:
registry_service = MediaRegistryService(api, config.registry)
except Exception as e:
feedback.error("Registry Error", f"Failed to initialize registry: {e}")
raise click.Abort()
# Determine which statuses to sync
status_list = list(status) if status else [
"watching", "completed", "planning", "dropped", "paused", "repeating"
]
# Convert to enum values
status_map = {
"watching": UserMediaListStatus.WATCHING,
"completed": UserMediaListStatus.COMPLETED,
"planning": UserMediaListStatus.PLANNING,
"dropped": UserMediaListStatus.DROPPED,
"paused": UserMediaListStatus.PAUSED,
"repeating": UserMediaListStatus.REPEATING,
}
statuses_to_sync = [status_map[s] for s in status_list]
with Progress() as progress:
if download:
_sync_download(
api_client, registry_service, statuses_to_sync,
feedback, progress, dry_run, force
)
if upload:
_sync_upload(
api_client, registry_service, statuses_to_sync,
feedback, progress, dry_run, force
)
feedback.success("Sync Complete", "Registry synchronization finished successfully")
def _sync_download(
api_client, registry_service, statuses, feedback, progress, dry_run, force
):
"""Download remote media list to local registry."""
feedback.info("Starting Download", "Fetching remote media lists...")
download_task = progress.add_task("Downloading media lists...", total=len(statuses))
total_downloaded = 0
total_updated = 0
for status in statuses:
try:
# Fetch all pages for this status
page = 1
while True:
params = UserMediaListSearchParams(
status=status,
page=page,
per_page=50
)
result = api_client.search_media_list(params)
if not result or not result.media:
break
for media_item in result.media:
if dry_run:
feedback.info(
"Would download",
f"{media_item.title.english or media_item.title.romaji} ({status.value})"
)
else:
# Get or create record and update with user status
record = registry_service.get_or_create_record(media_item)
# Update index entry with latest status
if media_item.user_status:
registry_service.update_media_index_entry(
media_item.id,
media_item=media_item,
status=media_item.user_status.status,
progress=str(media_item.user_status.progress or 0),
score=media_item.user_status.score,
repeat=media_item.user_status.repeat,
notes=media_item.user_status.notes,
)
total_updated += 1
registry_service.save_media_record(record)
total_downloaded += 1
if not result.page_info.has_next_page:
break
page += 1
except Exception as e:
feedback.error(f"Download Error ({status.value})", str(e))
continue
progress.advance(download_task)
if not dry_run:
feedback.success(
"Download Complete",
f"Downloaded {total_downloaded} media entries, updated {total_updated} existing entries"
)
def _sync_upload(
api_client, registry_service, statuses, feedback, progress, dry_run, force
):
"""Upload local registry changes to remote API."""
feedback.info("Starting Upload", "Syncing local changes to remote...")
upload_task = progress.add_task("Uploading changes...", total=None)
total_uploaded = 0
total_errors = 0
try:
# Get all media records from registry
all_records = registry_service.get_all_media_records()
for record in all_records:
try:
# Get the index entry for this media
index_entry = registry_service.get_media_index_entry(record.media_item.id)
if not index_entry or not index_entry.status:
continue
# Only sync if status is in our target list
if index_entry.status not in statuses:
continue
if dry_run:
feedback.info(
"Would upload",
f"{record.media_item.title.english or record.media_item.title.romaji} "
f"({index_entry.status.value}, progress: {index_entry.progress or 0})"
)
else:
# Update remote list entry
from .....libs.media_api.params import UpdateUserMediaListEntryParams
update_params = UpdateUserMediaListEntryParams(
media_id=record.media_item.id,
status=index_entry.status,
progress=index_entry.progress,
score=index_entry.score,
)
if api_client.update_list_entry(update_params):
total_uploaded += 1
else:
total_errors += 1
feedback.warning(
"Upload Failed",
f"Failed to upload {record.media_item.title.english or record.media_item.title.romaji}"
)
except Exception as e:
total_errors += 1
feedback.error("Upload Error", f"Failed to upload media {record.media_item.id}: {e}")
continue
except Exception as e:
feedback.error("Upload Error", f"Failed to get local records: {e}")
return
progress.remove_task(upload_task)
if not dry_run:
feedback.success(
"Upload Complete",
f"Uploaded {total_uploaded} entries, {total_errors} errors"
)

View File

@@ -0,0 +1,31 @@
"""
Example usage for the registry command
"""
main = """
Examples:
# Sync with remote AniList
fastanime registry sync --upload --download
# Show detailed registry statistics
fastanime registry stats --detailed
# Search local registry
fastanime registry search "attack on titan"
# Export registry to JSON
fastanime registry export --format json --output backup.json
# Import from backup
fastanime registry import backup.json
# Clean up orphaned entries
fastanime registry clean --dry-run
# Create full backup
fastanime registry backup --compress
# Restore from backup
fastanime registry restore backup.tar.gz
"""