Create body span on first poll
Some checks reported errors
continuous-integration/drone/push Build was killed
Some checks reported errors
continuous-integration/drone/push Build was killed
This commit is contained in:
parent
9d44100e32
commit
859f233ca2
105
src/lib.rs
105
src/lib.rs
|
@ -79,7 +79,7 @@ where
|
|||
{
|
||||
type Response = ConnectResponse;
|
||||
type Error = SendRequestError;
|
||||
type Future = TracingFuture<S::Future>;
|
||||
type Future = Instrumented<TracingFuture<S::Future>>;
|
||||
|
||||
fn poll_ready(&self, ctx: &mut core::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
self.0.poll_ready(ctx)
|
||||
|
@ -142,18 +142,16 @@ where
|
|||
}
|
||||
|
||||
TracingFuture {
|
||||
future: self.0.call(req).instrument(span.clone()),
|
||||
span,
|
||||
future: span.in_scope(|| self.0.call(req)),
|
||||
}
|
||||
.instrument(span)
|
||||
}
|
||||
}
|
||||
|
||||
pin_project_lite::pin_project! {
|
||||
pub struct TracingFuture<F> {
|
||||
span: Span,
|
||||
|
||||
#[pin]
|
||||
future: Instrumented<F>,
|
||||
future: F,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -166,62 +164,59 @@ where
|
|||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let this = self.as_mut().project();
|
||||
|
||||
let span = Span::current();
|
||||
|
||||
let future = this.future;
|
||||
let span = this.span;
|
||||
|
||||
span.in_scope(|| {
|
||||
future
|
||||
.poll(cx)
|
||||
.map_ok(|succ| match succ {
|
||||
ConnectResponse::Client(client_response) => {
|
||||
let status: i32 = client_response.status().as_u16().into();
|
||||
span.record("http.status_code", &status);
|
||||
if client_response.status().is_client_error() {
|
||||
span.record("otel.status_code", &"ERROR");
|
||||
}
|
||||
|
||||
ConnectResponse::Client(client_response.map_body(|_, payload| {
|
||||
let body_span =
|
||||
tracing::info_span!(parent: None, "HTTP Client Response Body");
|
||||
let payload = InstrumentedBody::new(body_span, payload);
|
||||
let payload: Pin<Box<dyn Stream<Item = Result<Bytes, PayloadError>>>> =
|
||||
Box::pin(payload);
|
||||
|
||||
Payload::Stream { payload }
|
||||
}))
|
||||
future
|
||||
.poll(cx)
|
||||
.map_ok(|succ| match succ {
|
||||
ConnectResponse::Client(client_response) => {
|
||||
let status: i32 = client_response.status().as_u16().into();
|
||||
span.record("http.status_code", &status);
|
||||
if client_response.status().is_client_error() {
|
||||
span.record("otel.status_code", &"ERROR");
|
||||
}
|
||||
ConnectResponse::Tunnel(response_head, etc) => {
|
||||
let status: i32 = response_head.status.as_u16().into();
|
||||
span.record("http.status_code", &status);
|
||||
if response_head.status.is_client_error() {
|
||||
span.record("otel.status_code", &"ERROR");
|
||||
}
|
||||
ConnectResponse::Tunnel(response_head, etc)
|
||||
|
||||
ConnectResponse::Client(client_response.map_body(|_, payload| {
|
||||
let payload = InstrumentedBody::new(payload);
|
||||
let payload: Pin<Box<dyn Stream<Item = Result<Bytes, PayloadError>>>> =
|
||||
Box::pin(payload);
|
||||
|
||||
Payload::Stream { payload }
|
||||
}))
|
||||
}
|
||||
ConnectResponse::Tunnel(response_head, etc) => {
|
||||
let status: i32 = response_head.status.as_u16().into();
|
||||
span.record("http.status_code", &status);
|
||||
if response_head.status.is_client_error() {
|
||||
span.record("otel.status_code", &"ERROR");
|
||||
}
|
||||
})
|
||||
.map_err(|err| {
|
||||
span.record("otel.status_code", &"ERROR");
|
||||
span.record(
|
||||
"exception.message",
|
||||
&tracing::field::display(&format!("{}", err)),
|
||||
);
|
||||
span.record(
|
||||
"exception.details",
|
||||
&tracing::field::display(&format!("{:?}", err)),
|
||||
);
|
||||
ConnectResponse::Tunnel(response_head, etc)
|
||||
}
|
||||
})
|
||||
.map_err(|err| {
|
||||
span.record("otel.status_code", &"ERROR");
|
||||
span.record(
|
||||
"exception.message",
|
||||
&tracing::field::display(&format!("{}", err)),
|
||||
);
|
||||
span.record(
|
||||
"exception.details",
|
||||
&tracing::field::display(&format!("{:?}", err)),
|
||||
);
|
||||
|
||||
#[cfg(feature = "emit_event_on_error")]
|
||||
tracing::warn!("Error in request: {}", err);
|
||||
#[cfg(feature = "emit_event_on_error")]
|
||||
tracing::warn!("Error in request: {}", err);
|
||||
|
||||
err
|
||||
})
|
||||
})
|
||||
err
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pin_project_lite::pin_project! {
|
||||
struct InstrumentedBody<S> {
|
||||
span: Span,
|
||||
span: Option<Span>,
|
||||
|
||||
#[pin]
|
||||
body: S,
|
||||
|
@ -232,8 +227,8 @@ impl<S> InstrumentedBody<S>
|
|||
where
|
||||
S: Stream<Item = Result<Bytes, PayloadError>>,
|
||||
{
|
||||
fn new(span: Span, body: S) -> InstrumentedBody<S> {
|
||||
InstrumentedBody { span, body }
|
||||
fn new(body: S) -> InstrumentedBody<S> {
|
||||
InstrumentedBody { span: None, body }
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -246,7 +241,9 @@ where
|
|||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
let this = self.as_mut().project();
|
||||
|
||||
let span = this.span;
|
||||
let span = this
|
||||
.span
|
||||
.get_or_insert_with(|| tracing::info_span!("HTTP Client Response Body"));
|
||||
let body = this.body;
|
||||
|
||||
span.in_scope(|| body.poll_next(cx))
|
||||
|
|
Loading…
Reference in a new issue