openzeppelin_relayer/bootstrap/
initialize_workers.rs

1//! # Workers
2//! Initialise and starts the workers for the application
3
4use actix_web::web::ThinData;
5use apalis::{layers::ErrorHandlingLayer, prelude::*};
6use apalis_cron::CronStream;
7use eyre::Result;
8use log::{error, info};
9use std::{str::FromStr, time::Duration};
10use tokio::signal::unix::SignalKind;
11
12use crate::{
13    jobs::{
14        notification_handler, solana_token_swap_cron_handler, solana_token_swap_request_handler,
15        transaction_cleanup_handler, transaction_request_handler, transaction_status_handler,
16        transaction_submission_handler, BackoffRetryPolicy,
17    },
18    models::DefaultAppState,
19    repositories::RelayerRepository,
20};
21
22// Review and fine tune configuration for the workers
23const DEFAULT_CONCURRENCY: usize = 2;
24const DEFAULT_RATE_LIMIT: u64 = 20;
25const DEFAULT_RATE_LIMIT_DURATION: Duration = Duration::from_secs(1);
26
27const TRANSACTION_REQUEST: &str = "transaction_request";
28const TRANSACTION_SENDER: &str = "transaction_sender";
29const TRANSACTION_STATUS_CHECKER: &str = "transaction_status_checker";
30const NOTIFICATION_SENDER: &str = "notification_sender";
31const SOLANA_TOKEN_SWAP_REQUEST: &str = "solana_token_swap_request";
32const TRANSACTION_CLEANUP: &str = "transaction_cleanup";
33
34pub async fn initialize_workers(app_state: ThinData<DefaultAppState>) -> Result<()> {
35    let queue = app_state.job_producer.get_queue().await?;
36
37    let transaction_request_queue_worker = WorkerBuilder::new(TRANSACTION_REQUEST)
38        .layer(ErrorHandlingLayer::new())
39        .enable_tracing()
40        .catch_panic()
41        .rate_limit(DEFAULT_RATE_LIMIT, DEFAULT_RATE_LIMIT_DURATION)
42        .retry(BackoffRetryPolicy::default())
43        .concurrency(DEFAULT_CONCURRENCY)
44        .data(app_state.clone())
45        .backend(queue.transaction_request_queue.clone())
46        .build_fn(transaction_request_handler);
47
48    let transaction_submission_queue_worker = WorkerBuilder::new(TRANSACTION_SENDER)
49        .layer(ErrorHandlingLayer::new())
50        .enable_tracing()
51        .catch_panic()
52        .rate_limit(DEFAULT_RATE_LIMIT, DEFAULT_RATE_LIMIT_DURATION)
53        .retry(BackoffRetryPolicy::default())
54        .concurrency(DEFAULT_CONCURRENCY)
55        .data(app_state.clone())
56        .backend(queue.transaction_submission_queue.clone())
57        .build_fn(transaction_submission_handler);
58
59    let transaction_status_queue_worker = WorkerBuilder::new(TRANSACTION_STATUS_CHECKER)
60        .layer(ErrorHandlingLayer::new())
61        .catch_panic()
62        .enable_tracing()
63        .rate_limit(DEFAULT_RATE_LIMIT, DEFAULT_RATE_LIMIT_DURATION)
64        .retry(BackoffRetryPolicy::default())
65        .concurrency(DEFAULT_CONCURRENCY)
66        .data(app_state.clone())
67        .backend(queue.transaction_status_queue.clone())
68        .build_fn(transaction_status_handler);
69
70    let notification_queue_worker = WorkerBuilder::new(NOTIFICATION_SENDER)
71        .layer(ErrorHandlingLayer::new())
72        .enable_tracing()
73        .catch_panic()
74        .rate_limit(DEFAULT_RATE_LIMIT, DEFAULT_RATE_LIMIT_DURATION)
75        .retry(BackoffRetryPolicy::default())
76        .concurrency(DEFAULT_CONCURRENCY)
77        .data(app_state.clone())
78        .backend(queue.notification_queue.clone())
79        .build_fn(notification_handler);
80
81    let solana_token_swap_request_queue_worker = WorkerBuilder::new(SOLANA_TOKEN_SWAP_REQUEST)
82        .layer(ErrorHandlingLayer::new())
83        .enable_tracing()
84        .catch_panic()
85        .rate_limit(DEFAULT_RATE_LIMIT, DEFAULT_RATE_LIMIT_DURATION)
86        .retry(BackoffRetryPolicy::default())
87        .concurrency(10)
88        .data(app_state.clone())
89        .backend(queue.solana_token_swap_request_queue.clone())
90        .build_fn(solana_token_swap_request_handler);
91
92    let transaction_cleanup_queue_worker = WorkerBuilder::new(TRANSACTION_CLEANUP)
93        .layer(ErrorHandlingLayer::new())
94        .enable_tracing()
95        .catch_panic()
96        .rate_limit(DEFAULT_RATE_LIMIT, DEFAULT_RATE_LIMIT_DURATION)
97        .retry(BackoffRetryPolicy::default())
98        .concurrency(1)
99        .data(app_state.clone())
100        .backend(CronStream::new(
101            // every 30 minutes
102            apalis_cron::Schedule::from_str("0 */30 * * * *").unwrap(),
103        ))
104        .build_fn(transaction_cleanup_handler);
105
106    let monitor = Monitor::new()
107        .register(transaction_request_queue_worker)
108        .register(transaction_submission_queue_worker)
109        .register(transaction_status_queue_worker)
110        .register(notification_queue_worker)
111        .register(solana_token_swap_request_queue_worker)
112        .register(transaction_cleanup_queue_worker)
113        .on_event(monitor_handle_event)
114        .shutdown_timeout(Duration::from_millis(5000));
115
116    let monitor_future = monitor.run_with_signal(async {
117        let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt())
118            .expect("Failed to create SIGINT signal");
119        let mut sigterm = tokio::signal::unix::signal(SignalKind::terminate())
120            .expect("Failed to create SIGTERM signal");
121
122        info!("Monitor started");
123
124        tokio::select! {
125            _ = sigint.recv() => info!("Received SIGINT."),
126            _ = sigterm.recv() => info!("Received SIGTERM."),
127        };
128
129        info!("Monitor shutting down");
130
131        Ok(())
132    });
133    tokio::spawn(async move {
134        if let Err(e) = monitor_future.await {
135            error!("Monitor error: {}", e);
136        }
137    });
138    info!("Monitor shutdown complete");
139    Ok(())
140}
141
142/// Initializes the Solana swap workers
143/// This function creates and registers workers for Solana relayers that have swap enabled and cron schedule set.
144pub async fn initialize_solana_swap_workers(app_state: ThinData<DefaultAppState>) -> Result<()> {
145    let solena_relayers_with_swap_enabled = app_state
146        .relayer_repository
147        .list_active()
148        .await?
149        .into_iter()
150        .filter(|relayer| {
151            let policy = relayer.policies.get_solana_policy();
152            let swap_config = match policy.get_swap_config() {
153                Some(config) => config,
154                None => {
155                    info!("No swap configuration specified; skipping validation.");
156                    return false;
157                }
158            };
159
160            if swap_config.cron_schedule.is_none() {
161                return false;
162            }
163            true
164        })
165        .collect::<Vec<_>>();
166
167    if solena_relayers_with_swap_enabled.is_empty() {
168        info!("No solana relayers with swap enabled");
169        return Ok(());
170    }
171    info!(
172        "Found {} solana relayers with swap enabled",
173        solena_relayers_with_swap_enabled.len()
174    );
175
176    let mut workers = Vec::new();
177
178    for relayer in solena_relayers_with_swap_enabled {
179        info!("Found solana relayer with swap enabled: {:?}", relayer);
180
181        let policy = relayer.policies.get_solana_policy();
182        let swap_config = match policy.get_swap_config() {
183            Some(config) => config,
184            None => {
185                info!("No swap configuration specified; skipping validation.");
186                continue;
187            }
188        };
189
190        let calendar_schedule = match swap_config.cron_schedule {
191            Some(schedule) => apalis_cron::Schedule::from_str(&schedule).unwrap(),
192            None => {
193                info!("No swap cron schedule found for relayer: {:?}", relayer);
194                continue;
195            }
196        };
197
198        // Create worker and add to the workers vector
199        let worker = WorkerBuilder::new(format!("solana-swap-schedule-{}", relayer.id.clone()))
200            .layer(ErrorHandlingLayer::new())
201            .enable_tracing()
202            .catch_panic()
203            .rate_limit(DEFAULT_RATE_LIMIT, DEFAULT_RATE_LIMIT_DURATION)
204            .retry(BackoffRetryPolicy::default())
205            .concurrency(1)
206            .data(relayer.id.clone())
207            .data(app_state.clone())
208            .backend(CronStream::new(calendar_schedule))
209            .build_fn(solana_token_swap_cron_handler);
210
211        workers.push(worker);
212        info!(
213            "Created worker for solana relayer with swap enabled: {:?}",
214            relayer
215        );
216    }
217
218    let mut monitor = Monitor::new()
219        .on_event(monitor_handle_event)
220        .shutdown_timeout(Duration::from_millis(5000));
221
222    // Register all workers with the monitor
223    for worker in workers {
224        monitor = monitor.register(worker);
225    }
226
227    let monitor_future = monitor.run_with_signal(async {
228        let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt())
229            .expect("Failed to create SIGINT signal");
230        let mut sigterm = tokio::signal::unix::signal(SignalKind::terminate())
231            .expect("Failed to create SIGTERM signal");
232
233        info!("Solana Swap Monitor started");
234
235        tokio::select! {
236            _ = sigint.recv() => info!("Received SIGINT."),
237            _ = sigterm.recv() => info!("Received SIGTERM."),
238        };
239
240        info!("Solana Swap Monitor shutting down");
241
242        Ok(())
243    });
244    tokio::spawn(async move {
245        if let Err(e) = monitor_future.await {
246            error!("Monitor error: {}", e);
247        }
248    });
249    Ok(())
250}
251
252fn monitor_handle_event(e: Worker<Event>) {
253    let worker_id = e.id();
254    match e.inner() {
255        Event::Engage(task_id) => {
256            info!("Worker [{worker_id}] got a job with id: {task_id}");
257        }
258        Event::Error(e) => {
259            error!("Worker [{worker_id}] encountered an error: {e}");
260        }
261        Event::Exit => {
262            info!("Worker [{worker_id}] exited");
263        }
264        Event::Idle => {
265            info!("Worker [{worker_id}] is idle");
266        }
267        Event::Start => {
268            info!("Worker [{worker_id}] started");
269        }
270        Event::Stop => {
271            info!("Worker [{worker_id}] stopped");
272        }
273        _ => {}
274    }
275}