1#![recursion_limit = "1024"]
4#![warn(clippy::pedantic)]
7#![warn(clippy::nursery)]
8#![warn(clippy::cargo)]
9#![warn(
11 clippy::clone_on_ref_ptr,
12 clippy::empty_structs_with_brackets,
13 clippy::exit,
14 clippy::expect_used,
15 clippy::format_push_string,
16 clippy::get_unwrap,
17 clippy::if_then_some_else_none,
18 clippy::indexing_slicing,
19 clippy::integer_division,
20 clippy::implicit_clone,
21 clippy::large_include_file,
22 clippy::missing_docs_in_private_items,
23 clippy::mixed_read_write_in_expression,
24 clippy::multiple_inherent_impl,
25 clippy::mutex_atomic,
26 clippy::panic_in_result_fn,
27 clippy::partial_pub_fields,
28 clippy::print_stderr,
29 clippy::print_stdout,
30 clippy::rc_buffer,
31 clippy::rc_mutex,
32 clippy::rest_pat_in_fully_bound_structs,
33 clippy::same_name_method,
34 clippy::shadow_unrelated,
35 clippy::str_to_string,
36 clippy::suspicious_xor_used_as_pow,
37 clippy::todo,
38 clippy::try_err,
39 clippy::unimplemented,
40 clippy::unnecessary_self_imports,
41 clippy::unneeded_field_pattern,
42 clippy::unreachable,
43 clippy::unseparated_literal_suffix,
44 clippy::unwrap_in_result,
45 clippy::unwrap_used,
46 clippy::use_debug,
47 clippy::verbose_file_reads
48)]
49#![allow(clippy::multiple_crate_versions)]
51#![allow(clippy::get_first)]
53#![allow(clippy::module_name_repetitions)]
55
56use actix_cors::Cors;
57use actix_web::{http, middleware::Logger, App, HttpServer};
58use config::{api_doc, auth::Config, routes};
59use db::{
60 connection::Pool,
61 cronjobs::{cleanup_layers, cleanup_maps},
62};
63use std::sync::Arc;
64
65use opentelemetry::propagation::TextMapCompositePropagator;
66use opentelemetry::{global, KeyValue};
67use opentelemetry_instrumentation_actix_web::{RequestMetrics, RequestTracing};
68use opentelemetry_resource_detectors::{
69 HostResourceDetector, OsResourceDetector, ProcessResourceDetector,
70};
71use opentelemetry_sdk::metrics::SdkMeterProvider;
72use opentelemetry_sdk::propagation::{BaggagePropagator, TraceContextPropagator};
73use opentelemetry_sdk::trace::SdkTracerProvider;
74use opentelemetry_sdk::Resource;
75
76use crate::config::app::Mode;
77
78pub mod config;
79pub mod controller;
80pub mod db;
81pub mod error;
82pub mod keycloak_api;
83pub mod model;
84#[allow(clippy::wildcard_imports)]
86#[allow(clippy::missing_docs_in_private_items)]
87pub mod schema;
88pub mod service;
89pub mod sse;
90#[cfg(test)]
91pub mod test;
92
93static RESOURCE: std::sync::LazyLock<Resource> = std::sync::LazyLock::new(|| {
95 const NAME: &str = env!("CARGO_PKG_NAME");
96 const VERSION: &str = env!("CARGO_PKG_VERSION");
97 const BUILD_DATE: &str = env!("BUILD_DATE");
98 Resource::builder()
99 .with_detector(Box::new(OsResourceDetector))
100 .with_detector(Box::new(HostResourceDetector::default()))
101 .with_detector(Box::new(ProcessResourceDetector))
102 .with_attributes(vec![
103 KeyValue::new("service.name", NAME),
104 KeyValue::new("service.version", VERSION),
105 KeyValue::new("service.build_date", BUILD_DATE),
106 ])
107 .build()
108});
109
110#[actix_web::main]
112async fn main() -> std::io::Result<()> {
113 env_logger::init();
114
115 log::debug!("Initializing telemetry ...");
116
117 let tracer_provider = init_trace();
119 let tracer_provider = match tracer_provider {
120 Ok(tp) => tp,
121 Err(e) => {
122 log::error!("Error initializing OpenTelemetry tracing: {e}");
123 std::process::exit(1);
124 }
125 };
126
127 global::set_tracer_provider(tracer_provider.clone());
128
129 let meter_provider = init_metrics();
130 let meter_provider = match meter_provider {
131 Ok(mp) => mp,
132 Err(e) => {
133 log::error!("Error initializing OpenTelemetry metrics: {e}");
134 std::process::exit(1);
135 }
136 };
137
138 global::set_meter_provider(meter_provider.clone());
139
140 opentelemetry_instrumentation_tokio::observe_current_runtime();
141
142 log::debug!("Starting PermaplanT backend...");
143
144 let version = option_env!("CARGO_PKG_VERSION").unwrap_or("unknown");
145 let build_date = option_env!("BUILD_DATE").unwrap_or("unknown");
146
147 let config = match config::app::Config::from_env() {
148 Ok(config) => config,
149 Err(e) => {
150 return Err(std::io::Error::other(format!(
151 "Error reading configuration: {e}"
152 )));
153 }
154 };
155
156 log::info!("Configuration loaded: {config:#?}");
157
158 Config::init(&config).await;
159
160 log::info!("Version : {version}");
161 log::info!("Build date: {build_date}");
162
163 let data_init = config::data::init(&config);
164 let pool = data_init.pool.clone().into_inner();
165 start_cronjobs(pool);
166
167 log::info!(
168 "Binding to: {}:{}",
169 config.bind_address.0,
170 config.bind_address.1
171 );
172
173 let broadcaster = data_init.broadcaster.clone();
174
175 let mode = config.mode;
176
177 let server = HttpServer::new(move || {
178 App::new()
179 .wrap(Logger::default())
180 .wrap(RequestTracing::new())
181 .wrap(RequestMetrics::default())
182 .wrap(cors_configuration(mode))
183 .app_data(data_init.pool.clone())
184 .app_data(data_init.broadcaster.clone())
185 .app_data(data_init.http_client.clone())
186 .app_data(data_init.keycloak_api.clone())
187 .app_data(data_init.mode.clone())
188 .configure(routes::config)
189 .configure(api_doc::config)
190 })
191 .disable_signals()
192 .shutdown_timeout(5)
193 .bind(config.bind_address)?
194 .run();
195
196 let srv_handle = server.handle();
197
198 actix_web::rt::spawn(async move {
199 if let Err(err) = shutdown_signal().await {
200 log::error!("Failed to listen for shutdown signals: {err}");
201 return;
202 }
203 log::info!("Received shutdown signal, notifying all connected clients");
204 let action = model::dto::actions::Action::new_shutdown_signal_action(
205 "Server is shutting down".to_owned(),
206 );
207 broadcaster.broadcast_all_maps(action).await;
208 log::info!("Shutdown notification sent to all clients");
209 srv_handle.stop(true).await;
210 });
211
212 server.await?;
213
214 let _ = tracer_provider.shutdown();
215 let _ = meter_provider.shutdown();
216
217 Ok(())
218}
219
220fn cors_configuration(mode: Mode) -> Cors {
222 let mut cors = Cors::default()
223 .allowed_methods(vec!["GET", "POST", "PUT", "PATCH", "DELETE"])
225 .allowed_headers(vec![
226 http::header::AUTHORIZATION,
227 http::header::ACCEPT,
228 http::header::CONTENT_TYPE,
229 ])
230 .allowed_header("traceparent")
231 .allowed_header("tracestate")
232 .max_age(3600);
233
234 match mode {
236 Mode::Testing => {
237 cors = cors.allow_any_origin();
238 }
239 Mode::Production => {
240 cors = cors
241 .allowed_origin("http://localhost:5173")
242 .allowed_origin("https://cloud.perma.health");
243
244 let subdomains = ["mr", "dev", "master", "www", "experimental", "telemetry"];
246 for sub in subdomains {
247 cors = cors.allowed_origin(&format!("https://{sub}.permaplant.net"));
248 cors = cors.allowed_origin(&format!("https://{sub}.staging.permaplant.net"));
249 }
250 }
251 }
252
253 cors
254}
255
256fn start_cronjobs(pool: Arc<Pool>) {
258 tokio::spawn(cleanup_maps(Arc::clone(&pool)));
259 tokio::spawn(cleanup_layers(pool));
260}
261
262fn init_trace() -> Result<SdkTracerProvider, Box<dyn std::error::Error + Send + Sync>> {
264 let baggage_propagator = BaggagePropagator::new();
265 let trace_context_propagator = TraceContextPropagator::new();
266 let composite_propagator = TextMapCompositePropagator::new(vec![
267 Box::new(baggage_propagator),
268 Box::new(trace_context_propagator),
269 ]);
270
271 global::set_text_map_propagator(composite_propagator);
272
273 let exporter = opentelemetry_otlp::SpanExporter::builder()
274 .with_http()
275 .build()?;
276
277 let tr = SdkTracerProvider::builder()
278 .with_batch_exporter(exporter)
279 .with_resource(RESOURCE.clone())
280 .build();
281 Ok(tr)
282}
283
284fn init_metrics() -> Result<SdkMeterProvider, Box<dyn std::error::Error + Send + Sync>> {
286 let exporter = opentelemetry_otlp::MetricExporter::builder()
287 .with_http()
288 .build()?;
289
290 let mt = SdkMeterProvider::builder()
291 .with_periodic_exporter(exporter)
292 .with_resource(RESOURCE.clone())
293 .build();
294 Ok(mt)
295}
296
297async fn shutdown_signal() -> std::io::Result<()> {
299 let ctrl_c = async {
300 tokio::signal::ctrl_c().await?;
301 Ok::<(), std::io::Error>(())
302 };
303
304 let unix_signal = async {
305 use tokio::signal::unix::{signal, SignalKind};
306
307 let mut sigterm = signal(SignalKind::terminate())?;
308 let mut sighup = signal(SignalKind::hangup())?;
309
310 tokio::select! {
311 _ = sigterm.recv() => {},
312 _ = sighup.recv() => {},
313 }
314
315 Ok::<(), std::io::Error>(())
316 };
317
318 tokio::select! {
319 res = ctrl_c => res?,
320 res = unix_signal => res?,
321 }
322
323 Ok(())
324}