use actix_http::{error::PayloadError, header::map::HeaderMap, Payload, RequestHeadType}; use actix_service::Service; use awc::{error::SendRequestError, middleware::Transform, ConnectRequest, ConnectResponse}; use bytes::Bytes; use futures_core::stream::Stream; use std::{ future::Future, pin::Pin, task::{Context, Poll}, }; use tracing::{instrument::Instrumented, Instrument, Span}; #[cfg(feature = "opentelemetry_0_13")] use opentelemetry_0_13_pkg as opentelemetry; #[cfg(feature = "opentelemetry_0_14")] use opentelemetry_0_14_pkg as opentelemetry; #[cfg(feature = "opentelemetry_0_15")] use opentelemetry_0_15_pkg as opentelemetry; #[cfg(feature = "opentelemetry_0_16")] use opentelemetry_0_16_pkg as opentelemetry; #[cfg(feature = "opentelemetry_0_17")] use opentelemetry_0_17_pkg as opentelemetry; #[cfg(feature = "opentelemetry_0_13")] use tracing_opentelemetry_0_12_pkg as tracing_opentelemetry; #[cfg(feature = "opentelemetry_0_14")] use tracing_opentelemetry_0_13_pkg as tracing_opentelemetry; #[cfg(feature = "opentelemetry_0_15")] use tracing_opentelemetry_0_14_pkg as tracing_opentelemetry; #[cfg(feature = "opentelemetry_0_16")] use tracing_opentelemetry_0_16_pkg as tracing_opentelemetry; #[cfg(feature = "opentelemetry_0_17")] use tracing_opentelemetry_0_17_pkg as tracing_opentelemetry; #[cfg(any( feature = "opentelemetry_0_13", feature = "opentelemetry_0_14", feature = "opentelemetry_0_15", feature = "opentelemetry_0_16", feature = "opentelemetry_0_17" ))] pub fn root_span() -> Span { let span = tracing::info_span!("Root span", trace_id = tracing::field::Empty,); { use opentelemetry::trace::TraceContextExt; use tracing_opentelemetry::OpenTelemetrySpanExt; #[cfg(not(feature = "opentelemetry_0_17"))] let trace_id = span.context().span().span_context().trace_id().to_hex(); #[cfg(feature = "opentelemetry_0_17")] let trace_id = { let id = span.context().span().span_context().trace_id(); format!("{:032x}", id) }; span.record("trace_id", &tracing::field::display(trace_id)); } span } pub struct Tracing; pub struct TracingMiddleware(S); impl Transform for Tracing where S: Service, { type Transform = TracingMiddleware; fn new_transform(self, service: S) -> Self::Transform { TracingMiddleware(service) } } impl Service for TracingMiddleware where S: Service, { type Response = ConnectResponse; type Error = SendRequestError; type Future = TracingFuture; fn poll_ready(&self, ctx: &mut core::task::Context<'_>) -> Poll> { self.0.poll_ready(ctx) } fn call(&self, mut req: ConnectRequest) -> Self::Future { let request_head = match &req { ConnectRequest::Client(head_type, _, _) => head_type.as_ref(), ConnectRequest::Tunnel(head, _) => head, }; let span = tracing::info_span!( "HTTP Client", otel.kind = "client", otel.status_code = tracing::field::Empty, http.method = tracing::field::display(&request_head.method), http.url = tracing::field::display(&request_head.uri), http.flavor = tracing::field::Empty, http.status_code = tracing::field::Empty, net.peer.ip = tracing::field::Empty, exception.message = tracing::field::Empty, exception.details = tracing::field::Empty, ); match request_head.version { actix_http::Version::HTTP_09 => { span.record("http.flavor", &"0.9"); } actix_http::Version::HTTP_10 => { span.record("http.flavor", &"1.0"); } actix_http::Version::HTTP_11 => { span.record("http.flavor", &"1.1"); } actix_http::Version::HTTP_2 => { span.record("http.flavor", &"2.0"); } actix_http::Version::HTTP_3 => { span.record("http.flavor", &"3.0"); } _ => (), } if let Some(peer_ip) = &request_head.peer_addr { span.record("net.peer.ip", &tracing::field::display(&peer_ip.ip())); } match &mut req { ConnectRequest::Client(ref mut head_type, _, _) => match head_type { RequestHeadType::Owned(ref mut head) => { record_otel(head.headers_mut(), &span); } RequestHeadType::Rc(_, ref mut extras) => { let mut owned = extras.take().unwrap_or_default(); record_otel(&mut owned, &span); *extras = Some(owned); } }, ConnectRequest::Tunnel(head, _) => record_otel(head.headers_mut(), &span), } TracingFuture { future: self.0.call(req).instrument(span.clone()), span, } } } pin_project_lite::pin_project! { pub struct TracingFuture { span: Span, #[pin] future: Instrumented, } } impl Future for TracingFuture where F: Future>, { type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.as_mut().project(); let future = this.future; let span = this.span; span.in_scope(|| { future .poll(cx) .map_ok(|succ| match succ { ConnectResponse::Client(client_response) => { let status: i32 = client_response.status().as_u16().into(); span.record("http.status_code", &status); if client_response.status().is_client_error() { span.record("otel.status_code", &"ERROR"); } ConnectResponse::Client(client_response.map_body(|_, payload| { let body_span = tracing::info_span!(parent: None, "HTTP Client Response Body"); let payload = InstrumentedBody::new(body_span, payload); let payload: Pin>>> = Box::pin(payload); Payload::Stream { payload } })) } ConnectResponse::Tunnel(response_head, etc) => { let status: i32 = response_head.status.as_u16().into(); span.record("http.status_code", &status); if response_head.status.is_client_error() { span.record("otel.status_code", &"ERROR"); } ConnectResponse::Tunnel(response_head, etc) } }) .map_err(|err| { span.record("otel.status_code", &"ERROR"); span.record( "exception.message", &tracing::field::display(&format!("{}", err)), ); span.record( "exception.details", &tracing::field::display(&format!("{:?}", err)), ); #[cfg(feature = "emit_event_on_error")] tracing::warn!("Error in request: {}", err); err }) }) } } pin_project_lite::pin_project! { struct InstrumentedBody { span: Span, #[pin] body: S, } } impl InstrumentedBody where S: Stream>, { fn new(span: Span, body: S) -> InstrumentedBody { InstrumentedBody { span, body } } } impl Stream for InstrumentedBody where S: Stream> + Unpin, { type Item = ::Item; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.as_mut().project(); let span = this.span; let body = this.body; span.in_scope(|| body.poll_next(cx)) } } #[derive(Debug)] struct RequestHeaderCarrier<'a> { #[allow(dead_code)] headers: &'a mut HeaderMap, } #[cfg(any( feature = "opentelemetry_0_13", feature = "opentelemetry_0_14", feature = "opentelemetry_0_15", feature = "opentelemetry_0_16", feature = "opentelemetry_0_17" ))] impl<'a> opentelemetry::propagation::Injector for RequestHeaderCarrier<'a> { fn set(&mut self, key: &str, value: String) { let f = || { use actix_http::header::{HeaderName, HeaderValue}; use std::convert::TryFrom; let key = HeaderName::from_bytes(key.as_bytes()) .map_err(|e| { tracing::warn!("Failed to inject header {}: {}", key, e); }) .ok()?; let value = HeaderValue::try_from(value) .map_err(|e| { tracing::warn!("Failed to inject header value for {}: {}", key, e); }) .ok()?; self.headers.insert(key, value); Some(()) }; let _ = (f)(); } } fn record_otel(_headers: &mut HeaderMap, _span: &Span) { #[cfg(any( feature = "opentelemetry_0_13", feature = "opentelemetry_0_14", feature = "opentelemetry_0_15", feature = "opentelemetry_0_16", feature = "opentelemetry_0_17" ))] { let span = _span; let headers = _headers; use tracing_opentelemetry::OpenTelemetrySpanExt; let mut carrier = RequestHeaderCarrier { headers }; let context = span.context(); opentelemetry::global::get_text_map_propagator(|propagator| { propagator.inject_context(&context, &mut carrier); }); }; }