openzeppelin_relayer/domain/transaction/stellar/
lane_gate.rs

1//! Lane gating mechanism for transaction relayers.
2//!
3//! This module provides a lock-free, atomic gating system that ensures only one transaction
4//! can hold a "lane" for a given relayer at any time. This prevents race conditions and
5//! conflicting operations when multiple transactions attempt to use the same relayer concurrently.
6//!
7//! ## Key Features
8//!
9//! - **Lock-free operations**: Uses DashMap for high-performance concurrent access
10//! - **Per-relayer lanes**: Different relayers can operate concurrently without blocking each other
11//! - **Atomic ownership transfer**: Supports atomic handoff of lane ownership between transactions
12//! - **Idempotent operations**: Safe to call multiple times with the same parameters
13use dashmap::{DashMap, Entry};
14use once_cell::sync::Lazy;
15
16type RelayerId = String;
17type TxId = String;
18
19static BUSY: Lazy<DashMap<RelayerId, TxId>> = Lazy::new(DashMap::new);
20
21/// Try to claim the lane for relayer_id:tx_id.
22/// Returns true if it becomes owner.
23/// Returns true if it already owns the lane.
24/// Returns false if another tx owns it.
25pub fn claim(relayer_id: &str, tx_id: &str) -> bool {
26    match BUSY.entry(relayer_id.to_owned()) {
27        Entry::Vacant(entry) => {
28            entry.insert(tx_id.to_owned());
29            true
30        }
31        Entry::Occupied(entry) => {
32            // Already owns the lane if same tx_id
33            entry.get() == tx_id
34        }
35    }
36}
37
38/// Pass the lane from current_tx_id to next_tx_id
39///
40/// This operation is atomic and lock-free per relayer.
41pub fn pass_to(relayer_id: &str, current_tx_id: &str, next_tx_id: &str) {
42    if let Entry::Occupied(mut entry) = BUSY.entry(relayer_id.to_owned()) {
43        if entry.get() == current_tx_id {
44            entry.insert(next_tx_id.to_owned());
45        }
46    }
47}
48
49/// Free the lane if we still own it.
50///
51/// This operation is atomic and lock-free per relayer.
52pub fn free(relayer_id: &str, tx_id: &str) {
53    if let Entry::Occupied(entry) = BUSY.entry(relayer_id.to_owned()) {
54        if entry.get() == tx_id {
55            entry.remove();
56        }
57    }
58}
59
60#[cfg(test)]
61mod tests {
62    use super::*;
63    use std::sync::{
64        atomic::{AtomicUsize, Ordering},
65        Arc, Barrier,
66    };
67    use std::thread;
68    use std::time::Duration;
69
70    /// Helper to restore a clean state for every test.
71    fn reset() {
72        BUSY.clear();
73    }
74
75    #[test]
76    fn claim_is_idempotent() {
77        reset();
78        assert!(claim("r", "tx1"));
79        assert!(claim("r", "tx1")); // same owner
80    }
81
82    #[test]
83    fn claim_is_exclusive() {
84        reset();
85        assert!(claim("r", "tx1"));
86        assert!(!claim("r", "tx2")); // different tx blocked
87    }
88
89    #[test]
90    fn free_releases_lane() {
91        reset();
92        assert!(claim("r", "tx1"));
93        free("r", "tx1");
94        assert!(claim("r", "tx2")); // now succeeds
95    }
96
97    #[test]
98    fn free_by_non_owner_is_noop() {
99        reset();
100        assert!(claim("r", "tx1"));
101        free("r", "tx2"); // wrong tx
102        assert!(!claim("r", "tx2")); // still owned by tx1
103        assert!(claim("r", "tx1")); // owner unchanged
104    }
105
106    #[test]
107    fn pass_to_transfers_ownership() {
108        reset();
109        assert!(claim("r", "tx1"));
110        pass_to("r", "tx1", "tx2");
111        assert!(!claim("r", "tx1")); // old owner lost
112        assert!(claim("r", "tx2")); // new owner
113    }
114
115    #[test]
116    fn pass_to_by_non_owner_is_noop() {
117        reset();
118        assert!(claim("r", "tx1"));
119        pass_to("r", "txX", "tx2"); // wrong current owner
120        assert!(!claim("r", "tx2")); // transfer failed
121        assert!(claim("r", "tx1"));
122    }
123
124    #[test]
125    fn exclusivity_holds_under_contention() {
126        reset();
127        const THREADS: usize = 8;
128        const ATTEMPTS: usize = 200;
129        let active = Arc::new(AtomicUsize::new(0));
130        let max_seen = Arc::new(AtomicUsize::new(0));
131        let barrier = Arc::new(Barrier::new(THREADS));
132
133        let handles: Vec<_> = (0..THREADS)
134            .map(|idx| {
135                let active = Arc::clone(&active);
136                let max_seen = Arc::clone(&max_seen);
137                let barrier = Arc::clone(&barrier);
138                thread::spawn(move || {
139                    barrier.wait(); // start together
140                    for a in 0..ATTEMPTS {
141                        let tx = format!("t{}-{}", idx, a);
142                        if claim("relayer", &tx) {
143                            let cur = active.fetch_add(1, Ordering::SeqCst) + 1;
144                            // record maximum concurrent owners
145                            max_seen.fetch_max(cur, Ordering::SeqCst);
146                            thread::sleep(Duration::from_micros(10));
147                            active.fetch_sub(1, Ordering::SeqCst);
148                            free("relayer", &tx);
149                        }
150                    }
151                })
152            })
153            .collect();
154
155        for h in handles {
156            h.join().unwrap();
157        }
158
159        assert_eq!(max_seen.load(Ordering::SeqCst), 1); // never more than one owner
160    }
161
162    #[test]
163    fn different_relayers_do_not_interfere() {
164        reset();
165        let barrier = Arc::new(Barrier::new(2));
166
167        let h1 = {
168            let barrier = Arc::clone(&barrier);
169            thread::spawn(move || {
170                barrier.wait();
171                assert!(claim("r1", "tx1"));
172                thread::sleep(Duration::from_millis(5));
173                free("r1", "tx1");
174            })
175        };
176        let h2 = {
177            let barrier = Arc::clone(&barrier);
178            thread::spawn(move || {
179                barrier.wait();
180                assert!(claim("r2", "tx1"));
181                thread::sleep(Duration::from_millis(5));
182                free("r2", "tx1");
183            })
184        };
185
186        h1.join().unwrap();
187        h2.join().unwrap();
188    }
189}