158 lines
6.0 KiB
Python
158 lines
6.0 KiB
Python
import os
|
|
import re
|
|
import argparse
|
|
from dotenv import load_dotenv
|
|
|
|
import generated.immich.openapi_client as openapi_client
|
|
from generated.immich.openapi_client.rest import ApiException
|
|
|
|
# Parse command line arguments
|
|
parser = argparse.ArgumentParser(description='Export albums from Immich to local storage')
|
|
parser.add_argument('--dry-run', action='store_true', help='Print actions without writing files')
|
|
args = parser.parse_args()
|
|
|
|
load_dotenv()
|
|
|
|
configuration = openapi_client.Configuration(
|
|
host = os.environ['IMMICH_HOST'],
|
|
api_key = { 'api_key': os.environ['IMMICH_API_KEY'] },
|
|
)
|
|
|
|
# Set target directory from env var or use default
|
|
target_dir = os.getenv('TARGET_DIR', './export')
|
|
if not os.path.exists(target_dir) and not args.dry_run:
|
|
os.makedirs(target_dir)
|
|
print(f"Created target directory: {target_dir}")
|
|
elif not os.path.exists(target_dir) and args.dry_run:
|
|
print(f"[DRY RUN] Would create target directory: {target_dir}")
|
|
|
|
# Function to sanitize filenames
|
|
def sanitize_filename(filename):
|
|
# Replace invalid characters with underscore
|
|
sanitized = re.sub(r'[\\/*?:"<>|]', '_', filename)
|
|
# Remove leading/trailing whitespace
|
|
sanitized = sanitized.strip()
|
|
# Replace empty filename with default
|
|
if not sanitized:
|
|
sanitized = "unnamed"
|
|
return sanitized
|
|
|
|
def save_asset(asset, asset_dir):
|
|
# Create filename from original filename or asset ID if not available
|
|
asset_filename = asset.original_file_name if asset.original_file_name else f"{asset.id}.jpg"
|
|
sanitized_asset_filename = sanitize_filename(asset_filename)
|
|
asset_path = os.path.join(asset_dir, sanitized_asset_filename)
|
|
count = 1
|
|
while asset_path in asset_paths:
|
|
# Duplicate filename, add a number to the end
|
|
asset_path = os.path.join(asset_dir, f"{sanitized_asset_filename} ({count})")
|
|
count += 1
|
|
|
|
# Skip if file already exists
|
|
if os.path.exists(asset_path):
|
|
print(f" Skipping existing file: {sanitized_asset_filename}")
|
|
return
|
|
|
|
# Save the asset to the album folder
|
|
if not args.dry_run:
|
|
# Download the asset
|
|
asset_data = assets_api.download_asset(asset.id)
|
|
with open(asset_path, 'wb') as f:
|
|
f.write(asset_data)
|
|
print(f" Saved asset to: {asset_path}")
|
|
else:
|
|
print(f" [DRY RUN] Would save asset to: {asset_path}")
|
|
|
|
return asset_path
|
|
|
|
album_count = 0
|
|
copied_assets = set()
|
|
asset_paths = set()
|
|
|
|
# Enter a context with an instance of the API client
|
|
with openapi_client.ApiClient(configuration) as immich_client:
|
|
# Create instances of the API classes
|
|
albums_api = openapi_client.AlbumsApi(immich_client)
|
|
assets_api = openapi_client.AssetsApi(immich_client)
|
|
|
|
try:
|
|
all_albums = albums_api.get_all_albums()
|
|
print(f"Found {len(all_albums)} albums")
|
|
|
|
# # Limit to 5 albums for debugging
|
|
# albums_to_process = all_albums[:5]
|
|
# print(f"Processing {len(albums_to_process)} albums for debugging")
|
|
albums_to_process = all_albums
|
|
|
|
for album in albums_to_process:
|
|
sanitized_album_name = sanitize_filename(album.album_name)
|
|
album_path = os.path.join(target_dir, sanitized_album_name)
|
|
if not os.path.exists(album_path):
|
|
if not args.dry_run:
|
|
os.makedirs(album_path)
|
|
print(f"Created directory for album: {sanitized_album_name}")
|
|
else:
|
|
print(f"[DRY RUN] Would create directory for album: {sanitized_album_name}")
|
|
|
|
try:
|
|
# Get detailed album info with assets
|
|
album_info = albums_api.get_album_info(album.id)
|
|
print(f"Album '{sanitized_album_name}' has {len(album_info.assets)} assets")
|
|
|
|
# Download each asset in the album
|
|
for asset in album_info.assets:
|
|
try:
|
|
save_asset(asset, album_path)
|
|
|
|
# Add a small delay to avoid overwhelming the server
|
|
# time.sleep(0.5)
|
|
|
|
copied_assets.add(asset.id)
|
|
|
|
except ApiException as e:
|
|
print(f" Error downloading asset {asset.id}: {e}")
|
|
except Exception as e:
|
|
print(f" Unexpected error downloading asset {asset.id}: {e}")
|
|
|
|
album_count += 1
|
|
|
|
except ApiException as e:
|
|
print(f"Error getting album info for {album.id}: {e}")
|
|
except Exception as e:
|
|
print(f"Unexpected error processing album {album.id}: {e}")
|
|
|
|
print(f"Processing assets not in albums...")
|
|
search_api = openapi_client.SearchApi(immich_client)
|
|
search_query = openapi_client.MetadataSearchDto(
|
|
isNotInAlbum=True
|
|
)
|
|
assets_not_in_albums = search_api.search_assets(search_query)
|
|
while assets_not_in_albums:
|
|
for asset in assets_not_in_albums.assets.items:
|
|
if asset.id in copied_assets:
|
|
continue
|
|
# Create a folder for the asset's year (i.e. Misc {{year}}) if it does not exist yet
|
|
year_folder_path = os.path.join(target_dir, f"Misc {asset.local_date_time.year}") if asset.local_date_time else "Misc"
|
|
if not os.path.exists(year_folder_path):
|
|
if not args.dry_run:
|
|
os.makedirs(year_folder_path)
|
|
print(f"Created directory for asset: {asset.id}")
|
|
|
|
save_asset(asset, year_folder_path)
|
|
copied_assets.add(asset.id)
|
|
|
|
if assets_not_in_albums.assets.next_page is not None:
|
|
search_query.page = int(assets_not_in_albums.assets.next_page)
|
|
assets_not_in_albums = search_api.search_assets(search_query)
|
|
else:
|
|
assets_not_in_albums = None
|
|
|
|
except ApiException as e:
|
|
print(f"Exception while retrieving albums: {e}")
|
|
except Exception as e:
|
|
print(f"Unexpected error: {e}")
|
|
|
|
|
|
|
|
print(f"Finished processing {album_count} albums with {len(copied_assets)} assets")
|