tracing-awc/src/lib.rs
2021-11-23 12:25:25 -06:00

512 lines
14 KiB
Rust

use actix_http::{
body::AnyBody, error::PayloadError, HttpMessage, Payload, PayloadStream, ResponseHead, StatusCode,
Version,
};
use awc::{
error::{JsonPayloadError, SendRequestError},
http::HeaderMap,
ClientRequest, ClientResponse, JsonBody, MessageBody, SendClientRequest,
};
use bytes::Bytes;
use encoding_rs::Encoding;
use futures_core::stream::Stream;
use mime::Mime;
use serde::{de::DeserializeOwned, ser::Serialize};
use std::{
cell::Ref,
future::Future,
pin::Pin,
task::{Context, Poll},
time::Duration,
};
use tracing::Span;
#[cfg(feature = "opentelemetry_0_13")]
use opentelemetry_0_13_pkg as opentelemetry;
#[cfg(feature = "opentelemetry_0_14")]
use opentelemetry_0_14_pkg as opentelemetry;
#[cfg(feature = "opentelemetry_0_15")]
use opentelemetry_0_15_pkg as opentelemetry;
#[cfg(feature = "opentelemetry_0_16")]
use opentelemetry_0_16_pkg as opentelemetry;
#[cfg(feature = "opentelemetry_0_13")]
use tracing_opentelemetry_0_12_pkg as tracing_opentelemetry;
#[cfg(feature = "opentelemetry_0_14")]
use tracing_opentelemetry_0_13_pkg as tracing_opentelemetry;
#[cfg(feature = "opentelemetry_0_15")]
use tracing_opentelemetry_0_14_pkg as tracing_opentelemetry;
#[cfg(feature = "opentelemetry_0_16")]
use tracing_opentelemetry_0_15_pkg as tracing_opentelemetry;
#[cfg(any(
feature = "opentelemetry_0_13",
feature = "opentelemetry_0_14",
feature = "opentelemetry_0_15",
feature = "opentelemetry_0_16"
))]
pub fn root_span() -> Span {
let span = tracing::info_span!("Root span", trace_id = tracing::field::Empty,);
{
use opentelemetry::trace::TraceContextExt;
use tracing_opentelemetry::OpenTelemetrySpanExt;
let trace_id = span.context().span().span_context().trace_id().to_hex();
span.record("trace_id", &tracing::field::display(trace_id));
}
span
}
trait InstrumentClient {
fn instrument(self, span: Span) -> SendRequestFuture;
}
impl InstrumentClient for SendClientRequest {
fn instrument(self, span: Span) -> SendRequestFuture {
SendRequestFuture { span, inner: self }
}
}
pub struct SendRequestFuture {
span: Span,
inner: SendClientRequest,
}
pub struct InstrumentedResponse<S = PayloadStream> {
span: Span,
response: ClientResponse<S>,
}
pub struct InstrumentedBody<S> {
span: Span,
body: MessageBody<S>,
}
pub struct InstrumentedJson<S, T> {
span: Span,
body: JsonBody<S, T>,
}
impl<S> InstrumentedResponse<S> {
pub fn version(&self) -> Version {
self.response.version()
}
pub fn status(&self) -> StatusCode {
self.response.status()
}
pub fn headers(&self) -> &HeaderMap {
self.response.headers()
}
pub fn map_body<F, U>(self, f: F) -> InstrumentedResponse<U>
where
F: FnOnce(&mut ResponseHead, Payload<S>) -> Payload<U>,
{
InstrumentedResponse {
span: self.span,
response: self.response.map_body(f),
}
}
pub fn timeout(self, dur: Duration) -> Self {
InstrumentedResponse {
span: self.span,
response: self.response.timeout(dur),
}
}
#[cfg(feature = "cookies")]
pub fn cookies(&self) -> Result<Ref<'_, Vec<cookie::Cookie<'static>>>, cookie::ParseError> {
self.response.cookies()
}
#[cfg(feature = "cookies")]
pub fn cookie(&self, name: &str) -> Option<cookie::Cookie<'static>> {
self.response.cookie(name)
}
pub fn span(&self) -> &Span {
&self.span
}
}
impl<S> InstrumentedResponse<S>
where
S: Stream<Item = Result<Bytes, PayloadError>>,
{
pub fn body(&mut self) -> InstrumentedBody<S> {
InstrumentedBody {
span: self.span.clone(),
body: self.response.body(),
}
}
pub fn json<T: DeserializeOwned>(&mut self) -> InstrumentedJson<S, T> {
InstrumentedJson {
span: self.span.clone(),
body: self.response.json(),
}
}
}
impl<S> InstrumentedBody<S>
where
S: Stream<Item = Result<Bytes, PayloadError>>,
{
pub fn new(span: Span, res: &mut ClientResponse<S>) -> InstrumentedBody<S> {
InstrumentedBody {
span,
body: MessageBody::new(res),
}
}
pub fn limit(self, limit: usize) -> Self {
InstrumentedBody {
span: self.span,
body: self.body.limit(limit),
}
}
}
impl<S, U> InstrumentedJson<S, U>
where
S: Stream<Item = Result<Bytes, PayloadError>>,
U: DeserializeOwned,
{
pub fn new(span: Span, res: &mut ClientResponse<S>) -> Self {
InstrumentedJson {
span,
body: JsonBody::new(res),
}
}
pub fn limit(self, limit: usize) -> Self {
InstrumentedJson {
span: self.span,
body: self.body.limit(limit),
}
}
}
impl<S> HttpMessage for InstrumentedResponse<S> {
type Stream = S;
fn headers(&self) -> &HeaderMap {
self.response.headers()
}
fn take_payload(&mut self) -> Payload<Self::Stream> {
self.response.take_payload()
}
fn extensions(&self) -> Ref<'_, actix_http::Extensions> {
self.response.extensions()
}
fn extensions_mut(&self) -> std::cell::RefMut<'_, actix_http::Extensions> {
self.response.extensions_mut()
}
fn content_type(&self) -> &str {
self.response.content_type()
}
fn encoding(&self) -> Result<&'static Encoding, actix_http::error::ContentTypeError> {
self.response.encoding()
}
fn mime_type(&self) -> Result<Option<Mime>, actix_http::error::ContentTypeError> {
self.response.mime_type()
}
fn chunked(&self) -> Result<bool, actix_http::error::ParseError> {
self.response.chunked()
}
}
impl<S> Stream for InstrumentedResponse<S>
where
S: Stream<Item = Result<Bytes, PayloadError>> + Unpin,
{
type Item = <S as Stream>::Item;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let span = self.span.clone();
span.in_scope(|| Pin::new(&mut self.response).poll_next(cx))
}
}
impl<S> Future for InstrumentedBody<S>
where
S: Stream<Item = Result<Bytes, PayloadError>> + Unpin,
{
type Output = Result<Bytes, PayloadError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let span = self.span.clone();
span.in_scope(|| Pin::new(&mut self.body).poll(cx))
}
}
impl<T, U> Future for InstrumentedJson<T, U>
where
T: Stream<Item = Result<Bytes, PayloadError>> + Unpin,
U: DeserializeOwned,
{
type Output = Result<U, JsonPayloadError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let span = self.span.clone();
span.in_scope(|| Pin::new(&mut self.body).poll(cx))
}
}
#[cfg(feature = "compress")]
impl Future for SendRequestFuture {
type Output = Result<
InstrumentedResponse<actix_http::encoding::Decoder<Payload<PayloadStream>>>,
SendRequestError,
>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let span = self.span.clone();
span.in_scope(|| {
Pin::new(&mut self.inner)
.poll(cx)
.map_ok(|succ| {
let status: i32 = succ.status().as_u16().into();
span.record("http.status_code", &status);
if succ.status().is_client_error() {
span.record("otel.status_code", &"ERROR");
}
let body_span = tracing::info_span!("HTTP Client Response Body");
body_span.follows_from(span.clone());
InstrumentedResponse {
span: body_span,
response: succ,
}
})
.map_err(|err| {
span.record("otel.status_code", &"ERROR");
span.record(
"exception.message",
&tracing::field::display(&format!("{}", err)),
);
span.record(
"exception.details",
&tracing::field::display(&format!("{:?}", err)),
);
#[cfg(feature = "emit_event_on_error")]
tracing::warn!("Error in request: {}", err);
err
})
})
}
}
#[cfg(not(feature = "compress"))]
impl Future for SendRequestFuture {
type Output = Result<InstrumentedResponse, SendRequestError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let span = self.span.clone();
span.in_scope(|| {
Pin::new(&mut self.inner)
.poll(cx)
.map_ok(|succ| {
let status: i32 = succ.status().as_u16().into();
span.record("http.status_code", &status);
if succ.status().is_client_error() {
span.record("otel.status_code", &"ERROR");
}
let body_span = tracing::info_span!("HTTP Client Response Body");
body_span.follows_from(span.clone());
InstrumentedResponse {
span: body_span,
response: succ,
}
})
.map_err(|err| {
span.record("otel.status_code", &"ERROR");
span.record(
"exception.message",
&tracing::field::display(&format!("{}", err)),
);
span.record(
"exception.details",
&tracing::field::display(&format!("{:?}", err)),
);
#[cfg(feature = "emit_event_on_error")]
tracing::warn!("Error in request: {}", err);
err
})
})
}
}
pub struct PropagatedRequest {
span: Span,
request: ClientRequest,
}
impl PropagatedRequest {
pub fn send_body<B>(self, body: B) -> SendRequestFuture
where
B: Into<AnyBody>,
{
self.request.send_body(body).instrument(self.span)
}
pub fn send_json<T: Serialize>(self, value: &T) -> SendRequestFuture {
self.request.send_json(value).instrument(self.span)
}
pub fn send_form<T: Serialize>(self, value: &T) -> SendRequestFuture {
self.request.send_form(value).instrument(self.span)
}
pub fn send_stream<S, E>(self, stream: S) -> SendRequestFuture
where
S: Stream<Item = Result<Bytes, E>> + Unpin + 'static,
E: Into<Box<dyn std::error::Error>> + 'static,
{
self.request.send_stream(stream).instrument(self.span)
}
pub fn send(self) -> SendRequestFuture {
self.request.send().instrument(self.span)
}
}
pub trait Propagate {
fn propagate(self) -> PropagatedRequest;
}
#[derive(Debug)]
struct RequestHeaderCarrier<'a> {
headers: &'a mut HeaderMap,
}
#[cfg(any(
feature = "opentelemetry_0_13",
feature = "opentelemetry_0_14",
feature = "opentelemetry_0_15",
feature = "opentelemetry_0_16"
))]
impl<'a> opentelemetry::propagation::Injector for RequestHeaderCarrier<'a> {
fn set(&mut self, key: &str, value: String) {
let f = || {
use actix_http::http::{HeaderName, HeaderValue};
use std::convert::TryFrom;
let key = HeaderName::from_bytes(key.as_bytes())
.map_err(|e| {
tracing::warn!("Failed to inject header {}: {}", key, e);
})
.ok()?;
let value = HeaderValue::try_from(value)
.map_err(|e| {
tracing::warn!("Failed to inject header value for {}: {}", key, e);
})
.ok()?;
self.headers.insert(key, value);
Some(())
};
let _ = (f)();
}
}
fn record_otel(request: ClientRequest, _span: &Span) -> ClientRequest {
#[cfg(any(
feature = "opentelemetry_0_13",
feature = "opentelemetry_0_14",
feature = "opentelemetry_0_15",
feature = "opentelemetry_0_16"
))]
{
let span = _span;
let mut request = request;
use tracing_opentelemetry::OpenTelemetrySpanExt;
let mut carrier = RequestHeaderCarrier {
headers: request.headers_mut(),
};
let context = span.context();
opentelemetry::global::get_text_map_propagator(|propagator| {
propagator.inject_context(&context, &mut carrier);
});
return request;
};
#[cfg(not(any(
feature = "opentelemetry_0_13",
feature = "opentelemetry_0_14",
feature = "opentelemetry_0_15",
feature = "opentelemetry_0_16"
)))]
request
}
impl Propagate for ClientRequest {
fn propagate(self) -> PropagatedRequest {
let span = tracing::info_span!(
"HTTP Client",
otel.kind = "client",
otel.status_code = tracing::field::Empty,
http.method = tracing::field::display(&self.get_method()),
http.url = tracing::field::display(&self.get_uri()),
http.flavor = tracing::field::Empty,
http.status_code = tracing::field::Empty,
net.peer.ip = tracing::field::Empty,
exception.message = tracing::field::Empty,
exception.details = tracing::field::Empty,
);
match self.get_version() {
&actix_http::Version::HTTP_09 => {
span.record("http.flavor", &"0.9");
}
&actix_http::Version::HTTP_10 => {
span.record("http.flavor", &"1.0");
}
&actix_http::Version::HTTP_11 => {
span.record("http.flavor", &"1.1");
}
&actix_http::Version::HTTP_2 => {
span.record("http.flavor", &"2.0");
}
&actix_http::Version::HTTP_3 => {
span.record("http.flavor", &"3.0");
}
_ => (),
}
if let Some(peer_ip) = self.get_peer_addr() {
span.record("net.peer.ip", &tracing::field::display(&peer_ip.ip()));
}
PropagatedRequest {
request: record_otel(self, &span),
span,
}
}
}