openzeppelin_relayer/repositories/relayer/
mod.rs

1//! Relayer Repository Module
2//!
3//! This module provides the relayer repository layer for the OpenZeppelin Relayer service.
4//! It implements the Repository pattern to abstract relayer data persistence operations,
5//! supporting both in-memory and Redis-backed storage implementations.
6//!
7//! ## Features
8//!
9//! - **CRUD Operations**: Create, read, update, and delete relayer configurations
10//! - **Status Management**: Enable/disable relayers and track their state
11//! - **Policy Management**: Update relayer network policies
12//! - **Partial Updates**: Support for partial relayer configuration updates
13//! - **Active Filtering**: Query for active (non-paused) relayers
14//! - **Pagination Support**: Efficient paginated listing of relayers
15//!
16//! ## Repository Implementations
17//!
18//! - [`InMemoryRelayerRepository`]: Fast in-memory storage for testing/development
19//! - [`RedisRelayerRepository`]: Redis-backed storage for production environments
20//!
21
22mod relayer_in_memory;
23mod relayer_redis;
24
25pub use relayer_in_memory::*;
26pub use relayer_redis::*;
27
28use crate::{
29    models::UpdateRelayerRequest,
30    models::{PaginationQuery, RelayerNetworkPolicy, RelayerRepoModel, RepositoryError},
31    repositories::{PaginatedResult, Repository},
32};
33use async_trait::async_trait;
34use redis::aio::ConnectionManager;
35use std::sync::Arc;
36
37#[async_trait]
38pub trait RelayerRepository: Repository<RelayerRepoModel, String> + Send + Sync {
39    async fn list_active(&self) -> Result<Vec<RelayerRepoModel>, RepositoryError>;
40    async fn list_by_signer_id(
41        &self,
42        signer_id: &str,
43    ) -> Result<Vec<RelayerRepoModel>, RepositoryError>;
44    async fn list_by_notification_id(
45        &self,
46        notification_id: &str,
47    ) -> Result<Vec<RelayerRepoModel>, RepositoryError>;
48    async fn partial_update(
49        &self,
50        id: String,
51        update: UpdateRelayerRequest,
52    ) -> Result<RelayerRepoModel, RepositoryError>;
53    async fn enable_relayer(&self, relayer_id: String)
54        -> Result<RelayerRepoModel, RepositoryError>;
55    async fn disable_relayer(
56        &self,
57        relayer_id: String,
58    ) -> Result<RelayerRepoModel, RepositoryError>;
59    async fn update_policy(
60        &self,
61        id: String,
62        policy: RelayerNetworkPolicy,
63    ) -> Result<RelayerRepoModel, RepositoryError>;
64}
65
66/// Enum wrapper for different relayer repository implementations
67#[derive(Debug, Clone)]
68pub enum RelayerRepositoryStorage {
69    InMemory(InMemoryRelayerRepository),
70    Redis(RedisRelayerRepository),
71}
72
73impl RelayerRepositoryStorage {
74    pub fn new_in_memory() -> Self {
75        Self::InMemory(InMemoryRelayerRepository::new())
76    }
77
78    pub fn new_redis(
79        connection_manager: Arc<ConnectionManager>,
80        key_prefix: String,
81    ) -> Result<Self, RepositoryError> {
82        Ok(Self::Redis(RedisRelayerRepository::new(
83            connection_manager,
84            key_prefix,
85        )?))
86    }
87}
88
89impl Default for RelayerRepositoryStorage {
90    fn default() -> Self {
91        Self::new_in_memory()
92    }
93}
94
95#[async_trait]
96impl Repository<RelayerRepoModel, String> for RelayerRepositoryStorage {
97    async fn create(&self, entity: RelayerRepoModel) -> Result<RelayerRepoModel, RepositoryError> {
98        match self {
99            RelayerRepositoryStorage::InMemory(repo) => repo.create(entity).await,
100            RelayerRepositoryStorage::Redis(repo) => repo.create(entity).await,
101        }
102    }
103
104    async fn get_by_id(&self, id: String) -> Result<RelayerRepoModel, RepositoryError> {
105        match self {
106            RelayerRepositoryStorage::InMemory(repo) => repo.get_by_id(id).await,
107            RelayerRepositoryStorage::Redis(repo) => repo.get_by_id(id).await,
108        }
109    }
110
111    async fn list_all(&self) -> Result<Vec<RelayerRepoModel>, RepositoryError> {
112        match self {
113            RelayerRepositoryStorage::InMemory(repo) => repo.list_all().await,
114            RelayerRepositoryStorage::Redis(repo) => repo.list_all().await,
115        }
116    }
117
118    async fn list_paginated(
119        &self,
120        query: PaginationQuery,
121    ) -> Result<PaginatedResult<RelayerRepoModel>, RepositoryError> {
122        match self {
123            RelayerRepositoryStorage::InMemory(repo) => repo.list_paginated(query).await,
124            RelayerRepositoryStorage::Redis(repo) => repo.list_paginated(query).await,
125        }
126    }
127
128    async fn update(
129        &self,
130        id: String,
131        entity: RelayerRepoModel,
132    ) -> Result<RelayerRepoModel, RepositoryError> {
133        match self {
134            RelayerRepositoryStorage::InMemory(repo) => repo.update(id, entity).await,
135            RelayerRepositoryStorage::Redis(repo) => repo.update(id, entity).await,
136        }
137    }
138
139    async fn delete_by_id(&self, id: String) -> Result<(), RepositoryError> {
140        match self {
141            RelayerRepositoryStorage::InMemory(repo) => repo.delete_by_id(id).await,
142            RelayerRepositoryStorage::Redis(repo) => repo.delete_by_id(id).await,
143        }
144    }
145
146    async fn count(&self) -> Result<usize, RepositoryError> {
147        match self {
148            RelayerRepositoryStorage::InMemory(repo) => repo.count().await,
149            RelayerRepositoryStorage::Redis(repo) => repo.count().await,
150        }
151    }
152
153    async fn has_entries(&self) -> Result<bool, RepositoryError> {
154        match self {
155            RelayerRepositoryStorage::InMemory(repo) => repo.has_entries().await,
156            RelayerRepositoryStorage::Redis(repo) => repo.has_entries().await,
157        }
158    }
159
160    async fn drop_all_entries(&self) -> Result<(), RepositoryError> {
161        match self {
162            RelayerRepositoryStorage::InMemory(repo) => repo.drop_all_entries().await,
163            RelayerRepositoryStorage::Redis(repo) => repo.drop_all_entries().await,
164        }
165    }
166}
167
168#[async_trait]
169impl RelayerRepository for RelayerRepositoryStorage {
170    async fn list_active(&self) -> Result<Vec<RelayerRepoModel>, RepositoryError> {
171        match self {
172            RelayerRepositoryStorage::InMemory(repo) => repo.list_active().await,
173            RelayerRepositoryStorage::Redis(repo) => repo.list_active().await,
174        }
175    }
176
177    async fn list_by_signer_id(
178        &self,
179        signer_id: &str,
180    ) -> Result<Vec<RelayerRepoModel>, RepositoryError> {
181        match self {
182            RelayerRepositoryStorage::InMemory(repo) => repo.list_by_signer_id(signer_id).await,
183            RelayerRepositoryStorage::Redis(repo) => repo.list_by_signer_id(signer_id).await,
184        }
185    }
186
187    async fn list_by_notification_id(
188        &self,
189        notification_id: &str,
190    ) -> Result<Vec<RelayerRepoModel>, RepositoryError> {
191        match self {
192            RelayerRepositoryStorage::InMemory(repo) => {
193                repo.list_by_notification_id(notification_id).await
194            }
195            RelayerRepositoryStorage::Redis(repo) => {
196                repo.list_by_notification_id(notification_id).await
197            }
198        }
199    }
200
201    async fn partial_update(
202        &self,
203        id: String,
204        update: UpdateRelayerRequest,
205    ) -> Result<RelayerRepoModel, RepositoryError> {
206        match self {
207            RelayerRepositoryStorage::InMemory(repo) => repo.partial_update(id, update).await,
208            RelayerRepositoryStorage::Redis(repo) => repo.partial_update(id, update).await,
209        }
210    }
211
212    async fn enable_relayer(
213        &self,
214        relayer_id: String,
215    ) -> Result<RelayerRepoModel, RepositoryError> {
216        match self {
217            RelayerRepositoryStorage::InMemory(repo) => repo.enable_relayer(relayer_id).await,
218            RelayerRepositoryStorage::Redis(repo) => repo.enable_relayer(relayer_id).await,
219        }
220    }
221
222    async fn disable_relayer(
223        &self,
224        relayer_id: String,
225    ) -> Result<RelayerRepoModel, RepositoryError> {
226        match self {
227            RelayerRepositoryStorage::InMemory(repo) => repo.disable_relayer(relayer_id).await,
228            RelayerRepositoryStorage::Redis(repo) => repo.disable_relayer(relayer_id).await,
229        }
230    }
231
232    async fn update_policy(
233        &self,
234        id: String,
235        policy: RelayerNetworkPolicy,
236    ) -> Result<RelayerRepoModel, RepositoryError> {
237        match self {
238            RelayerRepositoryStorage::InMemory(repo) => repo.update_policy(id, policy).await,
239            RelayerRepositoryStorage::Redis(repo) => repo.update_policy(id, policy).await,
240        }
241    }
242}
243
244#[cfg(test)]
245mod tests {
246    use super::*;
247    use crate::models::{NetworkType, RelayerEvmPolicy, RelayerNetworkPolicy};
248
249    fn create_test_relayer(id: String) -> RelayerRepoModel {
250        RelayerRepoModel {
251            id: id.clone(),
252            name: format!("Relayer {}", id.clone()),
253            network: "TestNet".to_string(),
254            paused: false,
255            network_type: NetworkType::Evm,
256            policies: RelayerNetworkPolicy::Evm(RelayerEvmPolicy {
257                min_balance: Some(0),
258                gas_limit_estimation: Some(true),
259                gas_price_cap: None,
260                whitelist_receivers: None,
261                eip1559_pricing: Some(false),
262                private_transactions: Some(false),
263            }),
264            signer_id: "test".to_string(),
265            address: "0x".to_string(),
266            notification_id: None,
267            system_disabled: false,
268            custom_rpc_urls: None,
269        }
270    }
271
272    #[actix_web::test]
273    async fn test_in_memory_repository_impl() {
274        let impl_repo = RelayerRepositoryStorage::new_in_memory();
275        let relayer = create_test_relayer("test-relayer".to_string());
276
277        // Test create
278        let created = impl_repo.create(relayer.clone()).await.unwrap();
279        assert_eq!(created.id, relayer.id);
280
281        // Test get
282        let retrieved = impl_repo
283            .get_by_id("test-relayer".to_string())
284            .await
285            .unwrap();
286        assert_eq!(retrieved.id, relayer.id);
287
288        // Test list all
289        let all_relayers = impl_repo.list_all().await.unwrap();
290        assert!(!all_relayers.is_empty());
291
292        // Test count
293        let count = impl_repo.count().await.unwrap();
294        assert!(count >= 1);
295
296        // Test update
297        let mut updated_relayer = relayer.clone();
298        updated_relayer.name = "Updated Name".to_string();
299        let updated = impl_repo
300            .update(relayer.id.clone(), updated_relayer)
301            .await
302            .unwrap();
303        assert_eq!(updated.name, "Updated Name");
304
305        // Test delete
306        impl_repo.delete_by_id(relayer.id.clone()).await.unwrap();
307        let get_result = impl_repo.get_by_id("test-relayer".to_string()).await;
308        assert!(get_result.is_err());
309    }
310
311    #[actix_web::test]
312    async fn test_relayer_repository_trait_methods() {
313        let impl_repo = RelayerRepositoryStorage::new_in_memory();
314        let relayer = create_test_relayer("test-relayer".to_string());
315
316        // Create the relayer first
317        impl_repo.create(relayer.clone()).await.unwrap();
318
319        // Test list_active
320        let active_relayers = impl_repo.list_active().await.unwrap();
321        assert!(!active_relayers.is_empty());
322
323        // Test partial_update
324        let update = UpdateRelayerRequest {
325            paused: Some(true),
326            ..Default::default()
327        };
328        let updated = impl_repo
329            .partial_update(relayer.id.clone(), update)
330            .await
331            .unwrap();
332        assert!(updated.paused);
333
334        // Test enable/disable
335        let disabled = impl_repo.disable_relayer(relayer.id.clone()).await.unwrap();
336        assert!(disabled.system_disabled);
337
338        let enabled = impl_repo.enable_relayer(relayer.id.clone()).await.unwrap();
339        assert!(!enabled.system_disabled);
340
341        // Test update_policy
342        let new_policy = RelayerNetworkPolicy::Evm(RelayerEvmPolicy {
343            min_balance: Some(1000000000000000000),
344            gas_limit_estimation: Some(true),
345            gas_price_cap: Some(50_000_000_000),
346            whitelist_receivers: None,
347            eip1559_pricing: Some(true),
348            private_transactions: Some(false),
349        });
350        let policy_updated = impl_repo
351            .update_policy(relayer.id.clone(), new_policy)
352            .await
353            .unwrap();
354
355        if let RelayerNetworkPolicy::Evm(evm_policy) = policy_updated.policies {
356            assert_eq!(evm_policy.gas_price_cap, Some(50_000_000_000));
357            assert_eq!(evm_policy.eip1559_pricing, Some(true));
358        } else {
359            panic!("Expected EVM policy");
360        }
361    }
362
363    #[actix_web::test]
364    async fn test_create_repository_in_memory() {
365        let result = RelayerRepositoryStorage::new_in_memory();
366
367        assert!(matches!(result, RelayerRepositoryStorage::InMemory(_)));
368    }
369
370    #[actix_web::test]
371    async fn test_pagination() {
372        let impl_repo = RelayerRepositoryStorage::new_in_memory();
373        let relayer1 = create_test_relayer("test-relayer-1".to_string());
374        let relayer2 = create_test_relayer("test-relayer-2".to_string());
375
376        impl_repo.create(relayer1).await.unwrap();
377        impl_repo.create(relayer2).await.unwrap();
378
379        let query = PaginationQuery {
380            page: 1,
381            per_page: 10,
382        };
383
384        let result = impl_repo.list_paginated(query).await.unwrap();
385        assert!(result.total >= 2);
386        assert_eq!(result.page, 1);
387        assert_eq!(result.per_page, 10);
388    }
389
390    #[actix_web::test]
391    async fn test_delete_relayer() {
392        let impl_repo = RelayerRepositoryStorage::new_in_memory();
393        let relayer = create_test_relayer("delete-test".to_string());
394
395        // Create relayer
396        impl_repo.create(relayer.clone()).await.unwrap();
397
398        // Delete relayer
399        impl_repo
400            .delete_by_id("delete-test".to_string())
401            .await
402            .unwrap();
403
404        // Verify deletion
405        let get_result = impl_repo.get_by_id("delete-test".to_string()).await;
406        assert!(get_result.is_err());
407        assert!(matches!(
408            get_result.unwrap_err(),
409            RepositoryError::NotFound(_)
410        ));
411
412        // Test deleting non-existent relayer
413        let delete_result = impl_repo.delete_by_id("nonexistent".to_string()).await;
414        assert!(delete_result.is_err());
415    }
416
417    #[actix_web::test]
418    async fn test_has_entries() {
419        let repo = InMemoryRelayerRepository::new();
420        assert!(!repo.has_entries().await.unwrap());
421
422        let relayer = create_test_relayer("test".to_string());
423
424        repo.create(relayer.clone()).await.unwrap();
425        assert!(repo.has_entries().await.unwrap());
426
427        repo.delete_by_id(relayer.id.clone()).await.unwrap();
428        assert!(!repo.has_entries().await.unwrap());
429    }
430
431    #[actix_web::test]
432    async fn test_drop_all_entries() {
433        let repo = InMemoryRelayerRepository::new();
434        let relayer = create_test_relayer("test".to_string());
435
436        repo.create(relayer.clone()).await.unwrap();
437        assert!(repo.has_entries().await.unwrap());
438
439        repo.drop_all_entries().await.unwrap();
440        assert!(!repo.has_entries().await.unwrap());
441    }
442}
443
444#[cfg(test)]
445mockall::mock! {
446    pub RelayerRepository {}
447
448    #[async_trait]
449    impl Repository<RelayerRepoModel, String> for RelayerRepository {
450        async fn create(&self, entity: RelayerRepoModel) -> Result<RelayerRepoModel, RepositoryError>;
451        async fn get_by_id(&self, id: String) -> Result<RelayerRepoModel, RepositoryError>;
452        async fn list_all(&self) -> Result<Vec<RelayerRepoModel>, RepositoryError>;
453        async fn list_paginated(&self, query: PaginationQuery) -> Result<PaginatedResult<RelayerRepoModel>, RepositoryError>;
454        async fn update(&self, id: String, entity: RelayerRepoModel) -> Result<RelayerRepoModel, RepositoryError>;
455        async fn delete_by_id(&self, id: String) -> Result<(), RepositoryError>;
456        async fn count(&self) -> Result<usize, RepositoryError>;
457        async fn has_entries(&self) -> Result<bool, RepositoryError>;
458        async fn drop_all_entries(&self) -> Result<(), RepositoryError>;
459    }
460
461    #[async_trait]
462    impl RelayerRepository for RelayerRepository {
463        async fn list_active(&self) -> Result<Vec<RelayerRepoModel>, RepositoryError>;
464        async fn list_by_signer_id(&self, signer_id: &str) -> Result<Vec<RelayerRepoModel>, RepositoryError>;
465        async fn list_by_notification_id(&self, notification_id: &str) -> Result<Vec<RelayerRepoModel>, RepositoryError>;
466        async fn partial_update(&self, id: String, update: UpdateRelayerRequest) -> Result<RelayerRepoModel, RepositoryError>;
467        async fn enable_relayer(&self, relayer_id: String) -> Result<RelayerRepoModel, RepositoryError>;
468        async fn disable_relayer(&self, relayer_id: String) -> Result<RelayerRepoModel, RepositoryError>;
469        async fn update_policy(&self, id: String, policy: RelayerNetworkPolicy) -> Result<RelayerRepoModel, RepositoryError>;
470    }
471}