openzeppelin_relayer/repositories/plugin/
plugin_redis.rs1use crate::models::{PaginationQuery, PluginModel, RepositoryError};
4use crate::repositories::redis_base::RedisRepository;
5use crate::repositories::{BatchRetrievalResult, PaginatedResult, PluginRepositoryTrait};
6use async_trait::async_trait;
7use log::{debug, error, warn};
8use redis::aio::ConnectionManager;
9use redis::AsyncCommands;
10use std::fmt;
11use std::sync::Arc;
12
13const PLUGIN_PREFIX: &str = "plugin";
14const PLUGIN_LIST_KEY: &str = "plugin_list";
15
16#[derive(Clone)]
17pub struct RedisPluginRepository {
18 pub client: Arc<ConnectionManager>,
19 pub key_prefix: String,
20}
21
22impl RedisRepository for RedisPluginRepository {}
23
24impl RedisPluginRepository {
25 pub fn new(
26 connection_manager: Arc<ConnectionManager>,
27 key_prefix: String,
28 ) -> Result<Self, RepositoryError> {
29 if key_prefix.is_empty() {
30 return Err(RepositoryError::InvalidData(
31 "Redis key prefix cannot be empty".to_string(),
32 ));
33 }
34
35 Ok(Self {
36 client: connection_manager,
37 key_prefix,
38 })
39 }
40
41 fn plugin_key(&self, plugin_id: &str) -> String {
43 format!("{}:{}:{}", self.key_prefix, PLUGIN_PREFIX, plugin_id)
44 }
45
46 fn plugin_list_key(&self) -> String {
48 format!("{}:{}", self.key_prefix, PLUGIN_LIST_KEY)
49 }
50
51 async fn get_by_id_with_connection(
60 &self,
61 id: &str,
62 conn: &mut ConnectionManager,
63 ) -> Result<Option<PluginModel>, RepositoryError> {
64 if id.is_empty() {
65 return Err(RepositoryError::InvalidData(
66 "Plugin ID cannot be empty".to_string(),
67 ));
68 }
69 let key = self.plugin_key(id);
70
71 debug!("Fetching plugin data for ID: {}", id);
72
73 let json: Option<String> = conn
74 .get(&key)
75 .await
76 .map_err(|e| self.map_redis_error(e, &format!("get_plugin_by_id_{}", id)))?;
77
78 match json {
79 Some(json) => {
80 debug!("Found plugin data for ID: {}", id);
81 let plugin = self.deserialize_entity::<PluginModel>(&json, id, "plugin")?;
82 Ok(Some(plugin))
83 }
84 None => {
85 debug!("No plugin found for ID: {}", id);
86 Ok(None)
87 }
88 }
89 }
90
91 async fn get_by_ids(
92 &self,
93 ids: &[String],
94 ) -> Result<BatchRetrievalResult<PluginModel>, RepositoryError> {
95 if ids.is_empty() {
96 debug!("No plugin IDs provided for batch fetch");
97 return Ok(BatchRetrievalResult {
98 results: vec![],
99 failed_ids: vec![],
100 });
101 }
102
103 let mut conn = self.client.as_ref().clone();
104 let keys: Vec<String> = ids.iter().map(|id| self.plugin_key(id)).collect();
105
106 let values: Vec<Option<String>> = conn
107 .mget(&keys)
108 .await
109 .map_err(|e| self.map_redis_error(e, "batch_fetch_plugins"))?;
110
111 let mut plugins = Vec::new();
112 let mut failed_count = 0;
113 let mut failed_ids = Vec::new();
114 for (i, value) in values.into_iter().enumerate() {
115 match value {
116 Some(json) => match self.deserialize_entity(&json, &ids[i], "plugin") {
117 Ok(plugin) => plugins.push(plugin),
118 Err(e) => {
119 failed_count += 1;
120 error!("Failed to deserialize plugin {}: {}", ids[i], e);
121 failed_ids.push(ids[i].clone());
122 }
123 },
124 None => {
125 warn!("Plugin {} not found in batch fetch", ids[i]);
126 }
127 }
128 }
129
130 if failed_count > 0 {
131 warn!(
132 "Failed to deserialize {} out of {} plugins in batch",
133 failed_count,
134 ids.len()
135 );
136 }
137
138 Ok(BatchRetrievalResult {
139 results: plugins,
140 failed_ids,
141 })
142 }
143}
144
145impl fmt::Debug for RedisPluginRepository {
146 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
147 f.debug_struct("RedisPluginRepository")
148 .field("client", &"<ConnectionManager>")
149 .field("key_prefix", &self.key_prefix)
150 .finish()
151 }
152}
153
154#[async_trait]
155impl PluginRepositoryTrait for RedisPluginRepository {
156 async fn get_by_id(&self, id: &str) -> Result<Option<PluginModel>, RepositoryError> {
157 let mut conn = self.client.as_ref().clone();
158 self.get_by_id_with_connection(id, &mut conn).await
159 }
160
161 async fn add(&self, plugin: PluginModel) -> Result<(), RepositoryError> {
162 if plugin.id.is_empty() {
163 return Err(RepositoryError::InvalidData(
164 "Plugin ID cannot be empty".to_string(),
165 ));
166 }
167
168 if plugin.path.is_empty() {
169 return Err(RepositoryError::InvalidData(
170 "Plugin path cannot be empty".to_string(),
171 ));
172 }
173
174 let mut conn = self.client.as_ref().clone();
175 let key = self.plugin_key(&plugin.id);
176 let list_key = self.plugin_list_key();
177
178 debug!("Adding plugin with ID: {}", plugin.id);
179
180 let exists: bool = conn
182 .exists(&key)
183 .await
184 .map_err(|e| self.map_redis_error(e, &format!("check_plugin_exists_{}", plugin.id)))?;
185
186 if exists {
187 return Err(RepositoryError::ConstraintViolation(format!(
188 "Plugin with ID {} already exists",
189 plugin.id
190 )));
191 }
192
193 let json = self.serialize_entity(&plugin, |p| &p.id, "plugin")?;
195
196 let mut pipe = redis::pipe();
198 pipe.atomic();
199 pipe.set(&key, &json);
200 pipe.sadd(&list_key, &plugin.id);
201
202 pipe.exec_async(&mut conn).await.map_err(|e| {
203 error!("Failed to add plugin {}: {}", plugin.id, e);
204 self.map_redis_error(e, &format!("add_plugin_{}", plugin.id))
205 })?;
206
207 debug!("Successfully added plugin with ID: {}", plugin.id);
208 Ok(())
209 }
210
211 async fn list_paginated(
212 &self,
213 query: PaginationQuery,
214 ) -> Result<PaginatedResult<PluginModel>, RepositoryError> {
215 if query.page == 0 {
216 return Err(RepositoryError::InvalidData(
217 "Page number must be greater than 0".to_string(),
218 ));
219 }
220
221 if query.per_page == 0 {
222 return Err(RepositoryError::InvalidData(
223 "Per page count must be greater than 0".to_string(),
224 ));
225 }
226
227 let mut conn = self.client.as_ref().clone();
228 let plugin_list_key = self.plugin_list_key();
229
230 let total: u64 = conn
232 .scard(&plugin_list_key)
233 .await
234 .map_err(|e| self.map_redis_error(e, "list_paginated_count"))?;
235
236 if total == 0 {
237 return Ok(PaginatedResult {
238 items: vec![],
239 total: 0,
240 page: query.page,
241 per_page: query.per_page,
242 });
243 }
244
245 let all_ids: Vec<String> = conn
247 .smembers(&plugin_list_key)
248 .await
249 .map_err(|e| self.map_redis_error(e, "list_paginated_members"))?;
250
251 let start = ((query.page - 1) * query.per_page) as usize;
252 let end = (start + query.per_page as usize).min(all_ids.len());
253
254 let ids_to_query = &all_ids[start..end];
255 let items = self.get_by_ids(ids_to_query).await?;
256
257 Ok(PaginatedResult {
258 items: items.results.clone(),
259 total,
260 page: query.page,
261 per_page: query.per_page,
262 })
263 }
264
265 async fn count(&self) -> Result<usize, RepositoryError> {
266 let mut conn = self.client.as_ref().clone();
267 let plugin_list_key = self.plugin_list_key();
268
269 let count: u64 = conn
270 .scard(&plugin_list_key)
271 .await
272 .map_err(|e| self.map_redis_error(e, "count_plugins"))?;
273
274 Ok(count as usize)
275 }
276
277 async fn has_entries(&self) -> Result<bool, RepositoryError> {
278 let mut conn = self.client.as_ref().clone();
279 let plugin_list_key = self.plugin_list_key();
280
281 debug!("Checking if plugin entries exist");
282
283 let exists: bool = conn
284 .exists(&plugin_list_key)
285 .await
286 .map_err(|e| self.map_redis_error(e, "has_entries_check"))?;
287
288 debug!("Plugin entries exist: {}", exists);
289 Ok(exists)
290 }
291
292 async fn drop_all_entries(&self) -> Result<(), RepositoryError> {
293 let mut conn = self.client.as_ref().clone();
294 let plugin_list_key = self.plugin_list_key();
295
296 debug!("Dropping all plugin entries");
297
298 let plugin_ids: Vec<String> = conn
300 .smembers(&plugin_list_key)
301 .await
302 .map_err(|e| self.map_redis_error(e, "drop_all_entries_get_ids"))?;
303
304 if plugin_ids.is_empty() {
305 debug!("No plugin entries to drop");
306 return Ok(());
307 }
308
309 let mut pipe = redis::pipe();
311 pipe.atomic();
312
313 for plugin_id in &plugin_ids {
315 let plugin_key = self.plugin_key(plugin_id);
316 pipe.del(&plugin_key);
317 }
318
319 pipe.del(&plugin_list_key);
321
322 pipe.exec_async(&mut conn)
323 .await
324 .map_err(|e| self.map_redis_error(e, "drop_all_entries_pipeline"))?;
325
326 debug!("Dropped {} plugin entries", plugin_ids.len());
327 Ok(())
328 }
329}
330
331#[cfg(test)]
332mod tests {
333 use super::*;
334 use crate::constants::DEFAULT_PLUGIN_TIMEOUT_SECONDS;
335 use crate::models::PluginModel;
336 use std::{sync::Arc, time::Duration};
337
338 fn create_test_plugin(id: &str, path: &str) -> PluginModel {
339 PluginModel {
340 id: id.to_string(),
341 path: path.to_string(),
342 timeout: Duration::from_secs(DEFAULT_PLUGIN_TIMEOUT_SECONDS),
343 }
344 }
345
346 async fn setup_test_repo() -> RedisPluginRepository {
347 let redis_url =
348 std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379/".to_string());
349 let client = redis::Client::open(redis_url).expect("Failed to create Redis client");
350 let mut connection_manager = ConnectionManager::new(client)
351 .await
352 .expect("Failed to create Redis connection manager");
353
354 connection_manager
356 .del::<&str, ()>("test_plugin:plugin_list")
357 .await
358 .unwrap();
359
360 RedisPluginRepository::new(Arc::new(connection_manager), "test_plugin".to_string())
361 .expect("Failed to create Redis plugin repository")
362 }
363
364 #[tokio::test]
365 #[ignore = "Requires active Redis instance"]
366 async fn test_new_repository_creation() {
367 let repo = setup_test_repo().await;
368 assert_eq!(repo.key_prefix, "test_plugin");
369 }
370
371 #[tokio::test]
372 #[ignore = "Requires active Redis instance"]
373 async fn test_new_repository_empty_prefix_fails() {
374 let client =
375 redis::Client::open("redis://127.0.0.1:6379/").expect("Failed to create Redis client");
376 let connection_manager = redis::aio::ConnectionManager::new(client)
377 .await
378 .expect("Failed to create Redis connection manager");
379
380 let result = RedisPluginRepository::new(Arc::new(connection_manager), "".to_string());
381 assert!(result.is_err());
382 assert!(result
383 .unwrap_err()
384 .to_string()
385 .contains("key prefix cannot be empty"));
386 }
387
388 #[tokio::test]
389 #[ignore = "Requires active Redis instance"]
390 async fn test_key_generation() {
391 let repo = setup_test_repo().await;
392
393 let plugin_key = repo.plugin_key("test-plugin");
394 assert_eq!(plugin_key, "test_plugin:plugin:test-plugin");
395
396 let list_key = repo.plugin_list_key();
397 assert_eq!(list_key, "test_plugin:plugin_list");
398 }
399
400 #[tokio::test]
401 #[ignore = "Requires active Redis instance"]
402 async fn test_serialize_deserialize_plugin() {
403 let repo = setup_test_repo().await;
404 let plugin = create_test_plugin("test-plugin", "/path/to/plugin");
405
406 let json = repo.serialize_entity(&plugin, |p| &p.id, "plugin").unwrap();
407 let deserialized: PluginModel = repo
408 .deserialize_entity(&json, &plugin.id, "plugin")
409 .unwrap();
410
411 assert_eq!(plugin.id, deserialized.id);
412 assert_eq!(plugin.path, deserialized.path);
413 }
414
415 #[tokio::test]
416 #[ignore = "Requires active Redis instance"]
417 async fn test_add_plugin() {
418 let repo = setup_test_repo().await;
419 let plugin_name = uuid::Uuid::new_v4().to_string();
420 let plugin = create_test_plugin(&plugin_name, "/path/to/plugin");
421
422 let result = repo.add(plugin).await;
423 assert!(result.is_ok());
424 }
425
426 #[tokio::test]
427 #[ignore = "Requires active Redis instance"]
428 async fn test_get_plugin() {
429 let repo = setup_test_repo().await;
430 let plugin_name = uuid::Uuid::new_v4().to_string();
431 let plugin = create_test_plugin(&plugin_name, "/path/to/plugin");
432
433 repo.add(plugin.clone()).await.unwrap();
435
436 let retrieved = repo.get_by_id(&plugin_name).await.unwrap();
438 assert!(retrieved.is_some());
439 let retrieved = retrieved.unwrap();
440 assert_eq!(retrieved.id, plugin.id);
441 assert_eq!(retrieved.path, plugin.path);
442 }
443
444 #[tokio::test]
445 #[ignore = "Requires active Redis instance"]
446 async fn test_get_nonexistent_plugin() {
447 let repo = setup_test_repo().await;
448
449 let result = repo.get_by_id("nonexistent-plugin").await;
450 assert!(matches!(result, Ok(None)));
451 }
452
453 #[tokio::test]
454 #[ignore = "Requires active Redis instance"]
455 async fn test_duplicate_plugin_addition() {
456 let repo = setup_test_repo().await;
457 let plugin_name = uuid::Uuid::new_v4().to_string();
458 let plugin = create_test_plugin(&plugin_name, "/path/to/plugin");
459
460 repo.add(plugin.clone()).await.unwrap();
462
463 let result = repo.add(plugin).await;
465 assert!(result.is_err());
466
467 if let Err(RepositoryError::ConstraintViolation(msg)) = result {
468 assert!(msg.contains("already exists"));
469 } else {
470 panic!("Expected ConstraintViolation error");
471 }
472 }
473
474 #[tokio::test]
475 #[ignore = "Requires active Redis instance"]
476 async fn test_debug_implementation() {
477 let repo = setup_test_repo().await;
478 let debug_str = format!("{:?}", repo);
479 assert!(debug_str.contains("RedisPluginRepository"));
480 assert!(debug_str.contains("test_plugin"));
481 }
482
483 #[tokio::test]
484 #[ignore = "Requires active Redis instance"]
485 async fn test_error_handling_empty_id() {
486 let repo = setup_test_repo().await;
487
488 let result = repo.get_by_id("").await;
489 assert!(result.is_err());
490 assert!(result
491 .unwrap_err()
492 .to_string()
493 .contains("ID cannot be empty"));
494 }
495
496 #[tokio::test]
497 #[ignore = "Requires active Redis instance"]
498 async fn test_add_plugin_with_empty_id() {
499 let repo = setup_test_repo().await;
500 let plugin = create_test_plugin("", "/path/to/plugin");
501
502 let result = repo.add(plugin).await;
503 assert!(result.is_err());
504 assert!(result
505 .unwrap_err()
506 .to_string()
507 .contains("ID cannot be empty"));
508 }
509
510 #[tokio::test]
511 #[ignore = "Requires active Redis instance"]
512 async fn test_add_plugin_with_empty_path() {
513 let repo = setup_test_repo().await;
514 let plugin = create_test_plugin("test-plugin", "");
515
516 let result = repo.add(plugin).await;
517 assert!(result.is_err());
518 assert!(result
519 .unwrap_err()
520 .to_string()
521 .contains("path cannot be empty"));
522 }
523
524 #[tokio::test]
525 #[ignore = "Requires active Redis instance"]
526 async fn test_get_by_ids_plugins() {
527 let repo = setup_test_repo().await;
528 let plugin_name1 = uuid::Uuid::new_v4().to_string();
529 let plugin_name2 = uuid::Uuid::new_v4().to_string();
530 let plugin1 = create_test_plugin(&plugin_name1, "/path/to/plugin1");
531 let plugin2 = create_test_plugin(&plugin_name2, "/path/to/plugin2");
532
533 repo.add(plugin1.clone()).await.unwrap();
534 repo.add(plugin2.clone()).await.unwrap();
535
536 let retrieved = repo
537 .get_by_ids(&[plugin1.id.clone(), plugin2.id.clone()])
538 .await
539 .unwrap();
540 assert!(retrieved.results.len() == 2);
541 assert_eq!(retrieved.results[0].id, plugin2.id);
542 assert_eq!(retrieved.results[1].id, plugin1.id);
543 assert_eq!(retrieved.failed_ids.len(), 0);
544 }
545
546 #[tokio::test]
547 #[ignore = "Requires active Redis instance"]
548 async fn test_list_paginated_plugins() {
549 let repo = setup_test_repo().await;
550
551 let plugin_id1 = uuid::Uuid::new_v4().to_string();
552 let plugin_id2 = uuid::Uuid::new_v4().to_string();
553 let plugin_id3 = uuid::Uuid::new_v4().to_string();
554 let plugin1 = create_test_plugin(&plugin_id1, "/path/to/plugin1");
555 let plugin2 = create_test_plugin(&plugin_id2, "/path/to/plugin2");
556 let plugin3 = create_test_plugin(&plugin_id3, "/path/to/plugin3");
557
558 repo.add(plugin1.clone()).await.unwrap();
559 repo.add(plugin2.clone()).await.unwrap();
560 repo.add(plugin3.clone()).await.unwrap();
561
562 let query = PaginationQuery {
563 page: 1,
564 per_page: 2,
565 };
566
567 let result = repo.list_paginated(query).await;
568 assert!(result.is_ok());
569 let result = result.unwrap();
570 assert!(result.items.len() == 2);
571 }
572
573 #[tokio::test]
574 #[ignore = "Requires active Redis instance"]
575 async fn test_has_entries() {
576 let repo = setup_test_repo().await;
577 assert!(!repo.has_entries().await.unwrap());
578 repo.add(create_test_plugin("test-plugin", "/path/to/plugin"))
579 .await
580 .unwrap();
581 assert!(repo.has_entries().await.unwrap());
582 repo.drop_all_entries().await.unwrap();
583 assert!(!repo.has_entries().await.unwrap());
584 }
585
586 #[tokio::test]
587 #[ignore = "Requires active Redis instance"]
588 async fn test_drop_all_entries() {
589 let repo = setup_test_repo().await;
590 repo.add(create_test_plugin("test-plugin", "/path/to/plugin"))
591 .await
592 .unwrap();
593 assert!(repo.has_entries().await.unwrap());
594 repo.drop_all_entries().await.unwrap();
595 assert!(!repo.has_entries().await.unwrap());
596 }
597}