openzeppelin_relayer/jobs/
queue.rs

1//! Queue management module for job processing.
2//!
3//! This module provides Redis-backed queue implementation for handling different types of jobs:
4//! - Transaction requests
5//! - Transaction submissions
6//! - Transaction status checks
7//! - Notifications
8//! - Solana swap requests
9use std::sync::Arc;
10
11use apalis_redis::{Config, ConnectionManager, RedisStorage};
12use color_eyre::{eyre, Result};
13use log::error;
14use serde::{Deserialize, Serialize};
15use tokio::time::{timeout, Duration};
16
17use crate::config::ServerConfig;
18
19use super::{
20    Job, NotificationSend, SolanaTokenSwapRequest, TransactionRequest, TransactionSend,
21    TransactionStatusCheck,
22};
23
24#[derive(Clone, Debug)]
25pub struct Queue {
26    pub transaction_request_queue: RedisStorage<Job<TransactionRequest>>,
27    pub transaction_submission_queue: RedisStorage<Job<TransactionSend>>,
28    pub transaction_status_queue: RedisStorage<Job<TransactionStatusCheck>>,
29    pub notification_queue: RedisStorage<Job<NotificationSend>>,
30    pub solana_token_swap_request_queue: RedisStorage<Job<SolanaTokenSwapRequest>>,
31}
32
33impl Queue {
34    async fn storage<T: Serialize + for<'de> Deserialize<'de>>(
35        namespace: &str,
36        shared: Arc<ConnectionManager>,
37    ) -> Result<RedisStorage<T>> {
38        let config = Config::default().set_namespace(namespace);
39
40        Ok(RedisStorage::new_with_config((*shared).clone(), config))
41    }
42
43    pub async fn setup() -> Result<Self> {
44        let config = ServerConfig::from_env();
45        let redis_url = config.redis_url.clone();
46        let redis_connection_timeout_ms = config.redis_connection_timeout_ms;
47        let conn = match timeout(Duration::from_millis(redis_connection_timeout_ms), apalis_redis::connect(redis_url.clone())).await {
48            Ok(result) => result.map_err(|e| {
49                error!("Failed to connect to Redis at {}: {}", redis_url, e);
50                eyre::eyre!("Failed to connect to Redis. Please ensure Redis is running and accessible at {}. Error: {}", redis_url, e)
51            })?,
52            Err(_) => {
53                error!("Timeout connecting to Redis at {}", redis_url);
54                return Err(eyre::eyre!("Timed out after {} milliseconds while connecting to Redis at {}", redis_connection_timeout_ms, redis_url));
55            }
56        };
57
58        let shared = Arc::new(conn);
59        Ok(Self {
60            transaction_request_queue: Self::storage("transaction_request_queue", shared.clone())
61                .await?,
62            transaction_submission_queue: Self::storage(
63                "transaction_submission_queue",
64                shared.clone(),
65            )
66            .await?,
67            transaction_status_queue: Self::storage("transaction_status_queue", shared.clone())
68                .await?,
69            notification_queue: Self::storage("notification_queue", shared.clone()).await?,
70            solana_token_swap_request_queue: Self::storage(
71                "solana_token_swap_request_queue",
72                shared.clone(),
73            )
74            .await?,
75        })
76    }
77}
78
79#[cfg(test)]
80mod tests {
81    use super::*;
82
83    #[tokio::test]
84    async fn test_queue_storage_configuration() {
85        // Test the config creation logic without actual Redis connections
86        let namespace = "test_namespace";
87        let config = Config::default().set_namespace(namespace);
88
89        assert_eq!(config.get_namespace(), namespace);
90    }
91
92    // Mock version of Queue for testing
93    #[derive(Clone, Debug)]
94    struct MockQueue {
95        pub namespace_transaction_request: String,
96        pub namespace_transaction_submission: String,
97        pub namespace_transaction_status: String,
98        pub namespace_notification: String,
99        pub namespace_solana_token_swap_request_queue: String,
100    }
101
102    impl MockQueue {
103        fn new() -> Self {
104            Self {
105                namespace_transaction_request: "transaction_request_queue".to_string(),
106                namespace_transaction_submission: "transaction_submission_queue".to_string(),
107                namespace_transaction_status: "transaction_status_queue".to_string(),
108                namespace_notification: "notification_queue".to_string(),
109                namespace_solana_token_swap_request_queue: "solana_token_swap_request_queue"
110                    .to_string(),
111            }
112        }
113    }
114
115    #[test]
116    fn test_queue_namespaces() {
117        let mock_queue = MockQueue::new();
118
119        assert_eq!(
120            mock_queue.namespace_transaction_request,
121            "transaction_request_queue"
122        );
123        assert_eq!(
124            mock_queue.namespace_transaction_submission,
125            "transaction_submission_queue"
126        );
127        assert_eq!(
128            mock_queue.namespace_transaction_status,
129            "transaction_status_queue"
130        );
131        assert_eq!(mock_queue.namespace_notification, "notification_queue");
132        assert_eq!(
133            mock_queue.namespace_solana_token_swap_request_queue,
134            "solana_token_swap_request_queue"
135        );
136    }
137}