1use std::env;
2use std::fmt::{Debug, Formatter};
3use std::str::FromStr;
4
5use http::{HeaderMap, HeaderName, HeaderValue};
6use opentelemetry::otel_debug;
7use tonic::codec::CompressionEncoding;
8use tonic::metadata::{KeyAndValueRef, MetadataMap};
9use tonic::service::Interceptor;
10use tonic::transport::Channel;
11#[cfg(any(
12 feature = "tls",
13 feature = "tls-ring",
14 feature = "tls-aws-lc",
15 feature = "tls-provider-agnostic"
16))]
17use tonic::transport::ClientTlsConfig;
18
19use super::{default_headers, parse_header_string, OTEL_EXPORTER_OTLP_GRPC_ENDPOINT_DEFAULT};
20use super::{resolve_timeout, ExporterBuildError};
21use crate::exporter::Compression;
22use crate::{ExportConfig, OTEL_EXPORTER_OTLP_ENDPOINT, OTEL_EXPORTER_OTLP_HEADERS};
23
24#[cfg(feature = "logs")]
25pub(crate) mod logs;
26
27#[cfg(feature = "metrics")]
28pub(crate) mod metrics;
29
30#[cfg(feature = "trace")]
31pub(crate) mod trace;
32
33#[derive(Debug, Default)]
37#[non_exhaustive]
38pub struct TonicConfig {
39 pub(crate) metadata: Option<MetadataMap>,
41 #[cfg(any(
43 feature = "tls",
44 feature = "tls-ring",
45 feature = "tls-aws-lc",
46 feature = "tls-provider-agnostic"
47 ))]
48 pub(crate) tls_config: Option<ClientTlsConfig>,
49 pub(crate) compression: Option<Compression>,
51 pub(crate) channel: Option<tonic::transport::Channel>,
52 pub(crate) interceptor: Option<BoxInterceptor>,
53}
54
55impl TryFrom<Compression> for tonic::codec::CompressionEncoding {
56 type Error = ExporterBuildError;
57
58 fn try_from(value: Compression) -> Result<Self, ExporterBuildError> {
59 match value {
60 #[cfg(feature = "gzip-tonic")]
61 Compression::Gzip => Ok(tonic::codec::CompressionEncoding::Gzip),
62 #[cfg(not(feature = "gzip-tonic"))]
63 Compression::Gzip => Err(ExporterBuildError::FeatureRequiredForCompressionAlgorithm(
64 "gzip-tonic",
65 Compression::Gzip,
66 )),
67 #[cfg(feature = "zstd-tonic")]
68 Compression::Zstd => Ok(tonic::codec::CompressionEncoding::Zstd),
69 #[cfg(not(feature = "zstd-tonic"))]
70 Compression::Zstd => Err(ExporterBuildError::FeatureRequiredForCompressionAlgorithm(
71 "zstd-tonic",
72 Compression::Zstd,
73 )),
74 }
75 }
76}
77
78#[derive(Debug)]
113pub struct TonicExporterBuilder {
114 pub(crate) tonic_config: TonicConfig,
115 pub(crate) exporter_config: ExportConfig,
116}
117
118pub(crate) struct BoxInterceptor(Box<dyn Interceptor + Send + Sync>);
119impl tonic::service::Interceptor for BoxInterceptor {
120 fn call(&mut self, request: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> {
121 self.0.call(request)
122 }
123}
124
125impl Debug for BoxInterceptor {
126 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
127 write!(f, "BoxInterceptor(..)")
128 }
129}
130
131impl Default for TonicExporterBuilder {
132 fn default() -> Self {
133 TonicExporterBuilder {
134 tonic_config: TonicConfig {
135 metadata: Some(MetadataMap::from_headers(
136 (&default_headers())
137 .try_into()
138 .expect("Invalid tonic headers"),
139 )),
140 #[cfg(any(
141 feature = "tls",
142 feature = "tls-ring",
143 feature = "tls-aws-lc",
144 feature = "tls-provider-agnostic"
145 ))]
146 tls_config: None,
147 compression: None,
148 channel: Option::default(),
149 interceptor: Option::default(),
150 },
151 exporter_config: ExportConfig {
152 protocol: crate::Protocol::Grpc,
153 ..Default::default()
154 },
155 }
156 }
157}
158
159impl TonicExporterBuilder {
160 #[allow(unused)]
162 fn build_channel(
163 self,
164 signal_endpoint_var: &str,
165 signal_timeout_var: &str,
166 signal_compression_var: &str,
167 signal_headers_var: &str,
168 ) -> Result<(Channel, BoxInterceptor, Option<CompressionEncoding>), ExporterBuildError> {
169 let compression = self.resolve_compression(signal_compression_var)?;
170
171 let (headers_from_env, headers_for_logging) = parse_headers_from_env(signal_headers_var);
172 let metadata = merge_metadata_with_headers_from_env(
173 self.tonic_config.metadata.unwrap_or_default(),
174 headers_from_env,
175 );
176
177 let add_metadata = move |mut req: tonic::Request<()>| {
178 for key_and_value in metadata.iter() {
179 match key_and_value {
180 KeyAndValueRef::Ascii(key, value) => {
181 req.metadata_mut().append(key, value.to_owned())
182 }
183 KeyAndValueRef::Binary(key, value) => {
184 req.metadata_mut().append_bin(key, value.to_owned())
185 }
186 };
187 }
188
189 Ok(req)
190 };
191
192 let interceptor = match self.tonic_config.interceptor {
193 Some(mut interceptor) => {
194 BoxInterceptor(Box::new(move |req| interceptor.call(add_metadata(req)?)))
195 }
196 None => BoxInterceptor(Box::new(add_metadata)),
197 };
198
199 if let Some(channel) = self.tonic_config.channel {
201 return Ok((channel, interceptor, compression));
202 }
203
204 let config = self.exporter_config;
205
206 let endpoint = Self::resolve_endpoint(signal_endpoint_var, config.endpoint);
207
208 let endpoint_clone = endpoint.clone();
210
211 let endpoint = Channel::from_shared(endpoint)
212 .map_err(|op| ExporterBuildError::InvalidUri(endpoint_clone.clone(), op.to_string()))?;
213 let timeout = resolve_timeout(signal_timeout_var, config.timeout.as_ref());
214
215 #[cfg(any(
216 feature = "tls",
217 feature = "tls-ring",
218 feature = "tls-aws-lc",
219 feature = "tls-provider-agnostic"
220 ))]
221 let channel = match self.tonic_config.tls_config {
222 Some(tls_config) => endpoint
223 .tls_config(tls_config)
224 .map_err(|er| ExporterBuildError::InternalFailure(er.to_string()))?,
225 None => endpoint,
226 }
227 .timeout(timeout)
228 .connect_lazy();
229
230 #[cfg(not(any(
231 feature = "tls",
232 feature = "tls-ring",
233 feature = "tls-aws-lc",
234 feature = "tls-provider-agnostic"
235 )))]
236 let channel = endpoint.timeout(timeout).connect_lazy();
237
238 otel_debug!(name: "TonicChannelBuilt", endpoint = endpoint_clone, timeout_in_millisecs = timeout.as_millis(), compression = format!("{:?}", compression), headers = format!("{:?}", headers_for_logging));
239 Ok((channel, interceptor, compression))
240 }
241
242 fn resolve_endpoint(default_endpoint_var: &str, provided_endpoint: Option<String>) -> String {
243 if let Some(endpoint) = provided_endpoint.filter(|s| !s.is_empty()) {
251 endpoint
252 } else if let Ok(endpoint) = env::var(default_endpoint_var) {
253 endpoint
254 } else if let Ok(endpoint) = env::var(OTEL_EXPORTER_OTLP_ENDPOINT) {
255 endpoint
256 } else {
257 OTEL_EXPORTER_OTLP_GRPC_ENDPOINT_DEFAULT.to_string()
258 }
259 }
260
261 fn resolve_compression(
262 &self,
263 env_override: &str,
264 ) -> Result<Option<CompressionEncoding>, ExporterBuildError> {
265 super::resolve_compression_from_env(self.tonic_config.compression, env_override)?
266 .map(|c| c.try_into())
267 .transpose()
268 }
269
270 #[cfg(feature = "logs")]
272 pub(crate) fn build_log_exporter(self) -> Result<crate::logs::LogExporter, ExporterBuildError> {
273 use crate::exporter::tonic::logs::TonicLogsClient;
274
275 otel_debug!(name: "LogsTonicChannelBuilding");
276
277 let (channel, interceptor, compression) = self.build_channel(
278 crate::logs::OTEL_EXPORTER_OTLP_LOGS_ENDPOINT,
279 crate::logs::OTEL_EXPORTER_OTLP_LOGS_TIMEOUT,
280 crate::logs::OTEL_EXPORTER_OTLP_LOGS_COMPRESSION,
281 crate::logs::OTEL_EXPORTER_OTLP_LOGS_HEADERS,
282 )?;
283
284 let client = TonicLogsClient::new(channel, interceptor, compression);
285
286 Ok(crate::logs::LogExporter::from_tonic(client))
287 }
288
289 #[cfg(feature = "metrics")]
291 pub(crate) fn build_metrics_exporter(
292 self,
293 temporality: opentelemetry_sdk::metrics::Temporality,
294 ) -> Result<crate::MetricExporter, ExporterBuildError> {
295 use crate::MetricExporter;
296 use metrics::TonicMetricsClient;
297
298 otel_debug!(name: "MetricsTonicChannelBuilding");
299
300 let (channel, interceptor, compression) = self.build_channel(
301 crate::metric::OTEL_EXPORTER_OTLP_METRICS_ENDPOINT,
302 crate::metric::OTEL_EXPORTER_OTLP_METRICS_TIMEOUT,
303 crate::metric::OTEL_EXPORTER_OTLP_METRICS_COMPRESSION,
304 crate::metric::OTEL_EXPORTER_OTLP_METRICS_HEADERS,
305 )?;
306
307 let client = TonicMetricsClient::new(channel, interceptor, compression);
308
309 Ok(MetricExporter::from_tonic(client, temporality))
310 }
311
312 #[cfg(feature = "trace")]
314 pub(crate) fn build_span_exporter(self) -> Result<crate::SpanExporter, ExporterBuildError> {
315 use crate::exporter::tonic::trace::TonicTracesClient;
316
317 otel_debug!(name: "TracesTonicChannelBuilding");
318
319 let (channel, interceptor, compression) = self.build_channel(
320 crate::span::OTEL_EXPORTER_OTLP_TRACES_ENDPOINT,
321 crate::span::OTEL_EXPORTER_OTLP_TRACES_TIMEOUT,
322 crate::span::OTEL_EXPORTER_OTLP_TRACES_COMPRESSION,
323 crate::span::OTEL_EXPORTER_OTLP_TRACES_HEADERS,
324 )?;
325
326 let client = TonicTracesClient::new(channel, interceptor, compression);
327
328 Ok(crate::SpanExporter::from_tonic(client))
329 }
330}
331
332fn merge_metadata_with_headers_from_env(
333 metadata: MetadataMap,
334 headers_from_env: HeaderMap,
335) -> MetadataMap {
336 if headers_from_env.is_empty() {
337 metadata
338 } else {
339 let mut existing_headers: HeaderMap = metadata.into_headers();
340 existing_headers.extend(headers_from_env);
341
342 MetadataMap::from_headers(existing_headers)
343 }
344}
345
346fn parse_headers_from_env(signal_headers_var: &str) -> (HeaderMap, Vec<(String, String)>) {
347 let mut headers = Vec::new();
348
349 (
350 env::var(signal_headers_var)
351 .or_else(|_| env::var(OTEL_EXPORTER_OTLP_HEADERS))
352 .map(|input| {
353 parse_header_string(&input)
354 .filter_map(|(key, value)| {
355 headers.push((key.to_owned(), value.clone()));
356 Some((
357 HeaderName::from_str(key).ok()?,
358 HeaderValue::from_str(&value).ok()?,
359 ))
360 })
361 .collect::<HeaderMap>()
362 })
363 .unwrap_or_default(),
364 headers,
365 )
366}
367
368pub trait HasTonicConfig {
370 fn tonic_config(&mut self) -> &mut TonicConfig;
372}
373
374impl HasTonicConfig for TonicExporterBuilder {
376 fn tonic_config(&mut self) -> &mut TonicConfig {
377 &mut self.tonic_config
378 }
379}
380
381pub trait WithTonicConfig {
396 #[cfg(any(
398 feature = "tls",
399 feature = "tls-ring",
400 feature = "tls-aws-lc",
401 feature = "tls-provider-agnostic"
402 ))]
403 fn with_tls_config(self, tls_config: ClientTlsConfig) -> Self;
404
405 fn with_metadata(self, metadata: MetadataMap) -> Self;
433
434 fn with_compression(self, compression: Compression) -> Self;
436
437 fn with_channel(self, channel: tonic::transport::Channel) -> Self;
444
445 fn with_interceptor<I>(self, interceptor: I) -> Self
506 where
507 I: tonic::service::Interceptor + Clone + Send + Sync + 'static;
508}
509
510impl<B: HasTonicConfig> WithTonicConfig for B {
511 #[cfg(any(
512 feature = "tls",
513 feature = "tls-ring",
514 feature = "tls-aws-lc",
515 feature = "tls-provider-agnostic"
516 ))]
517 fn with_tls_config(mut self, tls_config: ClientTlsConfig) -> Self {
518 self.tonic_config().tls_config = Some(tls_config);
519 self
520 }
521
522 fn with_metadata(mut self, metadata: MetadataMap) -> Self {
524 let mut existing_headers = self
526 .tonic_config()
527 .metadata
528 .clone()
529 .unwrap_or_default()
530 .into_headers();
531 existing_headers.extend(metadata.into_headers());
532
533 self.tonic_config().metadata = Some(MetadataMap::from_headers(existing_headers));
534 self
535 }
536
537 fn with_compression(mut self, compression: Compression) -> Self {
538 self.tonic_config().compression = Some(compression);
539 self
540 }
541
542 fn with_channel(mut self, channel: tonic::transport::Channel) -> Self {
543 self.tonic_config().channel = Some(channel);
544 self
545 }
546
547 fn with_interceptor<I>(mut self, interceptor: I) -> Self
548 where
549 I: tonic::service::Interceptor + Clone + Send + Sync + 'static,
550 {
551 self.tonic_config().interceptor = Some(BoxInterceptor(Box::new(interceptor)));
552 self
553 }
554}
555
556#[cfg(test)]
557mod tests {
558 use crate::exporter::tests::run_env_test;
559 use crate::exporter::tonic::WithTonicConfig;
560 #[cfg(feature = "grpc-tonic")]
561 use crate::exporter::Compression;
562 use crate::{TonicExporterBuilder, OTEL_EXPORTER_OTLP_TRACES_ENDPOINT};
563 use crate::{OTEL_EXPORTER_OTLP_HEADERS, OTEL_EXPORTER_OTLP_TRACES_HEADERS};
564 use http::{HeaderMap, HeaderName, HeaderValue};
565 use tonic::metadata::{MetadataMap, MetadataValue};
566
567 #[test]
568 fn test_with_metadata() {
569 let mut metadata = MetadataMap::new();
571 metadata.insert("foo", "bar".parse().unwrap());
572 let builder = TonicExporterBuilder::default().with_metadata(metadata);
573 let result = builder.tonic_config.metadata.unwrap();
574 let foo = result
575 .get("foo")
576 .expect("there to always be an entry for foo");
577 assert_eq!(foo, &MetadataValue::try_from("bar").unwrap());
578 assert!(result.get("User-Agent").is_some());
579
580 let mut metadata = MetadataMap::new();
582 metadata.insert("user-agent", "baz".parse().unwrap());
583 let builder = TonicExporterBuilder::default().with_metadata(metadata);
584 let result = builder.tonic_config.metadata.unwrap();
585 assert_eq!(
586 result.get("User-Agent").unwrap(),
587 &MetadataValue::try_from("baz").unwrap()
588 );
589 assert_eq!(
590 result.len(),
591 TonicExporterBuilder::default()
592 .tonic_config
593 .metadata
594 .unwrap()
595 .len()
596 );
597 }
598
599 #[test]
600 #[cfg(feature = "gzip-tonic")]
601 fn test_with_gzip_compression() {
602 let mut metadata = MetadataMap::new();
604 metadata.insert("foo", "bar".parse().unwrap());
605 let builder = TonicExporterBuilder::default().with_compression(Compression::Gzip);
606 assert_eq!(builder.tonic_config.compression.unwrap(), Compression::Gzip);
607 }
608
609 #[test]
610 #[cfg(feature = "zstd-tonic")]
611 fn test_with_zstd_compression() {
612 let builder = TonicExporterBuilder::default().with_compression(Compression::Zstd);
613 assert_eq!(builder.tonic_config.compression.unwrap(), Compression::Zstd);
614 }
615
616 #[test]
617 fn test_convert_compression() {
618 #[cfg(feature = "gzip-tonic")]
619 assert!(tonic::codec::CompressionEncoding::try_from(Compression::Gzip).is_ok());
620 #[cfg(not(feature = "gzip-tonic"))]
621 assert!(tonic::codec::CompressionEncoding::try_from(Compression::Gzip).is_err());
622 #[cfg(feature = "zstd-tonic")]
623 assert!(tonic::codec::CompressionEncoding::try_from(Compression::Zstd).is_ok());
624 #[cfg(not(feature = "zstd-tonic"))]
625 assert!(tonic::codec::CompressionEncoding::try_from(Compression::Zstd).is_err());
626 }
627
628 #[cfg(feature = "zstd-tonic")]
629 #[test]
630 fn test_priority_of_signal_env_over_generic_env_for_compression() {
631 run_env_test(
632 vec![
633 (crate::OTEL_EXPORTER_OTLP_TRACES_COMPRESSION, "zstd"),
634 (crate::OTEL_EXPORTER_OTLP_COMPRESSION, "gzip"),
635 ],
636 || {
637 let builder = TonicExporterBuilder::default();
638
639 let compression = builder
640 .resolve_compression(crate::OTEL_EXPORTER_OTLP_TRACES_COMPRESSION)
641 .unwrap();
642 assert_eq!(compression, Some(tonic::codec::CompressionEncoding::Zstd));
643 },
644 );
645 }
646
647 #[cfg(feature = "zstd-tonic")]
648 #[test]
649 fn test_priority_of_code_based_config_over_envs_for_compression() {
650 run_env_test(
651 vec![
652 (crate::OTEL_EXPORTER_OTLP_TRACES_COMPRESSION, "gzip"),
653 (crate::OTEL_EXPORTER_OTLP_COMPRESSION, "gzip"),
654 ],
655 || {
656 let builder = TonicExporterBuilder::default().with_compression(Compression::Zstd);
657
658 let compression = builder
659 .resolve_compression(crate::OTEL_EXPORTER_OTLP_TRACES_COMPRESSION)
660 .unwrap();
661 assert_eq!(compression, Some(tonic::codec::CompressionEncoding::Zstd));
662 },
663 );
664 }
665
666 #[test]
667 fn test_use_default_when_others_missing_for_compression() {
668 run_env_test(vec![], || {
669 let builder = TonicExporterBuilder::default();
670
671 let compression = builder
672 .resolve_compression(crate::OTEL_EXPORTER_OTLP_TRACES_COMPRESSION)
673 .unwrap();
674 assert!(compression.is_none());
675 });
676 }
677
678 #[test]
679 fn test_parse_headers_from_env() {
680 run_env_test(
681 vec![
682 (OTEL_EXPORTER_OTLP_TRACES_HEADERS, "k1=v1,k2=v2"),
683 (OTEL_EXPORTER_OTLP_HEADERS, "k3=v3"),
684 ],
685 || {
686 assert_eq!(
687 super::parse_headers_from_env(OTEL_EXPORTER_OTLP_TRACES_HEADERS).0,
688 HeaderMap::from_iter([
689 (
690 HeaderName::from_static("k1"),
691 HeaderValue::from_static("v1")
692 ),
693 (
694 HeaderName::from_static("k2"),
695 HeaderValue::from_static("v2")
696 ),
697 ])
698 );
699
700 assert_eq!(
701 super::parse_headers_from_env("EMPTY_ENV").0,
702 HeaderMap::from_iter([(
703 HeaderName::from_static("k3"),
704 HeaderValue::from_static("v3")
705 )])
706 );
707 },
708 )
709 }
710
711 #[test]
712 fn test_merge_metadata_with_headers_from_env() {
713 run_env_test(
714 vec![(OTEL_EXPORTER_OTLP_TRACES_HEADERS, "k1=v1,k2=v2")],
715 || {
716 let headers_from_env =
717 super::parse_headers_from_env(OTEL_EXPORTER_OTLP_TRACES_HEADERS);
718
719 let mut metadata = MetadataMap::new();
720 metadata.insert("foo", "bar".parse().unwrap());
721 metadata.insert("k1", "v0".parse().unwrap());
722
723 let result =
724 super::merge_metadata_with_headers_from_env(metadata, headers_from_env.0);
725
726 assert_eq!(
727 result.get("foo").unwrap(),
728 MetadataValue::from_static("bar")
729 );
730 assert_eq!(result.get("k1").unwrap(), MetadataValue::from_static("v1"));
731 assert_eq!(result.get("k2").unwrap(), MetadataValue::from_static("v2"));
732 },
733 );
734 }
735
736 #[test]
737 fn test_priority_of_signal_env_over_generic_env_for_endpoint() {
738 run_env_test(
739 vec![
740 (OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, "http://localhost:1234"),
741 (super::OTEL_EXPORTER_OTLP_ENDPOINT, "http://localhost:2345"),
742 ],
743 || {
744 let url = TonicExporterBuilder::resolve_endpoint(
745 OTEL_EXPORTER_OTLP_TRACES_ENDPOINT,
746 None,
747 );
748 assert_eq!(url, "http://localhost:1234");
749 },
750 );
751 }
752
753 #[test]
754 fn test_priority_of_code_based_config_over_envs_for_endpoint() {
755 run_env_test(
756 vec![
757 (OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, "http://localhost:1234"),
758 (super::OTEL_EXPORTER_OTLP_ENDPOINT, "http://localhost:2345"),
759 ],
760 || {
761 let url = TonicExporterBuilder::resolve_endpoint(
762 OTEL_EXPORTER_OTLP_TRACES_ENDPOINT,
763 Some("http://localhost:3456".to_string()),
764 );
765 assert_eq!(url, "http://localhost:3456");
766 },
767 );
768 }
769
770 #[test]
771 fn test_use_default_when_others_missing_for_endpoint() {
772 run_env_test(vec![], || {
773 let url =
774 TonicExporterBuilder::resolve_endpoint(OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, None);
775 assert_eq!(url, "http://localhost:4317");
776 });
777 }
778
779 #[test]
780 fn test_use_default_when_empty_string_for_option() {
781 run_env_test(vec![], || {
782 let url = TonicExporterBuilder::resolve_endpoint(
783 OTEL_EXPORTER_OTLP_TRACES_ENDPOINT,
784 Some(String::new()),
785 );
786 assert_eq!(url, "http://localhost:4317");
787 });
788 }
789}