openzeppelin_relayer/repositories/notification/
notification_in_memory.rs1use crate::{
7 models::{NotificationConfig, NotificationRepoModel, RepositoryError},
8 repositories::*,
9};
10use async_trait::async_trait;
11use std::collections::HashMap;
12use tokio::sync::{Mutex, MutexGuard};
13
14#[derive(Debug)]
15pub struct InMemoryNotificationRepository {
16 store: Mutex<HashMap<String, NotificationRepoModel>>,
17}
18
19impl Clone for InMemoryNotificationRepository {
20 fn clone(&self) -> Self {
21 let data = self
23 .store
24 .try_lock()
25 .map(|guard| guard.clone())
26 .unwrap_or_else(|_| HashMap::new());
27
28 Self {
29 store: Mutex::new(data),
30 }
31 }
32}
33
34#[allow(dead_code)]
35impl InMemoryNotificationRepository {
36 pub fn new() -> Self {
37 Self {
38 store: Mutex::new(HashMap::new()),
39 }
40 }
41
42 async fn acquire_lock<T>(lock: &Mutex<T>) -> Result<MutexGuard<T>, RepositoryError> {
43 Ok(lock.lock().await)
44 }
45}
46
47impl Default for InMemoryNotificationRepository {
48 fn default() -> Self {
49 Self::new()
50 }
51}
52
53#[async_trait]
54impl Repository<NotificationRepoModel, String> for InMemoryNotificationRepository {
55 async fn create(
56 &self,
57 notification: NotificationRepoModel,
58 ) -> Result<NotificationRepoModel, RepositoryError> {
59 let mut store = Self::acquire_lock(&self.store).await?;
60 if store.contains_key(¬ification.id) {
61 return Err(RepositoryError::ConstraintViolation(format!(
62 "Notification with ID '{}' already exists",
63 notification.id
64 )));
65 }
66 store.insert(notification.id.clone(), notification.clone());
67 Ok(notification)
68 }
69
70 async fn get_by_id(&self, id: String) -> Result<NotificationRepoModel, RepositoryError> {
71 let store = Self::acquire_lock(&self.store).await?;
72 match store.get(&id) {
73 Some(entity) => Ok(entity.clone()),
74 None => Err(RepositoryError::NotFound(format!(
75 "Notification with ID '{}' not found",
76 id
77 ))),
78 }
79 }
80
81 #[allow(clippy::map_entry)]
82 async fn update(
83 &self,
84 id: String,
85 notification: NotificationRepoModel,
86 ) -> Result<NotificationRepoModel, RepositoryError> {
87 let mut store = Self::acquire_lock(&self.store).await?;
88
89 if !store.contains_key(&id) {
91 return Err(RepositoryError::NotFound(format!(
92 "Notification with ID '{}' not found",
93 id
94 )));
95 }
96
97 if id != notification.id {
98 return Err(RepositoryError::InvalidData(format!(
99 "ID mismatch: URL parameter '{}' does not match entity ID '{}'",
100 id, notification.id
101 )));
102 }
103
104 store.insert(id, notification.clone());
105 Ok(notification)
106 }
107
108 async fn delete_by_id(&self, id: String) -> Result<(), RepositoryError> {
109 let mut store = Self::acquire_lock(&self.store).await?;
110
111 match store.remove(&id) {
112 Some(_) => Ok(()),
113 None => Err(RepositoryError::NotFound(format!(
114 "Notification with ID {} not found",
115 id
116 ))),
117 }
118 }
119
120 async fn list_all(&self) -> Result<Vec<NotificationRepoModel>, RepositoryError> {
121 let store = Self::acquire_lock(&self.store).await?;
122 let notifications: Vec<NotificationRepoModel> = store.values().cloned().collect();
123 Ok(notifications)
124 }
125
126 async fn list_paginated(
127 &self,
128 query: PaginationQuery,
129 ) -> Result<PaginatedResult<NotificationRepoModel>, RepositoryError> {
130 let total = self.count().await?;
131 let start = ((query.page - 1) * query.per_page) as usize;
132 let items: Vec<NotificationRepoModel> = self
133 .store
134 .lock()
135 .await
136 .values()
137 .skip(start)
138 .take(query.per_page as usize)
139 .cloned()
140 .collect();
141
142 Ok(PaginatedResult {
143 items,
144 total: total as u64,
145 page: query.page,
146 per_page: query.per_page,
147 })
148 }
149
150 async fn count(&self) -> Result<usize, RepositoryError> {
151 let store = Self::acquire_lock(&self.store).await?;
152 let length = store.len();
153 Ok(length)
154 }
155
156 async fn has_entries(&self) -> Result<bool, RepositoryError> {
157 let store = Self::acquire_lock(&self.store).await?;
158 Ok(!store.is_empty())
159 }
160
161 async fn drop_all_entries(&self) -> Result<(), RepositoryError> {
162 let mut store = Self::acquire_lock(&self.store).await?;
163 store.clear();
164 Ok(())
165 }
166}
167
168impl TryFrom<NotificationConfig> for NotificationRepoModel {
169 type Error = ConversionError;
170
171 fn try_from(config: NotificationConfig) -> Result<Self, Self::Error> {
172 let signing_key = config.get_signing_key().map_err(|e| {
173 ConversionError::InvalidConfig(format!("Failed to get signing key: {}", e))
174 })?;
175
176 Ok(NotificationRepoModel {
177 id: config.id.clone(),
178 url: config.url.clone(),
179 notification_type: config.r#type,
180 signing_key,
181 })
182 }
183}
184#[cfg(test)]
185mod tests {
186 use crate::models::NotificationType;
187
188 use super::*;
189
190 fn create_test_notification(id: String) -> NotificationRepoModel {
191 NotificationRepoModel {
192 id: id.clone(),
193 url: "http://localhost".to_string(),
194 notification_type: NotificationType::Webhook,
195 signing_key: None,
196 }
197 }
198
199 #[actix_web::test]
200 async fn test_new_repository_is_empty() {
201 let repo = InMemoryNotificationRepository::new();
202 assert_eq!(repo.count().await.unwrap(), 0);
203 }
204
205 #[actix_web::test]
206 async fn test_add_notification() {
207 let repo = InMemoryNotificationRepository::new();
208 let notification = create_test_notification("test".to_string());
209
210 repo.create(notification.clone()).await.unwrap();
211 assert_eq!(repo.count().await.unwrap(), 1);
212
213 let stored = repo.get_by_id("test".to_string()).await.unwrap();
214 assert_eq!(stored.id, notification.id);
215 }
216
217 #[actix_web::test]
218 async fn test_update_notification() {
219 let repo = InMemoryNotificationRepository::new();
220 let notification = create_test_notification("test".to_string());
221
222 repo.create(notification.clone()).await.unwrap();
224
225 let mut updated_notification = notification.clone();
227 updated_notification.url = "http://updated.example.com".to_string();
228
229 let result = repo
230 .update("test".to_string(), updated_notification.clone())
231 .await;
232 assert!(result.is_ok());
233
234 let updated = result.unwrap();
235 assert_eq!(updated.id, "test");
236 assert_eq!(updated.url, "http://updated.example.com");
237
238 let stored = repo.get_by_id("test".to_string()).await.unwrap();
240 assert_eq!(stored.url, "http://updated.example.com");
241 }
242
243 #[actix_web::test]
244 async fn test_list_notifications() {
245 let repo = InMemoryNotificationRepository::new();
246 let notification1 = create_test_notification("test".to_string());
247 let notification2 = create_test_notification("test2".to_string());
248
249 repo.create(notification1.clone()).await.unwrap();
250 repo.create(notification2).await.unwrap();
251
252 let notifications = repo.list_all().await.unwrap();
253 assert_eq!(notifications.len(), 2);
254 }
255
256 #[actix_web::test]
257 async fn test_update_nonexistent_notification() {
258 let repo = InMemoryNotificationRepository::new();
259 let notification = create_test_notification("test".to_string());
260
261 let result = repo.update("test2".to_string(), notification).await;
262 assert!(matches!(result, Err(RepositoryError::NotFound(_))));
263 }
264
265 #[actix_web::test]
266 async fn test_get_nonexistent_notification() {
267 let repo = InMemoryNotificationRepository::new();
268
269 let result = repo.get_by_id("test".to_string()).await;
270 assert!(matches!(result, Err(RepositoryError::NotFound(_))));
271 }
272
273 #[actix_web::test]
275 async fn test_has_entries() {
276 let repo = InMemoryNotificationRepository::new();
277 assert!(!repo.has_entries().await.unwrap());
278
279 let notification = create_test_notification("test".to_string());
280
281 repo.create(notification.clone()).await.unwrap();
282 assert!(repo.has_entries().await.unwrap());
283 }
284
285 #[actix_web::test]
286 async fn test_drop_all_entries() {
287 let repo = InMemoryNotificationRepository::new();
288 let notification = create_test_notification("test".to_string());
289
290 repo.create(notification.clone()).await.unwrap();
291 assert!(repo.has_entries().await.unwrap());
292
293 repo.drop_all_entries().await.unwrap();
294 assert!(!repo.has_entries().await.unwrap());
295 }
296
297 #[actix_web::test]
298 async fn test_delete_notification() {
299 let repo = InMemoryNotificationRepository::new();
300 let notification = create_test_notification("test".to_string());
301
302 repo.create(notification.clone()).await.unwrap();
304 assert_eq!(repo.count().await.unwrap(), 1);
305
306 let result = repo.delete_by_id("test".to_string()).await;
308 assert!(result.is_ok());
309
310 assert_eq!(repo.count().await.unwrap(), 0);
312 let get_result = repo.get_by_id("test".to_string()).await;
313 assert!(matches!(get_result, Err(RepositoryError::NotFound(_))));
314 }
315
316 #[actix_web::test]
317 async fn test_delete_nonexistent_notification() {
318 let repo = InMemoryNotificationRepository::new();
319
320 let result = repo.delete_by_id("nonexistent".to_string()).await;
321 assert!(matches!(result, Err(RepositoryError::NotFound(_))));
322 }
323
324 #[actix_web::test]
325 async fn test_update_with_id_mismatch() {
326 let repo = InMemoryNotificationRepository::new();
327 let notification = create_test_notification("test".to_string());
328
329 repo.create(notification.clone()).await.unwrap();
331
332 let mut updated_notification = notification.clone();
334 updated_notification.id = "different-id".to_string();
335
336 let result = repo.update("test".to_string(), updated_notification).await;
337 assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
338 }
339
340 #[actix_web::test]
341 async fn test_update_delete_integration() {
342 let repo = InMemoryNotificationRepository::new();
343 let notification1 = create_test_notification("test1".to_string());
344 let notification2 = create_test_notification("test2".to_string());
345
346 repo.create(notification1.clone()).await.unwrap();
348 repo.create(notification2.clone()).await.unwrap();
349 assert_eq!(repo.count().await.unwrap(), 2);
350
351 let mut updated_notification1 = notification1.clone();
353 updated_notification1.url = "http://updated.example.com".to_string();
354
355 let update_result = repo
356 .update("test1".to_string(), updated_notification1)
357 .await;
358 assert!(update_result.is_ok());
359
360 let stored = repo.get_by_id("test1".to_string()).await.unwrap();
362 assert_eq!(stored.url, "http://updated.example.com");
363
364 let delete_result = repo.delete_by_id("test2".to_string()).await;
366 assert!(delete_result.is_ok());
367
368 assert_eq!(repo.count().await.unwrap(), 1);
370 let remaining = repo.list_all().await.unwrap();
371 assert_eq!(remaining.len(), 1);
372 assert_eq!(remaining[0].id, "test1");
373 assert_eq!(remaining[0].url, "http://updated.example.com");
374 }
375}