openzeppelin_relayer/domain/transaction/stellar/
lane_gate.rs1use dashmap::{DashMap, Entry};
14use once_cell::sync::Lazy;
15
16type RelayerId = String;
17type TxId = String;
18
19static BUSY: Lazy<DashMap<RelayerId, TxId>> = Lazy::new(DashMap::new);
20
21pub fn claim(relayer_id: &str, tx_id: &str) -> bool {
26 match BUSY.entry(relayer_id.to_owned()) {
27 Entry::Vacant(entry) => {
28 entry.insert(tx_id.to_owned());
29 true
30 }
31 Entry::Occupied(entry) => {
32 entry.get() == tx_id
34 }
35 }
36}
37
38pub fn pass_to(relayer_id: &str, current_tx_id: &str, next_tx_id: &str) {
42 if let Entry::Occupied(mut entry) = BUSY.entry(relayer_id.to_owned()) {
43 if entry.get() == current_tx_id {
44 entry.insert(next_tx_id.to_owned());
45 }
46 }
47}
48
49pub fn free(relayer_id: &str, tx_id: &str) {
53 if let Entry::Occupied(entry) = BUSY.entry(relayer_id.to_owned()) {
54 if entry.get() == tx_id {
55 entry.remove();
56 }
57 }
58}
59
60#[cfg(test)]
61mod tests {
62 use super::*;
63 use std::sync::{
64 atomic::{AtomicUsize, Ordering},
65 Arc, Barrier,
66 };
67 use std::thread;
68 use std::time::Duration;
69
70 fn reset() {
72 BUSY.clear();
73 }
74
75 #[test]
76 fn claim_is_idempotent() {
77 reset();
78 assert!(claim("r", "tx1"));
79 assert!(claim("r", "tx1")); }
81
82 #[test]
83 fn claim_is_exclusive() {
84 reset();
85 assert!(claim("r", "tx1"));
86 assert!(!claim("r", "tx2")); }
88
89 #[test]
90 fn free_releases_lane() {
91 reset();
92 assert!(claim("r", "tx1"));
93 free("r", "tx1");
94 assert!(claim("r", "tx2")); }
96
97 #[test]
98 fn free_by_non_owner_is_noop() {
99 reset();
100 assert!(claim("r", "tx1"));
101 free("r", "tx2"); assert!(!claim("r", "tx2")); assert!(claim("r", "tx1")); }
105
106 #[test]
107 fn pass_to_transfers_ownership() {
108 reset();
109 assert!(claim("r", "tx1"));
110 pass_to("r", "tx1", "tx2");
111 assert!(!claim("r", "tx1")); assert!(claim("r", "tx2")); }
114
115 #[test]
116 fn pass_to_by_non_owner_is_noop() {
117 reset();
118 assert!(claim("r", "tx1"));
119 pass_to("r", "txX", "tx2"); assert!(!claim("r", "tx2")); assert!(claim("r", "tx1"));
122 }
123
124 #[test]
125 fn exclusivity_holds_under_contention() {
126 reset();
127 const THREADS: usize = 8;
128 const ATTEMPTS: usize = 200;
129 let active = Arc::new(AtomicUsize::new(0));
130 let max_seen = Arc::new(AtomicUsize::new(0));
131 let barrier = Arc::new(Barrier::new(THREADS));
132
133 let handles: Vec<_> = (0..THREADS)
134 .map(|idx| {
135 let active = Arc::clone(&active);
136 let max_seen = Arc::clone(&max_seen);
137 let barrier = Arc::clone(&barrier);
138 thread::spawn(move || {
139 barrier.wait(); for a in 0..ATTEMPTS {
141 let tx = format!("t{}-{}", idx, a);
142 if claim("relayer", &tx) {
143 let cur = active.fetch_add(1, Ordering::SeqCst) + 1;
144 max_seen.fetch_max(cur, Ordering::SeqCst);
146 thread::sleep(Duration::from_micros(10));
147 active.fetch_sub(1, Ordering::SeqCst);
148 free("relayer", &tx);
149 }
150 }
151 })
152 })
153 .collect();
154
155 for h in handles {
156 h.join().unwrap();
157 }
158
159 assert_eq!(max_seen.load(Ordering::SeqCst), 1); }
161
162 #[test]
163 fn different_relayers_do_not_interfere() {
164 reset();
165 let barrier = Arc::new(Barrier::new(2));
166
167 let h1 = {
168 let barrier = Arc::clone(&barrier);
169 thread::spawn(move || {
170 barrier.wait();
171 assert!(claim("r1", "tx1"));
172 thread::sleep(Duration::from_millis(5));
173 free("r1", "tx1");
174 })
175 };
176 let h2 = {
177 let barrier = Arc::clone(&barrier);
178 thread::spawn(move || {
179 barrier.wait();
180 assert!(claim("r2", "tx1"));
181 thread::sleep(Duration::from_millis(5));
182 free("r2", "tx1");
183 })
184 };
185
186 h1.join().unwrap();
187 h2.join().unwrap();
188 }
189}