Improve tracing
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
Aode (lion) 2022-04-06 21:40:49 -05:00
parent 05e2cf5e08
commit c80d207a87
16 changed files with 228 additions and 181 deletions

24
Cargo.lock generated
View file

@ -763,18 +763,18 @@ checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457"
[[package]]
name = "encoding_rs"
version = "0.8.30"
version = "0.8.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7896dc8abb250ffdda33912550faa54c88ec8b998dec0b2c55ab224921ce11df"
checksum = "9852635589dc9f9ea1b6fe9f05b50ef208c85c834a562f0c6abb1c475736ec2b"
dependencies = [
"cfg-if",
]
[[package]]
name = "eyre"
version = "0.6.7"
version = "0.6.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9289ed2c0440a6536e65119725cf91fc2c6b5e513bfd2e36e1134d7cca6ca12f"
checksum = "4c2b6b5a29c02cdc822728b7d7b8ae1bab3e3b05d44522770ddd49722eeac7eb"
dependencies = [
"indenter",
"once_cell",
@ -1261,9 +1261,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]]
name = "libc"
version = "0.2.121"
version = "0.2.122"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "efaa7b300f3b5fe8eb6bf21ce3895e1751d9665086af2d64b42f19701015ff4f"
checksum = "ec647867e2bf0772e28c8bcde4f0d19a9216916e890543b5a03ed8ef27b8f259"
[[package]]
name = "linked-hash-map"
@ -1783,9 +1783,9 @@ dependencies = [
[[package]]
name = "proc-macro2"
version = "1.0.36"
version = "1.0.37"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c7342d5883fbccae1cc37a2353b09c87c9b0f3afd73f5fb9bba687a1f733b029"
checksum = "ec757218438d5fda206afc041538b2f6d889286160d649a86a24d37e1235afd1"
dependencies = [
"unicode-xid",
]
@ -2318,9 +2318,9 @@ checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601"
[[package]]
name = "syn"
version = "1.0.90"
version = "1.0.91"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "704df27628939572cd88d33f171cd6f896f4eaca85252c6e0a72d8d8287ee86f"
checksum = "b683b2b825c8eef438b77c36a06dc262294da3d5a5813fac20da149241dcd44d"
dependencies = [
"proc-macro2",
"quote",
@ -2651,9 +2651,9 @@ dependencies = [
[[package]]
name = "tracing-awc"
version = "0.1.2"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b524ca012b208a8b5014b68da09573d6d43de93bb850c9303550ac2f5d896631"
checksum = "48723b87eafb89a0c219d5b2c1478711d69c8860503f79586a7e93231238bbb7"
dependencies = [
"actix-http",
"actix-service",

View file

@ -81,6 +81,6 @@ default-features = false
features = ["opentelemetry_0_17"]
[dependencies.tracing-awc]
version = "0.1.0"
version = "0.1.5"
default-features = false
features = ["opentelemetry_0_17"]

View file

@ -328,7 +328,7 @@ pict-rs offers the following endpoints:
The following endpoints are protected by an API key via the `X-Api-Token` header, and are disabled
unless the `--api-key` option is passed to the binary or the PICTRS_SERVER__API_KEY environment variable is
unless the `--api-key` option is passed to the binary or the PICTRS__SERVER__API_KEY environment variable is
set.
A secure API key can be generated by any password generator.

View file

@ -8,5 +8,7 @@ export GROUP_ID=$(id -g)
sudo docker-compose build --pull
sudo docker-compose up -d minio
sudo docker-compose up -d pictrs_proxy
sudo docker-compose up -d otel
sudo docker-compose up -d jaeger
sudo docker-compose run --service-ports --use-aliases pictrs
sudo docker-compose down

View file

@ -10,6 +10,8 @@ services:
GID: "${GROUP_ID:-1000}"
ports:
- "8080:8080"
environment:
- PICTRS__TRACING__OPENTELEMETRY__URL=http://otel:4137
links:
- "minio:pict-rs.minio"
stdin_open: true
@ -33,3 +35,22 @@ services:
- "9001:9001"
volumes:
- ./storage/minio:/mnt
otel:
image: otel/opentelemetry-collector:latest
command: --config otel-local-config.yaml
volumes:
- type: bind
source: ./otel.yml
target: /otel-local-config.yaml
restart: always
depends_on:
- jaeger
jaeger:
image: jaegertracing/all-in-one:1
ports:
- "14250:14250"
# To view traces, visit http://localhost:16686
- "16686:16686"
restart: always

View file

@ -0,0 +1,25 @@
receivers:
otlp:
protocols:
grpc:
endpoint: 0.0.0.0:4137
processors:
batch:
exporters:
logging:
jaeger:
endpoint: jaeger:14250
insecure: true
service:
pipelines:
traces:
receivers:
- otlp
processors:
- batch
exporters:
- logging
- jaeger

View file

@ -1,12 +1,12 @@
## Server configuration
[server]
## Optional: pict-rs binding address
# environment variable: PICTRS_SERVER__ADDRESS
# environment variable: PICTRS__SERVER__ADDRESS
# default: 0.0.0.0:8080
address = '0.0.0.0:8080'
## Optional: pict-rs worker id
# environment variable PICTRS_SERVER__WORKER_ID
# environment variable PICTRS__SERVER__WORKER_ID
# default: pict-rs-1
#
# This is used for the internal job queue. It will have more meaning once a shared metadata
@ -14,7 +14,7 @@ address = '0.0.0.0:8080'
worker_id = 'pict-rs-1'
## Optional: shared secret for internal endpoints
# environment variable: PICTRS_SERVER__API_KEY
# environment variable: PICTRS__SERVER__API_KEY
# default: empty
#
# Not specifying api_key disables internal endpoints
@ -24,14 +24,14 @@ api_key = 'API_KEY'
## Logging configuration
[tracing.logging]
## Optional: log format
# environment variable: PICTRS_TRACING__LOGGING__FORMAT
# environment variable: PICTRS__TRACING__LOGGING__FORMAT
# default: normal
#
# available options: compact, json, normal, pretty
format = 'normal'
## Optional: log targets
# environment variable: PICTRS_TRACING__LOGGING__TARGETS
# environment variable: PICTRS__TRACING__LOGGING__TARGETS
# default: warn,tracing_actix_web=info,actix_server=info,actix_web=info
#
# Dictates which traces should print to stdout
@ -42,7 +42,7 @@ targets = 'warn,tracing_actix_web=info,actix_server=info,actix_web=info'
## Console configuration
[tracing.console]
## Optional: console address
# environment variable: PICTRS_TRACING__CONSOLE__ADDRESS
# environment variable: PICTRS__TRACING__CONSOLE__ADDRESS
# default: empty
#
# Dictacts whether console should be enabled, and what address it should be exposed on.
@ -72,7 +72,7 @@ targets = 'warn,tracing_actix_web=info,actix_server=info,actix_web=info'
address = '0.0.0.0:6669'
## Optional: console buffer capacity
# environment variable: PICTRS_TRACING__CONSOLE__BUFFER_CAPACITY
# environment variable: PICTRS__TRACING__CONSOLE__BUFFER_CAPACITY
# default: 102400
#
# This is the number of _events_ to buffer, not the number of bytes. In reality, the amount of
@ -83,7 +83,7 @@ buffer_capacity = 102400
## OpenTelemetry configuration
[tracing.opentelemetry]
## Optional: url for exporting otlp traces
# environment variable: PICTRS_TRACING__OPENTELEMETRY__URL
# environment variable: PICTRS__TRACING__OPENTELEMETRY__URL
# default: empty
#
# Not specifying opentelemetry_url means no traces will be exported
@ -93,12 +93,12 @@ buffer_capacity = 102400
url = 'http://localhost:4317/'
## Optional: name to relate OpenTelemetry traces
# environment variable: PICTRS_TRACING__OPENTELEMETRY__SERVICE_NAME
# environment variable: PICTRS__TRACING__OPENTELEMETRY__SERVICE_NAME
# default: pict-rs
service_name = 'pict-rs'
## Optional: trace level to export
# environment variable: PICTRS_TRACING__OPENTELEMETRY__TARGETS
# environment variable: PICTRS__TRACING__OPENTELEMETRY__TARGETS
# default: info
#
# Follows the same format as RUST_LOG
@ -108,7 +108,7 @@ targets = 'info'
## Configuration for migrating from pict-rs 0.2
[old_db]
## Optional: path to old pict-rs directory
# environment variable: PICTRS_OLD_DB__PATH
# environment variable: PICTRS__OLD_DB__PATH
# default: /mnt
path = '/mnt'
@ -116,46 +116,46 @@ path = '/mnt'
## Media Processing Configuration
[media]
## Optional: max media width (in pixels)
# environment variable: PICTRS_MEDIA__MAX_WIDTH
# environment variable: PICTRS__MEDIA__MAX_WIDTH
# default: 10,000
max_width = 10000
## Optional: max media height (in pixels)
# environment variable: PICTRS_MEDIA__MAX_HEIGHT
# environment variable: PICTRS__MEDIA__MAX_HEIGHT
# default: 10,000
max_height = 10000
## Optional: max media area (in pixels)
# environment variable: PICTRS_MEDIA__MAX_AREA
# environment variable: PICTRS__MEDIA__MAX_AREA
# default: 40,000,000
max_area = 40000000
## Optional: max file size (in Megabytes)
# environment variable: PICTRS_MEDIA__MAX_FILE_SIZE
# environment variable: PICTRS__MEDIA__MAX_FILE_SIZE
# default: 40
max_file_size = 40
## Optional: enable GIF and MP4 uploads (without sound)
# environment variable: PICTRS_MEDIA__ENABLE_SILENT_VIDEO
# environment variable: PICTRS__MEDIA__ENABLE_SILENT_VIDEO
# default: true
#
# Set this to false to serve static images only
enable_silent_video = true
## Optional: set allowed filters for image processing
# environment variable: PICTRS_MEDIA__FILTERS
# environment variable: PICTRS__MEDIA__FILTERS
# default: ['blur', 'crop', 'identity', 'resize', 'thumbnail']
filters = ['blur', 'crop', 'identity', 'resize', 'thumbnail']
## Optional: whether to validate images uploaded through the `import` endpoint
# environment variable: PICTRS_MEDIA__SKIP_VALIDATE_IMPORTS
# environment variable: PICTRS__MEDIA__SKIP_VALIDATE_IMPORTS
# default: false
#
# Set this to true if you want to avoid processing imported media
skip_validate_imports = false
## Optional: The duration, in hours, to keep media ingested through the "cache" endpoint
# environment variable: PICTRS_MEDIA__CACHE_DURATION
# environment variable: PICTRS__MEDIA__CACHE_DURATION
# default: 168 (1 week)
cache_duration = 168
@ -163,19 +163,19 @@ cache_duration = 168
## Database configuration
[repo]
## Optional: database backend to use
# environment variable: PICTRS_REPO__TYPE
# environment variable: PICTRS__REPO__TYPE
# default: sled
#
# available options: sled
type = 'sled'
## Optional: path to sled repository
# environment variable: PICTRS_REPO__PATH
# environment variable: PICTRS__REPO__PATH
# default: /mnt/sled-repo
path = '/mnt/sled-repo'
## Optional: in-memory cache capacity for sled data (in bytes)
# environment variable: PICTRS_REPO__CACHE_CAPACITY
# environment variable: PICTRS__REPO__CACHE_CAPACITY
# default: 67,108,864 (1024 * 1024 * 64, or 64MB)
cache_capacity = 67108864
@ -183,39 +183,39 @@ cache_capacity = 67108864
## Media storage configuration
[store]
## Optional: type of media storage to use
# environment variable: PICTRS_STORE__TYPE
# environment variable: PICTRS__STORE__TYPE
# default: filesystem
#
# available options: filesystem, object_storage
type = 'object_storage'
## Required: object storage bucket name
# environment variable: PICTRS_STORE__BUCKET_NAME
# environment variable: PICTRS__STORE__BUCKET_NAME
# default: empty
bucket_name = 'BUCKET_NAME'
## Required: object storage region
# environment variable: PICTRS_STORE__REGION
# environment variable: PICTRS__STORE__REGION
# default: empty
region = 'REGION'
## Required: object storage access key
# environment variable: PICTRS_STORE__ACCESS_KEY
# environment variable: PICTRS__STORE__ACCESS_KEY
# default: empty
access_key = 'ACCESS_KEY'
## Required: object storage secret key
# environment variable: PICTRS_STORE__SECRET_KEY
# environment variable: PICTRS__STORE__SECRET_KEY
# default: empty
secret_key = 'SECRET_KEY'
## Optional: object storage security token
# environment variable: PICTRS_STORE__SECURITY_TOKEN
# environment variable: PICTRS__STORE__SECURITY_TOKEN
# default: empty
security_token = 'SECURITY_TOKEN'
## Optional: object storage session token
# environment variable: PICTRS_STORE__SESSION_TOKEN
# environment variable: PICTRS__STORE__SESSION_TOKEN
# default: empty
session_token = 'SESSION_TOKEN'
@ -223,13 +223,13 @@ session_token = 'SESSION_TOKEN'
# ## Media storage configuration
# [store]
# ## Optional: type of media storage to use
# # environment variable: PICTRS_STORE__TYPE
# # environment variable: PICTRS__STORE__TYPE
# # default: filesystem
# #
# # available options: filesystem, object_storage
# type = 'filesystem'
#
# ## Optional: path to uploaded media
# # environment variable: PICTRS_STORE__PATH
# # environment variable: PICTRS__STORE__PATH
# # default: /mnt/files
# path = '/mnt/files'

View file

@ -2,8 +2,9 @@ use crate::process::Process;
use actix_web::web::Bytes;
use tokio::io::AsyncRead;
#[tracing::instrument(name = "Clearing metadata", skip(input))]
pub(crate) fn clear_metadata_bytes_read(input: Bytes) -> std::io::Result<impl AsyncRead + Unpin> {
let process = Process::run("exiftool", &["-all=", "-", "-out", "-"])?;
Ok(process.bytes_read(input).unwrap())
Ok(process.bytes_read(input))
}

View file

@ -50,6 +50,7 @@ impl ThumbnailFormat {
}
}
#[tracing::instrument(name = "Convert to Mp4", skip(input))]
pub(crate) async fn to_mp4_bytes(
input: Bytes,
input_format: InputFormat,

View file

@ -11,6 +11,7 @@ use actix_web::web::Bytes;
use std::path::PathBuf;
use tokio::io::AsyncReadExt;
#[tracing::instrument(skip(hash))]
pub(crate) async fn generate<R: FullRepo, S: Store + 'static>(
repo: &R,
store: &S,
@ -36,6 +37,7 @@ pub(crate) async fn generate<R: FullRepo, S: Store + 'static>(
Ok((details, bytes))
}
#[tracing::instrument(skip(hash))]
async fn process<R: FullRepo, S: Store + 'static>(
repo: &R,
store: &S,
@ -45,7 +47,9 @@ async fn process<R: FullRepo, S: Store + 'static>(
thumbnail_args: Vec<String>,
hash: R::Bytes,
) -> Result<(Details, Bytes), Error> {
let permit = crate::PROCESS_SEMAPHORE.acquire().await?;
let permit = tracing::trace_span!(parent: None, "Aquire semaphore")
.in_scope(|| crate::PROCESS_SEMAPHORE.acquire())
.await;
let identifier = if let Some(identifier) = repo
.still_identifier_from_alias::<S::Identifier>(&alias)

View file

@ -8,11 +8,11 @@ use crate::{
use actix_web::web::{Bytes, BytesMut};
use futures_util::{Stream, StreamExt};
use sha2::{Digest, Sha256};
use tracing::debug;
mod hasher;
use hasher::Hasher;
#[derive(Debug)]
pub(crate) struct Session<R, S>
where
R: FullRepo + 'static,
@ -24,6 +24,33 @@ where
identifier: Option<S::Identifier>,
}
#[tracing::instrument(name = "Aggregate", skip(stream))]
async fn aggregate<S>(stream: S) -> Result<Bytes, Error>
where
S: Stream<Item = Result<Bytes, Error>>,
{
futures_util::pin_mut!(stream);
let mut buf = Vec::new();
tracing::debug!("Reading stream to memory");
while let Some(res) = stream.next().await {
let bytes = res?;
buf.push(bytes);
}
let total_len = buf.iter().fold(0, |acc, item| acc + item.len());
let bytes_mut = buf
.iter()
.fold(BytesMut::with_capacity(total_len), |mut acc, item| {
acc.extend_from_slice(item);
acc
});
Ok(bytes_mut.freeze())
}
#[tracing::instrument(name = "Ingest", skip(stream))]
pub(crate) async fn ingest<R, S>(
repo: &R,
store: &S,
@ -35,21 +62,15 @@ where
R: FullRepo + 'static,
S: Store,
{
let permit = crate::PROCESS_SEMAPHORE.acquire().await;
let permit = tracing::trace_span!(parent: None, "Aquire semaphore")
.in_scope(|| crate::PROCESS_SEMAPHORE.acquire())
.await;
let mut bytes_mut = BytesMut::new();
let bytes = aggregate(stream).await?;
futures_util::pin_mut!(stream);
debug!("Reading stream to memory");
while let Some(res) = stream.next().await {
let bytes = res?;
bytes_mut.extend_from_slice(&bytes);
}
debug!("Validating bytes");
tracing::debug!("Validating bytes");
let (input_type, validated_reader) = crate::validate::validate_image_bytes(
bytes_mut.freeze(),
bytes,
CONFIG.media.format,
CONFIG.media.enable_silent_video,
should_validate,
@ -73,12 +94,8 @@ where
session.hash = Some(hash.clone());
debug!("Saving upload");
save_upload(repo, store, &hash, &identifier).await?;
debug!("Adding alias");
if let Some(alias) = declared_alias {
session.add_existing_alias(&hash, alias).await?
} else {
@ -88,6 +105,7 @@ where
Ok(session)
}
#[tracing::instrument]
async fn save_upload<R, S>(
repo: &R,
store: &S,
@ -124,25 +142,27 @@ where
self.alias.as_ref()
}
#[tracing::instrument]
pub(crate) async fn delete_token(&self) -> Result<DeleteToken, Error> {
let alias = self.alias.clone().ok_or(UploadError::MissingAlias)?;
debug!("Generating delete token");
tracing::debug!("Generating delete token");
let delete_token = DeleteToken::generate();
debug!("Saving delete token");
tracing::debug!("Saving delete token");
let res = self.repo.relate_delete_token(&alias, &delete_token).await?;
if res.is_err() {
let delete_token = self.repo.delete_token(&alias).await?;
debug!("Returning existing delete token, {:?}", delete_token);
tracing::debug!("Returning existing delete token, {:?}", delete_token);
return Ok(delete_token);
}
debug!("Returning new delete token, {:?}", delete_token);
tracing::debug!("Returning new delete token, {:?}", delete_token);
Ok(delete_token)
}
#[tracing::instrument]
async fn add_existing_alias(&mut self, hash: &[u8], alias: Alias) -> Result<(), Error> {
AliasRepo::create(&self.repo, &alias)
.await?
@ -156,8 +176,9 @@ where
Ok(())
}
#[tracing::instrument]
async fn create_alias(&mut self, hash: &[u8], input_type: ValidInputType) -> Result<(), Error> {
debug!("Alias gen loop");
tracing::debug!("Alias gen loop");
loop {
let alias = Alias::generate(input_type.as_ext().to_string());
@ -171,7 +192,7 @@ where
return Ok(());
}
debug!("Alias exists, regenerating");
tracing::debug!("Alias exists, regenerating");
}
}
}

View file

@ -41,8 +41,8 @@ where
.with(ErrorLayer::default());
if let Some(address) = tracing.console.address {
println!("Starting console on {}", address);
let console_layer = ConsoleLayer::builder()
.with_default_env()
.event_buffer_capacity(tracing.console.buffer_capacity)
.server_addr(address)
.spawn();

View file

@ -79,12 +79,14 @@ pub(crate) struct Details {
pub(crate) height: usize,
}
#[tracing::instrument(name = "Clear Metadata", skip(input))]
pub(crate) fn clear_metadata_bytes_read(input: Bytes) -> std::io::Result<impl AsyncRead + Unpin> {
let process = Process::run("magick", &["convert", "-", "-strip", "-"])?;
Ok(process.bytes_read(input).unwrap())
Ok(process.bytes_read(input))
}
#[tracing::instrument(name = "Convert", skip(input))]
pub(crate) fn convert_bytes_read(
input: Bytes,
format: ImageFormat,
@ -99,7 +101,7 @@ pub(crate) fn convert_bytes_read(
],
)?;
Ok(process.bytes_read(input).unwrap())
Ok(process.bytes_read(input))
}
#[instrument(name = "Getting details from input bytes", skip(input))]
@ -130,7 +132,7 @@ pub(crate) async fn details_bytes(
&["identify", "-ping", "-format", "%w %h | %m\n", &last_arg],
)?;
let mut reader = process.bytes_read(input).unwrap();
let mut reader = process.bytes_read(input);
let mut bytes = Vec::new();
reader.read_to_end(&mut bytes).await?;
@ -170,7 +172,7 @@ pub(crate) async fn details_store<S: Store + 'static>(
&["identify", "-ping", "-format", "%w %h | %m\n", &last_arg],
)?;
let mut reader = process.store_read(store, identifier).unwrap();
let mut reader = process.store_read(store, identifier);
let mut output = Vec::new();
reader.read_to_end(&mut output).await?;
@ -187,7 +189,7 @@ pub(crate) async fn details_file(path_str: &str) -> Result<Details, Error> {
&["identify", "-ping", "-format", "%w %h | %m\n", path_str],
)?;
let mut reader = process.read().unwrap();
let mut reader = process.read();
let mut output = Vec::new();
reader.read_to_end(&mut output).await?;
@ -272,7 +274,7 @@ pub(crate) fn process_image_store_read<S: Store + 'static>(
.arg(last_arg),
)?;
Ok(process.store_read(store, identifier).unwrap())
Ok(process.store_read(store, identifier))
}
impl Details {

View file

@ -12,15 +12,18 @@ use tokio::{
process::{Child, Command},
sync::oneshot::{channel, Receiver},
};
use tracing::Instrument;
use tracing::Span;
#[derive(Debug)]
struct StatusError;
pub(crate) struct Process {
child: Child,
span: Span,
}
impl std::fmt::Debug for Process {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Process").field("child", &"Child").finish()
}
}
struct DropHandle {
@ -31,7 +34,6 @@ pin_project_lite::pin_project! {
struct ProcessRead<I> {
#[pin]
inner: I,
span: Span,
err_recv: Receiver<std::io::Error>,
err_closed: bool,
handle: DropHandle,
@ -39,30 +41,19 @@ pin_project_lite::pin_project! {
}
impl Process {
#[tracing::instrument]
pub(crate) fn run(command: &str, args: &[&str]) -> std::io::Result<Self> {
Self::spawn(Command::new(command).args(args))
}
fn spawn_span(&self) -> Span {
let span = tracing::info_span!(parent: None, "Spawned command writer",);
span.follows_from(self.span.clone());
span
}
#[tracing::instrument]
pub(crate) fn spawn(cmd: &mut Command) -> std::io::Result<Self> {
let cmd = cmd.stdin(Stdio::piped()).stdout(Stdio::piped());
let span = tracing::info_span!(
"Spawning Command",
command = &tracing::field::debug(&cmd),
exception.message = &tracing::field::Empty,
exception.details = &tracing::field::Empty,
);
cmd.spawn().map(|child| Process { child, span })
cmd.spawn().map(|child| Process { child })
}
#[tracing::instrument]
pub(crate) async fn wait(mut self) -> std::io::Result<()> {
let status = self.child.wait().await?;
if !status.success() {
@ -71,16 +62,16 @@ impl Process {
Ok(())
}
pub(crate) fn bytes_read(mut self, mut input: Bytes) -> Option<impl AsyncRead + Unpin> {
let mut stdin = self.child.stdin.take()?;
let stdout = self.child.stdout.take()?;
#[tracing::instrument(skip(input))]
pub(crate) fn bytes_read(mut self, mut input: Bytes) -> impl AsyncRead + Unpin {
let mut stdin = self.child.stdin.take().expect("stdin exists");
let stdout = self.child.stdout.take().expect("stdout exists");
let (tx, rx) = channel::<std::io::Error>();
let span = self.spawn_span();
let mut child = self.child;
let handle = actix_rt::spawn(
async move {
let handle = tracing::trace_span!(parent: None, "Spawn").in_scope(|| {
actix_rt::spawn(async move {
if let Err(e) = stdin.write_all_buf(&mut input).await {
let _ = tx.send(e);
return;
@ -98,94 +89,84 @@ impl Process {
let _ = tx.send(e);
}
}
}
.instrument(span),
);
})
});
Some(ProcessRead {
ProcessRead {
inner: stdout,
span: self.span,
err_recv: rx,
err_closed: false,
handle: DropHandle { inner: handle },
})
}
}
pub(crate) fn read(mut self) -> Option<impl AsyncRead + Unpin> {
let stdout = self.child.stdout.take()?;
#[tracing::instrument]
pub(crate) fn read(mut self) -> impl AsyncRead + Unpin {
let stdout = self.child.stdout.take().expect("stdout exists");
let (tx, rx) = channel();
let span = self.spawn_span();
let mut child = self.child;
let handle = actix_rt::spawn(
async move {
match child.wait().await {
Ok(status) => {
if !status.success() {
let _ = tx
.send(std::io::Error::new(std::io::ErrorKind::Other, &StatusError));
}
}
Err(e) => {
let _ = tx.send(e);
let handle = actix_rt::spawn(async move {
match child.wait().await {
Ok(status) => {
if !status.success() {
let _ =
tx.send(std::io::Error::new(std::io::ErrorKind::Other, &StatusError));
}
}
Err(e) => {
let _ = tx.send(e);
}
}
.instrument(span),
);
});
Some(ProcessRead {
ProcessRead {
inner: stdout,
span: self.span,
err_recv: rx,
err_closed: false,
handle: DropHandle { inner: handle },
})
}
}
#[tracing::instrument]
pub(crate) fn store_read<S: Store + 'static>(
mut self,
store: S,
identifier: S::Identifier,
) -> Option<impl AsyncRead + Unpin> {
let mut stdin = self.child.stdin.take()?;
let stdout = self.child.stdout.take()?;
) -> impl AsyncRead + Unpin {
let mut stdin = self.child.stdin.take().expect("stdin exists");
let stdout = self.child.stdout.take().expect("stdout exists");
let (tx, rx) = channel();
let span = self.spawn_span();
let mut child = self.child;
let handle = actix_rt::spawn(
async move {
if let Err(e) = store.read_into(&identifier, &mut stdin).await {
let _ = tx.send(e);
return;
}
drop(stdin);
let handle = actix_rt::spawn(async move {
if let Err(e) = store.read_into(&identifier, &mut stdin).await {
let _ = tx.send(e);
return;
}
drop(stdin);
match child.wait().await {
Ok(status) => {
if !status.success() {
let _ = tx
.send(std::io::Error::new(std::io::ErrorKind::Other, &StatusError));
}
}
Err(e) => {
let _ = tx.send(e);
match child.wait().await {
Ok(status) => {
if !status.success() {
let _ =
tx.send(std::io::Error::new(std::io::ErrorKind::Other, &StatusError));
}
}
Err(e) => {
let _ = tx.send(e);
}
}
.instrument(span),
);
});
Some(ProcessRead {
ProcessRead {
inner: stdout,
span: self.span,
err_recv: rx,
err_closed: false,
handle: DropHandle { inner: handle },
})
}
}
}
@ -200,37 +181,20 @@ where
) -> Poll<std::io::Result<()>> {
let this = self.as_mut().project();
let span = this.span;
let err_recv = this.err_recv;
let err_closed = this.err_closed;
let inner = this.inner;
span.in_scope(|| {
if !*err_closed {
if let Poll::Ready(res) = Pin::new(err_recv).poll(cx) {
*err_closed = true;
if let Ok(err) = res {
let display = format!("{}", err);
let debug = format!("{:?}", err);
span.record("exception.message", &display.as_str());
span.record("exception.details", &debug.as_str());
return Poll::Ready(Err(err));
}
if !*err_closed {
if let Poll::Ready(res) = Pin::new(err_recv).poll(cx) {
*err_closed = true;
if let Ok(err) = res {
return Poll::Ready(Err(err));
}
}
}
if let Poll::Ready(res) = inner.poll_read(cx, buf) {
if let Err(err) = &res {
let display = format!("{}", err);
let debug = format!("{:?}", err);
span.record("exception.message", &display.as_str());
span.record("exception.details", &debug.as_str());
}
return Poll::Ready(res);
}
Poll::Pending
})
inner.poll_read(cx, buf)
}
}

View file

@ -12,7 +12,7 @@ use std::{
};
use storage_path_generator::Generator;
use tokio::io::{AsyncRead, AsyncWrite};
use tracing::{debug, error, instrument};
use tracing::{debug, error, instrument, Instrument};
mod file_id;
pub(crate) use file_id::FileId;
@ -88,9 +88,15 @@ impl Store for FileStore {
) -> Result<Self::Stream, Error> {
let path = self.path_from_file_id(identifier);
let stream = File::open(path)
.await?
.read_to_stream(from_start, len)
let file_span = tracing::trace_span!(parent: None, "File Stream");
let file = file_span
.in_scope(|| File::open(path))
.instrument(file_span.clone())
.await?;
let stream = file_span
.in_scope(|| file.read_to_stream(from_start, len))
.instrument(file_span)
.await?;
Ok(Box::pin(stream))

View file

@ -88,7 +88,7 @@ impl Store for ObjectStore {
let start = from_start.unwrap_or(0);
let end = len.map(|len| start + len - 1);
let request_span = tracing::info_span!(parent: None, "Get Object");
let request_span = tracing::trace_span!(parent: None, "Get Object");
// NOTE: isolating reqwest in it's own span is to prevent the request's span from getting
// smuggled into a long-lived task. Unfortunately, I am unable to create a minimal