""" MongoDB Data Migration Script Migrates data from the old schema to the new refactored schema. Changes performed: 1. Deduplicate users by email (keep oldest) 2. Convert entries.userId from string to ObjectId 3. Add entryDate field to entries (defaults to createdAt) 4. Add encryption metadata to entries 5. Create compound indexes Usage: python backend/scripts/migrate_data.py IMPORTANT: Backup your database before running this script! mongodump --db grateful_journal_old --out ./backup """ from pymongo import MongoClient from bson import ObjectId from datetime import datetime from config import get_settings from typing import Dict, List, Set import sys def migrate_data(): """Perform complete data migration.""" settings = get_settings() client = MongoClient(settings.mongodb_uri) db = client[settings.mongodb_db_name] print(f"āœ“ Connected to MongoDB: {settings.mongodb_db_name}\n") # ========== STEP 1: DEDUPLICATE USERS ========== print("=" * 70) print("STEP 1: Deduplicating Users (keeping oldest)") print("=" * 70) duplicate_count = 0 user_mapping = {} # Maps old duplicates to canonical user ID # Group users by email email_groups = {} for user in db.users.find(): email = user["email"] if email not in email_groups: email_groups[email] = [] email_groups[email].append(user) # Process each email group for email, users in email_groups.items(): if len(users) > 1: # Sort by createdAt, keep oldest users.sort(key=lambda u: u["createdAt"]) canonical_user = users[0] canonical_id = canonical_user["_id"] print(f"\nšŸ“§ Email: {email}") print(f" Found {len(users)} duplicate users") print(f" Keeping (earliest): {canonical_id}") # Map all other users to canonical for dup_user in users[1:]: dup_id = dup_user["_id"] user_mapping[str(dup_id)] = canonical_id duplicate_count += 1 print(f" Deleting (later): {dup_id}") # Delete duplicate users for user in users[1:]: db.users.delete_one({"_id": user["_id"]}) if duplicate_count == 0: print("\nāœ“ No duplicate users found") else: print(f"\nāœ“ Removed {duplicate_count} duplicate users") # ========== STEP 2: MIGRATE ENTRIES ========== print("\n" + "=" * 70) print("STEP 2: Migrating Entries (userId string → ObjectId, add entryDate)") print("=" * 70) total_entries = db.entries.count_documents({}) entries_updated = 0 entries_with_issues = [] print(f"\nTotal entries to process: {total_entries}\n") for entry in db.entries.find(): try: entry_id = entry["_id"] old_user_id_str = entry.get("userId", "") # Convert userId: string → ObjectId if isinstance(old_user_id_str, str): # Check if this userId is in the duplicate mapping if old_user_id_str in user_mapping: new_user_id = user_mapping[old_user_id_str] print( f" → Entry {entry_id}: userId mapped {old_user_id_str[:8]}... → {str(new_user_id)[:8]}...") else: new_user_id = ObjectId(old_user_id_str) update_data = { "userId": new_user_id, } else: # Already an ObjectId new_user_id = old_user_id_str update_data = {} # Add entryDate if missing (default to createdAt) if "entryDate" not in entry: entry_date = entry.get("createdAt", datetime.utcnow()) # Set to start of day entry_date = entry_date.replace( hour=0, minute=0, second=0, microsecond=0) update_data["entryDate"] = entry_date # Add encryption metadata if missing if "encryption" not in entry: update_data["encryption"] = { "encrypted": False, "iv": None, "algorithm": None } # Perform update if there are changes if update_data: update_data["updatedAt"] = datetime.utcnow() db.entries.update_one( {"_id": entry_id}, {"$set": update_data} ) entries_updated += 1 if entries_updated % 100 == 0: print( f" āœ“ Processed {entries_updated}/{total_entries} entries") except Exception as e: entries_with_issues.append({ "entry_id": str(entry_id), "error": str(e) }) print(f" ⚠ Error processing entry {entry_id}: {e}") print(f"\nāœ“ Updated {entries_updated}/{total_entries} entries") if entries_with_issues: print(f"\n⚠ {len(entries_with_issues)} entries had issues:") for issue in entries_with_issues[:5]: # Show first 5 print(f" - {issue['entry_id']}: {issue['error']}") # ========== STEP 3: VERIFY DATA INTEGRITY ========== print("\n" + "=" * 70) print("STEP 3: Verifying Data Integrity") print("=" * 70) # Check for orphaned entries (userId doesn't exist in users) orphaned_count = 0 users_ids = set(str(u["_id"]) for u in db.users.find({}, {"_id": 1})) for entry in db.entries.find({}, {"userId": 1}): user_id = entry.get("userId") if isinstance(user_id, ObjectId): user_id = str(user_id) if user_id not in users_ids: orphaned_count += 1 print(f"\nUsers collection: {db.users.count_documents({})}") print(f"Entries collection: {db.entries.count_documents({})}") if orphaned_count > 0: print( f"\n⚠ WARNING: Found {orphaned_count} orphaned entries (no corresponding user)") else: print(f"āœ“ All entries have valid user references") # Sample entry check sample_entry = db.entries.find_one() if sample_entry: print(f"\nSample entry structure:") print( f" _id (entry): {sample_entry['_id']} (ObjectId: {isinstance(sample_entry['_id'], ObjectId)})") print( f" userId: {sample_entry.get('userId')} (ObjectId: {isinstance(sample_entry.get('userId'), ObjectId)})") print(f" entryDate present: {'entryDate' in sample_entry}") print(f" encryption present: {'encryption' in sample_entry}") if "entryDate" in sample_entry: print(f" → entryDate: {sample_entry['entryDate'].isoformat()}") if "encryption" in sample_entry: print(f" → encryption: {sample_entry['encryption']}") # ========== SUMMARY ========== print(f"\n{'='*70}") print("āœ“ Migration Complete") print(f"{'='*70}") print(f"Duplicate users removed: {duplicate_count}") print(f"Entries migrated: {entries_updated}") print(f"Orphaned entries found: {orphaned_count}") if orphaned_count == 0: print("\nāœ“ Data integrity verified successfully!") else: print(f"\n⚠ Please review {orphaned_count} orphaned entries") client.close() print("\nāœ“ Disconnected from MongoDB") def rollback_warning(): """Display rollback warning.""" print("\n" + "!" * 70) print("⚠ IMPORTANT REMINDERS") print("!" * 70) print(""" This script modifies your MongoDB database. Before running: 1. BACKUP YOUR DATABASE: mongodump --db grateful_journal --out ./backup-$(date +%Y%m%d) 2. TEST IN DEVELOPMENT first 3. This migration includes: - Removing duplicate users - Converting userId field types - Adding new entryDate field - Adding encryption metadata 4. All changes are permanent unless you restore from backup 5. This script is idempotent for most operations (safe to run multiple times) but the deduplication will only work on the first run. """) if __name__ == "__main__": rollback_warning() response = input( "\nDo you want to proceed with migration? (yes/no): ").strip().lower() if response != "yes": print("Migration cancelled.") sys.exit(0) try: migrate_data() except Exception as e: print(f"\nāœ— Migration failed with error:") print(f" {e}") sys.exit(1)