2021-12-12 20:36:05 +00:00
|
|
|
use actix_http::{error::PayloadError, header::map::HeaderMap, Payload, RequestHeadType};
|
2021-11-30 22:05:38 +00:00
|
|
|
use actix_service::Service;
|
2021-12-12 20:36:05 +00:00
|
|
|
use awc::{error::SendRequestError, middleware::Transform, ConnectRequest, ConnectResponse};
|
2021-09-19 18:11:58 +00:00
|
|
|
use bytes::Bytes;
|
|
|
|
use futures_core::stream::Stream;
|
|
|
|
use std::{
|
|
|
|
future::Future,
|
|
|
|
pin::Pin,
|
|
|
|
task::{Context, Poll},
|
|
|
|
};
|
2021-12-07 02:30:59 +00:00
|
|
|
use tracing::{instrument::Instrumented, Instrument, Span};
|
2021-09-19 18:11:58 +00:00
|
|
|
|
|
|
|
#[cfg(feature = "opentelemetry_0_13")]
|
|
|
|
use opentelemetry_0_13_pkg as opentelemetry;
|
|
|
|
#[cfg(feature = "opentelemetry_0_14")]
|
|
|
|
use opentelemetry_0_14_pkg as opentelemetry;
|
|
|
|
#[cfg(feature = "opentelemetry_0_15")]
|
|
|
|
use opentelemetry_0_15_pkg as opentelemetry;
|
|
|
|
#[cfg(feature = "opentelemetry_0_16")]
|
|
|
|
use opentelemetry_0_16_pkg as opentelemetry;
|
2022-02-04 01:26:45 +00:00
|
|
|
#[cfg(feature = "opentelemetry_0_17")]
|
|
|
|
use opentelemetry_0_17_pkg as opentelemetry;
|
2021-09-19 18:11:58 +00:00
|
|
|
|
|
|
|
#[cfg(feature = "opentelemetry_0_13")]
|
|
|
|
use tracing_opentelemetry_0_12_pkg as tracing_opentelemetry;
|
|
|
|
#[cfg(feature = "opentelemetry_0_14")]
|
|
|
|
use tracing_opentelemetry_0_13_pkg as tracing_opentelemetry;
|
|
|
|
#[cfg(feature = "opentelemetry_0_15")]
|
|
|
|
use tracing_opentelemetry_0_14_pkg as tracing_opentelemetry;
|
|
|
|
#[cfg(feature = "opentelemetry_0_16")]
|
2021-10-26 15:49:08 +00:00
|
|
|
use tracing_opentelemetry_0_16_pkg as tracing_opentelemetry;
|
2022-02-04 01:26:45 +00:00
|
|
|
#[cfg(feature = "opentelemetry_0_17")]
|
|
|
|
use tracing_opentelemetry_0_17_pkg as tracing_opentelemetry;
|
2021-09-19 18:11:58 +00:00
|
|
|
|
|
|
|
#[cfg(any(
|
|
|
|
feature = "opentelemetry_0_13",
|
|
|
|
feature = "opentelemetry_0_14",
|
|
|
|
feature = "opentelemetry_0_15",
|
2022-02-04 01:26:45 +00:00
|
|
|
feature = "opentelemetry_0_16",
|
|
|
|
feature = "opentelemetry_0_17"
|
2021-09-19 18:11:58 +00:00
|
|
|
))]
|
|
|
|
pub fn root_span() -> Span {
|
|
|
|
let span = tracing::info_span!("Root span", trace_id = tracing::field::Empty,);
|
|
|
|
{
|
|
|
|
use opentelemetry::trace::TraceContextExt;
|
|
|
|
use tracing_opentelemetry::OpenTelemetrySpanExt;
|
2022-02-04 01:26:45 +00:00
|
|
|
#[cfg(not(feature = "opentelemetry_0_17"))]
|
2021-09-19 18:11:58 +00:00
|
|
|
let trace_id = span.context().span().span_context().trace_id().to_hex();
|
2022-02-04 01:26:45 +00:00
|
|
|
|
|
|
|
#[cfg(feature = "opentelemetry_0_17")]
|
|
|
|
let trace_id = {
|
|
|
|
let id = span.context().span().span_context().trace_id();
|
|
|
|
format!("{:032x}", id)
|
|
|
|
};
|
|
|
|
|
2021-09-19 18:11:58 +00:00
|
|
|
span.record("trace_id", &tracing::field::display(trace_id));
|
|
|
|
}
|
|
|
|
|
|
|
|
span
|
|
|
|
}
|
|
|
|
|
2021-11-30 22:05:38 +00:00
|
|
|
pub struct Tracing;
|
|
|
|
pub struct TracingMiddleware<S>(S);
|
2021-10-11 17:10:07 +00:00
|
|
|
|
2021-11-30 22:05:38 +00:00
|
|
|
impl<S> Transform<S, ConnectRequest> for Tracing
|
2021-10-11 17:10:07 +00:00
|
|
|
where
|
2021-11-30 22:05:38 +00:00
|
|
|
S: Service<ConnectRequest, Response = ConnectResponse, Error = SendRequestError>,
|
2021-10-11 17:10:07 +00:00
|
|
|
{
|
2021-11-30 22:05:38 +00:00
|
|
|
type Transform = TracingMiddleware<S>;
|
2021-10-11 17:10:07 +00:00
|
|
|
|
2021-11-30 22:05:38 +00:00
|
|
|
fn new_transform(self, service: S) -> Self::Transform {
|
|
|
|
TracingMiddleware(service)
|
2021-10-11 17:10:07 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-11-30 22:05:38 +00:00
|
|
|
impl<S> Service<ConnectRequest> for TracingMiddleware<S>
|
2021-10-11 17:10:07 +00:00
|
|
|
where
|
2021-11-30 22:05:38 +00:00
|
|
|
S: Service<ConnectRequest, Response = ConnectResponse, Error = SendRequestError>,
|
2021-10-11 17:10:07 +00:00
|
|
|
{
|
2021-11-30 22:05:38 +00:00
|
|
|
type Response = ConnectResponse;
|
|
|
|
type Error = SendRequestError;
|
2022-04-06 21:51:20 +00:00
|
|
|
type Future = Instrumented<TracingFuture<S::Future>>;
|
2021-10-11 17:10:07 +00:00
|
|
|
|
2021-11-30 22:05:38 +00:00
|
|
|
fn poll_ready(&self, ctx: &mut core::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
|
|
|
|
self.0.poll_ready(ctx)
|
2021-10-11 17:10:07 +00:00
|
|
|
}
|
|
|
|
|
2021-11-30 22:05:38 +00:00
|
|
|
fn call(&self, mut req: ConnectRequest) -> Self::Future {
|
|
|
|
let request_head = match &req {
|
|
|
|
ConnectRequest::Client(head_type, _, _) => head_type.as_ref(),
|
|
|
|
ConnectRequest::Tunnel(head, _) => head,
|
|
|
|
};
|
2021-10-11 17:10:07 +00:00
|
|
|
|
2021-11-30 22:05:38 +00:00
|
|
|
let span = tracing::info_span!(
|
|
|
|
"HTTP Client",
|
|
|
|
otel.kind = "client",
|
|
|
|
otel.status_code = tracing::field::Empty,
|
|
|
|
http.method = tracing::field::display(&request_head.method),
|
|
|
|
http.url = tracing::field::display(&request_head.uri),
|
|
|
|
http.flavor = tracing::field::Empty,
|
|
|
|
http.status_code = tracing::field::Empty,
|
|
|
|
net.peer.ip = tracing::field::Empty,
|
|
|
|
exception.message = tracing::field::Empty,
|
|
|
|
exception.details = tracing::field::Empty,
|
|
|
|
);
|
2021-10-11 17:10:07 +00:00
|
|
|
|
2021-12-03 19:55:25 +00:00
|
|
|
match request_head.version {
|
|
|
|
actix_http::Version::HTTP_09 => {
|
2021-11-30 22:05:38 +00:00
|
|
|
span.record("http.flavor", &"0.9");
|
|
|
|
}
|
2021-12-03 19:55:25 +00:00
|
|
|
actix_http::Version::HTTP_10 => {
|
2021-11-30 22:05:38 +00:00
|
|
|
span.record("http.flavor", &"1.0");
|
|
|
|
}
|
2021-12-03 19:55:25 +00:00
|
|
|
actix_http::Version::HTTP_11 => {
|
2021-11-30 22:05:38 +00:00
|
|
|
span.record("http.flavor", &"1.1");
|
|
|
|
}
|
2021-12-03 19:55:25 +00:00
|
|
|
actix_http::Version::HTTP_2 => {
|
2021-11-30 22:05:38 +00:00
|
|
|
span.record("http.flavor", &"2.0");
|
|
|
|
}
|
2021-12-03 19:55:25 +00:00
|
|
|
actix_http::Version::HTTP_3 => {
|
2021-11-30 22:05:38 +00:00
|
|
|
span.record("http.flavor", &"3.0");
|
|
|
|
}
|
|
|
|
_ => (),
|
|
|
|
}
|
2021-10-11 17:10:07 +00:00
|
|
|
|
2021-11-30 22:05:38 +00:00
|
|
|
if let Some(peer_ip) = &request_head.peer_addr {
|
|
|
|
span.record("net.peer.ip", &tracing::field::display(&peer_ip.ip()));
|
|
|
|
}
|
2021-10-11 17:10:07 +00:00
|
|
|
|
2021-11-30 22:05:38 +00:00
|
|
|
match &mut req {
|
|
|
|
ConnectRequest::Client(ref mut head_type, _, _) => match head_type {
|
|
|
|
RequestHeadType::Owned(ref mut head) => {
|
|
|
|
record_otel(head.headers_mut(), &span);
|
|
|
|
}
|
|
|
|
RequestHeadType::Rc(_, ref mut extras) => {
|
|
|
|
let mut owned = extras.take().unwrap_or_default();
|
|
|
|
record_otel(&mut owned, &span);
|
|
|
|
*extras = Some(owned);
|
|
|
|
}
|
|
|
|
},
|
|
|
|
ConnectRequest::Tunnel(head, _) => record_otel(head.headers_mut(), &span),
|
|
|
|
}
|
2021-10-11 17:10:07 +00:00
|
|
|
|
2021-11-30 22:05:38 +00:00
|
|
|
TracingFuture {
|
2022-04-06 21:51:20 +00:00
|
|
|
future: span.in_scope(|| self.0.call(req)),
|
2021-11-30 22:05:38 +00:00
|
|
|
}
|
2022-04-06 21:51:20 +00:00
|
|
|
.instrument(span)
|
2021-10-11 17:10:07 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-11-30 22:05:38 +00:00
|
|
|
pin_project_lite::pin_project! {
|
|
|
|
pub struct TracingFuture<F> {
|
|
|
|
#[pin]
|
2022-04-06 21:51:20 +00:00
|
|
|
future: F,
|
2021-10-11 17:10:07 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-11-30 22:05:38 +00:00
|
|
|
impl<F> Future for TracingFuture<F>
|
2021-10-11 17:10:07 +00:00
|
|
|
where
|
2021-11-30 22:05:38 +00:00
|
|
|
F: Future<Output = Result<ConnectResponse, SendRequestError>>,
|
2021-10-11 17:10:07 +00:00
|
|
|
{
|
2021-11-30 22:05:38 +00:00
|
|
|
type Output = Result<ConnectResponse, SendRequestError>;
|
2021-10-11 17:10:07 +00:00
|
|
|
|
|
|
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
2021-11-30 22:05:38 +00:00
|
|
|
let this = self.as_mut().project();
|
2021-10-11 17:10:07 +00:00
|
|
|
|
2022-04-06 21:51:20 +00:00
|
|
|
let span = Span::current();
|
|
|
|
|
2021-11-30 22:05:38 +00:00
|
|
|
let future = this.future;
|
2022-04-06 21:51:20 +00:00
|
|
|
|
|
|
|
future
|
|
|
|
.poll(cx)
|
|
|
|
.map_ok(|succ| match succ {
|
|
|
|
ConnectResponse::Client(client_response) => {
|
|
|
|
let status: i32 = client_response.status().as_u16().into();
|
|
|
|
span.record("http.status_code", &status);
|
|
|
|
if client_response.status().is_client_error() {
|
|
|
|
span.record("otel.status_code", &"ERROR");
|
2021-10-11 17:10:07 +00:00
|
|
|
}
|
2022-04-06 21:51:20 +00:00
|
|
|
|
|
|
|
ConnectResponse::Client(client_response.map_body(|_, payload| {
|
|
|
|
let payload = InstrumentedBody::new(payload);
|
|
|
|
let payload: Pin<Box<dyn Stream<Item = Result<Bytes, PayloadError>>>> =
|
|
|
|
Box::pin(payload);
|
|
|
|
|
|
|
|
Payload::Stream { payload }
|
|
|
|
}))
|
|
|
|
}
|
|
|
|
ConnectResponse::Tunnel(response_head, etc) => {
|
|
|
|
let status: i32 = response_head.status.as_u16().into();
|
|
|
|
span.record("http.status_code", &status);
|
|
|
|
if response_head.status.is_client_error() {
|
|
|
|
span.record("otel.status_code", &"ERROR");
|
2021-11-30 22:25:34 +00:00
|
|
|
}
|
2022-04-06 21:51:20 +00:00
|
|
|
ConnectResponse::Tunnel(response_head, etc)
|
|
|
|
}
|
|
|
|
})
|
|
|
|
.map_err(|err| {
|
|
|
|
span.record("otel.status_code", &"ERROR");
|
|
|
|
span.record(
|
|
|
|
"exception.message",
|
|
|
|
&tracing::field::display(&format!("{}", err)),
|
|
|
|
);
|
|
|
|
span.record(
|
|
|
|
"exception.details",
|
|
|
|
&tracing::field::display(&format!("{:?}", err)),
|
|
|
|
);
|
|
|
|
|
|
|
|
#[cfg(feature = "emit_event_on_error")]
|
|
|
|
tracing::warn!("Error in request: {}", err);
|
|
|
|
|
|
|
|
err
|
|
|
|
})
|
2021-10-11 17:10:07 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-11-30 22:05:38 +00:00
|
|
|
pin_project_lite::pin_project! {
|
|
|
|
struct InstrumentedBody<S> {
|
2022-04-06 21:51:20 +00:00
|
|
|
span: Option<Span>,
|
2021-09-19 18:11:58 +00:00
|
|
|
|
2021-11-30 22:05:38 +00:00
|
|
|
#[pin]
|
|
|
|
body: S,
|
2021-09-19 18:11:58 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-11-30 22:05:38 +00:00
|
|
|
impl<S> InstrumentedBody<S>
|
|
|
|
where
|
|
|
|
S: Stream<Item = Result<Bytes, PayloadError>>,
|
|
|
|
{
|
2022-04-06 21:51:20 +00:00
|
|
|
fn new(body: S) -> InstrumentedBody<S> {
|
|
|
|
InstrumentedBody { span: None, body }
|
2021-11-30 22:05:38 +00:00
|
|
|
}
|
2021-09-19 18:11:58 +00:00
|
|
|
}
|
|
|
|
|
2021-11-30 22:05:38 +00:00
|
|
|
impl<S> Stream for InstrumentedBody<S>
|
|
|
|
where
|
|
|
|
S: Stream<Item = Result<Bytes, PayloadError>> + Unpin,
|
|
|
|
{
|
|
|
|
type Item = <S as Stream>::Item;
|
2021-09-19 18:11:58 +00:00
|
|
|
|
2021-11-30 22:05:38 +00:00
|
|
|
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
|
|
|
let this = self.as_mut().project();
|
2021-09-19 18:11:58 +00:00
|
|
|
|
2022-04-06 21:51:20 +00:00
|
|
|
let span = this
|
|
|
|
.span
|
|
|
|
.get_or_insert_with(|| tracing::info_span!("HTTP Client Response Body"));
|
2021-11-30 22:05:38 +00:00
|
|
|
let body = this.body;
|
2021-09-19 18:11:58 +00:00
|
|
|
|
2021-12-06 18:32:23 +00:00
|
|
|
span.in_scope(|| body.poll_next(cx))
|
2021-09-19 18:11:58 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Debug)]
|
|
|
|
struct RequestHeaderCarrier<'a> {
|
2021-12-03 19:55:25 +00:00
|
|
|
#[allow(dead_code)]
|
2021-09-19 18:11:58 +00:00
|
|
|
headers: &'a mut HeaderMap,
|
|
|
|
}
|
|
|
|
|
|
|
|
#[cfg(any(
|
|
|
|
feature = "opentelemetry_0_13",
|
|
|
|
feature = "opentelemetry_0_14",
|
|
|
|
feature = "opentelemetry_0_15",
|
2022-02-04 01:26:45 +00:00
|
|
|
feature = "opentelemetry_0_16",
|
|
|
|
feature = "opentelemetry_0_17"
|
2021-09-19 18:11:58 +00:00
|
|
|
))]
|
|
|
|
impl<'a> opentelemetry::propagation::Injector for RequestHeaderCarrier<'a> {
|
|
|
|
fn set(&mut self, key: &str, value: String) {
|
|
|
|
let f = || {
|
2021-12-12 20:36:05 +00:00
|
|
|
use actix_http::header::{HeaderName, HeaderValue};
|
2021-09-19 18:11:58 +00:00
|
|
|
use std::convert::TryFrom;
|
|
|
|
|
|
|
|
let key = HeaderName::from_bytes(key.as_bytes())
|
|
|
|
.map_err(|e| {
|
|
|
|
tracing::warn!("Failed to inject header {}: {}", key, e);
|
|
|
|
})
|
|
|
|
.ok()?;
|
|
|
|
let value = HeaderValue::try_from(value)
|
|
|
|
.map_err(|e| {
|
|
|
|
tracing::warn!("Failed to inject header value for {}: {}", key, e);
|
|
|
|
})
|
|
|
|
.ok()?;
|
|
|
|
|
|
|
|
self.headers.insert(key, value);
|
|
|
|
Some(())
|
|
|
|
};
|
|
|
|
|
|
|
|
let _ = (f)();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-11-30 22:05:38 +00:00
|
|
|
fn record_otel(_headers: &mut HeaderMap, _span: &Span) {
|
2021-09-19 18:11:58 +00:00
|
|
|
#[cfg(any(
|
|
|
|
feature = "opentelemetry_0_13",
|
|
|
|
feature = "opentelemetry_0_14",
|
|
|
|
feature = "opentelemetry_0_15",
|
2022-02-04 01:26:45 +00:00
|
|
|
feature = "opentelemetry_0_16",
|
|
|
|
feature = "opentelemetry_0_17"
|
2021-09-19 18:11:58 +00:00
|
|
|
))]
|
|
|
|
{
|
|
|
|
let span = _span;
|
2021-11-30 22:05:38 +00:00
|
|
|
let headers = _headers;
|
2021-09-19 18:11:58 +00:00
|
|
|
use tracing_opentelemetry::OpenTelemetrySpanExt;
|
|
|
|
|
2021-11-30 22:05:38 +00:00
|
|
|
let mut carrier = RequestHeaderCarrier { headers };
|
2021-09-19 18:11:58 +00:00
|
|
|
|
|
|
|
let context = span.context();
|
|
|
|
|
|
|
|
opentelemetry::global::get_text_map_propagator(|propagator| {
|
|
|
|
propagator.inject_context(&context, &mut carrier);
|
|
|
|
});
|
|
|
|
};
|
|
|
|
}
|