diff --git a/Cargo.lock b/Cargo.lock index 55b794d..095ef2c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -763,18 +763,18 @@ checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457" [[package]] name = "encoding_rs" -version = "0.8.30" +version = "0.8.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7896dc8abb250ffdda33912550faa54c88ec8b998dec0b2c55ab224921ce11df" +checksum = "9852635589dc9f9ea1b6fe9f05b50ef208c85c834a562f0c6abb1c475736ec2b" dependencies = [ "cfg-if", ] [[package]] name = "eyre" -version = "0.6.7" +version = "0.6.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9289ed2c0440a6536e65119725cf91fc2c6b5e513bfd2e36e1134d7cca6ca12f" +checksum = "4c2b6b5a29c02cdc822728b7d7b8ae1bab3e3b05d44522770ddd49722eeac7eb" dependencies = [ "indenter", "once_cell", @@ -1261,9 +1261,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libc" -version = "0.2.121" +version = "0.2.122" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "efaa7b300f3b5fe8eb6bf21ce3895e1751d9665086af2d64b42f19701015ff4f" +checksum = "ec647867e2bf0772e28c8bcde4f0d19a9216916e890543b5a03ed8ef27b8f259" [[package]] name = "linked-hash-map" @@ -1783,9 +1783,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.36" +version = "1.0.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c7342d5883fbccae1cc37a2353b09c87c9b0f3afd73f5fb9bba687a1f733b029" +checksum = "ec757218438d5fda206afc041538b2f6d889286160d649a86a24d37e1235afd1" dependencies = [ "unicode-xid", ] @@ -2318,9 +2318,9 @@ checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601" [[package]] name = "syn" -version = "1.0.90" +version = "1.0.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "704df27628939572cd88d33f171cd6f896f4eaca85252c6e0a72d8d8287ee86f" +checksum = "b683b2b825c8eef438b77c36a06dc262294da3d5a5813fac20da149241dcd44d" dependencies = [ "proc-macro2", "quote", @@ -2651,9 +2651,9 @@ dependencies = [ [[package]] name = "tracing-awc" -version = "0.1.2" +version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b524ca012b208a8b5014b68da09573d6d43de93bb850c9303550ac2f5d896631" +checksum = "48723b87eafb89a0c219d5b2c1478711d69c8860503f79586a7e93231238bbb7" dependencies = [ "actix-http", "actix-service", diff --git a/Cargo.toml b/Cargo.toml index 4ed3baf..c2a9498 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -81,6 +81,6 @@ default-features = false features = ["opentelemetry_0_17"] [dependencies.tracing-awc] -version = "0.1.0" +version = "0.1.5" default-features = false features = ["opentelemetry_0_17"] diff --git a/README.md b/README.md index 0ab21fc..7b5e1ac 100644 --- a/README.md +++ b/README.md @@ -328,7 +328,7 @@ pict-rs offers the following endpoints: The following endpoints are protected by an API key via the `X-Api-Token` header, and are disabled -unless the `--api-key` option is passed to the binary or the PICTRS_SERVER__API_KEY environment variable is +unless the `--api-key` option is passed to the binary or the PICTRS__SERVER__API_KEY environment variable is set. A secure API key can be generated by any password generator. diff --git a/docker/object-storage/dev.sh b/docker/object-storage/dev.sh index 42261b6..422eb52 100755 --- a/docker/object-storage/dev.sh +++ b/docker/object-storage/dev.sh @@ -8,5 +8,7 @@ export GROUP_ID=$(id -g) sudo docker-compose build --pull sudo docker-compose up -d minio sudo docker-compose up -d pictrs_proxy +sudo docker-compose up -d otel +sudo docker-compose up -d jaeger sudo docker-compose run --service-ports --use-aliases pictrs sudo docker-compose down diff --git a/docker/object-storage/docker-compose.yml b/docker/object-storage/docker-compose.yml index 7ae3693..e8e9470 100644 --- a/docker/object-storage/docker-compose.yml +++ b/docker/object-storage/docker-compose.yml @@ -10,6 +10,8 @@ services: GID: "${GROUP_ID:-1000}" ports: - "8080:8080" + environment: + - PICTRS__TRACING__OPENTELEMETRY__URL=http://otel:4137 links: - "minio:pict-rs.minio" stdin_open: true @@ -33,3 +35,22 @@ services: - "9001:9001" volumes: - ./storage/minio:/mnt + + otel: + image: otel/opentelemetry-collector:latest + command: --config otel-local-config.yaml + volumes: + - type: bind + source: ./otel.yml + target: /otel-local-config.yaml + restart: always + depends_on: + - jaeger + + jaeger: + image: jaegertracing/all-in-one:1 + ports: + - "14250:14250" + # To view traces, visit http://localhost:16686 + - "16686:16686" + restart: always diff --git a/docker/object-storage/otel.yml b/docker/object-storage/otel.yml new file mode 100644 index 0000000..8270b08 --- /dev/null +++ b/docker/object-storage/otel.yml @@ -0,0 +1,25 @@ +receivers: + otlp: + protocols: + grpc: + endpoint: 0.0.0.0:4137 + +processors: + batch: + +exporters: + logging: + jaeger: + endpoint: jaeger:14250 + insecure: true + +service: + pipelines: + traces: + receivers: + - otlp + processors: + - batch + exporters: + - logging + - jaeger diff --git a/pict-rs.toml b/pict-rs.toml index b1b9311..5a3a0a1 100644 --- a/pict-rs.toml +++ b/pict-rs.toml @@ -1,12 +1,12 @@ ## Server configuration [server] ## Optional: pict-rs binding address -# environment variable: PICTRS_SERVER__ADDRESS +# environment variable: PICTRS__SERVER__ADDRESS # default: 0.0.0.0:8080 address = '0.0.0.0:8080' ## Optional: pict-rs worker id -# environment variable PICTRS_SERVER__WORKER_ID +# environment variable PICTRS__SERVER__WORKER_ID # default: pict-rs-1 # # This is used for the internal job queue. It will have more meaning once a shared metadata @@ -14,7 +14,7 @@ address = '0.0.0.0:8080' worker_id = 'pict-rs-1' ## Optional: shared secret for internal endpoints -# environment variable: PICTRS_SERVER__API_KEY +# environment variable: PICTRS__SERVER__API_KEY # default: empty # # Not specifying api_key disables internal endpoints @@ -24,14 +24,14 @@ api_key = 'API_KEY' ## Logging configuration [tracing.logging] ## Optional: log format -# environment variable: PICTRS_TRACING__LOGGING__FORMAT +# environment variable: PICTRS__TRACING__LOGGING__FORMAT # default: normal # # available options: compact, json, normal, pretty format = 'normal' ## Optional: log targets -# environment variable: PICTRS_TRACING__LOGGING__TARGETS +# environment variable: PICTRS__TRACING__LOGGING__TARGETS # default: warn,tracing_actix_web=info,actix_server=info,actix_web=info # # Dictates which traces should print to stdout @@ -42,7 +42,7 @@ targets = 'warn,tracing_actix_web=info,actix_server=info,actix_web=info' ## Console configuration [tracing.console] ## Optional: console address -# environment variable: PICTRS_TRACING__CONSOLE__ADDRESS +# environment variable: PICTRS__TRACING__CONSOLE__ADDRESS # default: empty # # Dictacts whether console should be enabled, and what address it should be exposed on. @@ -72,7 +72,7 @@ targets = 'warn,tracing_actix_web=info,actix_server=info,actix_web=info' address = '0.0.0.0:6669' ## Optional: console buffer capacity -# environment variable: PICTRS_TRACING__CONSOLE__BUFFER_CAPACITY +# environment variable: PICTRS__TRACING__CONSOLE__BUFFER_CAPACITY # default: 102400 # # This is the number of _events_ to buffer, not the number of bytes. In reality, the amount of @@ -83,7 +83,7 @@ buffer_capacity = 102400 ## OpenTelemetry configuration [tracing.opentelemetry] ## Optional: url for exporting otlp traces -# environment variable: PICTRS_TRACING__OPENTELEMETRY__URL +# environment variable: PICTRS__TRACING__OPENTELEMETRY__URL # default: empty # # Not specifying opentelemetry_url means no traces will be exported @@ -93,12 +93,12 @@ buffer_capacity = 102400 url = 'http://localhost:4317/' ## Optional: name to relate OpenTelemetry traces -# environment variable: PICTRS_TRACING__OPENTELEMETRY__SERVICE_NAME +# environment variable: PICTRS__TRACING__OPENTELEMETRY__SERVICE_NAME # default: pict-rs service_name = 'pict-rs' ## Optional: trace level to export -# environment variable: PICTRS_TRACING__OPENTELEMETRY__TARGETS +# environment variable: PICTRS__TRACING__OPENTELEMETRY__TARGETS # default: info # # Follows the same format as RUST_LOG @@ -108,7 +108,7 @@ targets = 'info' ## Configuration for migrating from pict-rs 0.2 [old_db] ## Optional: path to old pict-rs directory -# environment variable: PICTRS_OLD_DB__PATH +# environment variable: PICTRS__OLD_DB__PATH # default: /mnt path = '/mnt' @@ -116,46 +116,46 @@ path = '/mnt' ## Media Processing Configuration [media] ## Optional: max media width (in pixels) -# environment variable: PICTRS_MEDIA__MAX_WIDTH +# environment variable: PICTRS__MEDIA__MAX_WIDTH # default: 10,000 max_width = 10000 ## Optional: max media height (in pixels) -# environment variable: PICTRS_MEDIA__MAX_HEIGHT +# environment variable: PICTRS__MEDIA__MAX_HEIGHT # default: 10,000 max_height = 10000 ## Optional: max media area (in pixels) -# environment variable: PICTRS_MEDIA__MAX_AREA +# environment variable: PICTRS__MEDIA__MAX_AREA # default: 40,000,000 max_area = 40000000 ## Optional: max file size (in Megabytes) -# environment variable: PICTRS_MEDIA__MAX_FILE_SIZE +# environment variable: PICTRS__MEDIA__MAX_FILE_SIZE # default: 40 max_file_size = 40 ## Optional: enable GIF and MP4 uploads (without sound) -# environment variable: PICTRS_MEDIA__ENABLE_SILENT_VIDEO +# environment variable: PICTRS__MEDIA__ENABLE_SILENT_VIDEO # default: true # # Set this to false to serve static images only enable_silent_video = true ## Optional: set allowed filters for image processing -# environment variable: PICTRS_MEDIA__FILTERS +# environment variable: PICTRS__MEDIA__FILTERS # default: ['blur', 'crop', 'identity', 'resize', 'thumbnail'] filters = ['blur', 'crop', 'identity', 'resize', 'thumbnail'] ## Optional: whether to validate images uploaded through the `import` endpoint -# environment variable: PICTRS_MEDIA__SKIP_VALIDATE_IMPORTS +# environment variable: PICTRS__MEDIA__SKIP_VALIDATE_IMPORTS # default: false # # Set this to true if you want to avoid processing imported media skip_validate_imports = false ## Optional: The duration, in hours, to keep media ingested through the "cache" endpoint -# environment variable: PICTRS_MEDIA__CACHE_DURATION +# environment variable: PICTRS__MEDIA__CACHE_DURATION # default: 168 (1 week) cache_duration = 168 @@ -163,19 +163,19 @@ cache_duration = 168 ## Database configuration [repo] ## Optional: database backend to use -# environment variable: PICTRS_REPO__TYPE +# environment variable: PICTRS__REPO__TYPE # default: sled # # available options: sled type = 'sled' ## Optional: path to sled repository -# environment variable: PICTRS_REPO__PATH +# environment variable: PICTRS__REPO__PATH # default: /mnt/sled-repo path = '/mnt/sled-repo' ## Optional: in-memory cache capacity for sled data (in bytes) -# environment variable: PICTRS_REPO__CACHE_CAPACITY +# environment variable: PICTRS__REPO__CACHE_CAPACITY # default: 67,108,864 (1024 * 1024 * 64, or 64MB) cache_capacity = 67108864 @@ -183,39 +183,39 @@ cache_capacity = 67108864 ## Media storage configuration [store] ## Optional: type of media storage to use -# environment variable: PICTRS_STORE__TYPE +# environment variable: PICTRS__STORE__TYPE # default: filesystem # # available options: filesystem, object_storage type = 'object_storage' ## Required: object storage bucket name -# environment variable: PICTRS_STORE__BUCKET_NAME +# environment variable: PICTRS__STORE__BUCKET_NAME # default: empty bucket_name = 'BUCKET_NAME' ## Required: object storage region -# environment variable: PICTRS_STORE__REGION +# environment variable: PICTRS__STORE__REGION # default: empty region = 'REGION' ## Required: object storage access key -# environment variable: PICTRS_STORE__ACCESS_KEY +# environment variable: PICTRS__STORE__ACCESS_KEY # default: empty access_key = 'ACCESS_KEY' ## Required: object storage secret key -# environment variable: PICTRS_STORE__SECRET_KEY +# environment variable: PICTRS__STORE__SECRET_KEY # default: empty secret_key = 'SECRET_KEY' ## Optional: object storage security token -# environment variable: PICTRS_STORE__SECURITY_TOKEN +# environment variable: PICTRS__STORE__SECURITY_TOKEN # default: empty security_token = 'SECURITY_TOKEN' ## Optional: object storage session token -# environment variable: PICTRS_STORE__SESSION_TOKEN +# environment variable: PICTRS__STORE__SESSION_TOKEN # default: empty session_token = 'SESSION_TOKEN' @@ -223,13 +223,13 @@ session_token = 'SESSION_TOKEN' # ## Media storage configuration # [store] # ## Optional: type of media storage to use -# # environment variable: PICTRS_STORE__TYPE +# # environment variable: PICTRS__STORE__TYPE # # default: filesystem # # # # available options: filesystem, object_storage # type = 'filesystem' # # ## Optional: path to uploaded media -# # environment variable: PICTRS_STORE__PATH +# # environment variable: PICTRS__STORE__PATH # # default: /mnt/files # path = '/mnt/files' diff --git a/src/exiftool.rs b/src/exiftool.rs index 90ff0ac..b6060b7 100644 --- a/src/exiftool.rs +++ b/src/exiftool.rs @@ -2,8 +2,9 @@ use crate::process::Process; use actix_web::web::Bytes; use tokio::io::AsyncRead; +#[tracing::instrument(name = "Clearing metadata", skip(input))] pub(crate) fn clear_metadata_bytes_read(input: Bytes) -> std::io::Result { let process = Process::run("exiftool", &["-all=", "-", "-out", "-"])?; - Ok(process.bytes_read(input).unwrap()) + Ok(process.bytes_read(input)) } diff --git a/src/ffmpeg.rs b/src/ffmpeg.rs index 3dd9124..101fa29 100644 --- a/src/ffmpeg.rs +++ b/src/ffmpeg.rs @@ -50,6 +50,7 @@ impl ThumbnailFormat { } } +#[tracing::instrument(name = "Convert to Mp4", skip(input))] pub(crate) async fn to_mp4_bytes( input: Bytes, input_format: InputFormat, diff --git a/src/generate.rs b/src/generate.rs index bffac6f..3bd9467 100644 --- a/src/generate.rs +++ b/src/generate.rs @@ -11,6 +11,7 @@ use actix_web::web::Bytes; use std::path::PathBuf; use tokio::io::AsyncReadExt; +#[tracing::instrument(skip(hash))] pub(crate) async fn generate( repo: &R, store: &S, @@ -36,6 +37,7 @@ pub(crate) async fn generate( Ok((details, bytes)) } +#[tracing::instrument(skip(hash))] async fn process( repo: &R, store: &S, @@ -45,7 +47,9 @@ async fn process( thumbnail_args: Vec, hash: R::Bytes, ) -> Result<(Details, Bytes), Error> { - let permit = crate::PROCESS_SEMAPHORE.acquire().await?; + let permit = tracing::trace_span!(parent: None, "Aquire semaphore") + .in_scope(|| crate::PROCESS_SEMAPHORE.acquire()) + .await; let identifier = if let Some(identifier) = repo .still_identifier_from_alias::(&alias) diff --git a/src/ingest.rs b/src/ingest.rs index 0bd0a2f..a052653 100644 --- a/src/ingest.rs +++ b/src/ingest.rs @@ -8,11 +8,11 @@ use crate::{ use actix_web::web::{Bytes, BytesMut}; use futures_util::{Stream, StreamExt}; use sha2::{Digest, Sha256}; -use tracing::debug; mod hasher; use hasher::Hasher; +#[derive(Debug)] pub(crate) struct Session where R: FullRepo + 'static, @@ -24,6 +24,33 @@ where identifier: Option, } +#[tracing::instrument(name = "Aggregate", skip(stream))] +async fn aggregate(stream: S) -> Result +where + S: Stream>, +{ + futures_util::pin_mut!(stream); + + let mut buf = Vec::new(); + tracing::debug!("Reading stream to memory"); + while let Some(res) = stream.next().await { + let bytes = res?; + buf.push(bytes); + } + + let total_len = buf.iter().fold(0, |acc, item| acc + item.len()); + + let bytes_mut = buf + .iter() + .fold(BytesMut::with_capacity(total_len), |mut acc, item| { + acc.extend_from_slice(item); + acc + }); + + Ok(bytes_mut.freeze()) +} + +#[tracing::instrument(name = "Ingest", skip(stream))] pub(crate) async fn ingest( repo: &R, store: &S, @@ -35,21 +62,15 @@ where R: FullRepo + 'static, S: Store, { - let permit = crate::PROCESS_SEMAPHORE.acquire().await; + let permit = tracing::trace_span!(parent: None, "Aquire semaphore") + .in_scope(|| crate::PROCESS_SEMAPHORE.acquire()) + .await; - let mut bytes_mut = BytesMut::new(); + let bytes = aggregate(stream).await?; - futures_util::pin_mut!(stream); - - debug!("Reading stream to memory"); - while let Some(res) = stream.next().await { - let bytes = res?; - bytes_mut.extend_from_slice(&bytes); - } - - debug!("Validating bytes"); + tracing::debug!("Validating bytes"); let (input_type, validated_reader) = crate::validate::validate_image_bytes( - bytes_mut.freeze(), + bytes, CONFIG.media.format, CONFIG.media.enable_silent_video, should_validate, @@ -73,12 +94,8 @@ where session.hash = Some(hash.clone()); - debug!("Saving upload"); - save_upload(repo, store, &hash, &identifier).await?; - debug!("Adding alias"); - if let Some(alias) = declared_alias { session.add_existing_alias(&hash, alias).await? } else { @@ -88,6 +105,7 @@ where Ok(session) } +#[tracing::instrument] async fn save_upload( repo: &R, store: &S, @@ -124,25 +142,27 @@ where self.alias.as_ref() } + #[tracing::instrument] pub(crate) async fn delete_token(&self) -> Result { let alias = self.alias.clone().ok_or(UploadError::MissingAlias)?; - debug!("Generating delete token"); + tracing::debug!("Generating delete token"); let delete_token = DeleteToken::generate(); - debug!("Saving delete token"); + tracing::debug!("Saving delete token"); let res = self.repo.relate_delete_token(&alias, &delete_token).await?; if res.is_err() { let delete_token = self.repo.delete_token(&alias).await?; - debug!("Returning existing delete token, {:?}", delete_token); + tracing::debug!("Returning existing delete token, {:?}", delete_token); return Ok(delete_token); } - debug!("Returning new delete token, {:?}", delete_token); + tracing::debug!("Returning new delete token, {:?}", delete_token); Ok(delete_token) } + #[tracing::instrument] async fn add_existing_alias(&mut self, hash: &[u8], alias: Alias) -> Result<(), Error> { AliasRepo::create(&self.repo, &alias) .await? @@ -156,8 +176,9 @@ where Ok(()) } + #[tracing::instrument] async fn create_alias(&mut self, hash: &[u8], input_type: ValidInputType) -> Result<(), Error> { - debug!("Alias gen loop"); + tracing::debug!("Alias gen loop"); loop { let alias = Alias::generate(input_type.as_ext().to_string()); @@ -171,7 +192,7 @@ where return Ok(()); } - debug!("Alias exists, regenerating"); + tracing::debug!("Alias exists, regenerating"); } } } diff --git a/src/init_tracing.rs b/src/init_tracing.rs index c586b41..80cde39 100644 --- a/src/init_tracing.rs +++ b/src/init_tracing.rs @@ -41,8 +41,8 @@ where .with(ErrorLayer::default()); if let Some(address) = tracing.console.address { + println!("Starting console on {}", address); let console_layer = ConsoleLayer::builder() - .with_default_env() .event_buffer_capacity(tracing.console.buffer_capacity) .server_addr(address) .spawn(); diff --git a/src/magick.rs b/src/magick.rs index e0c1c0d..fdeba2a 100644 --- a/src/magick.rs +++ b/src/magick.rs @@ -79,12 +79,14 @@ pub(crate) struct Details { pub(crate) height: usize, } +#[tracing::instrument(name = "Clear Metadata", skip(input))] pub(crate) fn clear_metadata_bytes_read(input: Bytes) -> std::io::Result { let process = Process::run("magick", &["convert", "-", "-strip", "-"])?; - Ok(process.bytes_read(input).unwrap()) + Ok(process.bytes_read(input)) } +#[tracing::instrument(name = "Convert", skip(input))] pub(crate) fn convert_bytes_read( input: Bytes, format: ImageFormat, @@ -99,7 +101,7 @@ pub(crate) fn convert_bytes_read( ], )?; - Ok(process.bytes_read(input).unwrap()) + Ok(process.bytes_read(input)) } #[instrument(name = "Getting details from input bytes", skip(input))] @@ -130,7 +132,7 @@ pub(crate) async fn details_bytes( &["identify", "-ping", "-format", "%w %h | %m\n", &last_arg], )?; - let mut reader = process.bytes_read(input).unwrap(); + let mut reader = process.bytes_read(input); let mut bytes = Vec::new(); reader.read_to_end(&mut bytes).await?; @@ -170,7 +172,7 @@ pub(crate) async fn details_store( &["identify", "-ping", "-format", "%w %h | %m\n", &last_arg], )?; - let mut reader = process.store_read(store, identifier).unwrap(); + let mut reader = process.store_read(store, identifier); let mut output = Vec::new(); reader.read_to_end(&mut output).await?; @@ -187,7 +189,7 @@ pub(crate) async fn details_file(path_str: &str) -> Result { &["identify", "-ping", "-format", "%w %h | %m\n", path_str], )?; - let mut reader = process.read().unwrap(); + let mut reader = process.read(); let mut output = Vec::new(); reader.read_to_end(&mut output).await?; @@ -272,7 +274,7 @@ pub(crate) fn process_image_store_read( .arg(last_arg), )?; - Ok(process.store_read(store, identifier).unwrap()) + Ok(process.store_read(store, identifier)) } impl Details { diff --git a/src/process.rs b/src/process.rs index 4d5b628..b6337cd 100644 --- a/src/process.rs +++ b/src/process.rs @@ -12,15 +12,18 @@ use tokio::{ process::{Child, Command}, sync::oneshot::{channel, Receiver}, }; -use tracing::Instrument; -use tracing::Span; #[derive(Debug)] struct StatusError; pub(crate) struct Process { child: Child, - span: Span, +} + +impl std::fmt::Debug for Process { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Process").field("child", &"Child").finish() + } } struct DropHandle { @@ -31,7 +34,6 @@ pin_project_lite::pin_project! { struct ProcessRead { #[pin] inner: I, - span: Span, err_recv: Receiver, err_closed: bool, handle: DropHandle, @@ -39,30 +41,19 @@ pin_project_lite::pin_project! { } impl Process { + #[tracing::instrument] pub(crate) fn run(command: &str, args: &[&str]) -> std::io::Result { Self::spawn(Command::new(command).args(args)) } - fn spawn_span(&self) -> Span { - let span = tracing::info_span!(parent: None, "Spawned command writer",); - - span.follows_from(self.span.clone()); - - span - } - + #[tracing::instrument] pub(crate) fn spawn(cmd: &mut Command) -> std::io::Result { let cmd = cmd.stdin(Stdio::piped()).stdout(Stdio::piped()); - let span = tracing::info_span!( - "Spawning Command", - command = &tracing::field::debug(&cmd), - exception.message = &tracing::field::Empty, - exception.details = &tracing::field::Empty, - ); - cmd.spawn().map(|child| Process { child, span }) + cmd.spawn().map(|child| Process { child }) } + #[tracing::instrument] pub(crate) async fn wait(mut self) -> std::io::Result<()> { let status = self.child.wait().await?; if !status.success() { @@ -71,16 +62,16 @@ impl Process { Ok(()) } - pub(crate) fn bytes_read(mut self, mut input: Bytes) -> Option { - let mut stdin = self.child.stdin.take()?; - let stdout = self.child.stdout.take()?; + #[tracing::instrument(skip(input))] + pub(crate) fn bytes_read(mut self, mut input: Bytes) -> impl AsyncRead + Unpin { + let mut stdin = self.child.stdin.take().expect("stdin exists"); + let stdout = self.child.stdout.take().expect("stdout exists"); let (tx, rx) = channel::(); - let span = self.spawn_span(); let mut child = self.child; - let handle = actix_rt::spawn( - async move { + let handle = tracing::trace_span!(parent: None, "Spawn").in_scope(|| { + actix_rt::spawn(async move { if let Err(e) = stdin.write_all_buf(&mut input).await { let _ = tx.send(e); return; @@ -98,94 +89,84 @@ impl Process { let _ = tx.send(e); } } - } - .instrument(span), - ); + }) + }); - Some(ProcessRead { + ProcessRead { inner: stdout, - span: self.span, err_recv: rx, err_closed: false, handle: DropHandle { inner: handle }, - }) + } } - pub(crate) fn read(mut self) -> Option { - let stdout = self.child.stdout.take()?; + #[tracing::instrument] + pub(crate) fn read(mut self) -> impl AsyncRead + Unpin { + let stdout = self.child.stdout.take().expect("stdout exists"); let (tx, rx) = channel(); - let span = self.spawn_span(); let mut child = self.child; - let handle = actix_rt::spawn( - async move { - match child.wait().await { - Ok(status) => { - if !status.success() { - let _ = tx - .send(std::io::Error::new(std::io::ErrorKind::Other, &StatusError)); - } - } - Err(e) => { - let _ = tx.send(e); + let handle = actix_rt::spawn(async move { + match child.wait().await { + Ok(status) => { + if !status.success() { + let _ = + tx.send(std::io::Error::new(std::io::ErrorKind::Other, &StatusError)); } } + Err(e) => { + let _ = tx.send(e); + } } - .instrument(span), - ); + }); - Some(ProcessRead { + ProcessRead { inner: stdout, - span: self.span, err_recv: rx, err_closed: false, handle: DropHandle { inner: handle }, - }) + } } + #[tracing::instrument] pub(crate) fn store_read( mut self, store: S, identifier: S::Identifier, - ) -> Option { - let mut stdin = self.child.stdin.take()?; - let stdout = self.child.stdout.take()?; + ) -> impl AsyncRead + Unpin { + let mut stdin = self.child.stdin.take().expect("stdin exists"); + let stdout = self.child.stdout.take().expect("stdout exists"); let (tx, rx) = channel(); - let span = self.spawn_span(); let mut child = self.child; - let handle = actix_rt::spawn( - async move { - if let Err(e) = store.read_into(&identifier, &mut stdin).await { - let _ = tx.send(e); - return; - } - drop(stdin); + let handle = actix_rt::spawn(async move { + if let Err(e) = store.read_into(&identifier, &mut stdin).await { + let _ = tx.send(e); + return; + } + drop(stdin); - match child.wait().await { - Ok(status) => { - if !status.success() { - let _ = tx - .send(std::io::Error::new(std::io::ErrorKind::Other, &StatusError)); - } - } - Err(e) => { - let _ = tx.send(e); + match child.wait().await { + Ok(status) => { + if !status.success() { + let _ = + tx.send(std::io::Error::new(std::io::ErrorKind::Other, &StatusError)); } } + Err(e) => { + let _ = tx.send(e); + } } - .instrument(span), - ); + }); - Some(ProcessRead { + ProcessRead { inner: stdout, - span: self.span, err_recv: rx, err_closed: false, handle: DropHandle { inner: handle }, - }) + } } } @@ -200,37 +181,20 @@ where ) -> Poll> { let this = self.as_mut().project(); - let span = this.span; let err_recv = this.err_recv; let err_closed = this.err_closed; let inner = this.inner; - span.in_scope(|| { - if !*err_closed { - if let Poll::Ready(res) = Pin::new(err_recv).poll(cx) { - *err_closed = true; - if let Ok(err) = res { - let display = format!("{}", err); - let debug = format!("{:?}", err); - span.record("exception.message", &display.as_str()); - span.record("exception.details", &debug.as_str()); - return Poll::Ready(Err(err)); - } + if !*err_closed { + if let Poll::Ready(res) = Pin::new(err_recv).poll(cx) { + *err_closed = true; + if let Ok(err) = res { + return Poll::Ready(Err(err)); } } + } - if let Poll::Ready(res) = inner.poll_read(cx, buf) { - if let Err(err) = &res { - let display = format!("{}", err); - let debug = format!("{:?}", err); - span.record("exception.message", &display.as_str()); - span.record("exception.details", &debug.as_str()); - } - return Poll::Ready(res); - } - - Poll::Pending - }) + inner.poll_read(cx, buf) } } diff --git a/src/store/file_store.rs b/src/store/file_store.rs index dede6f3..a93dee8 100644 --- a/src/store/file_store.rs +++ b/src/store/file_store.rs @@ -12,7 +12,7 @@ use std::{ }; use storage_path_generator::Generator; use tokio::io::{AsyncRead, AsyncWrite}; -use tracing::{debug, error, instrument}; +use tracing::{debug, error, instrument, Instrument}; mod file_id; pub(crate) use file_id::FileId; @@ -88,9 +88,15 @@ impl Store for FileStore { ) -> Result { let path = self.path_from_file_id(identifier); - let stream = File::open(path) - .await? - .read_to_stream(from_start, len) + let file_span = tracing::trace_span!(parent: None, "File Stream"); + let file = file_span + .in_scope(|| File::open(path)) + .instrument(file_span.clone()) + .await?; + + let stream = file_span + .in_scope(|| file.read_to_stream(from_start, len)) + .instrument(file_span) .await?; Ok(Box::pin(stream)) diff --git a/src/store/object_store.rs b/src/store/object_store.rs index 41bf188..c923db1 100644 --- a/src/store/object_store.rs +++ b/src/store/object_store.rs @@ -88,7 +88,7 @@ impl Store for ObjectStore { let start = from_start.unwrap_or(0); let end = len.map(|len| start + len - 1); - let request_span = tracing::info_span!(parent: None, "Get Object"); + let request_span = tracing::trace_span!(parent: None, "Get Object"); // NOTE: isolating reqwest in it's own span is to prevent the request's span from getting // smuggled into a long-lived task. Unfortunately, I am unable to create a minimal