backend/
main.rs

1//! The backend of `PermaplanT`.
2
3#![recursion_limit = "1024"]
4// Enable all lints apart from clippy::restriction by default.
5// See https://rust-lang.github.io/rust-clippy/master/index.html#blanket_clippy_restriction_lints for as to why restriction is not enabled.
6#![warn(clippy::pedantic)]
7#![warn(clippy::nursery)]
8#![warn(clippy::cargo)]
9// Lints in clippy::restriction which seem useful.
10#![warn(
11    clippy::clone_on_ref_ptr,
12    clippy::empty_structs_with_brackets,
13    clippy::exit,
14    clippy::expect_used,
15    clippy::format_push_string,
16    clippy::get_unwrap,
17    clippy::if_then_some_else_none,
18    clippy::indexing_slicing,
19    clippy::integer_division,
20    clippy::implicit_clone,
21    clippy::large_include_file,
22    clippy::missing_docs_in_private_items,
23    clippy::mixed_read_write_in_expression,
24    clippy::multiple_inherent_impl,
25    clippy::mutex_atomic,
26    clippy::panic_in_result_fn,
27    clippy::partial_pub_fields,
28    clippy::print_stderr,
29    clippy::print_stdout,
30    clippy::rc_buffer,
31    clippy::rc_mutex,
32    clippy::rest_pat_in_fully_bound_structs,
33    clippy::same_name_method,
34    clippy::shadow_unrelated,
35    clippy::str_to_string,
36    clippy::suspicious_xor_used_as_pow,
37    clippy::todo,
38    clippy::try_err,
39    clippy::unimplemented,
40    clippy::unnecessary_self_imports,
41    clippy::unneeded_field_pattern,
42    clippy::unreachable,
43    clippy::unseparated_literal_suffix,
44    clippy::unwrap_in_result,
45    clippy::unwrap_used,
46    clippy::use_debug,
47    clippy::verbose_file_reads
48)]
49// Cannot fix some errors because dependencies import them.
50#![allow(clippy::multiple_crate_versions)]
51// Clippy suggests lots of false "x.get(0)" => "x.first()"
52#![allow(clippy::get_first)]
53// We often want the same name per module (for instance every enum).
54#![allow(clippy::module_name_repetitions)]
55
56use actix_cors::Cors;
57use actix_web::{http, middleware::Logger, App, HttpServer};
58use config::{api_doc, auth::Config, routes};
59use db::{
60    connection::Pool,
61    cronjobs::{cleanup_layers, cleanup_maps},
62};
63use std::sync::Arc;
64
65use opentelemetry::propagation::TextMapCompositePropagator;
66use opentelemetry::{global, KeyValue};
67use opentelemetry_instrumentation_actix_web::{RequestMetrics, RequestTracing};
68use opentelemetry_resource_detectors::{
69    HostResourceDetector, OsResourceDetector, ProcessResourceDetector,
70};
71use opentelemetry_sdk::metrics::SdkMeterProvider;
72use opentelemetry_sdk::propagation::{BaggagePropagator, TraceContextPropagator};
73use opentelemetry_sdk::trace::SdkTracerProvider;
74use opentelemetry_sdk::Resource;
75
76use crate::config::app::Mode;
77
78pub mod config;
79pub mod controller;
80pub mod db;
81pub mod error;
82pub mod keycloak_api;
83pub mod model;
84/// Auto generated by diesel.
85#[allow(clippy::wildcard_imports)]
86#[allow(clippy::missing_docs_in_private_items)]
87pub mod schema;
88pub mod service;
89pub mod sse;
90#[cfg(test)]
91pub mod test;
92
93/// OpenTelemetry resource describing this service.
94static RESOURCE: std::sync::LazyLock<Resource> = std::sync::LazyLock::new(|| {
95    const NAME: &str = env!("CARGO_PKG_NAME");
96    const VERSION: &str = env!("CARGO_PKG_VERSION");
97    const BUILD_DATE: &str = env!("BUILD_DATE");
98    Resource::builder()
99        .with_detector(Box::new(OsResourceDetector))
100        .with_detector(Box::new(HostResourceDetector::default()))
101        .with_detector(Box::new(ProcessResourceDetector))
102        .with_attributes(vec![
103            KeyValue::new("service.name", NAME),
104            KeyValue::new("service.version", VERSION),
105            KeyValue::new("service.build_date", BUILD_DATE),
106        ])
107        .build()
108});
109
110/// Main function.
111#[actix_web::main]
112async fn main() -> std::io::Result<()> {
113    env_logger::init();
114
115    log::debug!("Initializing telemetry ...");
116
117    // Initialize OpenTelemetry
118    let tracer_provider = init_trace();
119    let tracer_provider = match tracer_provider {
120        Ok(tp) => tp,
121        Err(e) => {
122            log::error!("Error initializing OpenTelemetry tracing: {e}");
123            std::process::exit(1);
124        }
125    };
126
127    global::set_tracer_provider(tracer_provider.clone());
128
129    let meter_provider = init_metrics();
130    let meter_provider = match meter_provider {
131        Ok(mp) => mp,
132        Err(e) => {
133            log::error!("Error initializing OpenTelemetry metrics: {e}");
134            std::process::exit(1);
135        }
136    };
137
138    global::set_meter_provider(meter_provider.clone());
139
140    opentelemetry_instrumentation_tokio::observe_current_runtime();
141
142    log::debug!("Starting PermaplanT backend...");
143
144    let version = option_env!("CARGO_PKG_VERSION").unwrap_or("unknown");
145    let build_date = option_env!("BUILD_DATE").unwrap_or("unknown");
146
147    let config = match config::app::Config::from_env() {
148        Ok(config) => config,
149        Err(e) => {
150            return Err(std::io::Error::other(format!(
151                "Error reading configuration: {e}"
152            )));
153        }
154    };
155
156    log::info!("Configuration loaded: {config:#?}");
157
158    Config::init(&config).await;
159
160    log::info!("Version   : {version}");
161    log::info!("Build date: {build_date}");
162
163    let data_init = config::data::init(&config);
164    let pool = data_init.pool.clone().into_inner();
165    start_cronjobs(pool);
166
167    log::info!(
168        "Binding to: {}:{}",
169        config.bind_address.0,
170        config.bind_address.1
171    );
172
173    let broadcaster = data_init.broadcaster.clone();
174
175    let mode = config.mode;
176
177    let server = HttpServer::new(move || {
178        App::new()
179            .wrap(Logger::default())
180            .wrap(RequestTracing::new())
181            .wrap(RequestMetrics::default())
182            .wrap(cors_configuration(mode))
183            .app_data(data_init.pool.clone())
184            .app_data(data_init.broadcaster.clone())
185            .app_data(data_init.http_client.clone())
186            .app_data(data_init.keycloak_api.clone())
187            .app_data(data_init.mode.clone())
188            .configure(routes::config)
189            .configure(api_doc::config)
190    })
191    .disable_signals()
192    .shutdown_timeout(5)
193    .bind(config.bind_address)?
194    .run();
195
196    let srv_handle = server.handle();
197
198    actix_web::rt::spawn(async move {
199        if let Err(err) = shutdown_signal().await {
200            log::error!("Failed to listen for shutdown signals: {err}");
201            return;
202        }
203        log::info!("Received shutdown signal, notifying all connected clients");
204        let action = model::dto::actions::Action::new_shutdown_signal_action(
205            "Server is shutting down".to_owned(),
206        );
207        broadcaster.broadcast_all_maps(action).await;
208        log::info!("Shutdown notification sent to all clients");
209        srv_handle.stop(true).await;
210    });
211
212    server.await?;
213
214    let _ = tracer_provider.shutdown();
215    let _ = meter_provider.shutdown();
216
217    Ok(())
218}
219
220/// Create a CORS configuration for the server.
221fn cors_configuration(mode: Mode) -> Cors {
222    let mut cors = Cors::default()
223        // 1. Add methods and headers
224        .allowed_methods(vec!["GET", "POST", "PUT", "PATCH", "DELETE"])
225        .allowed_headers(vec![
226            http::header::AUTHORIZATION,
227            http::header::ACCEPT,
228            http::header::CONTENT_TYPE,
229        ])
230        .allowed_header("traceparent")
231        .allowed_header("tracestate")
232        .max_age(3600);
233
234    // 2. Define allowed origin
235    match mode {
236        Mode::Testing => {
237            cors = cors.allow_any_origin();
238        }
239        Mode::Production => {
240            cors = cors
241                .allowed_origin("http://localhost:5173")
242                .allowed_origin("https://cloud.perma.health");
243
244            // 3. Programmatically add the repetitive subdomains
245            let subdomains = ["mr", "dev", "master", "www", "experimental", "telemetry"];
246            for sub in subdomains {
247                cors = cors.allowed_origin(&format!("https://{sub}.permaplant.net"));
248                cors = cors.allowed_origin(&format!("https://{sub}.staging.permaplant.net"));
249            }
250        }
251    }
252
253    cors
254}
255
256/// Start all scheduled jobs that get run in the backend.
257fn start_cronjobs(pool: Arc<Pool>) {
258    tokio::spawn(cleanup_maps(Arc::clone(&pool)));
259    tokio::spawn(cleanup_layers(pool));
260}
261
262/// Initialize OpenTelemetry tracing.
263fn init_trace() -> Result<SdkTracerProvider, Box<dyn std::error::Error + Send + Sync>> {
264    let baggage_propagator = BaggagePropagator::new();
265    let trace_context_propagator = TraceContextPropagator::new();
266    let composite_propagator = TextMapCompositePropagator::new(vec![
267        Box::new(baggage_propagator),
268        Box::new(trace_context_propagator),
269    ]);
270
271    global::set_text_map_propagator(composite_propagator);
272
273    let exporter = opentelemetry_otlp::SpanExporter::builder()
274        .with_http()
275        .build()?;
276
277    let tr = SdkTracerProvider::builder()
278        .with_batch_exporter(exporter)
279        .with_resource(RESOURCE.clone())
280        .build();
281    Ok(tr)
282}
283
284/// Initialize OpenTelemetry metrics.
285fn init_metrics() -> Result<SdkMeterProvider, Box<dyn std::error::Error + Send + Sync>> {
286    let exporter = opentelemetry_otlp::MetricExporter::builder()
287        .with_http()
288        .build()?;
289
290    let mt = SdkMeterProvider::builder()
291        .with_periodic_exporter(exporter)
292        .with_resource(RESOURCE.clone())
293        .build();
294    Ok(mt)
295}
296
297/// Wait for shutdown signal.
298async fn shutdown_signal() -> std::io::Result<()> {
299    let ctrl_c = async {
300        tokio::signal::ctrl_c().await?;
301        Ok::<(), std::io::Error>(())
302    };
303
304    let unix_signal = async {
305        use tokio::signal::unix::{signal, SignalKind};
306
307        let mut sigterm = signal(SignalKind::terminate())?;
308        let mut sighup = signal(SignalKind::hangup())?;
309
310        tokio::select! {
311            _ = sigterm.recv() => {},
312            _ = sighup.recv() => {},
313        }
314
315        Ok::<(), std::io::Error>(())
316    };
317
318    tokio::select! {
319        res = ctrl_c => res?,
320        res = unix_signal => res?,
321    }
322
323    Ok(())
324}