openzeppelin_relayer/jobs/
retry_backoff.rs

1//! Retry policy implementation with exponential backoff for job processing.
2//!
3//! This module provides a configurable retry mechanism with exponential backoff
4//! for handling transient failures in job processing.
5//!
6//! # Example
7//! ```rust, ignore
8//! use crate::jobs::retry_backoff::BackoffRetryPolicy;
9//! use std::time::Duration;
10//!
11//! let policy = BackoffRetryPolicy {
12//!     retries: 5,
13//!     initial_backoff: Duration::from_secs(1),
14//!     multiplier: 2.0,
15//!     max_backoff: Duration::from_secs(60),
16//! };
17//! ```
18
19use apalis::prelude::*;
20use std::time::Duration;
21use tokio::time::{sleep, Sleep};
22use tower::retry::Policy;
23
24type Req<T, Ctx> = Request<T, Ctx>;
25type Err = Error;
26
27/// A retry policy that implements exponential backoff.
28///
29/// This policy will retry failed operations with increasing delays between attempts,
30/// using an exponential backoff algorithm with a configurable multiplier.
31///
32/// # Fields
33///
34/// * `retries` - Maximum number of retry attempts
35/// * `initial_backoff` - Initial delay duration before first retry
36/// * `multiplier` - Factor by which the delay increases after each attempt
37/// * `max_backoff` - Maximum delay duration between retries
38///
39/// # Example
40/// ```rust, ignore
41/// let policy = BackoffRetryPolicy {
42///     retries: 5,
43///     initial_backoff: Duration::from_secs(1),
44///     multiplier: 2.0,
45///     max_backoff: Duration::from_secs(60),
46/// };
47/// ```
48#[derive(Clone, Debug)]
49pub struct BackoffRetryPolicy {
50    /// Maximum number of retry attempts
51    pub retries: usize,
52    /// Initial delay duration before first retry
53    pub initial_backoff: Duration,
54    /// Factor by which the delay increases after each attempt
55    pub multiplier: f64,
56    /// Maximum delay duration between retries
57    pub max_backoff: Duration,
58}
59
60/// Default configuration for retry policy
61impl Default for BackoffRetryPolicy {
62    fn default() -> Self {
63        Self {
64            retries: 5,
65            initial_backoff: Duration::from_millis(1000),
66            multiplier: 1.5,
67            max_backoff: Duration::from_secs(60),
68        }
69    }
70}
71
72impl BackoffRetryPolicy {
73    fn backoff_duration(&self, attempt: usize) -> Duration {
74        let backoff =
75            self.initial_backoff.as_millis() as f64 * self.multiplier.powi(attempt as i32);
76        Duration::from_millis(backoff.min(self.max_backoff.as_millis() as f64) as u64)
77    }
78}
79
80impl<T, Res, Ctx> Policy<Req<T, Ctx>, Res, Err> for BackoffRetryPolicy
81where
82    T: Clone,
83    Ctx: Clone,
84{
85    type Future = Sleep;
86
87    fn retry(
88        &mut self,
89        req: &mut Req<T, Ctx>,
90        result: &mut Result<Res, Err>,
91    ) -> Option<Self::Future> {
92        let attempt = req.parts.attempt.current();
93
94        match result {
95            Ok(_) => None,
96            Err(_) if (self.retries - attempt > 0) => Some(sleep(self.backoff_duration(attempt))),
97            Err(_) => None,
98        }
99    }
100
101    fn clone_request(&mut self, req: &Req<T, Ctx>) -> Option<Req<T, Ctx>> {
102        let req = req.clone();
103        req.parts.attempt.increment();
104        Some(req)
105    }
106}
107
108#[cfg(test)]
109mod tests {
110    use super::*;
111
112    #[derive(Clone)]
113    struct TestJob;
114
115    #[tokio::test]
116    async fn test_backoff_duration_calculation() {
117        let policy = BackoffRetryPolicy {
118            retries: 5,
119            initial_backoff: Duration::from_secs(1),
120            multiplier: 2.0,
121            max_backoff: Duration::from_secs(60),
122        };
123
124        // Check first few backoff durations
125        assert_eq!(policy.backoff_duration(0), Duration::from_secs(1));
126        assert_eq!(policy.backoff_duration(1), Duration::from_secs(2));
127        assert_eq!(policy.backoff_duration(2), Duration::from_secs(4));
128        assert_eq!(policy.backoff_duration(3), Duration::from_secs(8));
129
130        // Test max backoff limit
131        let policy = BackoffRetryPolicy {
132            retries: 10,
133            initial_backoff: Duration::from_secs(10),
134            multiplier: 3.0,
135            max_backoff: Duration::from_secs(60),
136        };
137
138        // This would be 10 * 3^3 = 270 seconds, but should be capped at 60
139        assert_eq!(policy.backoff_duration(3), Duration::from_secs(60));
140    }
141
142    #[tokio::test]
143    async fn test_retry_policy_success() {
144        let mut policy = BackoffRetryPolicy::default();
145        let job = TestJob;
146        let ctx = ();
147        let mut req = Request::new_with_ctx(job, ctx);
148        let mut result: Result<(), Err> = Ok(());
149
150        // Should return None for successful results
151        assert!(policy.retry(&mut req, &mut result).is_none());
152    }
153
154    #[tokio::test]
155    async fn test_retry_policy_failure_with_retries() {
156        let mut policy = BackoffRetryPolicy {
157            retries: 3,
158            initial_backoff: Duration::from_millis(10),
159            multiplier: 2.0,
160            max_backoff: Duration::from_secs(1),
161        };
162
163        let job = TestJob;
164        let ctx = ();
165        let mut req = Request::new_with_ctx(job, ctx);
166        let mut result: Result<(), Err> =
167            Err(Error::from(Box::new(std::io::Error::other("Test error"))
168                as Box<dyn std::error::Error + Send + Sync>));
169
170        // First attempt (0) should return Some(Sleep) since retries > 0
171        assert!(policy.retry(&mut req, &mut result).is_some());
172
173        // Simulate first retry
174        req.parts.attempt.increment();
175        assert!(policy.retry(&mut req, &mut result).is_some());
176
177        // Simulate second retry
178        req.parts.attempt.increment();
179        assert!(policy.retry(&mut req, &mut result).is_some());
180
181        // Simulate third retry - should be the last one
182        req.parts.attempt.increment();
183        assert!(policy.retry(&mut req, &mut result).is_none());
184    }
185
186    #[tokio::test]
187    async fn test_clone_request() {
188        let mut policy: BackoffRetryPolicy = BackoffRetryPolicy::default();
189        let job = TestJob;
190        let ctx = ();
191        let req: Request<TestJob, ()> = Request::new_with_ctx(job, ctx);
192
193        // Original request attempt should be 0
194        assert_eq!(req.parts.attempt.current(), 0);
195
196        // Cloned request should have attempt incremented to 1
197        let cloned_req =
198            <BackoffRetryPolicy as Policy<Request<TestJob, ()>, (), Error>>::clone_request(
199                &mut policy,
200                &req,
201            )
202            .unwrap();
203        let cloned_req_attempt = cloned_req.parts.attempt.current();
204        assert_eq!(cloned_req_attempt, 1);
205    }
206
207    #[test]
208    fn test_default_policy() {
209        let policy = BackoffRetryPolicy::default();
210
211        assert_eq!(policy.retries, 5);
212        assert_eq!(policy.initial_backoff, Duration::from_millis(1000));
213        assert_eq!(policy.multiplier, 1.5);
214        assert_eq!(policy.max_backoff, Duration::from_secs(60));
215    }
216
217    #[test]
218    fn test_zero_initial_backoff() {
219        let policy = BackoffRetryPolicy {
220            retries: 3,
221            initial_backoff: Duration::from_millis(0),
222            multiplier: 2.0,
223            max_backoff: Duration::from_secs(60),
224        };
225
226        // With zero initial backoff, all durations should be zero
227        assert_eq!(policy.backoff_duration(0), Duration::from_millis(0));
228        assert_eq!(policy.backoff_duration(1), Duration::from_millis(0));
229        assert_eq!(policy.backoff_duration(2), Duration::from_millis(0));
230    }
231
232    #[test]
233    fn test_multiplier_one() {
234        let policy = BackoffRetryPolicy {
235            retries: 3,
236            initial_backoff: Duration::from_millis(500),
237            multiplier: 1.0,
238            max_backoff: Duration::from_secs(60),
239        };
240
241        // With multiplier of 1.0, all durations should be the same as initial
242        assert_eq!(policy.backoff_duration(0), Duration::from_millis(500));
243        assert_eq!(policy.backoff_duration(1), Duration::from_millis(500));
244        assert_eq!(policy.backoff_duration(2), Duration::from_millis(500));
245    }
246
247    #[test]
248    fn test_multiplier_less_than_one() {
249        let policy = BackoffRetryPolicy {
250            retries: 3,
251            initial_backoff: Duration::from_millis(1000),
252            multiplier: 0.5,
253            max_backoff: Duration::from_secs(60),
254        };
255
256        // With multiplier < 1.0, each duration should be less than the previous
257        assert_eq!(policy.backoff_duration(0), Duration::from_millis(1000));
258        assert_eq!(policy.backoff_duration(1), Duration::from_millis(500));
259        assert_eq!(policy.backoff_duration(2), Duration::from_millis(250));
260    }
261
262    #[tokio::test]
263    async fn test_retry_policy_exhausted_retries() {
264        let mut policy = BackoffRetryPolicy {
265            retries: 0, // No retries allowed
266            initial_backoff: Duration::from_millis(10),
267            multiplier: 2.0,
268            max_backoff: Duration::from_secs(1),
269        };
270
271        let job = TestJob;
272        let ctx = ();
273        let mut req = Request::new_with_ctx(job, ctx);
274        let mut result: Result<(), Err> =
275            Err(Error::from(Box::new(std::io::Error::other("Test error"))
276                as Box<dyn std::error::Error + Send + Sync>));
277
278        // Should return None immediately because retries=0
279        assert!(policy.retry(&mut req, &mut result).is_none());
280    }
281
282    #[tokio::test]
283    async fn test_retry_policy_large_max_backoff() {
284        let policy = BackoffRetryPolicy {
285            retries: 10,
286            initial_backoff: Duration::from_millis(100),
287            multiplier: 10.0,                               // Large multiplier
288            max_backoff: Duration::from_secs(24 * 60 * 60), // 24 hours
289        };
290
291        // Even with a large multiplier, we should never exceed max_backoff
292        assert!(policy.backoff_duration(10) <= Duration::from_secs(24 * 60 * 60));
293    }
294}