fix(registry-commands): make it work

This commit is contained in:
Benexl
2025-07-29 18:03:05 +03:00
parent ba4df96dc8
commit b9b0d49530
6 changed files with 566 additions and 911 deletions

View File

@@ -2,15 +2,21 @@
Registry backup command - create full backups of the registry
"""
import json
import tarfile
from pathlib import Path
from datetime import datetime
from io import BytesIO
from pathlib import Path
from typing import TYPE_CHECKING
import click
from .....core.config import AppConfig
from ....service.feedback import FeedbackService
from ....service.registry.service import MediaRegistryService
from ....utils.feedback import create_feedback_manager
if TYPE_CHECKING:
from ....service.registry.service import StatBreakdown
@click.command(help="Create a full backup of the registry")
@@ -50,10 +56,10 @@ def backup(
Includes all media records, index files, and optionally cache data.
Backups can be compressed and are suitable for restoration.
"""
feedback = create_feedback_manager(config.general.icons)
feedback = FeedbackService(config)
try:
registry_service = MediaRegistryService(api, config.registry)
registry_service = MediaRegistryService(api, config.media_registry)
# Generate output filename if not specified
if not output:
@@ -99,15 +105,14 @@ def backup(
def _create_tar_backup(
registry_service,
registry_service: MediaRegistryService,
output_path: Path,
compress: bool,
include_cache: bool,
feedback,
feedback: FeedbackService,
api: str,
):
"""Create a tar-based backup."""
mode = "w:gz" if compress else "w"
with tarfile.open(output_path, mode) as tar:
@@ -130,25 +135,27 @@ def _create_tar_backup(
tar.add(cache_dir, arcname="cache")
feedback.info("Added to backup", "Cache data")
# Add metadata file
metadata = _create_backup_metadata(registry_service, api, include_cache)
metadata_path = output_path.parent / "backup_metadata.json"
# Add metadata file directly into the archive without creating a temp file
try:
import json
metadata = _create_backup_metadata(registry_service, api, include_cache)
metadata_bytes = json.dumps(metadata, indent=2, default=str).encode("utf-8")
with open(metadata_path, "w", encoding="utf-8") as f:
json.dump(metadata, f, indent=2, default=str)
tar.add(metadata_path, arcname="backup_metadata.json")
metadata_path.unlink() # Clean up temp file
tarinfo = tarfile.TarInfo(name="backup_metadata.json")
tarinfo.size = len(metadata_bytes)
tarinfo.mtime = int(datetime.now().timestamp())
with BytesIO(metadata_bytes) as bio:
tar.addfile(tarinfo, bio)
except Exception as e:
feedback.warning("Metadata Error", f"Failed to add metadata: {e}")
def _create_zip_backup(
registry_service, output_path: Path, include_cache: bool, feedback, api: str
registry_service: MediaRegistryService,
output_path: Path,
include_cache: bool,
feedback: FeedbackService,
api: str,
):
"""Create a zip-based backup."""
import zipfile
@@ -183,23 +190,25 @@ def _create_zip_backup(
feedback.info("Added to backup", "Cache data")
# Add metadata
metadata = _create_backup_metadata(registry_service, api, include_cache)
try:
import json
metadata = _create_backup_metadata(registry_service, api, include_cache)
metadata_json = json.dumps(metadata, indent=2, default=str)
zip_file.writestr("backup_metadata.json", metadata_json)
except Exception as e:
feedback.warning("Metadata Error", f"Failed to add metadata: {e}")
def _create_backup_metadata(registry_service, api: str, include_cache: bool) -> dict:
def _create_backup_metadata(
registry_service: MediaRegistryService, api: str, include_cache: bool
) -> dict:
"""Create backup metadata."""
from .....core.constants import __version__
stats = registry_service.get_registry_stats()
return {
"backup_timestamp": datetime.now().isoformat(),
"fastanime_version": "unknown", # You might want to get this from somewhere
"fastanime_version": __version__,
"registry_version": stats.get("version"),
"api": api,
"total_media": stats.get("total_media", 0),
@@ -209,9 +218,10 @@ def _create_backup_metadata(registry_service, api: str, include_cache: bool) ->
}
def _show_backup_summary(backup_path: Path, format_type: str, feedback):
def _show_backup_summary(
backup_path: Path, format_type: str, feedback: FeedbackService
):
"""Show summary of backup contents."""
try:
if format_type.lower() == "tar":
with tarfile.open(backup_path, "r:*") as tar:
@@ -235,11 +245,14 @@ def _show_backup_summary(backup_path: Path, format_type: str, feedback):
def _format_file_size(file_path: Path) -> str:
"""Format file size in human-readable format."""
try:
size = file_path.stat().st_size
for unit in ["B", "KB", "MB", "GB"]:
if size < 1024.0:
return f"{size:.1f} {unit}"
size /= 1024.0
return f"{size:.1f} TB"
except:
size_bytes: float = float(file_path.stat().st_size)
if size_bytes == 0:
return "0 B"
size_name = ("B", "KB", "MB", "GB", "TB")
i = 0
while size_bytes >= 1024.0 and i < len(size_name) - 1:
size_bytes /= 1024.0
i += 1
return f"{size_bytes:.1f} {size_name[i]}"
except FileNotFoundError:
return "Unknown size"

View File

@@ -2,13 +2,16 @@
Registry clean command - clean up orphaned entries and invalid data
"""
import json
from typing import Dict, List
import click
from rich.console import Console
from rich.table import Table
from .....core.config import AppConfig
from ....service.feedback import FeedbackService
from ....service.registry.service import MediaRegistryService
from ....utils.feedback import create_feedback_manager
@click.command(help="Clean up orphaned entries and invalid data from registry")
@@ -51,7 +54,7 @@ def clean(
Can remove orphaned entries, invalid data, duplicates, and entries
from old format versions. Use --dry-run to preview changes.
"""
feedback = create_feedback_manager(config.general.icons)
feedback = FeedbackService(config)
console = Console()
# Default to all cleanup types if none specified
@@ -59,9 +62,9 @@ def clean(
orphaned = invalid = duplicates = old_format = True
try:
registry_service = MediaRegistryService(api, config.registry)
registry_service = MediaRegistryService(api, config.media_registry)
cleanup_results = {
cleanup_results: Dict[str, List] = {
"orphaned": [],
"invalid": [],
"duplicates": [],
@@ -69,33 +72,35 @@ def clean(
}
# Analyze registry for cleanup opportunities
_analyze_registry(
registry_service, cleanup_results, orphaned, invalid, duplicates, old_format
)
with feedback.progress("Analyzing registry..."):
_analyze_registry(
registry_service,
cleanup_results,
orphaned,
invalid,
duplicates,
old_format,
)
# Show cleanup summary
_display_cleanup_summary(console, cleanup_results, config.general.icons)
# Confirm cleanup if not dry run and not forced
total_items = sum(len(items) for items in cleanup_results.values())
if total_items == 0:
feedback.info(
feedback.success(
"Registry Clean", "No cleanup needed - registry is already clean!"
)
return
if not dry_run:
if not force:
if not click.confirm(f"Clean up {total_items} items from registry?"):
feedback.info("Cleanup Cancelled", "No changes were made")
return
if not force and not click.confirm(
f"Clean up {total_items} items from registry?"
):
feedback.info("Cleanup Cancelled", "No changes were made")
return
# Perform cleanup
_perform_cleanup(registry_service, cleanup_results, feedback)
feedback.success(
"Cleanup Complete", f"Cleaned up {total_items} items from registry"
)
else:
feedback.info("Dry Run Complete", f"Would clean up {total_items} items")
@@ -105,172 +110,107 @@ def clean(
def _analyze_registry(
registry_service,
results: dict,
registry_service: MediaRegistryService,
results: Dict[str, List],
check_orphaned: bool,
check_invalid: bool,
check_duplicates: bool,
check_old_format: bool,
):
"""Analyze registry for cleanup opportunities."""
if check_orphaned:
results["orphaned"] = _find_orphaned_entries(registry_service)
if check_invalid:
results["invalid"] = _find_invalid_entries(registry_service)
if check_duplicates:
results["duplicates"] = _find_duplicate_entries(registry_service)
if check_old_format:
results["old_format"] = _find_old_format_entries(registry_service)
def _find_orphaned_entries(registry_service) -> list:
def _find_orphaned_entries(registry_service: MediaRegistryService) -> list:
"""Find index entries that don't have corresponding media files."""
orphaned = []
try:
index = registry_service._load_index()
for entry_key, entry in index.media_index.items():
media_file = registry_service._get_media_file_path(entry.media_id)
if not media_file.exists():
orphaned.append(
{
"type": "orphaned_index",
"id": entry.media_id,
"key": entry_key,
"reason": "Media file missing",
}
)
except Exception:
pass
index = registry_service._load_index()
for entry_key, entry in index.media_index.items():
media_file = registry_service._get_media_file_path(entry.media_id)
if not media_file.exists():
orphaned.append(
{"id": entry.media_id, "key": entry_key, "reason": "Media file missing"}
)
return orphaned
def _find_invalid_entries(registry_service) -> list:
def _find_invalid_entries(registry_service: MediaRegistryService) -> list:
"""Find invalid or corrupted entries."""
invalid = []
try:
# Check all media files
for media_file in registry_service.media_registry_dir.iterdir():
if not media_file.name.endswith(".json"):
continue
try:
media_id = int(media_file.stem)
record = registry_service.get_media_record(media_id)
# Check for invalid record structure
if not record or not record.media_item:
invalid.append(
{
"type": "invalid_record",
"id": media_id,
"file": media_file,
"reason": "Invalid record structure",
}
)
elif (
not record.media_item.title
or not record.media_item.title.english
and not record.media_item.title.romaji
):
invalid.append(
{
"type": "invalid_title",
"id": media_id,
"file": media_file,
"reason": "Missing or invalid title",
}
)
except (ValueError, Exception) as e:
for media_file in registry_service.media_registry_dir.glob("*.json"):
try:
media_id = int(media_file.stem)
record = registry_service.get_media_record(media_id)
if (
not record
or not record.media_item
or not record.media_item.title.english
and not record.media_item.title.romaji
):
invalid.append(
{
"type": "corrupted_file",
"id": media_file.stem,
"id": media_id,
"file": media_file,
"reason": f"File corruption: {e}",
"reason": "Invalid record structure or missing title",
}
)
except Exception:
pass
except (ValueError, json.JSONDecodeError) as e:
invalid.append(
{
"id": media_file.stem,
"file": media_file,
"reason": f"File corruption: {e}",
}
)
return invalid
def _find_duplicate_entries(registry_service) -> list:
def _find_duplicate_entries(registry_service: MediaRegistryService) -> list:
"""Find duplicate entries (same media ID appearing multiple times)."""
duplicates = []
seen_ids = set()
try:
index = registry_service._load_index()
for entry_key, entry in index.media_index.items():
if entry.media_id in seen_ids:
duplicates.append(
{
"type": "duplicate_index",
"id": entry.media_id,
"key": entry_key,
"reason": "Duplicate media ID in index",
}
)
else:
seen_ids.add(entry.media_id)
except Exception:
pass
index = registry_service._load_index()
for entry_key, entry in index.media_index.items():
if entry.media_id in seen_ids:
duplicates.append(
{
"id": entry.media_id,
"key": entry_key,
"reason": "Duplicate media ID in index",
}
)
else:
seen_ids.add(entry.media_id)
return duplicates
def _find_old_format_entries(registry_service) -> list:
def _find_old_format_entries(registry_service: MediaRegistryService) -> list:
"""Find entries from old registry format versions."""
from ....service.registry.service import REGISTRY_VERSION
old_format = []
try:
index = registry_service._load_index()
current_version = registry_service._index.version
# Check for entries that might be from old formats
# This is a placeholder - you'd implement specific checks based on your version history
for media_file in registry_service.media_registry_dir.iterdir():
if not media_file.name.endswith(".json"):
continue
try:
import json
with open(media_file, "r") as f:
data = json.load(f)
# Check for old format indicators
if "version" in data and data["version"] < current_version:
old_format.append(
{
"type": "old_version",
"id": media_file.stem,
"file": media_file,
"reason": f"Old format version {data.get('version')}",
}
)
except Exception:
pass
except Exception:
pass
index = registry_service._load_index()
current_version = index.version
if index.version != REGISTRY_VERSION:
old_format.append(
{
"id": "index",
"file": registry_service._index_file,
"reason": f"Index version mismatch ({index.version})",
}
)
return old_format
def _display_cleanup_summary(console: Console, results: dict, icons: bool):
def _display_cleanup_summary(console: Console, results: Dict[str, List], icons: bool):
"""Display summary of cleanup opportunities."""
table = Table(title=f"{'🧹 ' if icons else ''}Registry Cleanup Summary")
table.add_column("Category", style="cyan", no_wrap=True)
table.add_column("Count", style="magenta", justify="right")
@@ -278,123 +218,69 @@ def _display_cleanup_summary(console: Console, results: dict, icons: bool):
categories = {
"orphaned": "Orphaned Entries",
"invalid": "Invalid Entries",
"invalid": "Invalid/Corrupt Entries",
"duplicates": "Duplicate Entries",
"old_format": "Old Format Entries",
"old_format": "Outdated Format",
}
for category, display_name in categories.items():
count = len(results[category])
description = "None found"
if count > 0:
# Get sample reasons
reasons = set(item["reason"] for item in results[category][:3])
reasons = {item["reason"] for item in results[category][:3]}
description = "; ".join(list(reasons)[:2])
if len(reasons) > 2:
description += "..."
else:
description = "None found"
table.add_row(display_name, str(count), description)
console.print(table)
console.print()
# Show detailed breakdown if there are items to clean
for category, items in results.items():
if items:
_display_category_details(console, category, items, icons)
def _display_category_details(
console: Console, category: str, items: list, icons: bool
def _perform_cleanup(
registry_service: MediaRegistryService,
results: Dict[str, List],
feedback: FeedbackService,
):
"""Display detailed breakdown for a cleanup category."""
category_names = {
"orphaned": "🔗 Orphaned Entries" if icons else "Orphaned Entries",
"invalid": "❌ Invalid Entries" if icons else "Invalid Entries",
"duplicates": "👥 Duplicate Entries" if icons else "Duplicate Entries",
"old_format": "📼 Old Format Entries" if icons else "Old Format Entries",
}
table = Table(title=category_names.get(category, category.title()))
table.add_column("ID", style="cyan", no_wrap=True)
table.add_column("Type", style="magenta")
table.add_column("Reason", style="yellow")
for item in items[:10]: # Show max 10 items
table.add_row(str(item["id"]), item["type"], item["reason"])
if len(items) > 10:
table.add_row("...", "...", f"And {len(items) - 10} more")
console.print(table)
console.print()
def _perform_cleanup(registry_service, results: dict, feedback):
"""Perform the actual cleanup operations."""
cleaned_count = 0
total_to_clean = sum(len(v) for v in results.values())
# Clean orphaned entries
for item in results["orphaned"]:
try:
if item["type"] == "orphaned_index":
index = registry_service._load_index()
if item["key"] in index.media_index:
del index.media_index[item["key"]]
registry_service._save_index(index)
with feedback.progress("Cleaning registry...", total=total_to_clean) as (
task_id,
progress,
):
def _cleanup_item(item_list, cleanup_func):
nonlocal cleaned_count
for item in item_list:
try:
cleanup_func(item)
cleaned_count += 1
except Exception as e:
feedback.warning(
"Cleanup Error", f"Failed to clean orphaned entry {item['id']}: {e}"
)
except Exception as e:
feedback.warning(
"Cleanup Error",
f"Failed to clean item {item.get('id', 'N/A')}: {e}",
)
progress.advance(task_id) # type: ignore
# Clean invalid entries
for item in results["invalid"]:
try:
if "file" in item:
item["file"].unlink() # Delete the file
cleaned_count += 1
index = registry_service._load_index()
# Also remove from index if present
index = registry_service._load_index()
entry_key = f"{registry_service._media_api}_{item['id']}"
if entry_key in index.media_index:
del index.media_index[entry_key]
registry_service._save_index(index)
_cleanup_item(
results["orphaned"], lambda item: index.media_index.pop(item["key"], None)
)
_cleanup_item(results["invalid"], lambda item: item["file"].unlink())
_cleanup_item(
results["duplicates"], lambda item: index.media_index.pop(item["key"], None)
)
except Exception as e:
feedback.warning(
"Cleanup Error", f"Failed to clean invalid entry {item['id']}: {e}"
)
from ....service.registry.service import REGISTRY_VERSION
# Clean duplicates
for item in results["duplicates"]:
try:
if item["type"] == "duplicate_index":
index = registry_service._load_index()
if item["key"] in index.media_index:
del index.media_index[item["key"]]
registry_service._save_index(index)
cleaned_count += 1
except Exception as e:
feedback.warning(
"Cleanup Error", f"Failed to clean duplicate entry {item['id']}: {e}"
)
# For old format, we just re-save the index to update its version
if results["old_format"]:
index.version = REGISTRY_VERSION
progress.advance(task_id, len(results["old_format"])) # type:ignore
# Clean old format entries
for item in results["old_format"]:
try:
if "file" in item:
# You might want to migrate instead of delete
# For now, we'll just remove old format files
item["file"].unlink()
cleaned_count += 1
except Exception as e:
feedback.warning(
"Cleanup Error", f"Failed to clean old format entry {item['id']}: {e}"
)
feedback.info("Cleanup Results", f"Successfully cleaned {cleaned_count} items")
registry_service._save_index(index)
feedback.success(
"Cleanup Complete",
f"Successfully cleaned {cleaned_count} items from the registry.",
)

View File

@@ -2,16 +2,20 @@
Registry export command - export registry data to various formats
"""
import json
import csv
from pathlib import Path
import json
from datetime import datetime
from pathlib import Path
from typing import TYPE_CHECKING
import click
from .....core.config import AppConfig
from ....service.feedback import FeedbackService
from ....service.registry.service import MediaRegistryService
from ....utils.feedback import create_feedback_manager
if TYPE_CHECKING:
from ....service.registry.models import MediaRecord
@click.command(help="Export registry data to various formats")
@@ -25,7 +29,7 @@ from ....utils.feedback import create_feedback_manager
@click.option(
"--output",
"-o",
type=click.Path(),
type=click.Path(path_type=Path),
help="Output file path (auto-generated if not specified)",
)
@click.option(
@@ -51,7 +55,7 @@ from ....utils.feedback import create_feedback_manager
def export(
config: AppConfig,
output_format: str,
output: str | None,
output: Path | None,
include_metadata: bool,
status: tuple[str, ...],
compress: bool,
@@ -63,10 +67,10 @@ def export(
Supports JSON, CSV, and XML formats. Can optionally include
detailed metadata and compress the output.
"""
feedback = create_feedback_manager(config.general.icons)
feedback = FeedbackService(config)
try:
registry_service = MediaRegistryService(api, config.registry)
registry_service = MediaRegistryService(api, config.media_registry)
# Generate output filename if not specified
if not output:
@@ -74,20 +78,30 @@ def export(
extension = output_format.lower()
if compress:
extension += ".gz"
output = f"fastanime_registry_{api}_{timestamp}.{extension}"
output_path = Path(output)
output_path = Path(f"fastanime_registry_{api}_{timestamp}.{extension}")
else:
output_path = output
# Get export data
export_data = _prepare_export_data(registry_service, include_metadata, status)
if not export_data["media"]:
feedback.warning(
"No Data", "No media entries to export based on your criteria."
)
return
# Export based on format
if output_format.lower() == "json":
_export_json(export_data, output_path, compress, feedback)
_export_json(export_data, output_path)
elif output_format.lower() == "csv":
_export_csv(export_data, output_path, compress, feedback)
_export_csv(export_data, output_path)
elif output_format.lower() == "xml":
_export_xml(export_data, output_path, compress, feedback)
_export_xml(export_data, output_path)
if compress:
_compress_file(output_path, feedback)
output_path = output_path.with_suffix(output_path.suffix + ".gz")
feedback.success(
"Export Complete",
@@ -100,11 +114,11 @@ def export(
def _prepare_export_data(
registry_service, include_metadata: bool, status_filter: tuple[str, ...]
registry_service: MediaRegistryService,
include_metadata: bool,
status_filter: tuple[str, ...],
) -> dict:
"""Prepare data for export based on options."""
# Convert status filter to enums
from .....libs.media_api.types import UserMediaListStatus
status_map = {
@@ -115,194 +129,106 @@ def _prepare_export_data(
"paused": UserMediaListStatus.PAUSED,
"repeating": UserMediaListStatus.REPEATING,
}
status_enums = [status_map[s] for s in status_filter] if status_filter else None
status_enums = {status_map[s] for s in status_filter}
export_data = {
"metadata": {
"export_timestamp": datetime.now().isoformat(),
"registry_version": registry_service._load_index().version,
"include_metadata": include_metadata,
"filtered_status": list(status_filter) if status_filter else None,
"filtered_status": list(status_filter) if status_filter else "all",
},
"statistics": registry_service.get_registry_stats(),
"media": [],
}
# Get all records and filter by status if specified
all_records = registry_service.get_all_media_records()
for record in all_records:
index_entry = registry_service.get_media_index_entry(record.media_item.id)
# Skip if status filter is specified and doesn't match
if status_enums and (not index_entry or index_entry.status not in status_enums):
continue
media_data = {
"id": record.media_item.id,
"title": {
"english": record.media_item.title.english,
"romaji": record.media_item.title.romaji,
"native": record.media_item.title.native,
},
"user_status": {
"status": index_entry.status.value
if index_entry and index_entry.status
else None,
"progress": index_entry.progress if index_entry else None,
"score": index_entry.score if index_entry else None,
"last_watched": index_entry.last_watched.isoformat()
if index_entry and index_entry.last_watched
else None,
"notes": index_entry.notes if index_entry else None,
},
}
if include_metadata:
media_data.update(
{
"format": record.media_item.format.value
if record.media_item.format
else None,
"episodes": record.media_item.episodes,
"duration": record.media_item.duration,
"status": record.media_item.status.value
if record.media_item.status
else None,
"start_date": record.media_item.start_date.isoformat()
if record.media_item.start_date
else None,
"end_date": record.media_item.end_date.isoformat()
if record.media_item.end_date
else None,
"average_score": record.media_item.average_score,
"popularity": record.media_item.popularity,
"genres": [genre.value for genre in record.media_item.genres],
"tags": [
{"name": tag.name.value, "rank": tag.rank}
for tag in record.media_item.tags
],
"studios": [
studio.name
for studio in record.media_item.studios
if studio.name
],
"description": record.media_item.description,
"cover_image": {
"large": record.media_item.cover_image.large
if record.media_item.cover_image
else None,
"medium": record.media_item.cover_image.medium
if record.media_item.cover_image
else None,
}
if record.media_item.cover_image
else None,
}
)
media_data = _flatten_record_for_export(record, index_entry, include_metadata)
export_data["media"].append(media_data)
return export_data
def _export_json(data: dict, output_path: Path, compress: bool, feedback):
"""Export data to JSON format."""
if compress:
import gzip
def _flatten_record_for_export(
record: "MediaRecord", index_entry, include_metadata: bool
) -> dict:
"""Helper to convert a MediaRecord into a flat dictionary for exporting."""
media_item = record.media_item
with gzip.open(output_path, "wt", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
else:
with open(output_path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
data = {
"id": media_item.id,
"title_english": media_item.title.english,
"title_romaji": media_item.title.romaji,
"title_native": media_item.title.native,
"user_status": index_entry.status.value
if index_entry and index_entry.status
else None,
"user_progress": index_entry.progress if index_entry else None,
"user_score": index_entry.score if index_entry else None,
"user_last_watched": index_entry.last_watched.isoformat()
if index_entry and index_entry.last_watched
else None,
"user_notes": index_entry.notes if index_entry else None,
}
def _export_csv(data: dict, output_path: Path, compress: bool, feedback):
"""Export data to CSV format."""
# Flatten media data for CSV
fieldnames = [
"id",
"title_english",
"title_romaji",
"title_native",
"status",
"progress",
"score",
"last_watched",
"notes",
]
# Add metadata fields if included
if data["metadata"]["include_metadata"]:
fieldnames.extend(
[
"format",
"episodes",
"duration",
"media_status",
"start_date",
"end_date",
"average_score",
"popularity",
"genres",
"description",
]
)
def write_csv(file_obj):
writer = csv.DictWriter(file_obj, fieldnames=fieldnames)
writer.writeheader()
for media in data["media"]:
row = {
"id": media["id"],
"title_english": media["title"]["english"],
"title_romaji": media["title"]["romaji"],
"title_native": media["title"]["native"],
"status": media["user_status"]["status"],
"progress": media["user_status"]["progress"],
"score": media["user_status"]["score"],
"last_watched": media["user_status"]["last_watched"],
"notes": media["user_status"]["notes"],
if include_metadata:
data.update(
{
"format": media_item.format.value if media_item.format else None,
"episodes": media_item.episodes,
"duration_minutes": media_item.duration,
"media_status": media_item.status.value if media_item.status else None,
"start_date": media_item.start_date.isoformat()
if media_item.start_date
else None,
"end_date": media_item.end_date.isoformat()
if media_item.end_date
else None,
"average_score": media_item.average_score,
"popularity": media_item.popularity,
"genres": ", ".join([genre.value for genre in media_item.genres]),
"tags": ", ".join([tag.name.value for tag in media_item.tags]),
"studios": ", ".join(
[studio.name for studio in media_item.studios if studio.name]
),
"description": media_item.description,
"cover_image_large": media_item.cover_image.large
if media_item.cover_image
else None,
}
if data["metadata"]["include_metadata"]:
row.update(
{
"format": media.get("format"),
"episodes": media.get("episodes"),
"duration": media.get("duration"),
"media_status": media.get("status"),
"start_date": media.get("start_date"),
"end_date": media.get("end_date"),
"average_score": media.get("average_score"),
"popularity": media.get("popularity"),
"genres": ",".join(media.get("genres", [])),
"description": media.get("description"),
}
)
writer.writerow(row)
if compress:
import gzip
with gzip.open(output_path, "wt", encoding="utf-8", newline="") as f:
write_csv(f)
else:
with open(output_path, "w", encoding="utf-8", newline="") as f:
write_csv(f)
)
return data
def _export_xml(data: dict, output_path: Path, compress: bool, feedback):
def _export_json(data: dict, output_path: Path):
"""Export data to JSON format."""
with open(output_path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
def _export_csv(data: dict, output_path: Path):
"""Export data to CSV format."""
if not data["media"]:
return
fieldnames = list(data["media"][0].keys())
with open(output_path, "w", encoding="utf-8", newline="") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(data["media"])
def _export_xml(data: dict, output_path: Path):
"""Export data to XML format."""
try:
import xml.etree.ElementTree as ET
except ImportError:
feedback.error("XML Export Error", "XML export requires Python's xml module")
raise click.Abort()
import xml.etree.ElementTree as ET
root = ET.Element("fastanime_registry")
@@ -313,71 +239,46 @@ def _export_xml(data: dict, output_path: Path, compress: bool, feedback):
elem = ET.SubElement(metadata_elem, key)
elem.text = str(value)
# Add statistics
stats_elem = ET.SubElement(root, "statistics")
for key, value in data["statistics"].items():
if value is not None:
elem = ET.SubElement(stats_elem, key)
elem.text = str(value)
# Add media
media_list_elem = ET.SubElement(root, "media_list")
for media in data["media"]:
media_elem = ET.SubElement(media_list_elem, "media")
media_elem.set("id", str(media["id"]))
# Add titles
titles_elem = ET.SubElement(media_elem, "titles")
for title_type, title_value in media["title"].items():
if title_value:
title_elem = ET.SubElement(titles_elem, title_type)
title_elem.text = title_value
# Add user status
status_elem = ET.SubElement(media_elem, "user_status")
for key, value in media["user_status"].items():
for key, value in media.items():
if value is not None:
elem = ET.SubElement(status_elem, key)
elem.text = str(value)
# Add metadata if included
if data["metadata"]["include_metadata"]:
for key, value in media.items():
if key not in ["id", "title", "user_status"] and value is not None:
if isinstance(value, list):
list_elem = ET.SubElement(media_elem, key)
for item in value:
item_elem = ET.SubElement(list_elem, "item")
item_elem.text = str(item)
elif isinstance(value, dict):
dict_elem = ET.SubElement(media_elem, key)
for sub_key, sub_value in value.items():
if sub_value is not None:
sub_elem = ET.SubElement(dict_elem, sub_key)
sub_elem.text = str(sub_value)
else:
elem = ET.SubElement(media_elem, key)
elem.text = str(value)
field_elem = ET.SubElement(media_elem, key)
field_elem.text = str(value)
# Write XML
tree = ET.ElementTree(root)
if compress:
import gzip
ET.indent(tree, space=" ", level=0) # Pretty print
tree.write(output_path, encoding="utf-8", xml_declaration=True)
with gzip.open(output_path, "wb") as f:
tree.write(f, encoding="utf-8", xml_declaration=True)
else:
tree.write(output_path, encoding="utf-8", xml_declaration=True)
def _compress_file(file_path: Path, feedback: FeedbackService):
"""Compresses a file using gzip and removes the original."""
import gzip
import shutil
compressed_path = file_path.with_suffix(file_path.suffix + ".gz")
try:
with open(file_path, "rb") as f_in:
with gzip.open(compressed_path, "wb") as f_out:
shutil.copyfileobj(f_in, f_out)
file_path.unlink() # Remove original file
except Exception as e:
feedback.warning("Compression Failed", f"Could not compress {file_path}: {e}")
def _format_file_size(file_path: Path) -> str:
"""Format file size in human-readable format."""
try:
size = file_path.stat().st_size
for unit in ["B", "KB", "MB", "GB"]:
if size < 1024.0:
return f"{size:.1f} {unit}"
size /= 1024.0
return f"{size:.1f} TB"
except:
size_bytes: float = float(file_path.stat().st_size)
if size_bytes < 1024.0:
return f"{int(size_bytes)} B"
for unit in ["KB", "MB", "GB"]:
if size_bytes < 1024.0:
return f"{size_bytes:.1f} {unit}"
size_bytes /= 1024.0
return f"{size_bytes:.1f} TB"
except FileNotFoundError:
return "Unknown size"

View File

@@ -2,17 +2,18 @@
Registry import command - import registry data from various formats
"""
import json
import csv
from pathlib import Path
import json
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, Optional
import click
from .....core.config import AppConfig
from .....libs.media_api.types import UserMediaListStatus, MediaItem, MediaTitle
from .....libs.media_api.types import MediaItem, MediaTitle, UserMediaListStatus
from ....service.feedback import FeedbackService
from ....service.registry.service import MediaRegistryService
from ....utils.feedback import create_feedback_manager
@click.command(name="import", help="Import registry data from various formats")
@@ -60,14 +61,14 @@ def import_(
Supports JSON, CSV, and XML formats exported by the export command
or compatible third-party tools.
"""
feedback = create_feedback_manager(config.general.icons)
feedback = FeedbackService(config)
try:
registry_service = MediaRegistryService(api, config.registry)
registry_service = MediaRegistryService(api, config.media_registry)
# Create backup if requested
if backup and not dry_run:
_create_backup(registry_service, feedback)
_create_backup(registry_service, feedback, api)
# Auto-detect format if needed
if input_format == "auto":
@@ -77,7 +78,7 @@ def import_(
)
# Parse input file
import_data = _parse_input_file(input_file, input_format, feedback)
import_data = _parse_input_file(input_file, input_format)
# Validate import data
_validate_import_data(import_data, force, feedback)
@@ -101,15 +102,17 @@ def import_(
raise click.Abort()
def _create_backup(registry_service, feedback):
def _create_backup(
registry_service: MediaRegistryService, feedback: FeedbackService, api: str
):
"""Create a backup before importing."""
from .export import _prepare_export_data, _export_json
from .export import _export_json, _prepare_export_data
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_path = Path(f"fastanime_registry_backup_{timestamp}.json")
backup_path = Path(f"fastanime_registry_pre_import_{api}_{timestamp}.json")
export_data = _prepare_export_data(registry_service, True, ())
_export_json(export_data, backup_path, False, feedback)
_export_json(export_data, backup_path)
feedback.info("Backup Created", f"Registry backed up to {backup_path}")
@@ -117,52 +120,70 @@ def _create_backup(registry_service, feedback):
def _detect_format(file_path: Path) -> str:
"""Auto-detect file format based on extension and content."""
extension = file_path.suffix.lower()
if extension in [".json", ".gz"]:
if ".gz" in file_path.suffixes:
return "json" # Assume gzipped jsons for now
if extension == ".json":
return "json"
elif extension == ".csv":
return "csv"
elif extension == ".xml":
return "xml"
# Try to detect by content
# Fallback to content detection
try:
with open(file_path, "r", encoding="utf-8") as f:
content = f.read(100).strip()
if content.startswith("{") or content.startswith("["):
if content.startswith(("{", "[")):
return "json"
elif content.startswith("<?xml") or content.startswith("<"):
return "xml"
elif "," in content: # Very basic CSV detection
elif "," in content:
return "csv"
except:
except Exception:
pass
raise click.ClickException(f"Could not detect format for {file_path}")
raise click.ClickException(f"Could not auto-detect format for {file_path}")
def _parse_input_file(file_path: Path, format_type: str, feedback) -> dict:
def _parse_input_file(file_path: Path, format_type: str) -> dict:
"""Parse input file based on format."""
if format_type == "json":
return _parse_json(file_path)
elif format_type == "csv":
if format_type == "csv":
return _parse_csv(file_path)
elif format_type == "xml":
if format_type == "xml":
return _parse_xml(file_path)
else:
raise click.ClickException(f"Unsupported format: {format_type}")
raise click.ClickException(f"Unsupported format: {format_type}")
def _safe_int(value: Optional[str]) -> Optional[int]:
if value is None or value == "":
return None
try:
return int(value)
except (ValueError, TypeError):
return None
def _safe_float(value: Optional[str]) -> Optional[float]:
if value is None or value == "":
return None
try:
return float(value)
except (ValueError, TypeError):
return None
def _parse_json(file_path: Path) -> dict:
"""Parse JSON input file."""
try:
if file_path.suffix.lower() == ".gz":
if ".gz" in file_path.suffixes:
import gzip
with gzip.open(file_path, "rt", encoding="utf-8") as f:
return json.load(f)
else:
with open(file_path, "r", encoding="utf-8") as f:
with file_path.open("r", encoding="utf-8") as f:
return json.load(f)
except json.JSONDecodeError as e:
raise click.ClickException(f"Invalid JSON format: {e}")
@@ -170,20 +191,13 @@ def _parse_json(file_path: Path) -> dict:
def _parse_csv(file_path: Path) -> dict:
"""Parse CSV input file."""
import_data = {
"metadata": {
"import_timestamp": datetime.now().isoformat(),
"source_format": "csv",
},
"media": [],
}
import_data = {"metadata": {"source_format": "csv"}, "media": []}
try:
with open(file_path, "r", encoding="utf-8") as f:
with file_path.open("r", encoding="utf-8", newline="") as f:
reader = csv.DictReader(f)
for row in reader:
media_data = {
"id": int(row["id"]) if row.get("id") else None,
media_data: Dict[str, Any] = {
"id": _safe_int(row.get("id")),
"title": {
"english": row.get("title_english"),
"romaji": row.get("title_romaji"),
@@ -191,235 +205,136 @@ def _parse_csv(file_path: Path) -> dict:
},
"user_status": {
"status": row.get("status"),
"progress": int(row["progress"])
if row.get("progress")
else None,
"score": float(row["score"]) if row.get("score") else None,
"progress": _safe_int(row.get("progress")),
"score": _safe_float(row.get("score")),
"last_watched": row.get("last_watched"),
"notes": row.get("notes"),
},
}
# Add metadata fields if present
if "format" in row:
if "format" in row: # Check if detailed metadata is present
media_data.update(
{
"format": row.get("format"),
"episodes": int(row["episodes"])
if row.get("episodes")
else None,
"duration": int(row["duration"])
if row.get("duration")
else None,
"episodes": _safe_int(row.get("episodes")),
"duration": _safe_int(row.get("duration")),
"media_status": row.get("media_status"),
"start_date": row.get("start_date"),
"end_date": row.get("end_date"),
"average_score": float(row["average_score"])
if row.get("average_score")
else None,
"popularity": int(row["popularity"])
if row.get("popularity")
else None,
"average_score": _safe_float(row.get("average_score")),
"popularity": _safe_int(row.get("popularity")),
"genres": row.get("genres", "").split(",")
if row.get("genres")
else [],
"description": row.get("description"),
}
)
import_data["media"].append(media_data)
except (ValueError, KeyError) as e:
except (ValueError, KeyError, csv.Error) as e:
raise click.ClickException(f"Invalid CSV format: {e}")
return import_data
def _parse_xml(file_path: Path) -> dict:
"""Parse XML input file."""
try:
import xml.etree.ElementTree as ET
except ImportError:
raise click.ClickException("XML import requires Python's xml module")
import xml.etree.ElementTree as ET
try:
tree = ET.parse(file_path)
root = tree.getroot()
import_data: Dict[str, Any] = {"metadata": {}, "media": []}
import_data = {"metadata": {}, "media": []}
# Parse metadata
metadata_elem = root.find("metadata")
if metadata_elem is not None:
for child in metadata_elem:
import_data["metadata"][child.tag] = child.text
# Parse media
media_list_elem = root.find("media_list")
if media_list_elem is not None:
for media_elem in media_list_elem.findall("media"):
media_data = {
"id": int(media_elem.get("id")),
"title": {},
"user_status": {},
}
# Parse titles
titles_elem = media_elem.find("titles")
if titles_elem is not None:
for title_elem in titles_elem:
media_data["title"][title_elem.tag] = title_elem.text
# Parse user status
status_elem = media_elem.find("user_status")
if status_elem is not None:
for child in status_elem:
value = child.text
if child.tag in ["progress", "score"] and value:
try:
value = (
float(value) if child.tag == "score" else int(value)
)
except ValueError:
pass
media_data["user_status"][child.tag] = value
# Parse other metadata
for child in media_elem:
if child.tag not in ["titles", "user_status"]:
if child.tag in ["episodes", "duration", "popularity"]:
try:
media_data[child.tag] = (
int(child.text) if child.text else None
)
except ValueError:
media_data[child.tag] = child.text
elif child.tag == "average_score":
try:
media_data[child.tag] = (
float(child.text) if child.text else None
)
except ValueError:
media_data[child.tag] = child.text
else:
media_data[child.tag] = child.text
import_data["media"].append(media_data)
for child in root.find("metadata") or []:
import_data["metadata"][child.tag] = child.text
for media_elem in root.find("media_list") or []:
media_data = {child.tag: child.text for child in media_elem}
# Reconstruct nested structures for consistency with other parsers
media_data["id"] = _safe_int(media_data.get("id"))
media_data["title"] = {
"english": media_data.pop("title_english", None),
"romaji": media_data.pop("title_romaji", None),
"native": media_data.pop("title_native", None),
}
media_data["user_status"] = {
"status": media_data.pop("user_status", None),
"progress": _safe_int(media_data.pop("user_progress", None)),
"score": _safe_float(media_data.pop("user_score", None)),
"last_watched": media_data.pop("user_last_watched", None),
"notes": media_data.pop("user_notes", None),
}
import_data["media"].append(media_data)
except ET.ParseError as e:
raise click.ClickException(f"Invalid XML format: {e}")
return import_data
def _validate_import_data(data: dict, force: bool, feedback):
def _validate_import_data(data: dict, force: bool, feedback: FeedbackService):
"""Validate import data structure and compatibility."""
if "media" not in data:
raise click.ClickException("Import data missing 'media' section")
if not isinstance(data["media"], list):
raise click.ClickException("'media' section must be a list")
# Check if any media entries exist
if "media" not in data or not isinstance(data["media"], list):
raise click.ClickException(
"Import data missing or has invalid 'media' section."
)
if not data["media"]:
feedback.warning("No Media", "Import file contains no media entries")
feedback.warning("No Media", "Import file contains no media entries.")
return
# Validate media entries
required_fields = ["id", "title"]
for i, media in enumerate(data["media"]):
for field in required_fields:
if field not in media:
raise click.ClickException(
f"Media entry {i} missing required field: {field}"
)
if "id" not in media or "title" not in media:
raise click.ClickException(
f"Media entry {i + 1} missing required 'id' or 'title' field."
)
if not isinstance(media.get("title"), dict):
raise click.ClickException(f"Media entry {i} has invalid title format")
raise click.ClickException(f"Media entry {i + 1} has invalid title format.")
feedback.info(
"Validation", f"Import data validated - {len(data['media'])} media entries"
"Validation",
f"Import data validated - {len(data['media'])} media entries found.",
)
def _import_data(registry_service, data: dict, merge: bool, dry_run: bool, feedback):
def _import_data(
registry_service: MediaRegistryService,
data: dict,
merge: bool,
dry_run: bool,
feedback: FeedbackService,
):
"""Import data into the registry."""
from .....libs.media_api.types import (
MediaFormat,
MediaType,
)
from .....libs.media_api.types import MediaFormat, MediaType
imported_count = 0
updated_count = 0
error_count = 0
status_map = {
"watching": UserMediaListStatus.WATCHING,
"completed": UserMediaListStatus.COMPLETED,
"planning": UserMediaListStatus.PLANNING,
"dropped": UserMediaListStatus.DROPPED,
"paused": UserMediaListStatus.PAUSED,
"repeating": UserMediaListStatus.REPEATING,
}
imported_count, updated_count, error_count = 0, 0, 0
status_map = {status.value: status for status in UserMediaListStatus}
for media_data in data["media"]:
try:
media_id = media_data["id"]
media_id = media_data.get("id")
if not media_id:
error_count += 1
continue
title_data = media_data.get("title", {})
title = MediaTitle(
english=title_data.get("english") or "",
romaji=title_data.get("romaji"),
native=title_data.get("native"),
)
# Create minimal MediaItem for registry
media_item = MediaItem(
id=media_id,
title=title,
type=MediaType.ANIME, # Default to anime
)
# Add additional metadata if available
if "format" in media_data and media_data["format"]:
try:
media_item.format = getattr(MediaFormat, media_data["format"])
except (AttributeError, TypeError):
pass
if "episodes" in media_data:
media_item.episodes = media_data["episodes"]
if "average_score" in media_data:
media_item.average_score = media_data["average_score"]
title = MediaTitle(**media_data.get("title", {}))
media_item = MediaItem(id=media_id, title=title, type=MediaType.ANIME)
if dry_run:
title_str = title.english or title.romaji or f"ID:{media_id}"
feedback.info("Would import", title_str)
feedback.info(
"Would import", title.english or title.romaji or f"ID:{media_id}"
)
imported_count += 1
continue
# Check if record exists
existing_record = registry_service.get_media_record(media_id)
if existing_record and not merge:
# Skip if not merging
continue
elif existing_record:
updated_count += 1
else:
imported_count += 1
# Create or update record
updated_count += 1 if existing_record else 0
imported_count += 1 if not existing_record else 0
record = registry_service.get_or_create_record(media_item)
registry_service.save_media_record(record)
# Update user status if provided
user_status = media_data.get("user_status", {})
if user_status.get("status"):
status_enum = status_map.get(user_status["status"].lower())
status_enum = status_map.get(str(user_status["status"]).lower())
if status_enum:
registry_service.update_media_index_entry(
media_id,
@@ -429,14 +344,12 @@ def _import_data(registry_service, data: dict, merge: bool, dry_run: bool, feedb
score=user_status.get("score"),
notes=user_status.get("notes"),
)
except Exception as e:
error_count += 1
feedback.warning(
"Import Error",
f"Failed to import media {media_data.get('id', 'unknown')}: {e}",
)
continue
if not dry_run:
feedback.info(

View File

@@ -2,16 +2,17 @@
Registry restore command - restore registry from backup files
"""
import json
import shutil
import tarfile
from pathlib import Path
from datetime import datetime
from pathlib import Path
import click
from .....core.config import AppConfig
from ....service.feedback import FeedbackService
from ....service.registry.service import MediaRegistryService
from ....utils.feedback import create_feedback_manager
@click.command(help="Restore registry from a backup file")
@@ -46,7 +47,7 @@ def restore(
Can restore from tar or zip backups created by the backup command.
Optionally creates a backup of the current registry before restoring.
"""
feedback = create_feedback_manager(config.general.icons)
feedback = FeedbackService(config)
try:
# Detect backup format
@@ -57,17 +58,20 @@ def restore(
if verify:
if not _verify_backup(backup_file, backup_format, feedback):
feedback.error(
"Verification Failed", "Backup file appears to be corrupted"
"Verification Failed",
"Backup file appears to be corrupted or invalid",
)
raise click.Abort()
feedback.success("Verification", "Backup file integrity verified")
# Check if current registry exists
registry_service = MediaRegistryService(api, config.registry)
registry_service = MediaRegistryService(api, config.media_registry)
registry_exists = _check_registry_exists(registry_service)
if registry_exists and not force:
if not click.confirm("Current registry exists. Continue with restore?"):
if not click.confirm(
"Current registry exists. This will overwrite it. Continue with restore?"
):
feedback.info("Restore Cancelled", "No changes were made")
return
@@ -87,7 +91,7 @@ def restore(
# Verify restored registry
try:
restored_service = MediaRegistryService(api, config.registry)
restored_service = MediaRegistryService(api, config.media_registry)
stats = restored_service.get_registry_stats()
feedback.info(
"Restored Registry",
@@ -105,65 +109,30 @@ def restore(
def _detect_backup_format(backup_file: Path) -> str:
"""Detect backup file format."""
if backup_file.suffix.lower() in [".tar", ".gz"]:
suffixes = "".join(backup_file.suffixes).lower()
if ".tar" in suffixes or ".gz" in suffixes or ".tgz" in suffixes:
return "tar"
elif backup_file.suffix.lower() == ".zip":
elif ".zip" in suffixes:
return "zip"
elif backup_file.name.endswith(".tar.gz"):
return "tar"
else:
# Try to detect by content
try:
with tarfile.open(backup_file, "r:*"):
return "tar"
except:
pass
try:
import zipfile
with zipfile.ZipFile(backup_file, "r"):
return "zip"
except:
pass
raise click.ClickException(f"Could not detect backup format for {backup_file}")
def _verify_backup(backup_file: Path, format_type: str, feedback) -> bool:
def _verify_backup(
backup_file: Path, format_type: str, feedback: FeedbackService
) -> bool:
"""Verify backup file integrity."""
try:
has_registry = has_index = has_metadata = False
if format_type == "tar":
with tarfile.open(backup_file, "r:*") as tar:
# Check if essential files exist
names = tar.getnames()
has_registry = any("registry/" in name for name in names)
has_index = any("index/" in name for name in names)
has_metadata = "backup_metadata.json" in names
if not (has_registry and has_index):
return False
# Try to read metadata if it exists
if has_metadata:
try:
metadata_member = tar.getmember("backup_metadata.json")
metadata_file = tar.extractfile(metadata_member)
if metadata_file:
import json
metadata = json.load(metadata_file)
feedback.info(
"Backup Info",
f"Created: {metadata.get('backup_timestamp', 'Unknown')}",
)
feedback.info(
"Backup Info",
f"Total Media: {metadata.get('total_media', 'Unknown')}",
)
except:
pass
metadata_member = tar.getmember("backup_metadata.json")
if metadata_file := tar.extractfile(metadata_member):
metadata = json.load(metadata_file)
else: # zip
import zipfile
@@ -172,44 +141,38 @@ def _verify_backup(backup_file: Path, format_type: str, feedback) -> bool:
has_registry = any("registry/" in name for name in names)
has_index = any("index/" in name for name in names)
has_metadata = "backup_metadata.json" in names
if not (has_registry and has_index):
return False
# Try to read metadata
if has_metadata:
try:
with zip_file.open("backup_metadata.json") as metadata_file:
import json
with zip_file.open("backup_metadata.json") as metadata_file:
metadata = json.load(metadata_file)
metadata = json.load(metadata_file)
feedback.info(
"Backup Info",
f"Created: {metadata.get('backup_timestamp', 'Unknown')}",
)
feedback.info(
"Backup Info",
f"Total Media: {metadata.get('total_media', 'Unknown')}",
)
except:
pass
if has_metadata:
feedback.info(
"Backup Info", f"Created: {metadata.get('backup_timestamp', 'Unknown')}"
)
feedback.info(
"Backup Info", f"Total Media: {metadata.get('total_media', 'Unknown')}"
)
return True
except Exception:
return has_registry and has_index
except (tarfile.ReadError, zipfile.BadZipFile, json.JSONDecodeError):
return False
except Exception as e:
feedback.warning("Verification Warning", f"Could not fully verify backup: {e}")
return False
def _check_registry_exists(registry_service) -> bool:
def _check_registry_exists(registry_service: MediaRegistryService) -> bool:
"""Check if a registry already exists."""
try:
stats = registry_service.get_registry_stats()
return stats.get("total_media", 0) > 0
except:
except Exception:
return False
def _backup_current_registry(registry_service, api: str, feedback):
def _backup_current_registry(
registry_service: MediaRegistryService, api: str, feedback: FeedbackService
):
"""Create backup of current registry before restoring."""
from .backup import _create_tar_backup
@@ -218,20 +181,21 @@ def _backup_current_registry(registry_service, api: str, feedback):
try:
_create_tar_backup(registry_service, backup_path, True, False, feedback, api)
feedback.info("Current Registry Backed Up", f"Saved to {backup_path}")
feedback.success("Current Registry Backed Up", f"Saved to {backup_path}")
except Exception as e:
feedback.warning("Backup Warning", f"Failed to backup current registry: {e}")
def _show_restore_summary(backup_file: Path, format_type: str, feedback):
def _show_restore_summary(
backup_file: Path, format_type: str, feedback: FeedbackService
):
"""Show summary of what will be restored."""
try:
file_count = media_files = 0
if format_type == "tar":
with tarfile.open(backup_file, "r:*") as tar:
members = tar.getmembers()
file_count = len([m for m in members if m.isfile()])
# Count media files
media_files = len(
[
m
@@ -239,15 +203,12 @@ def _show_restore_summary(backup_file: Path, format_type: str, feedback):
if m.name.startswith("registry/") and m.name.endswith(".json")
]
)
else: # zip
import zipfile
with zipfile.ZipFile(backup_file, "r") as zip_file:
info_list = zip_file.infolist()
file_count = len([info for info in info_list if not info.is_dir()])
# Count media files
media_files = len(
[
info
@@ -257,70 +218,69 @@ def _show_restore_summary(backup_file: Path, format_type: str, feedback):
]
)
feedback.info("Restore Preview", f"Will restore {file_count} files")
feedback.info("Media Records", f"Contains {media_files} media entries")
feedback.info(
"Restore Preview",
f"Backup contains {file_count} files, including {media_files} media entries.",
)
except Exception as e:
feedback.warning("Preview Error", f"Could not analyze backup: {e}")
def _perform_restore(
backup_file: Path, format_type: str, config: AppConfig, api: str, feedback
backup_file: Path,
format_type: str,
config: AppConfig,
api: str,
feedback: FeedbackService,
):
"""Perform the actual restore operation."""
# Create temporary extraction directory
temp_dir = Path(config.registry.media_dir.parent / "restore_temp")
temp_dir.mkdir(exist_ok=True)
temp_dir = Path(
config.media_registry.media_dir.parent
/ f"restore_temp_{datetime.now().timestamp()}"
)
temp_dir.mkdir(exist_ok=True, parents=True)
try:
# Extract backup
if format_type == "tar":
with tarfile.open(backup_file, "r:*") as tar:
tar.extractall(temp_dir)
else: # zip
import zipfile
with feedback.progress("Restoring from backup...") as (task_id, progress):
# 1. Extract backup
progress.update(task_id, description="Extracting backup...")
if format_type == "tar":
with tarfile.open(backup_file, "r:*") as tar:
tar.extractall(temp_dir)
else:
import zipfile
with zipfile.ZipFile(backup_file, "r") as zip_file:
zip_file.extractall(temp_dir)
with zipfile.ZipFile(backup_file, "r") as zip_file:
zip_file.extractall(temp_dir)
feedback.info("Extraction", "Backup extracted to temporary directory")
feedback.info("Extraction", "Backup extracted to temporary directory")
# 2. Prepare paths
registry_dir = config.media_registry.media_dir / api
index_dir = config.media_registry.index_dir
cache_dir = config.media_registry.media_dir.parent / "cache"
# Remove existing registry if it exists
registry_dir = config.registry.media_dir / api
index_dir = config.registry.index_dir
if registry_dir.exists():
shutil.rmtree(registry_dir)
feedback.info("Cleanup", "Removed existing registry data")
if index_dir.exists():
shutil.rmtree(index_dir)
feedback.info("Cleanup", "Removed existing index data")
# Move extracted files to proper locations
extracted_registry = temp_dir / "registry" / api
extracted_index = temp_dir / "index"
if extracted_registry.exists():
shutil.move(str(extracted_registry), str(registry_dir))
feedback.info("Restore", "Registry data restored")
if extracted_index.exists():
shutil.move(str(extracted_index), str(index_dir))
feedback.info("Restore", "Index data restored")
# Restore cache if it exists
extracted_cache = temp_dir / "cache"
if extracted_cache.exists():
cache_dir = config.registry.media_dir.parent / "cache"
# 3. Clean existing data
progress.update(task_id, description="Cleaning existing registry...")
if registry_dir.exists():
shutil.rmtree(registry_dir)
if index_dir.exists():
shutil.rmtree(index_dir)
if cache_dir.exists():
shutil.rmtree(cache_dir)
shutil.move(str(extracted_cache), str(cache_dir))
feedback.info("Restore", "Cache data restored")
feedback.info("Cleanup", "Removed existing registry, index, and cache data")
# 4. Move extracted files
progress.update(task_id, description="Moving new files into place...")
if (extracted_registry := temp_dir / "registry" / api).exists():
shutil.move(str(extracted_registry), str(registry_dir))
if (extracted_index := temp_dir / "index").exists():
shutil.move(str(extracted_index), str(index_dir))
if (extracted_cache := temp_dir / "cache").exists():
shutil.move(str(extracted_cache), str(cache_dir))
progress.update(task_id, description="Finalizing...")
finally:
# Clean up temporary directory
if temp_dir.exists():
shutil.rmtree(temp_dir)
feedback.info("Cleanup", "Temporary files removed")

View File

@@ -2,15 +2,26 @@
Registry search command - search through the local media registry
"""
import json
from typing import TYPE_CHECKING, List
import click
from rich.console import Console
from rich.table import Table
from .....core.config import AppConfig
from .....libs.media_api.params import MediaSearchParams
from .....libs.media_api.types import MediaSort, UserMediaListStatus
from .....libs.media_api.types import (
MediaFormat,
MediaGenre,
MediaSort,
UserMediaListStatus,
)
from ....service.feedback import FeedbackService
from ....service.registry.service import MediaRegistryService
from ....utils.feedback import create_feedback_manager
if TYPE_CHECKING:
from .....libs.media_api.types import MediaItem, MediaSearchResult
@click.command(help="Search through the local media registry")
@@ -18,7 +29,7 @@ from ....utils.feedback import create_feedback_manager
@click.option(
"--status",
type=click.Choice(
["watching", "completed", "planning", "dropped", "paused", "repeating"],
[s.value for s in UserMediaListStatus],
case_sensitive=False,
),
help="Filter by watch status",
@@ -29,7 +40,11 @@ from ....utils.feedback import create_feedback_manager
@click.option(
"--format",
type=click.Choice(
["TV", "TV_SHORT", "MOVIE", "SPECIAL", "OVA", "ONA", "MUSIC"],
[
f.value
for f in MediaFormat
if f not in [MediaFormat.MANGA, MediaFormat.NOVEL, MediaFormat.ONE_SHOT]
],
case_sensitive=False,
),
help="Filter by format",
@@ -77,28 +92,25 @@ def search(
You can search by title and filter by various criteria like status,
genre, format, year, and score range.
"""
feedback = create_feedback_manager(config.general.icons)
feedback = FeedbackService(config)
console = Console()
try:
registry_service = MediaRegistryService(api, config.registry)
registry_service = MediaRegistryService(api, config.media_registry)
# Build search parameters
search_params = _build_search_params(
query, status, genre, format, year, min_score, max_score, sort, limit
)
# Perform search
result = registry_service.search_for_media(search_params)
with feedback.progress("Searching local registry..."):
result = registry_service.search_for_media(search_params)
if not result or not result.media:
feedback.info("No Results", "No media found matching your criteria")
return
if output_json:
import json
print(json.dumps(result.model_dump(), indent=2, default=str))
print(json.dumps(result.model_dump(mode="json"), indent=2))
return
_display_search_results(console, result, config.general.icons)
@@ -109,24 +121,17 @@ def search(
def _build_search_params(
query, status, genre, format, year, min_score, max_score, sort, limit
query: str | None,
status: str | None,
genre: tuple[str, ...],
format_str: str | None,
year: int | None,
min_score: float | None,
max_score: float | None,
sort: str,
limit: int,
) -> MediaSearchParams:
"""Build MediaSearchParams from command options."""
# Convert status string to enum
status_enum = None
if status:
status_map = {
"watching": UserMediaListStatus.WATCHING,
"completed": UserMediaListStatus.COMPLETED,
"planning": UserMediaListStatus.PLANNING,
"dropped": UserMediaListStatus.DROPPED,
"paused": UserMediaListStatus.PAUSED,
"repeating": UserMediaListStatus.REPEATING,
}
status_enum = status_map.get(status.lower())
# Convert sort string to enum
"""Build MediaSearchParams from command options for local filtering."""
sort_map = {
"title": MediaSort.TITLE_ROMAJI,
"score": MediaSort.SCORE_DESC,
@@ -135,74 +140,52 @@ def _build_search_params(
"episodes": MediaSort.EPISODES_DESC,
"updated": MediaSort.UPDATED_AT_DESC,
}
sort_enum = sort_map.get(sort.lower(), MediaSort.TITLE_ROMAJI)
# Convert format string to enum if provided
format_enum = None
if format:
from .....libs.media_api.types import MediaFormat
# Safely convert strings to enums
format_enum = next(
(f for f in MediaFormat if f.value.lower() == (format_str or "").lower()), None
)
genre_enums = [
g for g_str in genre for g in MediaGenre if g.value.lower() == g_str.lower()
]
format_enum = getattr(MediaFormat, format.upper(), None)
# Convert genre strings to enums
genre_enums = []
if genre:
from .....libs.media_api.types import MediaGenre
for g in genre:
# Try to find matching genre enum
for genre_enum in MediaGenre:
if genre_enum.value.lower() == g.lower():
genre_enums.append(genre_enum)
break
# Note: Local search handles status separately as it's part of the index, not MediaItem
return MediaSearchParams(
query=query,
per_page=limit,
sort=[sort_enum],
averageScore_greater=min_score * 10
if min_score
else None, # Convert to AniList scale
averageScore_lesser=max_score * 10 if max_score else None,
genre_in=genre_enums if genre_enums else None,
sort=[sort_map.get(sort.lower(), MediaSort.TITLE_ROMAJI)],
averageScore_greater=int(min_score * 10) if min_score is not None else None,
averageScore_lesser=int(max_score * 10) if max_score is not None else None,
genre_in=genre_enums or None,
format_in=[format_enum] if format_enum else None,
seasonYear=year,
# We'll handle status filtering differently since it's user-specific
)
def _display_search_results(console: Console, result, icons: bool):
def _display_search_results(console: Console, result: "MediaSearchResult", icons: bool):
"""Display search results in a formatted table."""
table = Table(
title=f"{'🔍 ' if icons else ''}Search Results ({len(result.media)} found)"
)
table.add_column("Title", style="cyan", min_width=30)
table.add_column("Year", style="dim", justify="center", min_width=6)
table.add_column("Format", style="magenta", justify="center", min_width=8)
table.add_column("Episodes", style="green", justify="center", min_width=8)
table.add_column("Score", style="yellow", justify="center", min_width=6)
table.add_column("Status", style="blue", justify="center", min_width=10)
table.add_column("Progress", style="white", justify="center", min_width=8)
table.add_column("Title", style="cyan", min_width=30, overflow="ellipsis")
table.add_column("Year", style="dim", justify="center")
table.add_column("Format", style="magenta", justify="center")
table.add_column("Episodes", style="green", justify="center")
table.add_column("Score", style="yellow", justify="center")
table.add_column("Status", style="blue", justify="center")
table.add_column("Progress", style="white", justify="center")
for media in result.media:
# Get title (prefer English, fallback to Romaji)
title = media.title.english or media.title.romaji or "Unknown"
if len(title) > 40:
title = title[:37] + "..."
year = str(media.start_date.year) if media.start_date else "N/A"
episodes_total = str(media.episodes) if media.episodes else "?"
score = (
f"{media.average_score / 10:.1f}"
if media.average_score is not None
else "N/A"
)
# Get year from start date
year = ""
if media.start_date:
year = str(media.start_date.year)
# Format episodes
episodes = str(media.episodes) if media.episodes else "?"
# Format score
score = f"{media.average_score / 10:.1f}" if media.average_score else "N/A"
# Get user status
status = "Not Listed"
progress = "0"
if media.user_status:
@@ -211,13 +194,13 @@ def _display_search_results(console: Console, result, icons: bool):
if media.user_status.status
else "Unknown"
)
progress = f"{media.user_status.progress or 0}/{episodes}"
progress = f"{media.user_status.progress or 0}/{episodes_total}"
table.add_row(
title,
year,
media.format.value if media.format else "Unknown",
episodes,
media.format.value if media.format else "N/A",
episodes_total,
score,
status,
progress,
@@ -225,8 +208,7 @@ def _display_search_results(console: Console, result, icons: bool):
console.print(table)
# Show pagination info if applicable
if result.page_info.total > len(result.media):
if result.page_info and result.page_info.total > len(result.media):
console.print(
f"\n[dim]Showing {len(result.media)} of {result.page_info.total} total results[/dim]"
)