openzeppelin_relayer/repositories/notification/
notification_in_memory.rs

1//! This module defines an in-memory notification repository for managing
2//! notifications. It provides full CRUD functionality including create, retrieve,
3//! update, delete, and list operations. The repository is implemented using a
4//! `Mutex`-protected `HashMap` to ensure thread safety in asynchronous contexts.
5
6use crate::{
7    models::{NotificationConfig, NotificationRepoModel, RepositoryError},
8    repositories::*,
9};
10use async_trait::async_trait;
11use std::collections::HashMap;
12use tokio::sync::{Mutex, MutexGuard};
13
14#[derive(Debug)]
15pub struct InMemoryNotificationRepository {
16    store: Mutex<HashMap<String, NotificationRepoModel>>,
17}
18
19impl Clone for InMemoryNotificationRepository {
20    fn clone(&self) -> Self {
21        // Try to get the current data, or use empty HashMap if lock fails
22        let data = self
23            .store
24            .try_lock()
25            .map(|guard| guard.clone())
26            .unwrap_or_else(|_| HashMap::new());
27
28        Self {
29            store: Mutex::new(data),
30        }
31    }
32}
33
34#[allow(dead_code)]
35impl InMemoryNotificationRepository {
36    pub fn new() -> Self {
37        Self {
38            store: Mutex::new(HashMap::new()),
39        }
40    }
41
42    async fn acquire_lock<T>(lock: &Mutex<T>) -> Result<MutexGuard<T>, RepositoryError> {
43        Ok(lock.lock().await)
44    }
45}
46
47impl Default for InMemoryNotificationRepository {
48    fn default() -> Self {
49        Self::new()
50    }
51}
52
53#[async_trait]
54impl Repository<NotificationRepoModel, String> for InMemoryNotificationRepository {
55    async fn create(
56        &self,
57        notification: NotificationRepoModel,
58    ) -> Result<NotificationRepoModel, RepositoryError> {
59        let mut store = Self::acquire_lock(&self.store).await?;
60        if store.contains_key(&notification.id) {
61            return Err(RepositoryError::ConstraintViolation(format!(
62                "Notification with ID '{}' already exists",
63                notification.id
64            )));
65        }
66        store.insert(notification.id.clone(), notification.clone());
67        Ok(notification)
68    }
69
70    async fn get_by_id(&self, id: String) -> Result<NotificationRepoModel, RepositoryError> {
71        let store = Self::acquire_lock(&self.store).await?;
72        match store.get(&id) {
73            Some(entity) => Ok(entity.clone()),
74            None => Err(RepositoryError::NotFound(format!(
75                "Notification with ID '{}' not found",
76                id
77            ))),
78        }
79    }
80
81    #[allow(clippy::map_entry)]
82    async fn update(
83        &self,
84        id: String,
85        notification: NotificationRepoModel,
86    ) -> Result<NotificationRepoModel, RepositoryError> {
87        let mut store = Self::acquire_lock(&self.store).await?;
88
89        // Check if notification exists
90        if !store.contains_key(&id) {
91            return Err(RepositoryError::NotFound(format!(
92                "Notification with ID '{}' not found",
93                id
94            )));
95        }
96
97        if id != notification.id {
98            return Err(RepositoryError::InvalidData(format!(
99                "ID mismatch: URL parameter '{}' does not match entity ID '{}'",
100                id, notification.id
101            )));
102        }
103
104        store.insert(id, notification.clone());
105        Ok(notification)
106    }
107
108    async fn delete_by_id(&self, id: String) -> Result<(), RepositoryError> {
109        let mut store = Self::acquire_lock(&self.store).await?;
110
111        match store.remove(&id) {
112            Some(_) => Ok(()),
113            None => Err(RepositoryError::NotFound(format!(
114                "Notification with ID {} not found",
115                id
116            ))),
117        }
118    }
119
120    async fn list_all(&self) -> Result<Vec<NotificationRepoModel>, RepositoryError> {
121        let store = Self::acquire_lock(&self.store).await?;
122        let notifications: Vec<NotificationRepoModel> = store.values().cloned().collect();
123        Ok(notifications)
124    }
125
126    async fn list_paginated(
127        &self,
128        query: PaginationQuery,
129    ) -> Result<PaginatedResult<NotificationRepoModel>, RepositoryError> {
130        let total = self.count().await?;
131        let start = ((query.page - 1) * query.per_page) as usize;
132        let items: Vec<NotificationRepoModel> = self
133            .store
134            .lock()
135            .await
136            .values()
137            .skip(start)
138            .take(query.per_page as usize)
139            .cloned()
140            .collect();
141
142        Ok(PaginatedResult {
143            items,
144            total: total as u64,
145            page: query.page,
146            per_page: query.per_page,
147        })
148    }
149
150    async fn count(&self) -> Result<usize, RepositoryError> {
151        let store = Self::acquire_lock(&self.store).await?;
152        let length = store.len();
153        Ok(length)
154    }
155
156    async fn has_entries(&self) -> Result<bool, RepositoryError> {
157        let store = Self::acquire_lock(&self.store).await?;
158        Ok(!store.is_empty())
159    }
160
161    async fn drop_all_entries(&self) -> Result<(), RepositoryError> {
162        let mut store = Self::acquire_lock(&self.store).await?;
163        store.clear();
164        Ok(())
165    }
166}
167
168impl TryFrom<NotificationConfig> for NotificationRepoModel {
169    type Error = ConversionError;
170
171    fn try_from(config: NotificationConfig) -> Result<Self, Self::Error> {
172        let signing_key = config.get_signing_key().map_err(|e| {
173            ConversionError::InvalidConfig(format!("Failed to get signing key: {}", e))
174        })?;
175
176        Ok(NotificationRepoModel {
177            id: config.id.clone(),
178            url: config.url.clone(),
179            notification_type: config.r#type,
180            signing_key,
181        })
182    }
183}
184#[cfg(test)]
185mod tests {
186    use crate::models::NotificationType;
187
188    use super::*;
189
190    fn create_test_notification(id: String) -> NotificationRepoModel {
191        NotificationRepoModel {
192            id: id.clone(),
193            url: "http://localhost".to_string(),
194            notification_type: NotificationType::Webhook,
195            signing_key: None,
196        }
197    }
198
199    #[actix_web::test]
200    async fn test_new_repository_is_empty() {
201        let repo = InMemoryNotificationRepository::new();
202        assert_eq!(repo.count().await.unwrap(), 0);
203    }
204
205    #[actix_web::test]
206    async fn test_add_notification() {
207        let repo = InMemoryNotificationRepository::new();
208        let notification = create_test_notification("test".to_string());
209
210        repo.create(notification.clone()).await.unwrap();
211        assert_eq!(repo.count().await.unwrap(), 1);
212
213        let stored = repo.get_by_id("test".to_string()).await.unwrap();
214        assert_eq!(stored.id, notification.id);
215    }
216
217    #[actix_web::test]
218    async fn test_update_notification() {
219        let repo = InMemoryNotificationRepository::new();
220        let notification = create_test_notification("test".to_string());
221
222        // First create the notification
223        repo.create(notification.clone()).await.unwrap();
224
225        // Update the notification
226        let mut updated_notification = notification.clone();
227        updated_notification.url = "http://updated.example.com".to_string();
228
229        let result = repo
230            .update("test".to_string(), updated_notification.clone())
231            .await;
232        assert!(result.is_ok());
233
234        let updated = result.unwrap();
235        assert_eq!(updated.id, "test");
236        assert_eq!(updated.url, "http://updated.example.com");
237
238        // Verify the update persisted
239        let stored = repo.get_by_id("test".to_string()).await.unwrap();
240        assert_eq!(stored.url, "http://updated.example.com");
241    }
242
243    #[actix_web::test]
244    async fn test_list_notifications() {
245        let repo = InMemoryNotificationRepository::new();
246        let notification1 = create_test_notification("test".to_string());
247        let notification2 = create_test_notification("test2".to_string());
248
249        repo.create(notification1.clone()).await.unwrap();
250        repo.create(notification2).await.unwrap();
251
252        let notifications = repo.list_all().await.unwrap();
253        assert_eq!(notifications.len(), 2);
254    }
255
256    #[actix_web::test]
257    async fn test_update_nonexistent_notification() {
258        let repo = InMemoryNotificationRepository::new();
259        let notification = create_test_notification("test".to_string());
260
261        let result = repo.update("test2".to_string(), notification).await;
262        assert!(matches!(result, Err(RepositoryError::NotFound(_))));
263    }
264
265    #[actix_web::test]
266    async fn test_get_nonexistent_notification() {
267        let repo = InMemoryNotificationRepository::new();
268
269        let result = repo.get_by_id("test".to_string()).await;
270        assert!(matches!(result, Err(RepositoryError::NotFound(_))));
271    }
272
273    // test has_entries
274    #[actix_web::test]
275    async fn test_has_entries() {
276        let repo = InMemoryNotificationRepository::new();
277        assert!(!repo.has_entries().await.unwrap());
278
279        let notification = create_test_notification("test".to_string());
280
281        repo.create(notification.clone()).await.unwrap();
282        assert!(repo.has_entries().await.unwrap());
283    }
284
285    #[actix_web::test]
286    async fn test_drop_all_entries() {
287        let repo = InMemoryNotificationRepository::new();
288        let notification = create_test_notification("test".to_string());
289
290        repo.create(notification.clone()).await.unwrap();
291        assert!(repo.has_entries().await.unwrap());
292
293        repo.drop_all_entries().await.unwrap();
294        assert!(!repo.has_entries().await.unwrap());
295    }
296
297    #[actix_web::test]
298    async fn test_delete_notification() {
299        let repo = InMemoryNotificationRepository::new();
300        let notification = create_test_notification("test".to_string());
301
302        // Create the notification first
303        repo.create(notification.clone()).await.unwrap();
304        assert_eq!(repo.count().await.unwrap(), 1);
305
306        // Delete the notification
307        let result = repo.delete_by_id("test".to_string()).await;
308        assert!(result.is_ok());
309
310        // Verify it's gone
311        assert_eq!(repo.count().await.unwrap(), 0);
312        let get_result = repo.get_by_id("test".to_string()).await;
313        assert!(matches!(get_result, Err(RepositoryError::NotFound(_))));
314    }
315
316    #[actix_web::test]
317    async fn test_delete_nonexistent_notification() {
318        let repo = InMemoryNotificationRepository::new();
319
320        let result = repo.delete_by_id("nonexistent".to_string()).await;
321        assert!(matches!(result, Err(RepositoryError::NotFound(_))));
322    }
323
324    #[actix_web::test]
325    async fn test_update_with_id_mismatch() {
326        let repo = InMemoryNotificationRepository::new();
327        let notification = create_test_notification("test".to_string());
328
329        // Create the notification first
330        repo.create(notification.clone()).await.unwrap();
331
332        // Try to update with mismatched ID
333        let mut updated_notification = notification.clone();
334        updated_notification.id = "different-id".to_string();
335
336        let result = repo.update("test".to_string(), updated_notification).await;
337        assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
338    }
339
340    #[actix_web::test]
341    async fn test_update_delete_integration() {
342        let repo = InMemoryNotificationRepository::new();
343        let notification1 = create_test_notification("test1".to_string());
344        let notification2 = create_test_notification("test2".to_string());
345
346        // Create two notifications
347        repo.create(notification1.clone()).await.unwrap();
348        repo.create(notification2.clone()).await.unwrap();
349        assert_eq!(repo.count().await.unwrap(), 2);
350
351        // Update the first notification
352        let mut updated_notification1 = notification1.clone();
353        updated_notification1.url = "http://updated.example.com".to_string();
354
355        let update_result = repo
356            .update("test1".to_string(), updated_notification1)
357            .await;
358        assert!(update_result.is_ok());
359
360        // Verify the update
361        let stored = repo.get_by_id("test1".to_string()).await.unwrap();
362        assert_eq!(stored.url, "http://updated.example.com");
363
364        // Delete the second notification
365        let delete_result = repo.delete_by_id("test2".to_string()).await;
366        assert!(delete_result.is_ok());
367
368        // Verify final state
369        assert_eq!(repo.count().await.unwrap(), 1);
370        let remaining = repo.list_all().await.unwrap();
371        assert_eq!(remaining.len(), 1);
372        assert_eq!(remaining[0].id, "test1");
373        assert_eq!(remaining[0].url, "http://updated.example.com");
374    }
375}