openzeppelin_relayer/jobs/
retry_backoff.rs1use apalis::prelude::*;
20use std::time::Duration;
21use tokio::time::{sleep, Sleep};
22use tower::retry::Policy;
23
24type Req<T, Ctx> = Request<T, Ctx>;
25type Err = Error;
26
27#[derive(Clone, Debug)]
49pub struct BackoffRetryPolicy {
50 pub retries: usize,
52 pub initial_backoff: Duration,
54 pub multiplier: f64,
56 pub max_backoff: Duration,
58}
59
60impl Default for BackoffRetryPolicy {
62 fn default() -> Self {
63 Self {
64 retries: 5,
65 initial_backoff: Duration::from_millis(1000),
66 multiplier: 1.5,
67 max_backoff: Duration::from_secs(60),
68 }
69 }
70}
71
72impl BackoffRetryPolicy {
73 fn backoff_duration(&self, attempt: usize) -> Duration {
74 let backoff =
75 self.initial_backoff.as_millis() as f64 * self.multiplier.powi(attempt as i32);
76 Duration::from_millis(backoff.min(self.max_backoff.as_millis() as f64) as u64)
77 }
78}
79
80impl<T, Res, Ctx> Policy<Req<T, Ctx>, Res, Err> for BackoffRetryPolicy
81where
82 T: Clone,
83 Ctx: Clone,
84{
85 type Future = Sleep;
86
87 fn retry(
88 &mut self,
89 req: &mut Req<T, Ctx>,
90 result: &mut Result<Res, Err>,
91 ) -> Option<Self::Future> {
92 let attempt = req.parts.attempt.current();
93
94 match result {
95 Ok(_) => None,
96 Err(_) if (self.retries - attempt > 0) => Some(sleep(self.backoff_duration(attempt))),
97 Err(_) => None,
98 }
99 }
100
101 fn clone_request(&mut self, req: &Req<T, Ctx>) -> Option<Req<T, Ctx>> {
102 let req = req.clone();
103 req.parts.attempt.increment();
104 Some(req)
105 }
106}
107
108#[cfg(test)]
109mod tests {
110 use super::*;
111
112 #[derive(Clone)]
113 struct TestJob;
114
115 #[tokio::test]
116 async fn test_backoff_duration_calculation() {
117 let policy = BackoffRetryPolicy {
118 retries: 5,
119 initial_backoff: Duration::from_secs(1),
120 multiplier: 2.0,
121 max_backoff: Duration::from_secs(60),
122 };
123
124 assert_eq!(policy.backoff_duration(0), Duration::from_secs(1));
126 assert_eq!(policy.backoff_duration(1), Duration::from_secs(2));
127 assert_eq!(policy.backoff_duration(2), Duration::from_secs(4));
128 assert_eq!(policy.backoff_duration(3), Duration::from_secs(8));
129
130 let policy = BackoffRetryPolicy {
132 retries: 10,
133 initial_backoff: Duration::from_secs(10),
134 multiplier: 3.0,
135 max_backoff: Duration::from_secs(60),
136 };
137
138 assert_eq!(policy.backoff_duration(3), Duration::from_secs(60));
140 }
141
142 #[tokio::test]
143 async fn test_retry_policy_success() {
144 let mut policy = BackoffRetryPolicy::default();
145 let job = TestJob;
146 let ctx = ();
147 let mut req = Request::new_with_ctx(job, ctx);
148 let mut result: Result<(), Err> = Ok(());
149
150 assert!(policy.retry(&mut req, &mut result).is_none());
152 }
153
154 #[tokio::test]
155 async fn test_retry_policy_failure_with_retries() {
156 let mut policy = BackoffRetryPolicy {
157 retries: 3,
158 initial_backoff: Duration::from_millis(10),
159 multiplier: 2.0,
160 max_backoff: Duration::from_secs(1),
161 };
162
163 let job = TestJob;
164 let ctx = ();
165 let mut req = Request::new_with_ctx(job, ctx);
166 let mut result: Result<(), Err> =
167 Err(Error::from(Box::new(std::io::Error::other("Test error"))
168 as Box<dyn std::error::Error + Send + Sync>));
169
170 assert!(policy.retry(&mut req, &mut result).is_some());
172
173 req.parts.attempt.increment();
175 assert!(policy.retry(&mut req, &mut result).is_some());
176
177 req.parts.attempt.increment();
179 assert!(policy.retry(&mut req, &mut result).is_some());
180
181 req.parts.attempt.increment();
183 assert!(policy.retry(&mut req, &mut result).is_none());
184 }
185
186 #[tokio::test]
187 async fn test_clone_request() {
188 let mut policy: BackoffRetryPolicy = BackoffRetryPolicy::default();
189 let job = TestJob;
190 let ctx = ();
191 let req: Request<TestJob, ()> = Request::new_with_ctx(job, ctx);
192
193 assert_eq!(req.parts.attempt.current(), 0);
195
196 let cloned_req =
198 <BackoffRetryPolicy as Policy<Request<TestJob, ()>, (), Error>>::clone_request(
199 &mut policy,
200 &req,
201 )
202 .unwrap();
203 let cloned_req_attempt = cloned_req.parts.attempt.current();
204 assert_eq!(cloned_req_attempt, 1);
205 }
206
207 #[test]
208 fn test_default_policy() {
209 let policy = BackoffRetryPolicy::default();
210
211 assert_eq!(policy.retries, 5);
212 assert_eq!(policy.initial_backoff, Duration::from_millis(1000));
213 assert_eq!(policy.multiplier, 1.5);
214 assert_eq!(policy.max_backoff, Duration::from_secs(60));
215 }
216
217 #[test]
218 fn test_zero_initial_backoff() {
219 let policy = BackoffRetryPolicy {
220 retries: 3,
221 initial_backoff: Duration::from_millis(0),
222 multiplier: 2.0,
223 max_backoff: Duration::from_secs(60),
224 };
225
226 assert_eq!(policy.backoff_duration(0), Duration::from_millis(0));
228 assert_eq!(policy.backoff_duration(1), Duration::from_millis(0));
229 assert_eq!(policy.backoff_duration(2), Duration::from_millis(0));
230 }
231
232 #[test]
233 fn test_multiplier_one() {
234 let policy = BackoffRetryPolicy {
235 retries: 3,
236 initial_backoff: Duration::from_millis(500),
237 multiplier: 1.0,
238 max_backoff: Duration::from_secs(60),
239 };
240
241 assert_eq!(policy.backoff_duration(0), Duration::from_millis(500));
243 assert_eq!(policy.backoff_duration(1), Duration::from_millis(500));
244 assert_eq!(policy.backoff_duration(2), Duration::from_millis(500));
245 }
246
247 #[test]
248 fn test_multiplier_less_than_one() {
249 let policy = BackoffRetryPolicy {
250 retries: 3,
251 initial_backoff: Duration::from_millis(1000),
252 multiplier: 0.5,
253 max_backoff: Duration::from_secs(60),
254 };
255
256 assert_eq!(policy.backoff_duration(0), Duration::from_millis(1000));
258 assert_eq!(policy.backoff_duration(1), Duration::from_millis(500));
259 assert_eq!(policy.backoff_duration(2), Duration::from_millis(250));
260 }
261
262 #[tokio::test]
263 async fn test_retry_policy_exhausted_retries() {
264 let mut policy = BackoffRetryPolicy {
265 retries: 0, initial_backoff: Duration::from_millis(10),
267 multiplier: 2.0,
268 max_backoff: Duration::from_secs(1),
269 };
270
271 let job = TestJob;
272 let ctx = ();
273 let mut req = Request::new_with_ctx(job, ctx);
274 let mut result: Result<(), Err> =
275 Err(Error::from(Box::new(std::io::Error::other("Test error"))
276 as Box<dyn std::error::Error + Send + Sync>));
277
278 assert!(policy.retry(&mut req, &mut result).is_none());
280 }
281
282 #[tokio::test]
283 async fn test_retry_policy_large_max_backoff() {
284 let policy = BackoffRetryPolicy {
285 retries: 10,
286 initial_backoff: Duration::from_millis(100),
287 multiplier: 10.0, max_backoff: Duration::from_secs(24 * 60 * 60), };
290
291 assert!(policy.backoff_duration(10) <= Duration::from_secs(24 * 60 * 60));
293 }
294}