455 lines
18 KiB
Python
455 lines
18 KiB
Python
"""Tests for journal entry endpoints (/api/entries/*)."""
|
|
import pytest
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Shared helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
VALID_ENCRYPTION = {
|
|
"encrypted": True,
|
|
"ciphertext": "dGVzdF9jaXBoZXJ0ZXh0", # base64("test_ciphertext")
|
|
"nonce": "dGVzdF9ub25jZQ==", # base64("test_nonce")
|
|
"algorithm": "XSalsa20-Poly1305",
|
|
}
|
|
|
|
|
|
@pytest.fixture
|
|
def user(client):
|
|
"""Register and return a test user."""
|
|
c, _ = client
|
|
response = c.post("/api/users/register", json={"email": "entry_test@example.com"})
|
|
return response.json()
|
|
|
|
|
|
@pytest.fixture
|
|
def entry(client, user):
|
|
"""Create and return a test entry."""
|
|
c, _ = client
|
|
response = c.post(f"/api/entries/{user['id']}", json={"encryption": VALID_ENCRYPTION})
|
|
assert response.status_code == 200
|
|
return response.json()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# POST /api/entries/{user_id}
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestCreateEntry:
|
|
def test_create_encrypted_entry_returns_200(self, client, user):
|
|
c, _ = client
|
|
response = c.post(f"/api/entries/{user['id']}", json={"encryption": VALID_ENCRYPTION})
|
|
assert response.status_code == 200
|
|
|
|
def test_create_entry_returns_id_and_message(self, client, user):
|
|
c, _ = client
|
|
response = c.post(f"/api/entries/{user['id']}", json={"encryption": VALID_ENCRYPTION})
|
|
data = response.json()
|
|
assert "id" in data
|
|
assert data["message"] == "Entry created successfully"
|
|
|
|
def test_create_entry_with_mood(self, client, user):
|
|
c, _ = client
|
|
response = c.post(f"/api/entries/{user['id']}", json={
|
|
"encryption": VALID_ENCRYPTION,
|
|
"mood": "grateful",
|
|
})
|
|
assert response.status_code == 200
|
|
|
|
def test_create_entry_with_invalid_mood_returns_422(self, client, user):
|
|
c, _ = client
|
|
response = c.post(f"/api/entries/{user['id']}", json={
|
|
"encryption": VALID_ENCRYPTION,
|
|
"mood": "ecstatic", # Not in MoodEnum
|
|
})
|
|
assert response.status_code == 422
|
|
|
|
def test_create_entry_with_tags(self, client, user):
|
|
c, _ = client
|
|
response = c.post(f"/api/entries/{user['id']}", json={
|
|
"encryption": VALID_ENCRYPTION,
|
|
"tags": ["family", "gratitude"],
|
|
})
|
|
assert response.status_code == 200
|
|
|
|
def test_create_entry_missing_ciphertext_returns_400(self, client, user):
|
|
"""Encryption metadata without ciphertext must be rejected."""
|
|
c, _ = client
|
|
response = c.post(f"/api/entries/{user['id']}", json={
|
|
"encryption": {
|
|
"encrypted": True,
|
|
"nonce": "bm9uY2U=",
|
|
"algorithm": "XSalsa20-Poly1305",
|
|
# ciphertext intentionally missing
|
|
}
|
|
})
|
|
# Pydantic requires ciphertext field → 422
|
|
assert response.status_code == 422
|
|
|
|
def test_create_entry_encryption_missing_nonce_returns_400(self, client, user):
|
|
c, _ = client
|
|
response = c.post(f"/api/entries/{user['id']}", json={
|
|
"encryption": {
|
|
"encrypted": True,
|
|
"ciphertext": "dGVzdA==",
|
|
"algorithm": "XSalsa20-Poly1305",
|
|
# nonce intentionally missing
|
|
}
|
|
})
|
|
assert response.status_code == 422
|
|
|
|
def test_create_entry_for_nonexistent_user_returns_404(self, client):
|
|
c, _ = client
|
|
response = c.post("/api/entries/507f1f77bcf86cd799439011", json={"encryption": VALID_ENCRYPTION})
|
|
assert response.status_code == 404
|
|
assert "User not found" in response.json()["detail"]
|
|
|
|
def test_create_entry_with_invalid_user_id_returns_400(self, client):
|
|
c, _ = client
|
|
response = c.post("/api/entries/not-a-valid-id", json={"encryption": VALID_ENCRYPTION})
|
|
assert response.status_code == 400
|
|
|
|
def test_create_entry_with_specific_entry_date(self, client, user):
|
|
c, _ = client
|
|
response = c.post(f"/api/entries/{user['id']}", json={
|
|
"encryption": VALID_ENCRYPTION,
|
|
"entryDate": "2024-06-15T00:00:00",
|
|
})
|
|
assert response.status_code == 200
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# GET /api/entries/{user_id}
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestGetUserEntries:
|
|
def test_returns_entries_and_pagination(self, client, user, entry):
|
|
c, _ = client
|
|
response = c.get(f"/api/entries/{user['id']}")
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert "entries" in data
|
|
assert "pagination" in data
|
|
|
|
def test_returns_entry_that_was_created(self, client, user, entry):
|
|
c, _ = client
|
|
response = c.get(f"/api/entries/{user['id']}")
|
|
entries = response.json()["entries"]
|
|
assert len(entries) == 1
|
|
assert entries[0]["id"] == entry["id"]
|
|
|
|
def test_entry_includes_encryption_metadata(self, client, user, entry):
|
|
c, _ = client
|
|
response = c.get(f"/api/entries/{user['id']}")
|
|
fetched_entry = response.json()["entries"][0]
|
|
assert fetched_entry["encryption"]["ciphertext"] == VALID_ENCRYPTION["ciphertext"]
|
|
assert fetched_entry["encryption"]["nonce"] == VALID_ENCRYPTION["nonce"]
|
|
|
|
def test_empty_list_when_no_entries(self, client, user):
|
|
c, _ = client
|
|
response = c.get(f"/api/entries/{user['id']}")
|
|
assert response.status_code == 200
|
|
assert response.json()["entries"] == []
|
|
assert response.json()["pagination"]["total"] == 0
|
|
|
|
def test_pagination_limit(self, client, user):
|
|
c, _ = client
|
|
for _ in range(5):
|
|
c.post(f"/api/entries/{user['id']}", json={"encryption": VALID_ENCRYPTION})
|
|
response = c.get(f"/api/entries/{user['id']}?limit=2&skip=0")
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert len(data["entries"]) == 2
|
|
assert data["pagination"]["hasMore"] is True
|
|
assert data["pagination"]["total"] == 5
|
|
|
|
def test_pagination_skip(self, client, user):
|
|
c, _ = client
|
|
for _ in range(4):
|
|
c.post(f"/api/entries/{user['id']}", json={"encryption": VALID_ENCRYPTION})
|
|
response = c.get(f"/api/entries/{user['id']}?limit=10&skip=3")
|
|
assert len(response.json()["entries"]) == 1
|
|
|
|
def test_pagination_has_more_false_at_end(self, client, user):
|
|
c, _ = client
|
|
for _ in range(3):
|
|
c.post(f"/api/entries/{user['id']}", json={"encryption": VALID_ENCRYPTION})
|
|
response = c.get(f"/api/entries/{user['id']}?limit=10&skip=0")
|
|
assert response.json()["pagination"]["hasMore"] is False
|
|
|
|
def test_nonexistent_user_returns_404(self, client):
|
|
c, _ = client
|
|
response = c.get("/api/entries/507f1f77bcf86cd799439011")
|
|
assert response.status_code == 404
|
|
|
|
def test_invalid_user_id_returns_400(self, client):
|
|
c, _ = client
|
|
response = c.get("/api/entries/bad-id")
|
|
assert response.status_code == 400
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# GET /api/entries/{user_id}/{entry_id}
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestGetSingleEntry:
|
|
def test_returns_entry_by_id(self, client, user, entry):
|
|
c, _ = client
|
|
response = c.get(f"/api/entries/{user['id']}/{entry['id']}")
|
|
assert response.status_code == 200
|
|
assert response.json()["id"] == entry["id"]
|
|
|
|
def test_returned_entry_has_encryption_field(self, client, user, entry):
|
|
c, _ = client
|
|
response = c.get(f"/api/entries/{user['id']}/{entry['id']}")
|
|
data = response.json()
|
|
assert "encryption" in data
|
|
assert data["encryption"]["ciphertext"] == VALID_ENCRYPTION["ciphertext"]
|
|
|
|
def test_entry_belongs_to_correct_user(self, client, user, entry):
|
|
c, _ = client
|
|
response = c.get(f"/api/entries/{user['id']}/{entry['id']}")
|
|
assert response.json()["userId"] == user["id"]
|
|
|
|
def test_entry_from_different_user_returns_404(self, client, user, entry):
|
|
"""User isolation: another user cannot access this entry."""
|
|
c, _ = client
|
|
other = c.post("/api/users/register", json={"email": "other@example.com"}).json()
|
|
response = c.get(f"/api/entries/{other['id']}/{entry['id']}")
|
|
assert response.status_code == 404
|
|
|
|
def test_nonexistent_entry_returns_404(self, client, user):
|
|
c, _ = client
|
|
response = c.get(f"/api/entries/{user['id']}/507f1f77bcf86cd799439099")
|
|
assert response.status_code == 404
|
|
|
|
def test_invalid_entry_id_returns_400(self, client, user):
|
|
c, _ = client
|
|
response = c.get(f"/api/entries/{user['id']}/not-valid-id")
|
|
assert response.status_code == 400
|
|
|
|
def test_invalid_user_id_returns_400(self, client, entry):
|
|
c, _ = client
|
|
response = c.get(f"/api/entries/bad-user-id/{entry['id']}")
|
|
assert response.status_code == 400
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# PUT /api/entries/{user_id}/{entry_id}
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestUpdateEntry:
|
|
def test_update_mood(self, client, user, entry):
|
|
c, _ = client
|
|
response = c.put(f"/api/entries/{user['id']}/{entry['id']}", json={"mood": "happy"})
|
|
assert response.status_code == 200
|
|
assert response.json()["mood"] == "happy"
|
|
|
|
def test_update_encryption_ciphertext(self, client, user, entry):
|
|
c, _ = client
|
|
new_enc = {**VALID_ENCRYPTION, "ciphertext": "bmV3Y2lwaGVydGV4dA=="}
|
|
response = c.put(f"/api/entries/{user['id']}/{entry['id']}", json={"encryption": new_enc})
|
|
assert response.status_code == 200
|
|
assert response.json()["encryption"]["ciphertext"] == "bmV3Y2lwaGVydGV4dA=="
|
|
|
|
def test_update_persists(self, client, user, entry):
|
|
c, _ = client
|
|
c.put(f"/api/entries/{user['id']}/{entry['id']}", json={"mood": "sad"})
|
|
response = c.get(f"/api/entries/{user['id']}/{entry['id']}")
|
|
assert response.json()["mood"] == "sad"
|
|
|
|
def test_update_invalid_mood_returns_422(self, client, user, entry):
|
|
c, _ = client
|
|
response = c.put(f"/api/entries/{user['id']}/{entry['id']}", json={"mood": "furious"})
|
|
assert response.status_code == 422
|
|
|
|
def test_update_nonexistent_entry_returns_404(self, client, user):
|
|
c, _ = client
|
|
response = c.put(f"/api/entries/{user['id']}/507f1f77bcf86cd799439099", json={"mood": "happy"})
|
|
assert response.status_code == 404
|
|
|
|
def test_update_invalid_entry_id_returns_400(self, client, user):
|
|
c, _ = client
|
|
response = c.put(f"/api/entries/{user['id']}/bad-id", json={"mood": "happy"})
|
|
assert response.status_code == 400
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# DELETE /api/entries/{user_id}/{entry_id}
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestDeleteEntry:
|
|
def test_delete_entry_returns_200(self, client, user, entry):
|
|
c, _ = client
|
|
response = c.delete(f"/api/entries/{user['id']}/{entry['id']}")
|
|
assert response.status_code == 200
|
|
assert "deleted" in response.json()["message"].lower()
|
|
|
|
def test_deleted_entry_is_not_retrievable(self, client, user, entry):
|
|
c, _ = client
|
|
c.delete(f"/api/entries/{user['id']}/{entry['id']}")
|
|
response = c.get(f"/api/entries/{user['id']}/{entry['id']}")
|
|
assert response.status_code == 404
|
|
|
|
def test_deleted_entry_not_in_list(self, client, user, entry):
|
|
c, _ = client
|
|
c.delete(f"/api/entries/{user['id']}/{entry['id']}")
|
|
response = c.get(f"/api/entries/{user['id']}")
|
|
assert response.json()["entries"] == []
|
|
|
|
def test_delete_entry_wrong_user_returns_404(self, client, user, entry):
|
|
"""User isolation: another user cannot delete this entry."""
|
|
c, _ = client
|
|
other = c.post("/api/users/register", json={"email": "other_del@example.com"}).json()
|
|
response = c.delete(f"/api/entries/{other['id']}/{entry['id']}")
|
|
assert response.status_code == 404
|
|
|
|
def test_delete_nonexistent_entry_returns_404(self, client, user):
|
|
c, _ = client
|
|
response = c.delete(f"/api/entries/{user['id']}/507f1f77bcf86cd799439099")
|
|
assert response.status_code == 404
|
|
|
|
def test_delete_invalid_entry_id_returns_400(self, client, user):
|
|
c, _ = client
|
|
response = c.delete(f"/api/entries/{user['id']}/bad-id")
|
|
assert response.status_code == 400
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# GET /api/entries/{user_id}/by-date/{date_str}
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestGetEntriesByDate:
|
|
def test_returns_entry_for_matching_date(self, client, user):
|
|
c, _ = client
|
|
c.post(f"/api/entries/{user['id']}", json={
|
|
"encryption": VALID_ENCRYPTION,
|
|
"entryDate": "2024-06-15T00:00:00",
|
|
})
|
|
response = c.get(f"/api/entries/{user['id']}/by-date/2024-06-15")
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["count"] == 1
|
|
assert data["date"] == "2024-06-15"
|
|
|
|
def test_returns_empty_for_date_with_no_entries(self, client, user):
|
|
c, _ = client
|
|
response = c.get(f"/api/entries/{user['id']}/by-date/2020-01-01")
|
|
assert response.status_code == 200
|
|
assert response.json()["count"] == 0
|
|
|
|
def test_does_not_return_entries_from_other_dates(self, client, user):
|
|
c, _ = client
|
|
c.post(f"/api/entries/{user['id']}", json={
|
|
"encryption": VALID_ENCRYPTION,
|
|
"entryDate": "2024-06-15T00:00:00",
|
|
})
|
|
response = c.get(f"/api/entries/{user['id']}/by-date/2024-06-16") # Next day
|
|
assert response.json()["count"] == 0
|
|
|
|
def test_invalid_date_format_returns_400(self, client, user):
|
|
c, _ = client
|
|
response = c.get(f"/api/entries/{user['id']}/by-date/not-a-date")
|
|
assert response.status_code == 400
|
|
|
|
def test_invalid_date_13th_month_returns_400(self, client, user):
|
|
c, _ = client
|
|
response = c.get(f"/api/entries/{user['id']}/by-date/2024-13-01")
|
|
assert response.status_code == 400
|
|
|
|
def test_invalid_user_id_returns_400(self, client):
|
|
c, _ = client
|
|
response = c.get("/api/entries/bad-id/by-date/2024-06-15")
|
|
assert response.status_code == 400
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# GET /api/entries/{user_id}/by-month/{year}/{month}
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestGetEntriesByMonth:
|
|
def test_returns_entries_for_matching_month(self, client, user):
|
|
c, _ = client
|
|
c.post(f"/api/entries/{user['id']}", json={
|
|
"encryption": VALID_ENCRYPTION,
|
|
"entryDate": "2024-06-15T00:00:00",
|
|
})
|
|
response = c.get(f"/api/entries/{user['id']}/by-month/2024/6")
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["count"] == 1
|
|
assert data["year"] == 2024
|
|
assert data["month"] == 6
|
|
|
|
def test_does_not_return_entries_from_other_months(self, client, user):
|
|
c, _ = client
|
|
c.post(f"/api/entries/{user['id']}", json={
|
|
"encryption": VALID_ENCRYPTION,
|
|
"entryDate": "2024-05-10T00:00:00", # May, not June
|
|
})
|
|
response = c.get(f"/api/entries/{user['id']}/by-month/2024/6")
|
|
assert response.json()["count"] == 0
|
|
|
|
def test_december_january_rollover_works(self, client, user):
|
|
"""Month 12 boundary (year+1 rollover) must not crash."""
|
|
c, _ = client
|
|
response = c.get(f"/api/entries/{user['id']}/by-month/2024/12")
|
|
assert response.status_code == 200
|
|
|
|
def test_invalid_month_0_returns_400(self, client, user):
|
|
c, _ = client
|
|
response = c.get(f"/api/entries/{user['id']}/by-month/2024/0")
|
|
assert response.status_code == 400
|
|
|
|
def test_invalid_month_13_returns_400(self, client, user):
|
|
c, _ = client
|
|
response = c.get(f"/api/entries/{user['id']}/by-month/2024/13")
|
|
assert response.status_code == 400
|
|
|
|
def test_invalid_user_id_returns_400(self, client):
|
|
c, _ = client
|
|
response = c.get("/api/entries/bad-id/by-month/2024/6")
|
|
assert response.status_code == 400
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# POST /api/entries/convert-timestamp/utc-to-ist
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestConvertTimestamp:
|
|
def test_converts_utc_z_suffix_to_ist(self, client):
|
|
c, _ = client
|
|
response = c.post("/api/entries/convert-timestamp/utc-to-ist", json={
|
|
"timestamp": "2024-01-01T00:00:00Z"
|
|
})
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert "utc" in data
|
|
assert "ist" in data
|
|
assert "+05:30" in data["ist"]
|
|
|
|
def test_ist_is_5h30m_ahead_of_utc(self, client):
|
|
c, _ = client
|
|
response = c.post("/api/entries/convert-timestamp/utc-to-ist", json={
|
|
"timestamp": "2024-01-01T00:00:00Z"
|
|
})
|
|
assert "05:30:00+05:30" in response.json()["ist"]
|
|
|
|
def test_missing_timestamp_field_returns_400(self, client):
|
|
c, _ = client
|
|
response = c.post("/api/entries/convert-timestamp/utc-to-ist", json={})
|
|
assert response.status_code == 400
|
|
|
|
def test_invalid_timestamp_string_returns_400(self, client):
|
|
c, _ = client
|
|
response = c.post("/api/entries/convert-timestamp/utc-to-ist", json={
|
|
"timestamp": "not-a-date"
|
|
})
|
|
assert response.status_code == 400
|
|
|
|
def test_returns_original_utc_in_response(self, client):
|
|
c, _ = client
|
|
utc = "2024-06-15T12:00:00Z"
|
|
response = c.post("/api/entries/convert-timestamp/utc-to-ist", json={"timestamp": utc})
|
|
assert response.json()["utc"] == utc
|