Files
grateful-journal/backend/scripts/migrate_data.py
2026-03-05 12:43:44 +05:30

249 lines
8.5 KiB
Python

"""
MongoDB Data Migration Script
Migrates data from the old schema to the new refactored schema.
Changes performed:
1. Deduplicate users by email (keep oldest)
2. Convert entries.userId from string to ObjectId
3. Add entryDate field to entries (defaults to createdAt)
4. Add encryption metadata to entries
5. Create compound indexes
Usage:
python backend/scripts/migrate_data.py
IMPORTANT: Backup your database before running this script!
mongodump --db grateful_journal_old --out ./backup
"""
from pymongo import MongoClient
from bson import ObjectId
from datetime import datetime
from config import get_settings
from typing import Dict, List, Set
import sys
def migrate_data():
"""Perform complete data migration."""
settings = get_settings()
client = MongoClient(settings.mongodb_uri)
db = client[settings.mongodb_db_name]
print(f"✓ Connected to MongoDB: {settings.mongodb_db_name}\n")
# ========== STEP 1: DEDUPLICATE USERS ==========
print("=" * 70)
print("STEP 1: Deduplicating Users (keeping oldest)")
print("=" * 70)
duplicate_count = 0
user_mapping = {} # Maps old duplicates to canonical user ID
# Group users by email
email_groups = {}
for user in db.users.find():
email = user["email"]
if email not in email_groups:
email_groups[email] = []
email_groups[email].append(user)
# Process each email group
for email, users in email_groups.items():
if len(users) > 1:
# Sort by createdAt, keep oldest
users.sort(key=lambda u: u["createdAt"])
canonical_user = users[0]
canonical_id = canonical_user["_id"]
print(f"\n📧 Email: {email}")
print(f" Found {len(users)} duplicate users")
print(f" Keeping (earliest): {canonical_id}")
# Map all other users to canonical
for dup_user in users[1:]:
dup_id = dup_user["_id"]
user_mapping[str(dup_id)] = canonical_id
duplicate_count += 1
print(f" Deleting (later): {dup_id}")
# Delete duplicate users
for user in users[1:]:
db.users.delete_one({"_id": user["_id"]})
if duplicate_count == 0:
print("\n✓ No duplicate users found")
else:
print(f"\n✓ Removed {duplicate_count} duplicate users")
# ========== STEP 2: MIGRATE ENTRIES ==========
print("\n" + "=" * 70)
print("STEP 2: Migrating Entries (userId string → ObjectId, add entryDate)")
print("=" * 70)
total_entries = db.entries.count_documents({})
entries_updated = 0
entries_with_issues = []
print(f"\nTotal entries to process: {total_entries}\n")
for entry in db.entries.find():
try:
entry_id = entry["_id"]
old_user_id_str = entry.get("userId", "")
# Convert userId: string → ObjectId
if isinstance(old_user_id_str, str):
# Check if this userId is in the duplicate mapping
if old_user_id_str in user_mapping:
new_user_id = user_mapping[old_user_id_str]
print(f" → Entry {entry_id}: userId mapped {old_user_id_str[:8]}... → {str(new_user_id)[:8]}...")
else:
new_user_id = ObjectId(old_user_id_str)
update_data = {
"userId": new_user_id,
}
else:
# Already an ObjectId
new_user_id = old_user_id_str
update_data = {}
# Add entryDate if missing (default to createdAt)
if "entryDate" not in entry:
entry_date = entry.get("createdAt", datetime.utcnow())
# Set to start of day
entry_date = entry_date.replace(hour=0, minute=0, second=0, microsecond=0)
update_data["entryDate"] = entry_date
# Add encryption metadata if missing
if "encryption" not in entry:
update_data["encryption"] = {
"encrypted": False,
"iv": None,
"algorithm": None
}
# Perform update if there are changes
if update_data:
update_data["updatedAt"] = datetime.utcnow()
db.entries.update_one(
{"_id": entry_id},
{"$set": update_data}
)
entries_updated += 1
if entries_updated % 100 == 0:
print(f" ✓ Processed {entries_updated}/{total_entries} entries")
except Exception as e:
entries_with_issues.append({
"entry_id": str(entry_id),
"error": str(e)
})
print(f" ⚠ Error processing entry {entry_id}: {e}")
print(f"\n✓ Updated {entries_updated}/{total_entries} entries")
if entries_with_issues:
print(f"\n{len(entries_with_issues)} entries had issues:")
for issue in entries_with_issues[:5]: # Show first 5
print(f" - {issue['entry_id']}: {issue['error']}")
# ========== STEP 3: VERIFY DATA INTEGRITY ==========
print("\n" + "=" * 70)
print("STEP 3: Verifying Data Integrity")
print("=" * 70)
# Check for orphaned entries (userId doesn't exist in users)
orphaned_count = 0
users_ids = set(str(u["_id"]) for u in db.users.find({}, {"_id": 1}))
for entry in db.entries.find({}, {"userId": 1}):
user_id = entry.get("userId")
if isinstance(user_id, ObjectId):
user_id = str(user_id)
if user_id not in users_ids:
orphaned_count += 1
print(f"\nUsers collection: {db.users.count_documents({})}")
print(f"Entries collection: {db.entries.count_documents({})}")
if orphaned_count > 0:
print(f"\n⚠ WARNING: Found {orphaned_count} orphaned entries (no corresponding user)")
else:
print(f"✓ All entries have valid user references")
# Sample entry check
sample_entry = db.entries.find_one()
if sample_entry:
print(f"\nSample entry structure:")
print(f" _id (entry): {sample_entry['_id']} (ObjectId: {isinstance(sample_entry['_id'], ObjectId)})")
print(f" userId: {sample_entry.get('userId')} (ObjectId: {isinstance(sample_entry.get('userId'), ObjectId)})")
print(f" entryDate present: {'entryDate' in sample_entry}")
print(f" encryption present: {'encryption' in sample_entry}")
if "entryDate" in sample_entry:
print(f" → entryDate: {sample_entry['entryDate'].isoformat()}")
if "encryption" in sample_entry:
print(f" → encryption: {sample_entry['encryption']}")
# ========== SUMMARY ==========
print(f"\n{'='*70}")
print("✓ Migration Complete")
print(f"{'='*70}")
print(f"Duplicate users removed: {duplicate_count}")
print(f"Entries migrated: {entries_updated}")
print(f"Orphaned entries found: {orphaned_count}")
if orphaned_count == 0:
print("\n✓ Data integrity verified successfully!")
else:
print(f"\n⚠ Please review {orphaned_count} orphaned entries")
client.close()
print("\n✓ Disconnected from MongoDB")
def rollback_warning():
"""Display rollback warning."""
print("\n" + "!" * 70)
print("⚠ IMPORTANT REMINDERS")
print("!" * 70)
print("""
This script modifies your MongoDB database. Before running:
1. BACKUP YOUR DATABASE:
mongodump --db grateful_journal --out ./backup-$(date +%Y%m%d)
2. TEST IN DEVELOPMENT first
3. This migration includes:
- Removing duplicate users
- Converting userId field types
- Adding new entryDate field
- Adding encryption metadata
4. All changes are permanent unless you restore from backup
5. This script is idempotent for most operations (safe to run multiple times)
but the deduplication will only work on the first run.
""")
if __name__ == "__main__":
rollback_warning()
response = input("\nDo you want to proceed with migration? (yes/no): ").strip().lower()
if response != "yes":
print("Migration cancelled.")
sys.exit(0)
try:
migrate_data()
except Exception as e:
print(f"\n✗ Migration failed with error:")
print(f" {e}")
sys.exit(1)