openzeppelin_relayer/repositories/plugin/
plugin_redis.rs

1//! Redis-backed implementation of the PluginRepository.
2
3use crate::models::{PaginationQuery, PluginModel, RepositoryError};
4use crate::repositories::redis_base::RedisRepository;
5use crate::repositories::{BatchRetrievalResult, PaginatedResult, PluginRepositoryTrait};
6use async_trait::async_trait;
7use log::{debug, error, warn};
8use redis::aio::ConnectionManager;
9use redis::AsyncCommands;
10use std::fmt;
11use std::sync::Arc;
12
13const PLUGIN_PREFIX: &str = "plugin";
14const PLUGIN_LIST_KEY: &str = "plugin_list";
15
16#[derive(Clone)]
17pub struct RedisPluginRepository {
18    pub client: Arc<ConnectionManager>,
19    pub key_prefix: String,
20}
21
22impl RedisRepository for RedisPluginRepository {}
23
24impl RedisPluginRepository {
25    pub fn new(
26        connection_manager: Arc<ConnectionManager>,
27        key_prefix: String,
28    ) -> Result<Self, RepositoryError> {
29        if key_prefix.is_empty() {
30            return Err(RepositoryError::InvalidData(
31                "Redis key prefix cannot be empty".to_string(),
32            ));
33        }
34
35        Ok(Self {
36            client: connection_manager,
37            key_prefix,
38        })
39    }
40
41    /// Generate key for plugin data: plugin:{plugin_id}
42    fn plugin_key(&self, plugin_id: &str) -> String {
43        format!("{}:{}:{}", self.key_prefix, PLUGIN_PREFIX, plugin_id)
44    }
45
46    /// Generate key for plugin list: plugin_list (paginated list of plugin IDs)
47    fn plugin_list_key(&self) -> String {
48        format!("{}:{}", self.key_prefix, PLUGIN_LIST_KEY)
49    }
50
51    /// Get plugin by ID using an existing connection.
52    /// This method is useful to prevent creating new connections for
53    /// getting individual plugins on list operations.
54    ///
55    /// # Arguments
56    ///
57    /// * `id` - The ID of the plugin to get.
58    /// * `conn` - The connection to use.
59    async fn get_by_id_with_connection(
60        &self,
61        id: &str,
62        conn: &mut ConnectionManager,
63    ) -> Result<Option<PluginModel>, RepositoryError> {
64        if id.is_empty() {
65            return Err(RepositoryError::InvalidData(
66                "Plugin ID cannot be empty".to_string(),
67            ));
68        }
69        let key = self.plugin_key(id);
70
71        debug!("Fetching plugin data for ID: {}", id);
72
73        let json: Option<String> = conn
74            .get(&key)
75            .await
76            .map_err(|e| self.map_redis_error(e, &format!("get_plugin_by_id_{}", id)))?;
77
78        match json {
79            Some(json) => {
80                debug!("Found plugin data for ID: {}", id);
81                let plugin = self.deserialize_entity::<PluginModel>(&json, id, "plugin")?;
82                Ok(Some(plugin))
83            }
84            None => {
85                debug!("No plugin found for ID: {}", id);
86                Ok(None)
87            }
88        }
89    }
90
91    async fn get_by_ids(
92        &self,
93        ids: &[String],
94    ) -> Result<BatchRetrievalResult<PluginModel>, RepositoryError> {
95        if ids.is_empty() {
96            debug!("No plugin IDs provided for batch fetch");
97            return Ok(BatchRetrievalResult {
98                results: vec![],
99                failed_ids: vec![],
100            });
101        }
102
103        let mut conn = self.client.as_ref().clone();
104        let keys: Vec<String> = ids.iter().map(|id| self.plugin_key(id)).collect();
105
106        let values: Vec<Option<String>> = conn
107            .mget(&keys)
108            .await
109            .map_err(|e| self.map_redis_error(e, "batch_fetch_plugins"))?;
110
111        let mut plugins = Vec::new();
112        let mut failed_count = 0;
113        let mut failed_ids = Vec::new();
114        for (i, value) in values.into_iter().enumerate() {
115            match value {
116                Some(json) => match self.deserialize_entity(&json, &ids[i], "plugin") {
117                    Ok(plugin) => plugins.push(plugin),
118                    Err(e) => {
119                        failed_count += 1;
120                        error!("Failed to deserialize plugin {}: {}", ids[i], e);
121                        failed_ids.push(ids[i].clone());
122                    }
123                },
124                None => {
125                    warn!("Plugin {} not found in batch fetch", ids[i]);
126                }
127            }
128        }
129
130        if failed_count > 0 {
131            warn!(
132                "Failed to deserialize {} out of {} plugins in batch",
133                failed_count,
134                ids.len()
135            );
136        }
137
138        Ok(BatchRetrievalResult {
139            results: plugins,
140            failed_ids,
141        })
142    }
143}
144
145impl fmt::Debug for RedisPluginRepository {
146    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
147        f.debug_struct("RedisPluginRepository")
148            .field("client", &"<ConnectionManager>")
149            .field("key_prefix", &self.key_prefix)
150            .finish()
151    }
152}
153
154#[async_trait]
155impl PluginRepositoryTrait for RedisPluginRepository {
156    async fn get_by_id(&self, id: &str) -> Result<Option<PluginModel>, RepositoryError> {
157        let mut conn = self.client.as_ref().clone();
158        self.get_by_id_with_connection(id, &mut conn).await
159    }
160
161    async fn add(&self, plugin: PluginModel) -> Result<(), RepositoryError> {
162        if plugin.id.is_empty() {
163            return Err(RepositoryError::InvalidData(
164                "Plugin ID cannot be empty".to_string(),
165            ));
166        }
167
168        if plugin.path.is_empty() {
169            return Err(RepositoryError::InvalidData(
170                "Plugin path cannot be empty".to_string(),
171            ));
172        }
173
174        let mut conn = self.client.as_ref().clone();
175        let key = self.plugin_key(&plugin.id);
176        let list_key = self.plugin_list_key();
177
178        debug!("Adding plugin with ID: {}", plugin.id);
179
180        // Check if plugin already exists
181        let exists: bool = conn
182            .exists(&key)
183            .await
184            .map_err(|e| self.map_redis_error(e, &format!("check_plugin_exists_{}", plugin.id)))?;
185
186        if exists {
187            return Err(RepositoryError::ConstraintViolation(format!(
188                "Plugin with ID {} already exists",
189                plugin.id
190            )));
191        }
192
193        // Serialize plugin
194        let json = self.serialize_entity(&plugin, |p| &p.id, "plugin")?;
195
196        // Use a pipeline to ensure atomicity
197        let mut pipe = redis::pipe();
198        pipe.atomic();
199        pipe.set(&key, &json);
200        pipe.sadd(&list_key, &plugin.id);
201
202        pipe.exec_async(&mut conn).await.map_err(|e| {
203            error!("Failed to add plugin {}: {}", plugin.id, e);
204            self.map_redis_error(e, &format!("add_plugin_{}", plugin.id))
205        })?;
206
207        debug!("Successfully added plugin with ID: {}", plugin.id);
208        Ok(())
209    }
210
211    async fn list_paginated(
212        &self,
213        query: PaginationQuery,
214    ) -> Result<PaginatedResult<PluginModel>, RepositoryError> {
215        if query.page == 0 {
216            return Err(RepositoryError::InvalidData(
217                "Page number must be greater than 0".to_string(),
218            ));
219        }
220
221        if query.per_page == 0 {
222            return Err(RepositoryError::InvalidData(
223                "Per page count must be greater than 0".to_string(),
224            ));
225        }
226
227        let mut conn = self.client.as_ref().clone();
228        let plugin_list_key = self.plugin_list_key();
229
230        // Get total count
231        let total: u64 = conn
232            .scard(&plugin_list_key)
233            .await
234            .map_err(|e| self.map_redis_error(e, "list_paginated_count"))?;
235
236        if total == 0 {
237            return Ok(PaginatedResult {
238                items: vec![],
239                total: 0,
240                page: query.page,
241                per_page: query.per_page,
242            });
243        }
244
245        // Get all IDs and paginate in memory
246        let all_ids: Vec<String> = conn
247            .smembers(&plugin_list_key)
248            .await
249            .map_err(|e| self.map_redis_error(e, "list_paginated_members"))?;
250
251        let start = ((query.page - 1) * query.per_page) as usize;
252        let end = (start + query.per_page as usize).min(all_ids.len());
253
254        let ids_to_query = &all_ids[start..end];
255        let items = self.get_by_ids(ids_to_query).await?;
256
257        Ok(PaginatedResult {
258            items: items.results.clone(),
259            total,
260            page: query.page,
261            per_page: query.per_page,
262        })
263    }
264
265    async fn count(&self) -> Result<usize, RepositoryError> {
266        let mut conn = self.client.as_ref().clone();
267        let plugin_list_key = self.plugin_list_key();
268
269        let count: u64 = conn
270            .scard(&plugin_list_key)
271            .await
272            .map_err(|e| self.map_redis_error(e, "count_plugins"))?;
273
274        Ok(count as usize)
275    }
276
277    async fn has_entries(&self) -> Result<bool, RepositoryError> {
278        let mut conn = self.client.as_ref().clone();
279        let plugin_list_key = self.plugin_list_key();
280
281        debug!("Checking if plugin entries exist");
282
283        let exists: bool = conn
284            .exists(&plugin_list_key)
285            .await
286            .map_err(|e| self.map_redis_error(e, "has_entries_check"))?;
287
288        debug!("Plugin entries exist: {}", exists);
289        Ok(exists)
290    }
291
292    async fn drop_all_entries(&self) -> Result<(), RepositoryError> {
293        let mut conn = self.client.as_ref().clone();
294        let plugin_list_key = self.plugin_list_key();
295
296        debug!("Dropping all plugin entries");
297
298        // Get all plugin IDs first
299        let plugin_ids: Vec<String> = conn
300            .smembers(&plugin_list_key)
301            .await
302            .map_err(|e| self.map_redis_error(e, "drop_all_entries_get_ids"))?;
303
304        if plugin_ids.is_empty() {
305            debug!("No plugin entries to drop");
306            return Ok(());
307        }
308
309        // Use pipeline for atomic operations
310        let mut pipe = redis::pipe();
311        pipe.atomic();
312
313        // Delete all individual plugin entries
314        for plugin_id in &plugin_ids {
315            let plugin_key = self.plugin_key(plugin_id);
316            pipe.del(&plugin_key);
317        }
318
319        // Delete the plugin list key
320        pipe.del(&plugin_list_key);
321
322        pipe.exec_async(&mut conn)
323            .await
324            .map_err(|e| self.map_redis_error(e, "drop_all_entries_pipeline"))?;
325
326        debug!("Dropped {} plugin entries", plugin_ids.len());
327        Ok(())
328    }
329}
330
331#[cfg(test)]
332mod tests {
333    use super::*;
334    use crate::constants::DEFAULT_PLUGIN_TIMEOUT_SECONDS;
335    use crate::models::PluginModel;
336    use std::{sync::Arc, time::Duration};
337
338    fn create_test_plugin(id: &str, path: &str) -> PluginModel {
339        PluginModel {
340            id: id.to_string(),
341            path: path.to_string(),
342            timeout: Duration::from_secs(DEFAULT_PLUGIN_TIMEOUT_SECONDS),
343        }
344    }
345
346    async fn setup_test_repo() -> RedisPluginRepository {
347        let redis_url =
348            std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379/".to_string());
349        let client = redis::Client::open(redis_url).expect("Failed to create Redis client");
350        let mut connection_manager = ConnectionManager::new(client)
351            .await
352            .expect("Failed to create Redis connection manager");
353
354        // Clear the plugin lists
355        connection_manager
356            .del::<&str, ()>("test_plugin:plugin_list")
357            .await
358            .unwrap();
359
360        RedisPluginRepository::new(Arc::new(connection_manager), "test_plugin".to_string())
361            .expect("Failed to create Redis plugin repository")
362    }
363
364    #[tokio::test]
365    #[ignore = "Requires active Redis instance"]
366    async fn test_new_repository_creation() {
367        let repo = setup_test_repo().await;
368        assert_eq!(repo.key_prefix, "test_plugin");
369    }
370
371    #[tokio::test]
372    #[ignore = "Requires active Redis instance"]
373    async fn test_new_repository_empty_prefix_fails() {
374        let client =
375            redis::Client::open("redis://127.0.0.1:6379/").expect("Failed to create Redis client");
376        let connection_manager = redis::aio::ConnectionManager::new(client)
377            .await
378            .expect("Failed to create Redis connection manager");
379
380        let result = RedisPluginRepository::new(Arc::new(connection_manager), "".to_string());
381        assert!(result.is_err());
382        assert!(result
383            .unwrap_err()
384            .to_string()
385            .contains("key prefix cannot be empty"));
386    }
387
388    #[tokio::test]
389    #[ignore = "Requires active Redis instance"]
390    async fn test_key_generation() {
391        let repo = setup_test_repo().await;
392
393        let plugin_key = repo.plugin_key("test-plugin");
394        assert_eq!(plugin_key, "test_plugin:plugin:test-plugin");
395
396        let list_key = repo.plugin_list_key();
397        assert_eq!(list_key, "test_plugin:plugin_list");
398    }
399
400    #[tokio::test]
401    #[ignore = "Requires active Redis instance"]
402    async fn test_serialize_deserialize_plugin() {
403        let repo = setup_test_repo().await;
404        let plugin = create_test_plugin("test-plugin", "/path/to/plugin");
405
406        let json = repo.serialize_entity(&plugin, |p| &p.id, "plugin").unwrap();
407        let deserialized: PluginModel = repo
408            .deserialize_entity(&json, &plugin.id, "plugin")
409            .unwrap();
410
411        assert_eq!(plugin.id, deserialized.id);
412        assert_eq!(plugin.path, deserialized.path);
413    }
414
415    #[tokio::test]
416    #[ignore = "Requires active Redis instance"]
417    async fn test_add_plugin() {
418        let repo = setup_test_repo().await;
419        let plugin_name = uuid::Uuid::new_v4().to_string();
420        let plugin = create_test_plugin(&plugin_name, "/path/to/plugin");
421
422        let result = repo.add(plugin).await;
423        assert!(result.is_ok());
424    }
425
426    #[tokio::test]
427    #[ignore = "Requires active Redis instance"]
428    async fn test_get_plugin() {
429        let repo = setup_test_repo().await;
430        let plugin_name = uuid::Uuid::new_v4().to_string();
431        let plugin = create_test_plugin(&plugin_name, "/path/to/plugin");
432
433        // Add the plugin first
434        repo.add(plugin.clone()).await.unwrap();
435
436        // Get the plugin
437        let retrieved = repo.get_by_id(&plugin_name).await.unwrap();
438        assert!(retrieved.is_some());
439        let retrieved = retrieved.unwrap();
440        assert_eq!(retrieved.id, plugin.id);
441        assert_eq!(retrieved.path, plugin.path);
442    }
443
444    #[tokio::test]
445    #[ignore = "Requires active Redis instance"]
446    async fn test_get_nonexistent_plugin() {
447        let repo = setup_test_repo().await;
448
449        let result = repo.get_by_id("nonexistent-plugin").await;
450        assert!(matches!(result, Ok(None)));
451    }
452
453    #[tokio::test]
454    #[ignore = "Requires active Redis instance"]
455    async fn test_duplicate_plugin_addition() {
456        let repo = setup_test_repo().await;
457        let plugin_name = uuid::Uuid::new_v4().to_string();
458        let plugin = create_test_plugin(&plugin_name, "/path/to/plugin");
459
460        // Add the plugin first time
461        repo.add(plugin.clone()).await.unwrap();
462
463        // Try to add the same plugin again
464        let result = repo.add(plugin).await;
465        assert!(result.is_err());
466
467        if let Err(RepositoryError::ConstraintViolation(msg)) = result {
468            assert!(msg.contains("already exists"));
469        } else {
470            panic!("Expected ConstraintViolation error");
471        }
472    }
473
474    #[tokio::test]
475    #[ignore = "Requires active Redis instance"]
476    async fn test_debug_implementation() {
477        let repo = setup_test_repo().await;
478        let debug_str = format!("{:?}", repo);
479        assert!(debug_str.contains("RedisPluginRepository"));
480        assert!(debug_str.contains("test_plugin"));
481    }
482
483    #[tokio::test]
484    #[ignore = "Requires active Redis instance"]
485    async fn test_error_handling_empty_id() {
486        let repo = setup_test_repo().await;
487
488        let result = repo.get_by_id("").await;
489        assert!(result.is_err());
490        assert!(result
491            .unwrap_err()
492            .to_string()
493            .contains("ID cannot be empty"));
494    }
495
496    #[tokio::test]
497    #[ignore = "Requires active Redis instance"]
498    async fn test_add_plugin_with_empty_id() {
499        let repo = setup_test_repo().await;
500        let plugin = create_test_plugin("", "/path/to/plugin");
501
502        let result = repo.add(plugin).await;
503        assert!(result.is_err());
504        assert!(result
505            .unwrap_err()
506            .to_string()
507            .contains("ID cannot be empty"));
508    }
509
510    #[tokio::test]
511    #[ignore = "Requires active Redis instance"]
512    async fn test_add_plugin_with_empty_path() {
513        let repo = setup_test_repo().await;
514        let plugin = create_test_plugin("test-plugin", "");
515
516        let result = repo.add(plugin).await;
517        assert!(result.is_err());
518        assert!(result
519            .unwrap_err()
520            .to_string()
521            .contains("path cannot be empty"));
522    }
523
524    #[tokio::test]
525    #[ignore = "Requires active Redis instance"]
526    async fn test_get_by_ids_plugins() {
527        let repo = setup_test_repo().await;
528        let plugin_name1 = uuid::Uuid::new_v4().to_string();
529        let plugin_name2 = uuid::Uuid::new_v4().to_string();
530        let plugin1 = create_test_plugin(&plugin_name1, "/path/to/plugin1");
531        let plugin2 = create_test_plugin(&plugin_name2, "/path/to/plugin2");
532
533        repo.add(plugin1.clone()).await.unwrap();
534        repo.add(plugin2.clone()).await.unwrap();
535
536        let retrieved = repo
537            .get_by_ids(&[plugin1.id.clone(), plugin2.id.clone()])
538            .await
539            .unwrap();
540        assert!(retrieved.results.len() == 2);
541        assert_eq!(retrieved.results[0].id, plugin2.id);
542        assert_eq!(retrieved.results[1].id, plugin1.id);
543        assert_eq!(retrieved.failed_ids.len(), 0);
544    }
545
546    #[tokio::test]
547    #[ignore = "Requires active Redis instance"]
548    async fn test_list_paginated_plugins() {
549        let repo = setup_test_repo().await;
550
551        let plugin_id1 = uuid::Uuid::new_v4().to_string();
552        let plugin_id2 = uuid::Uuid::new_v4().to_string();
553        let plugin_id3 = uuid::Uuid::new_v4().to_string();
554        let plugin1 = create_test_plugin(&plugin_id1, "/path/to/plugin1");
555        let plugin2 = create_test_plugin(&plugin_id2, "/path/to/plugin2");
556        let plugin3 = create_test_plugin(&plugin_id3, "/path/to/plugin3");
557
558        repo.add(plugin1.clone()).await.unwrap();
559        repo.add(plugin2.clone()).await.unwrap();
560        repo.add(plugin3.clone()).await.unwrap();
561
562        let query = PaginationQuery {
563            page: 1,
564            per_page: 2,
565        };
566
567        let result = repo.list_paginated(query).await;
568        assert!(result.is_ok());
569        let result = result.unwrap();
570        assert!(result.items.len() == 2);
571    }
572
573    #[tokio::test]
574    #[ignore = "Requires active Redis instance"]
575    async fn test_has_entries() {
576        let repo = setup_test_repo().await;
577        assert!(!repo.has_entries().await.unwrap());
578        repo.add(create_test_plugin("test-plugin", "/path/to/plugin"))
579            .await
580            .unwrap();
581        assert!(repo.has_entries().await.unwrap());
582        repo.drop_all_entries().await.unwrap();
583        assert!(!repo.has_entries().await.unwrap());
584    }
585
586    #[tokio::test]
587    #[ignore = "Requires active Redis instance"]
588    async fn test_drop_all_entries() {
589        let repo = setup_test_repo().await;
590        repo.add(create_test_plugin("test-plugin", "/path/to/plugin"))
591            .await
592            .unwrap();
593        assert!(repo.has_entries().await.unwrap());
594        repo.drop_all_entries().await.unwrap();
595        assert!(!repo.has_entries().await.unwrap());
596    }
597}