openzeppelin_relayer/bootstrap/
initialize_workers.rs1use actix_web::web::ThinData;
5use apalis::{layers::ErrorHandlingLayer, prelude::*};
6use apalis_cron::CronStream;
7use eyre::Result;
8use log::{error, info};
9use std::{str::FromStr, time::Duration};
10use tokio::signal::unix::SignalKind;
11
12use crate::{
13 jobs::{
14 notification_handler, solana_token_swap_cron_handler, solana_token_swap_request_handler,
15 transaction_cleanup_handler, transaction_request_handler, transaction_status_handler,
16 transaction_submission_handler, BackoffRetryPolicy,
17 },
18 models::DefaultAppState,
19 repositories::RelayerRepository,
20};
21
22const DEFAULT_CONCURRENCY: usize = 2;
24const DEFAULT_RATE_LIMIT: u64 = 20;
25const DEFAULT_RATE_LIMIT_DURATION: Duration = Duration::from_secs(1);
26
27const TRANSACTION_REQUEST: &str = "transaction_request";
28const TRANSACTION_SENDER: &str = "transaction_sender";
29const TRANSACTION_STATUS_CHECKER: &str = "transaction_status_checker";
30const NOTIFICATION_SENDER: &str = "notification_sender";
31const SOLANA_TOKEN_SWAP_REQUEST: &str = "solana_token_swap_request";
32const TRANSACTION_CLEANUP: &str = "transaction_cleanup";
33
34pub async fn initialize_workers(app_state: ThinData<DefaultAppState>) -> Result<()> {
35 let queue = app_state.job_producer.get_queue().await?;
36
37 let transaction_request_queue_worker = WorkerBuilder::new(TRANSACTION_REQUEST)
38 .layer(ErrorHandlingLayer::new())
39 .enable_tracing()
40 .catch_panic()
41 .rate_limit(DEFAULT_RATE_LIMIT, DEFAULT_RATE_LIMIT_DURATION)
42 .retry(BackoffRetryPolicy::default())
43 .concurrency(DEFAULT_CONCURRENCY)
44 .data(app_state.clone())
45 .backend(queue.transaction_request_queue.clone())
46 .build_fn(transaction_request_handler);
47
48 let transaction_submission_queue_worker = WorkerBuilder::new(TRANSACTION_SENDER)
49 .layer(ErrorHandlingLayer::new())
50 .enable_tracing()
51 .catch_panic()
52 .rate_limit(DEFAULT_RATE_LIMIT, DEFAULT_RATE_LIMIT_DURATION)
53 .retry(BackoffRetryPolicy::default())
54 .concurrency(DEFAULT_CONCURRENCY)
55 .data(app_state.clone())
56 .backend(queue.transaction_submission_queue.clone())
57 .build_fn(transaction_submission_handler);
58
59 let transaction_status_queue_worker = WorkerBuilder::new(TRANSACTION_STATUS_CHECKER)
60 .layer(ErrorHandlingLayer::new())
61 .catch_panic()
62 .enable_tracing()
63 .rate_limit(DEFAULT_RATE_LIMIT, DEFAULT_RATE_LIMIT_DURATION)
64 .retry(BackoffRetryPolicy::default())
65 .concurrency(DEFAULT_CONCURRENCY)
66 .data(app_state.clone())
67 .backend(queue.transaction_status_queue.clone())
68 .build_fn(transaction_status_handler);
69
70 let notification_queue_worker = WorkerBuilder::new(NOTIFICATION_SENDER)
71 .layer(ErrorHandlingLayer::new())
72 .enable_tracing()
73 .catch_panic()
74 .rate_limit(DEFAULT_RATE_LIMIT, DEFAULT_RATE_LIMIT_DURATION)
75 .retry(BackoffRetryPolicy::default())
76 .concurrency(DEFAULT_CONCURRENCY)
77 .data(app_state.clone())
78 .backend(queue.notification_queue.clone())
79 .build_fn(notification_handler);
80
81 let solana_token_swap_request_queue_worker = WorkerBuilder::new(SOLANA_TOKEN_SWAP_REQUEST)
82 .layer(ErrorHandlingLayer::new())
83 .enable_tracing()
84 .catch_panic()
85 .rate_limit(DEFAULT_RATE_LIMIT, DEFAULT_RATE_LIMIT_DURATION)
86 .retry(BackoffRetryPolicy::default())
87 .concurrency(10)
88 .data(app_state.clone())
89 .backend(queue.solana_token_swap_request_queue.clone())
90 .build_fn(solana_token_swap_request_handler);
91
92 let transaction_cleanup_queue_worker = WorkerBuilder::new(TRANSACTION_CLEANUP)
93 .layer(ErrorHandlingLayer::new())
94 .enable_tracing()
95 .catch_panic()
96 .rate_limit(DEFAULT_RATE_LIMIT, DEFAULT_RATE_LIMIT_DURATION)
97 .retry(BackoffRetryPolicy::default())
98 .concurrency(1)
99 .data(app_state.clone())
100 .backend(CronStream::new(
101 apalis_cron::Schedule::from_str("0 */30 * * * *").unwrap(),
103 ))
104 .build_fn(transaction_cleanup_handler);
105
106 let monitor = Monitor::new()
107 .register(transaction_request_queue_worker)
108 .register(transaction_submission_queue_worker)
109 .register(transaction_status_queue_worker)
110 .register(notification_queue_worker)
111 .register(solana_token_swap_request_queue_worker)
112 .register(transaction_cleanup_queue_worker)
113 .on_event(monitor_handle_event)
114 .shutdown_timeout(Duration::from_millis(5000));
115
116 let monitor_future = monitor.run_with_signal(async {
117 let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt())
118 .expect("Failed to create SIGINT signal");
119 let mut sigterm = tokio::signal::unix::signal(SignalKind::terminate())
120 .expect("Failed to create SIGTERM signal");
121
122 info!("Monitor started");
123
124 tokio::select! {
125 _ = sigint.recv() => info!("Received SIGINT."),
126 _ = sigterm.recv() => info!("Received SIGTERM."),
127 };
128
129 info!("Monitor shutting down");
130
131 Ok(())
132 });
133 tokio::spawn(async move {
134 if let Err(e) = monitor_future.await {
135 error!("Monitor error: {}", e);
136 }
137 });
138 info!("Monitor shutdown complete");
139 Ok(())
140}
141
142pub async fn initialize_solana_swap_workers(app_state: ThinData<DefaultAppState>) -> Result<()> {
145 let solena_relayers_with_swap_enabled = app_state
146 .relayer_repository
147 .list_active()
148 .await?
149 .into_iter()
150 .filter(|relayer| {
151 let policy = relayer.policies.get_solana_policy();
152 let swap_config = match policy.get_swap_config() {
153 Some(config) => config,
154 None => {
155 info!("No swap configuration specified; skipping validation.");
156 return false;
157 }
158 };
159
160 if swap_config.cron_schedule.is_none() {
161 return false;
162 }
163 true
164 })
165 .collect::<Vec<_>>();
166
167 if solena_relayers_with_swap_enabled.is_empty() {
168 info!("No solana relayers with swap enabled");
169 return Ok(());
170 }
171 info!(
172 "Found {} solana relayers with swap enabled",
173 solena_relayers_with_swap_enabled.len()
174 );
175
176 let mut workers = Vec::new();
177
178 for relayer in solena_relayers_with_swap_enabled {
179 info!("Found solana relayer with swap enabled: {:?}", relayer);
180
181 let policy = relayer.policies.get_solana_policy();
182 let swap_config = match policy.get_swap_config() {
183 Some(config) => config,
184 None => {
185 info!("No swap configuration specified; skipping validation.");
186 continue;
187 }
188 };
189
190 let calendar_schedule = match swap_config.cron_schedule {
191 Some(schedule) => apalis_cron::Schedule::from_str(&schedule).unwrap(),
192 None => {
193 info!("No swap cron schedule found for relayer: {:?}", relayer);
194 continue;
195 }
196 };
197
198 let worker = WorkerBuilder::new(format!("solana-swap-schedule-{}", relayer.id.clone()))
200 .layer(ErrorHandlingLayer::new())
201 .enable_tracing()
202 .catch_panic()
203 .rate_limit(DEFAULT_RATE_LIMIT, DEFAULT_RATE_LIMIT_DURATION)
204 .retry(BackoffRetryPolicy::default())
205 .concurrency(1)
206 .data(relayer.id.clone())
207 .data(app_state.clone())
208 .backend(CronStream::new(calendar_schedule))
209 .build_fn(solana_token_swap_cron_handler);
210
211 workers.push(worker);
212 info!(
213 "Created worker for solana relayer with swap enabled: {:?}",
214 relayer
215 );
216 }
217
218 let mut monitor = Monitor::new()
219 .on_event(monitor_handle_event)
220 .shutdown_timeout(Duration::from_millis(5000));
221
222 for worker in workers {
224 monitor = monitor.register(worker);
225 }
226
227 let monitor_future = monitor.run_with_signal(async {
228 let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt())
229 .expect("Failed to create SIGINT signal");
230 let mut sigterm = tokio::signal::unix::signal(SignalKind::terminate())
231 .expect("Failed to create SIGTERM signal");
232
233 info!("Solana Swap Monitor started");
234
235 tokio::select! {
236 _ = sigint.recv() => info!("Received SIGINT."),
237 _ = sigterm.recv() => info!("Received SIGTERM."),
238 };
239
240 info!("Solana Swap Monitor shutting down");
241
242 Ok(())
243 });
244 tokio::spawn(async move {
245 if let Err(e) = monitor_future.await {
246 error!("Monitor error: {}", e);
247 }
248 });
249 Ok(())
250}
251
252fn monitor_handle_event(e: Worker<Event>) {
253 let worker_id = e.id();
254 match e.inner() {
255 Event::Engage(task_id) => {
256 info!("Worker [{worker_id}] got a job with id: {task_id}");
257 }
258 Event::Error(e) => {
259 error!("Worker [{worker_id}] encountered an error: {e}");
260 }
261 Event::Exit => {
262 info!("Worker [{worker_id}] exited");
263 }
264 Event::Idle => {
265 info!("Worker [{worker_id}] is idle");
266 }
267 Event::Start => {
268 info!("Worker [{worker_id}] started");
269 }
270 Event::Stop => {
271 info!("Worker [{worker_id}] stopped");
272 }
273 _ => {}
274 }
275}