openzeppelin_relayer/repositories/notification/
mod.rs

1//! Notification Repository Module
2//!
3//! This module provides the notification repository layer for the OpenZeppelin Relayer service.
4//! It implements the Repository pattern to abstract notification data persistence operations,
5//! supporting both in-memory and Redis-backed storage implementations.
6//!
7//! ## Features
8//!
9//! - **CRUD Operations**: Create, read, update, and delete notification configurations
10//! - **Webhook Support**: Store webhook notification configurations
11//! - **Pagination Support**: Efficient paginated listing of notifications
12//! - **Configuration Management**: Handle notification signing keys and URLs
13//!
14//! ## Repository Implementations
15//!
16//! - [`InMemoryNotificationRepository`]: Fast in-memory storage for testing/development
17//! - [`RedisNotificationRepository`]: Redis-backed storage for production environments
18//!
19mod notification_in_memory;
20mod notification_redis;
21
22pub use notification_in_memory::*;
23pub use notification_redis::*;
24use redis::aio::ConnectionManager;
25
26use crate::{
27    models::{NotificationRepoModel, RepositoryError},
28    repositories::{PaginatedResult, PaginationQuery, Repository},
29};
30use async_trait::async_trait;
31use std::sync::Arc;
32
33/// Enum wrapper for different notification repository implementations
34#[derive(Debug, Clone)]
35pub enum NotificationRepositoryStorage {
36    InMemory(InMemoryNotificationRepository),
37    Redis(RedisNotificationRepository),
38}
39
40impl NotificationRepositoryStorage {
41    pub fn new_in_memory() -> Self {
42        Self::InMemory(InMemoryNotificationRepository::new())
43    }
44    pub fn new_redis(
45        connection_manager: Arc<ConnectionManager>,
46        key_prefix: String,
47    ) -> Result<Self, RepositoryError> {
48        Ok(Self::Redis(RedisNotificationRepository::new(
49            connection_manager,
50            key_prefix,
51        )?))
52    }
53}
54
55#[async_trait]
56impl Repository<NotificationRepoModel, String> for NotificationRepositoryStorage {
57    async fn create(
58        &self,
59        entity: NotificationRepoModel,
60    ) -> Result<NotificationRepoModel, RepositoryError> {
61        match self {
62            NotificationRepositoryStorage::InMemory(repo) => repo.create(entity).await,
63            NotificationRepositoryStorage::Redis(repo) => repo.create(entity).await,
64        }
65    }
66
67    async fn get_by_id(&self, id: String) -> Result<NotificationRepoModel, RepositoryError> {
68        match self {
69            NotificationRepositoryStorage::InMemory(repo) => repo.get_by_id(id).await,
70            NotificationRepositoryStorage::Redis(repo) => repo.get_by_id(id).await,
71        }
72    }
73
74    async fn list_all(&self) -> Result<Vec<NotificationRepoModel>, RepositoryError> {
75        match self {
76            NotificationRepositoryStorage::InMemory(repo) => repo.list_all().await,
77            NotificationRepositoryStorage::Redis(repo) => repo.list_all().await,
78        }
79    }
80
81    async fn list_paginated(
82        &self,
83        query: PaginationQuery,
84    ) -> Result<PaginatedResult<NotificationRepoModel>, RepositoryError> {
85        match self {
86            NotificationRepositoryStorage::InMemory(repo) => repo.list_paginated(query).await,
87            NotificationRepositoryStorage::Redis(repo) => repo.list_paginated(query).await,
88        }
89    }
90
91    async fn update(
92        &self,
93        id: String,
94        entity: NotificationRepoModel,
95    ) -> Result<NotificationRepoModel, RepositoryError> {
96        match self {
97            NotificationRepositoryStorage::InMemory(repo) => repo.update(id, entity).await,
98            NotificationRepositoryStorage::Redis(repo) => repo.update(id, entity).await,
99        }
100    }
101
102    async fn delete_by_id(&self, id: String) -> Result<(), RepositoryError> {
103        match self {
104            NotificationRepositoryStorage::InMemory(repo) => repo.delete_by_id(id).await,
105            NotificationRepositoryStorage::Redis(repo) => repo.delete_by_id(id).await,
106        }
107    }
108
109    async fn count(&self) -> Result<usize, RepositoryError> {
110        match self {
111            NotificationRepositoryStorage::InMemory(repo) => repo.count().await,
112            NotificationRepositoryStorage::Redis(repo) => repo.count().await,
113        }
114    }
115
116    async fn has_entries(&self) -> Result<bool, RepositoryError> {
117        match self {
118            NotificationRepositoryStorage::InMemory(repo) => repo.has_entries().await,
119            NotificationRepositoryStorage::Redis(repo) => repo.has_entries().await,
120        }
121    }
122
123    async fn drop_all_entries(&self) -> Result<(), RepositoryError> {
124        match self {
125            NotificationRepositoryStorage::InMemory(repo) => repo.drop_all_entries().await,
126            NotificationRepositoryStorage::Redis(repo) => repo.drop_all_entries().await,
127        }
128    }
129}
130
131#[cfg(test)]
132mod tests {
133    use super::*;
134    use crate::models::RepositoryError;
135    use crate::repositories::PaginationQuery;
136    use crate::utils::mocks::mockutils::create_mock_notification;
137    use color_eyre::Result;
138
139    fn create_test_notification(id: &str) -> NotificationRepoModel {
140        create_mock_notification(id.to_string())
141    }
142
143    #[tokio::test]
144    async fn test_new_in_memory() {
145        let storage = NotificationRepositoryStorage::new_in_memory();
146
147        match storage {
148            NotificationRepositoryStorage::InMemory(_) => {
149                // Success - verify it's the InMemory variant
150            }
151            NotificationRepositoryStorage::Redis(_) => {
152                panic!("Expected InMemory variant, got Redis");
153            }
154        }
155    }
156
157    #[tokio::test]
158    async fn test_create_in_memory() -> Result<()> {
159        let storage = NotificationRepositoryStorage::new_in_memory();
160        let notification = create_test_notification("test-notification");
161
162        let created = storage.create(notification.clone()).await?;
163        assert_eq!(created.id, notification.id);
164        assert_eq!(created.url, notification.url);
165
166        Ok(())
167    }
168
169    #[tokio::test]
170    async fn test_get_by_id_in_memory() -> Result<()> {
171        let storage = NotificationRepositoryStorage::new_in_memory();
172        let notification = create_test_notification("test-notification");
173
174        // Create notification first
175        storage.create(notification.clone()).await?;
176
177        // Get by ID
178        let retrieved = storage.get_by_id("test-notification".to_string()).await?;
179        assert_eq!(retrieved.id, notification.id);
180        assert_eq!(retrieved.url, notification.url);
181
182        Ok(())
183    }
184
185    #[tokio::test]
186    async fn test_get_by_id_not_found_in_memory() -> Result<()> {
187        let storage = NotificationRepositoryStorage::new_in_memory();
188
189        let result = storage.get_by_id("non-existent".to_string()).await;
190        assert!(result.is_err());
191
192        Ok(())
193    }
194
195    #[tokio::test]
196    async fn test_list_all_in_memory() -> Result<()> {
197        let storage = NotificationRepositoryStorage::new_in_memory();
198
199        // Initially empty
200        let notifications = storage.list_all().await?;
201        assert!(notifications.is_empty());
202
203        // Add notifications
204        let notification1 = create_test_notification("notification-1");
205        let notification2 = create_test_notification("notification-2");
206
207        storage.create(notification1.clone()).await?;
208        storage.create(notification2.clone()).await?;
209
210        let all_notifications = storage.list_all().await?;
211        assert_eq!(all_notifications.len(), 2);
212
213        let ids: Vec<&str> = all_notifications.iter().map(|n| n.id.as_str()).collect();
214        assert!(ids.contains(&"notification-1"));
215        assert!(ids.contains(&"notification-2"));
216
217        Ok(())
218    }
219
220    #[tokio::test]
221    async fn test_list_paginated_in_memory() -> Result<()> {
222        let storage = NotificationRepositoryStorage::new_in_memory();
223
224        // Add test notifications
225        for i in 1..=5 {
226            let notification = create_test_notification(&format!("notification-{}", i));
227            storage.create(notification).await?;
228        }
229
230        // Test pagination
231        let query = PaginationQuery {
232            page: 1,
233            per_page: 2,
234        };
235        let page = storage.list_paginated(query).await?;
236
237        assert_eq!(page.items.len(), 2);
238        assert_eq!(page.total, 5);
239        assert_eq!(page.page, 1);
240        assert_eq!(page.per_page, 2);
241
242        // Test second page
243        let query2 = PaginationQuery {
244            page: 2,
245            per_page: 2,
246        };
247        let page2 = storage.list_paginated(query2).await?;
248
249        assert_eq!(page2.items.len(), 2);
250        assert_eq!(page2.total, 5);
251        assert_eq!(page2.page, 2);
252        assert_eq!(page2.per_page, 2);
253
254        // Test final page
255        let query3 = PaginationQuery {
256            page: 3,
257            per_page: 2,
258        };
259        let page3 = storage.list_paginated(query3).await?;
260
261        assert_eq!(page3.items.len(), 1);
262        assert_eq!(page3.total, 5);
263        assert_eq!(page3.page, 3);
264        assert_eq!(page3.per_page, 2);
265
266        Ok(())
267    }
268
269    #[tokio::test]
270    async fn test_update_in_memory() -> Result<()> {
271        let storage = NotificationRepositoryStorage::new_in_memory();
272        let notification = create_test_notification("test-notification");
273
274        // Create notification first
275        storage.create(notification.clone()).await?;
276
277        let mut updated_notification = notification.clone();
278        updated_notification.url = "https://updated.webhook.com".to_string();
279
280        let result = storage
281            .update(
282                "test-notification".to_string(),
283                updated_notification.clone(),
284            )
285            .await;
286        assert!(result.is_ok());
287        let updated = result.unwrap();
288        assert_eq!(updated.url, "https://updated.webhook.com");
289
290        // Verify the update persisted
291        let retrieved = storage.get_by_id("test-notification".to_string()).await?;
292        assert_eq!(retrieved.url, "https://updated.webhook.com");
293
294        Ok(())
295    }
296
297    #[tokio::test]
298    async fn test_update_not_found_in_memory() -> Result<()> {
299        let storage = NotificationRepositoryStorage::new_in_memory();
300        let notification = create_test_notification("non-existent");
301
302        let result = storage
303            .update("non-existent".to_string(), notification)
304            .await;
305        assert!(result.is_err());
306
307        Ok(())
308    }
309
310    #[tokio::test]
311    async fn test_delete_by_id_in_memory() -> Result<()> {
312        let storage = NotificationRepositoryStorage::new_in_memory();
313        let notification = create_test_notification("test-notification");
314
315        // Create notification first
316        storage.create(notification.clone()).await?;
317
318        // Verify it exists
319        let retrieved = storage.get_by_id("test-notification".to_string()).await?;
320        assert_eq!(retrieved.id, "test-notification");
321
322        let result = storage.delete_by_id("test-notification".to_string()).await;
323        assert!(result.is_ok());
324
325        // Verify it's gone
326        let get_result = storage.get_by_id("test-notification".to_string()).await;
327        assert!(matches!(get_result, Err(RepositoryError::NotFound(_))));
328
329        Ok(())
330    }
331
332    #[tokio::test]
333    async fn test_delete_by_id_not_found_in_memory() -> Result<()> {
334        let storage = NotificationRepositoryStorage::new_in_memory();
335
336        let result = storage.delete_by_id("non-existent".to_string()).await;
337        assert!(result.is_err());
338
339        Ok(())
340    }
341
342    #[tokio::test]
343    async fn test_count_in_memory() -> Result<()> {
344        let storage = NotificationRepositoryStorage::new_in_memory();
345
346        // Initially empty
347        let count = storage.count().await?;
348        assert_eq!(count, 0);
349
350        // Add notifications
351        let notification1 = create_test_notification("notification-1");
352        let notification2 = create_test_notification("notification-2");
353
354        storage.create(notification1).await?;
355        let count_after_one = storage.count().await?;
356        assert_eq!(count_after_one, 1);
357
358        storage.create(notification2).await?;
359        let count_after_two = storage.count().await?;
360        assert_eq!(count_after_two, 2);
361
362        // Delete one - should now succeed
363        let delete_result = storage.delete_by_id("notification-1".to_string()).await;
364        assert!(delete_result.is_ok());
365
366        // Count should decrease after successful delete
367        let count_after_delete = storage.count().await?;
368        assert_eq!(count_after_delete, 1);
369
370        Ok(())
371    }
372
373    #[tokio::test]
374    async fn test_has_entries_in_memory() -> Result<()> {
375        let storage = NotificationRepositoryStorage::new_in_memory();
376
377        // Initially empty
378        let has_entries = storage.has_entries().await?;
379        assert!(!has_entries);
380
381        // Add notification
382        let notification = create_test_notification("test-notification");
383        storage.create(notification).await?;
384
385        let has_entries_after_create = storage.has_entries().await?;
386        assert!(has_entries_after_create);
387
388        // Delete notification - should now succeed
389        let delete_result = storage.delete_by_id("test-notification".to_string()).await;
390        assert!(delete_result.is_ok());
391
392        // Should no longer have entries after successful delete
393        let has_entries_after_delete = storage.has_entries().await?;
394        assert!(!has_entries_after_delete);
395
396        Ok(())
397    }
398
399    #[tokio::test]
400    async fn test_drop_all_entries_in_memory() -> Result<()> {
401        let storage = NotificationRepositoryStorage::new_in_memory();
402
403        // Add multiple notifications
404        for i in 1..=5 {
405            let notification = create_test_notification(&format!("notification-{}", i));
406            storage.create(notification).await?;
407        }
408
409        // Verify they exist
410        let count_before = storage.count().await?;
411        assert_eq!(count_before, 5);
412
413        let has_entries_before = storage.has_entries().await?;
414        assert!(has_entries_before);
415
416        // Drop all entries
417        storage.drop_all_entries().await?;
418
419        // Verify they're gone
420        let count_after = storage.count().await?;
421        assert_eq!(count_after, 0);
422
423        let has_entries_after = storage.has_entries().await?;
424        assert!(!has_entries_after);
425
426        let all_notifications = storage.list_all().await?;
427        assert!(all_notifications.is_empty());
428
429        Ok(())
430    }
431
432    #[tokio::test]
433    async fn test_create_duplicate_id_in_memory() -> Result<()> {
434        let storage = NotificationRepositoryStorage::new_in_memory();
435        let notification = create_test_notification("duplicate-id");
436
437        // Create first notification
438        storage.create(notification.clone()).await?;
439
440        // Try to create another with same ID - should fail
441        let result = storage.create(notification.clone()).await;
442        assert!(result.is_err());
443
444        Ok(())
445    }
446
447    #[tokio::test]
448    async fn test_workflow_in_memory() -> Result<()> {
449        let storage = NotificationRepositoryStorage::new_in_memory();
450
451        // 1. Start with empty storage
452        assert!(!storage.has_entries().await?);
453        assert_eq!(storage.count().await?, 0);
454
455        // 2. Create notification
456        let notification = create_test_notification("workflow-test");
457        let created = storage.create(notification.clone()).await?;
458        assert_eq!(created.id, "workflow-test");
459
460        // 3. Verify it exists
461        assert!(storage.has_entries().await?);
462        assert_eq!(storage.count().await?, 1);
463
464        // 4. Retrieve it
465        let retrieved = storage.get_by_id("workflow-test".to_string()).await?;
466        assert_eq!(retrieved.id, "workflow-test");
467
468        // 5. Update it - should now succeed
469        let mut updated = retrieved.clone();
470        updated.url = "https://updated.example.com".to_string();
471        let update_result = storage.update("workflow-test".to_string(), updated).await;
472        assert!(update_result.is_ok());
473        let updated_notification = update_result.unwrap();
474        assert_eq!(updated_notification.url, "https://updated.example.com");
475
476        // 6. Verify the update persisted
477        let after_update = storage.get_by_id("workflow-test".to_string()).await?;
478        assert_eq!(after_update.url, "https://updated.example.com");
479
480        // 7. Delete it - should now succeed
481        let delete_result = storage.delete_by_id("workflow-test".to_string()).await;
482        assert!(delete_result.is_ok());
483
484        // 8. Verify it's gone
485        assert!(!storage.has_entries().await?);
486        assert_eq!(storage.count().await?, 0);
487
488        let result = storage.get_by_id("workflow-test".to_string()).await;
489        assert!(matches!(result, Err(RepositoryError::NotFound(_))));
490
491        Ok(())
492    }
493
494    #[tokio::test]
495    async fn test_pagination_edge_cases_in_memory() -> Result<()> {
496        let storage = NotificationRepositoryStorage::new_in_memory();
497
498        // Test pagination with empty storage
499        let query = PaginationQuery {
500            page: 1,
501            per_page: 10,
502        };
503        let page = storage.list_paginated(query).await?;
504        assert_eq!(page.items.len(), 0);
505        assert_eq!(page.total, 0);
506        assert_eq!(page.page, 1);
507        assert_eq!(page.per_page, 10);
508
509        // Add one notification
510        let notification = create_test_notification("single-item");
511        storage.create(notification).await?;
512
513        // Test pagination with single item
514        let query = PaginationQuery {
515            page: 1,
516            per_page: 10,
517        };
518        let page = storage.list_paginated(query).await?;
519        assert_eq!(page.items.len(), 1);
520        assert_eq!(page.total, 1);
521        assert_eq!(page.page, 1);
522        assert_eq!(page.per_page, 10);
523
524        // Test pagination with page beyond total
525        let query = PaginationQuery {
526            page: 3,
527            per_page: 10,
528        };
529        let page = storage.list_paginated(query).await?;
530        assert_eq!(page.items.len(), 0);
531        assert_eq!(page.total, 1);
532        assert_eq!(page.page, 3);
533        assert_eq!(page.per_page, 10);
534
535        Ok(())
536    }
537}