251 lines
7.4 KiB
Rust
251 lines
7.4 KiB
Rust
use actix_http::body::Body;
|
|
use awc::{http::HeaderMap, ClientRequest, SendClientRequest};
|
|
use bytes::Bytes;
|
|
use futures_core::stream::Stream;
|
|
use serde::ser::Serialize;
|
|
use std::{
|
|
future::Future,
|
|
pin::Pin,
|
|
task::{Context, Poll},
|
|
};
|
|
use tracing::Span;
|
|
|
|
#[cfg(feature = "opentelemetry_0_13")]
|
|
use opentelemetry_0_13_pkg as opentelemetry;
|
|
#[cfg(feature = "opentelemetry_0_14")]
|
|
use opentelemetry_0_14_pkg as opentelemetry;
|
|
#[cfg(feature = "opentelemetry_0_15")]
|
|
use opentelemetry_0_15_pkg as opentelemetry;
|
|
#[cfg(feature = "opentelemetry_0_16")]
|
|
use opentelemetry_0_16_pkg as opentelemetry;
|
|
|
|
#[cfg(feature = "opentelemetry_0_13")]
|
|
use tracing_opentelemetry_0_12_pkg as tracing_opentelemetry;
|
|
#[cfg(feature = "opentelemetry_0_14")]
|
|
use tracing_opentelemetry_0_13_pkg as tracing_opentelemetry;
|
|
#[cfg(feature = "opentelemetry_0_15")]
|
|
use tracing_opentelemetry_0_14_pkg as tracing_opentelemetry;
|
|
#[cfg(feature = "opentelemetry_0_16")]
|
|
use tracing_opentelemetry_0_15_pkg as tracing_opentelemetry;
|
|
|
|
#[cfg(any(
|
|
feature = "opentelemetry_0_13",
|
|
feature = "opentelemetry_0_14",
|
|
feature = "opentelemetry_0_15",
|
|
feature = "opentelemetry_0_16"
|
|
))]
|
|
pub fn root_span() -> Span {
|
|
let span = tracing::info_span!("Root span", trace_id = tracing::field::Empty,);
|
|
{
|
|
use opentelemetry::trace::TraceContextExt;
|
|
use tracing_opentelemetry::OpenTelemetrySpanExt;
|
|
let trace_id = span.context().span().span_context().trace_id().to_hex();
|
|
span.record("trace_id", &tracing::field::display(trace_id));
|
|
}
|
|
|
|
span
|
|
}
|
|
|
|
trait InstrumentClient {
|
|
fn instrument(self, span: Span) -> SendRequestFuture;
|
|
}
|
|
|
|
impl InstrumentClient for SendClientRequest {
|
|
fn instrument(self, span: Span) -> SendRequestFuture {
|
|
SendRequestFuture { span, inner: self }
|
|
}
|
|
}
|
|
|
|
pub struct SendRequestFuture {
|
|
span: Span,
|
|
inner: SendClientRequest,
|
|
}
|
|
|
|
impl Future for SendRequestFuture {
|
|
type Output = <SendClientRequest as Future>::Output;
|
|
|
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
|
let span = self.span.clone();
|
|
span.in_scope(|| {
|
|
Pin::new(&mut self.inner)
|
|
.poll(cx)
|
|
.map_ok(|succ| {
|
|
span.record("http.status_code", &succ.status().as_u16());
|
|
if succ.status().is_client_error() {
|
|
span.record("otel.status_code", &"ERROR");
|
|
}
|
|
succ
|
|
})
|
|
.map_err(|err| {
|
|
span.record("otel.status_code", &"ERROR");
|
|
span.record(
|
|
"exception.message",
|
|
&tracing::field::display(&format!("{}", err)),
|
|
);
|
|
span.record(
|
|
"exception.details",
|
|
&tracing::field::display(&format!("{:?}", err)),
|
|
);
|
|
|
|
#[cfg(feature = "emit_event_on_error")]
|
|
tracing::warn!("Error in request: {}", err);
|
|
|
|
err
|
|
})
|
|
})
|
|
}
|
|
}
|
|
|
|
pub struct PropagatedRequest {
|
|
span: Span,
|
|
request: ClientRequest,
|
|
}
|
|
|
|
impl PropagatedRequest {
|
|
pub fn send_body<B>(self, body: B) -> SendRequestFuture
|
|
where
|
|
B: Into<Body>,
|
|
{
|
|
self.request.send_body(body).instrument(self.span)
|
|
}
|
|
|
|
pub fn send_json<T: Serialize>(self, value: &T) -> SendRequestFuture {
|
|
self.request.send_json(value).instrument(self.span)
|
|
}
|
|
|
|
pub fn send_form<T: Serialize>(self, value: &T) -> SendRequestFuture {
|
|
self.request.send_form(value).instrument(self.span)
|
|
}
|
|
|
|
pub fn send_stream<S, E>(self, stream: S) -> SendRequestFuture
|
|
where
|
|
S: Stream<Item = Result<Bytes, E>> + Unpin + 'static,
|
|
E: Into<Box<dyn std::error::Error>> + 'static,
|
|
{
|
|
self.request.send_stream(stream).instrument(self.span)
|
|
}
|
|
|
|
pub fn send(self) -> SendRequestFuture {
|
|
self.request.send().instrument(self.span)
|
|
}
|
|
}
|
|
|
|
pub trait Propagate {
|
|
fn propagate(self) -> PropagatedRequest;
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
struct RequestHeaderCarrier<'a> {
|
|
headers: &'a mut HeaderMap,
|
|
}
|
|
|
|
#[cfg(any(
|
|
feature = "opentelemetry_0_13",
|
|
feature = "opentelemetry_0_14",
|
|
feature = "opentelemetry_0_15",
|
|
feature = "opentelemetry_0_16"
|
|
))]
|
|
impl<'a> opentelemetry::propagation::Injector for RequestHeaderCarrier<'a> {
|
|
fn set(&mut self, key: &str, value: String) {
|
|
let f = || {
|
|
use actix_http::http::{HeaderName, HeaderValue};
|
|
use std::convert::TryFrom;
|
|
|
|
let key = HeaderName::from_bytes(key.as_bytes())
|
|
.map_err(|e| {
|
|
tracing::warn!("Failed to inject header {}: {}", key, e);
|
|
})
|
|
.ok()?;
|
|
let value = HeaderValue::try_from(value)
|
|
.map_err(|e| {
|
|
tracing::warn!("Failed to inject header value for {}: {}", key, e);
|
|
})
|
|
.ok()?;
|
|
|
|
self.headers.insert(key, value);
|
|
Some(())
|
|
};
|
|
|
|
let _ = (f)();
|
|
}
|
|
}
|
|
|
|
fn record_otel(request: ClientRequest, _span: &Span) -> ClientRequest {
|
|
#[cfg(any(
|
|
feature = "opentelemetry_0_13",
|
|
feature = "opentelemetry_0_14",
|
|
feature = "opentelemetry_0_15",
|
|
feature = "opentelemetry_0_16"
|
|
))]
|
|
{
|
|
let span = _span;
|
|
let mut request = request;
|
|
use tracing_opentelemetry::OpenTelemetrySpanExt;
|
|
|
|
let mut carrier = RequestHeaderCarrier {
|
|
headers: request.headers_mut(),
|
|
};
|
|
|
|
let context = span.context();
|
|
|
|
opentelemetry::global::get_text_map_propagator(|propagator| {
|
|
propagator.inject_context(&context, &mut carrier);
|
|
});
|
|
|
|
return request;
|
|
};
|
|
|
|
#[cfg(not(any(
|
|
feature = "opentelemetry_0_13",
|
|
feature = "opentelemetry_0_14",
|
|
feature = "opentelemetry_0_15",
|
|
feature = "opentelemetry_0_16"
|
|
)))]
|
|
request
|
|
}
|
|
|
|
impl Propagate for ClientRequest {
|
|
fn propagate(self) -> PropagatedRequest {
|
|
let span = tracing::info_span!(
|
|
"HTTP Client",
|
|
otel.kind = "client",
|
|
otel.status_code = tracing::field::Empty,
|
|
http.method = tracing::field::display(&self.get_method()),
|
|
http.url = tracing::field::display(&self.get_uri()),
|
|
http.flavor = tracing::field::Empty,
|
|
http.status_code = tracing::field::Empty,
|
|
net.peer.ip = tracing::field::Empty,
|
|
exception.message = tracing::field::Empty,
|
|
exception.details = tracing::field::Empty,
|
|
);
|
|
|
|
match self.get_version() {
|
|
&actix_http::Version::HTTP_09 => {
|
|
span.record("http.flavor", &"0.9");
|
|
}
|
|
&actix_http::Version::HTTP_10 => {
|
|
span.record("http.flavor", &"1.0");
|
|
}
|
|
&actix_http::Version::HTTP_11 => {
|
|
span.record("http.flavor", &"1.1");
|
|
}
|
|
&actix_http::Version::HTTP_2 => {
|
|
span.record("http.flavor", &"2.0");
|
|
}
|
|
&actix_http::Version::HTTP_3 => {
|
|
span.record("http.flavor", &"3.0");
|
|
}
|
|
_ => (),
|
|
}
|
|
|
|
if let Some(peer_ip) = self.get_peer_addr() {
|
|
span.record("net.peer.ip", &tracing::field::display(&peer_ip.ip()));
|
|
}
|
|
|
|
PropagatedRequest {
|
|
request: record_otel(self, &span),
|
|
span,
|
|
}
|
|
}
|
|
}
|