openzeppelin_relayer/jobs/
queue.rs1use std::sync::Arc;
10
11use apalis_redis::{Config, ConnectionManager, RedisStorage};
12use color_eyre::{eyre, Result};
13use log::error;
14use serde::{Deserialize, Serialize};
15use tokio::time::{timeout, Duration};
16
17use crate::config::ServerConfig;
18
19use super::{
20 Job, NotificationSend, SolanaTokenSwapRequest, TransactionRequest, TransactionSend,
21 TransactionStatusCheck,
22};
23
24#[derive(Clone, Debug)]
25pub struct Queue {
26 pub transaction_request_queue: RedisStorage<Job<TransactionRequest>>,
27 pub transaction_submission_queue: RedisStorage<Job<TransactionSend>>,
28 pub transaction_status_queue: RedisStorage<Job<TransactionStatusCheck>>,
29 pub notification_queue: RedisStorage<Job<NotificationSend>>,
30 pub solana_token_swap_request_queue: RedisStorage<Job<SolanaTokenSwapRequest>>,
31}
32
33impl Queue {
34 async fn storage<T: Serialize + for<'de> Deserialize<'de>>(
35 namespace: &str,
36 shared: Arc<ConnectionManager>,
37 ) -> Result<RedisStorage<T>> {
38 let config = Config::default().set_namespace(namespace);
39
40 Ok(RedisStorage::new_with_config((*shared).clone(), config))
41 }
42
43 pub async fn setup() -> Result<Self> {
44 let config = ServerConfig::from_env();
45 let redis_url = config.redis_url.clone();
46 let redis_connection_timeout_ms = config.redis_connection_timeout_ms;
47 let conn = match timeout(Duration::from_millis(redis_connection_timeout_ms), apalis_redis::connect(redis_url.clone())).await {
48 Ok(result) => result.map_err(|e| {
49 error!("Failed to connect to Redis at {}: {}", redis_url, e);
50 eyre::eyre!("Failed to connect to Redis. Please ensure Redis is running and accessible at {}. Error: {}", redis_url, e)
51 })?,
52 Err(_) => {
53 error!("Timeout connecting to Redis at {}", redis_url);
54 return Err(eyre::eyre!("Timed out after {} milliseconds while connecting to Redis at {}", redis_connection_timeout_ms, redis_url));
55 }
56 };
57
58 let shared = Arc::new(conn);
59 Ok(Self {
60 transaction_request_queue: Self::storage("transaction_request_queue", shared.clone())
61 .await?,
62 transaction_submission_queue: Self::storage(
63 "transaction_submission_queue",
64 shared.clone(),
65 )
66 .await?,
67 transaction_status_queue: Self::storage("transaction_status_queue", shared.clone())
68 .await?,
69 notification_queue: Self::storage("notification_queue", shared.clone()).await?,
70 solana_token_swap_request_queue: Self::storage(
71 "solana_token_swap_request_queue",
72 shared.clone(),
73 )
74 .await?,
75 })
76 }
77}
78
79#[cfg(test)]
80mod tests {
81 use super::*;
82
83 #[tokio::test]
84 async fn test_queue_storage_configuration() {
85 let namespace = "test_namespace";
87 let config = Config::default().set_namespace(namespace);
88
89 assert_eq!(config.get_namespace(), namespace);
90 }
91
92 #[derive(Clone, Debug)]
94 struct MockQueue {
95 pub namespace_transaction_request: String,
96 pub namespace_transaction_submission: String,
97 pub namespace_transaction_status: String,
98 pub namespace_notification: String,
99 pub namespace_solana_token_swap_request_queue: String,
100 }
101
102 impl MockQueue {
103 fn new() -> Self {
104 Self {
105 namespace_transaction_request: "transaction_request_queue".to_string(),
106 namespace_transaction_submission: "transaction_submission_queue".to_string(),
107 namespace_transaction_status: "transaction_status_queue".to_string(),
108 namespace_notification: "notification_queue".to_string(),
109 namespace_solana_token_swap_request_queue: "solana_token_swap_request_queue"
110 .to_string(),
111 }
112 }
113 }
114
115 #[test]
116 fn test_queue_namespaces() {
117 let mock_queue = MockQueue::new();
118
119 assert_eq!(
120 mock_queue.namespace_transaction_request,
121 "transaction_request_queue"
122 );
123 assert_eq!(
124 mock_queue.namespace_transaction_submission,
125 "transaction_submission_queue"
126 );
127 assert_eq!(
128 mock_queue.namespace_transaction_status,
129 "transaction_status_queue"
130 );
131 assert_eq!(mock_queue.namespace_notification, "notification_queue");
132 assert_eq!(
133 mock_queue.namespace_solana_token_swap_request_queue,
134 "solana_token_swap_request_queue"
135 );
136 }
137}