openzeppelin_relayer/jobs/handlers/
notification_handler.rs

1//! Notification handling worker implementation.
2//!
3//! This module implements the notification handling worker that processes
4//! notification jobs from the queue.
5
6use actix_web::web::ThinData;
7use apalis::prelude::{Attempt, Data, *};
8use eyre::Result;
9use log::info;
10
11use crate::{
12    constants::WORKER_DEFAULT_MAXIMUM_RETRIES,
13    jobs::{handle_result, Job, NotificationSend},
14    models::DefaultAppState,
15    repositories::Repository,
16    services::WebhookNotificationService,
17};
18
19/// Handles incoming notification jobs from the queue.
20///
21/// # Arguments
22/// * `job` - The notification job containing recipient and message details
23/// * `context` - Application state containing notification services
24///
25/// # Returns
26/// * `Result<(), Error>` - Success or failure of notification processing
27pub async fn notification_handler(
28    job: Job<NotificationSend>,
29    context: Data<ThinData<DefaultAppState>>,
30    attempt: Attempt,
31) -> Result<(), Error> {
32    info!("handling notification: {:?}", job.data);
33
34    let result = handle_request(job.data, context).await;
35
36    handle_result(
37        result,
38        attempt,
39        "Notification",
40        WORKER_DEFAULT_MAXIMUM_RETRIES,
41    )
42}
43
44async fn handle_request(
45    request: NotificationSend,
46    context: Data<ThinData<DefaultAppState>>,
47) -> Result<()> {
48    info!("sending notification: {:?}", request);
49    let notification = context
50        .notification_repository
51        .get_by_id(request.notification_id)
52        .await?;
53
54    let notification_service =
55        WebhookNotificationService::new(notification.url, notification.signing_key);
56
57    notification_service
58        .send_notification(request.notification)
59        .await?;
60
61    Ok(())
62}
63
64#[cfg(test)]
65mod tests {
66    use super::*;
67    use crate::models::{
68        EvmTransactionResponse, NetworkType, RelayerDisabledPayload, RelayerEvmPolicy,
69        RelayerNetworkPolicyResponse, RelayerResponse, TransactionResponse, TransactionStatus,
70        WebhookNotification, WebhookPayload, U256,
71    };
72
73    #[tokio::test]
74    async fn test_notification_job_creation() {
75        // Create a basic notification webhook payload
76        let payload = WebhookPayload::Transaction(TransactionResponse::Evm(Box::new(
77            EvmTransactionResponse {
78                id: "tx123".to_string(),
79                hash: Some("0x123".to_string()),
80                status: TransactionStatus::Confirmed,
81                status_reason: None,
82                created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
83                sent_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
84                confirmed_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
85                gas_price: Some(1000000000),
86                gas_limit: Some(21000),
87                nonce: Some(1),
88                value: U256::from(1000000000000000000_u64),
89                from: "0xabc".to_string(),
90                to: Some("0xdef".to_string()),
91                relayer_id: "relayer-1".to_string(),
92                data: None,
93                max_fee_per_gas: None,
94                max_priority_fee_per_gas: None,
95                signature: None,
96                speed: None,
97            },
98        )));
99
100        // Create a notification
101        let notification = WebhookNotification::new("test_event".to_string(), payload);
102        let notification_job =
103            NotificationSend::new("notification-1".to_string(), notification.clone());
104
105        // Create the job
106        let job = Job::new(crate::jobs::JobType::NotificationSend, notification_job);
107
108        // Test the job structure
109        assert_eq!(job.data.notification_id, "notification-1");
110        assert_eq!(job.data.notification.event, "test_event");
111    }
112
113    #[tokio::test]
114    async fn test_notification_job_with_different_payloads() {
115        // Test with different payload types
116
117        let transaction_payload = WebhookPayload::Transaction(TransactionResponse::Evm(Box::new(
118            EvmTransactionResponse {
119                id: "tx123".to_string(),
120                hash: Some("0x123".to_string()),
121                status: TransactionStatus::Confirmed,
122                status_reason: None,
123                created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
124                sent_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
125                confirmed_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
126                gas_price: Some(1000000000),
127                gas_limit: Some(21000),
128                nonce: Some(1),
129                value: U256::from(1000000000000000000_u64),
130                from: "0xabc".to_string(),
131                to: Some("0xdef".to_string()),
132                relayer_id: "relayer-1".to_string(),
133                data: None,
134                max_fee_per_gas: None,
135                max_priority_fee_per_gas: None,
136                signature: None,
137                speed: None,
138            },
139        )));
140
141        let string_notification =
142            WebhookNotification::new("transaction_payload".to_string(), transaction_payload);
143        let job = NotificationSend::new("notification-string".to_string(), string_notification);
144        assert_eq!(job.notification.event, "transaction_payload");
145
146        let relayer_disabled = WebhookPayload::RelayerDisabled(Box::new(RelayerDisabledPayload {
147            relayer: RelayerResponse {
148                id: "relayer-1".to_string(),
149                name: "relayer-1".to_string(),
150                network: "ethereum".to_string(),
151                network_type: NetworkType::Evm,
152                paused: false,
153                policies: Some(RelayerNetworkPolicyResponse::Evm(
154                    RelayerEvmPolicy {
155                        gas_price_cap: None,
156                        whitelist_receivers: None,
157                        eip1559_pricing: None,
158                        private_transactions: Some(false),
159                        min_balance: Some(0),
160                        gas_limit_estimation: None,
161                    }
162                    .into(),
163                )),
164                signer_id: "signer-1".to_string(),
165                notification_id: None,
166                custom_rpc_urls: None,
167                address: Some("0xabc".to_string()),
168                system_disabled: Some(false),
169            },
170            disable_reason: "test".to_string(),
171        }));
172        let object_notification =
173            WebhookNotification::new("object_event".to_string(), relayer_disabled);
174        let job = NotificationSend::new("notification-object".to_string(), object_notification);
175        assert_eq!(job.notification.event, "object_event");
176    }
177}