diff --git a/src/file.rs b/src/file.rs index ba7ae6bd..9b958fcf 100644 --- a/src/file.rs +++ b/src/file.rs @@ -48,6 +48,8 @@ mod tokio_file { let mut stream = stream.into_streamer(); while let Some(res) = stream.next().await { + tracing::trace!("write_from_stream: looping"); + let mut bytes = res?; self.inner.write_all_buf(&mut bytes).await?; @@ -158,6 +160,8 @@ mod io_uring { let mut cursor: u64 = 0; loop { + tracing::trace!("write_from_bytes: looping"); + if cursor == len { break; } @@ -189,12 +193,16 @@ mod io_uring { let mut cursor: u64 = 0; while let Some(res) = stream.next().await { + tracing::trace!("write_from_stream while: looping"); + let mut buf = res?; let len = buf.len(); let mut position = 0; loop { + tracing::trace!("write_from_stream: looping"); + if position == len { break; } @@ -234,6 +242,8 @@ mod io_uring { let mut cursor: u64 = 0; loop { + tracing::trace!("write_from_async_read: looping"); + let max_size = 65_536; let mut buf = Vec::with_capacity(max_size.try_into().unwrap()); @@ -246,6 +256,8 @@ mod io_uring { let mut position = 0; loop { + tracing::trace!("write_from_async_read: looping inner"); + if position == n { break; } @@ -288,6 +300,8 @@ mod io_uring { let mut cursor: u64 = 0; loop { + tracing::trace!("read_to_async_write: looping"); + if cursor == size { break; } @@ -343,6 +357,8 @@ mod io_uring { let mut bytes = BytesMut::new(); loop { + tracing::trace!("bytes_stream: looping"); + let max_size = read_until.saturating_sub(cursor); if max_size == 0 { diff --git a/src/ingest.rs b/src/ingest.rs index 4dc2d257..eba10791 100644 --- a/src/ingest.rs +++ b/src/ingest.rs @@ -40,6 +40,8 @@ where let mut stream = stream.into_streamer(); while let Some(res) = stream.next().await { + tracing::trace!("aggregate: looping"); + buf.add_bytes(res?); } @@ -269,6 +271,8 @@ impl Session { #[tracing::instrument(level = "debug", skip(self, hash))] async fn create_alias(&mut self, hash: Hash, input_type: InternalFormat) -> Result<(), Error> { loop { + tracing::trace!("create_alias: looping"); + let alias = Alias::generate(input_type.file_extension().to_string()); if self @@ -281,8 +285,6 @@ impl Session { return Ok(()); } - - tracing::trace!("Alias exists, regenerating"); } } } diff --git a/src/lib.rs b/src/lib.rs index 0365f1ab..4ca65518 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1418,6 +1418,8 @@ where let mut streamer = stream.into_streamer(); while let Some(res) = streamer.next().await { + tracing::trace!("srv_response: looping"); + let item = res.map_err(Error::from)??; yielder.yield_ok(item).await; } @@ -1790,6 +1792,8 @@ fn spawn_cleanup(repo: ArcRepo, config: &Configuration) { let mut interval = tokio::time::interval(Duration::from_secs(30)); loop { + tracing::trace!("queue_cleanup: looping"); + interval.tick().await; if let Err(e) = queue::cleanup_outdated_variants(&repo).await { diff --git a/src/middleware/payload.rs b/src/middleware/payload.rs index 0af71aee..765a7676 100644 --- a/src/middleware/payload.rs +++ b/src/middleware/payload.rs @@ -19,21 +19,29 @@ async fn drain(rx: flume::Receiver) { let mut set = JoinSet::new(); while let Ok(payload) = rx.recv_async().await { + tracing::trace!("drain: looping"); + // draining a payload is a best-effort task - if we can't collect in 2 minutes we bail set.spawn_local(tokio::time::timeout(Duration::from_secs(120), async move { let mut streamer = payload.into_streamer(); - while streamer.next().await.is_some() {} + while streamer.next().await.is_some() { + tracing::trace!("drain drop bytes: looping"); + } })); let mut count = 0; // drain completed tasks while set.join_next().now_or_never().is_some() { + tracing::trace!("drain join now: looping"); + count += 1; } // if we're past the limit, wait for completions while set.len() > LIMIT { + tracing::trace!("drain join await: looping"); + if set.join_next().await.is_some() { count += 1; } @@ -45,7 +53,9 @@ async fn drain(rx: flume::Receiver) { } // drain set - while set.join_next().await.is_some() {} + while set.join_next().await.is_some() { + tracing::trace!("drain join await cleanup: looping"); + } } #[derive(Clone)] diff --git a/src/migrate_store.rs b/src/migrate_store.rs index 661ae61d..4cb04d39 100644 --- a/src/migrate_store.rs +++ b/src/migrate_store.rs @@ -136,6 +136,8 @@ where let mut joinset = tokio::task::JoinSet::new(); while let Some(hash) = stream.next().await { + tracing::trace!("do_migrate_store: looping"); + let hash = hash?; if joinset.len() >= concurrency { @@ -149,6 +151,8 @@ where } while let Some(res) = joinset.join_next().await { + tracing::trace!("do_migrate_store: join looping"); + res.map_err(|_| UploadError::Canceled)??; } @@ -383,6 +387,8 @@ where let mut failure_count = 0; loop { + tracing::trace!("migrate_file: looping"); + match do_migrate_file(tmp_dir, repo, from, to, identifier, timeout).await { Ok(identifier) => return Ok(identifier), Err(MigrateError::From(e)) if e.is_not_found() && skip_missing_files => { diff --git a/src/queue.rs b/src/queue.rs index d543a429..d976aa39 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -207,6 +207,8 @@ async fn process_jobs( let worker_id = uuid::Uuid::new_v4(); loop { + tracing::trace!("process_jobs: looping"); + let res = job_loop(repo, store, config, worker_id, queue, callback).await; if let Err(e) = res { @@ -274,6 +276,8 @@ where + Copy, { loop { + tracing::trace!("job_loop: looping"); + let fut = async { let (job_id, job) = repo.pop(queue, worker_id).await?; @@ -334,6 +338,8 @@ async fn process_image_jobs( let worker_id = uuid::Uuid::new_v4(); loop { + tracing::trace!("process_image_jobs: looping"); + let res = image_job_loop( tmp_dir, repo, @@ -388,6 +394,8 @@ where + Copy, { loop { + tracing::trace!("image_job_loop: looping"); + let fut = async { let (job_id, job) = repo.pop(queue, worker_id).await?; @@ -439,6 +447,8 @@ where let mut hb = None; loop { + tracing::trace!("heartbeat: looping"); + tokio::select! { output = &mut fut => { return output; diff --git a/src/queue/cleanup.rs b/src/queue/cleanup.rs index 6c497bcc..07540ed8 100644 --- a/src/queue/cleanup.rs +++ b/src/queue/cleanup.rs @@ -144,6 +144,8 @@ async fn all_variants(repo: &ArcRepo) -> Result<(), Error> { let mut hash_stream = hash_stream.into_streamer(); while let Some(res) = hash_stream.next().await { + tracing::trace!("all_variants: looping"); + let hash = res?; super::cleanup_variants(repo, hash, None).await?; } @@ -159,6 +161,8 @@ async fn outdated_variants(repo: &ArcRepo, config: &Configuration) -> Result<(), let mut variant_stream = repo.older_variants(since).await?.into_streamer(); while let Some(res) = variant_stream.next().await { + tracing::trace!("outdated_variants: looping"); + let (hash, variant) = res?; super::cleanup_variants(repo, hash, Some(variant)).await?; } @@ -174,6 +178,8 @@ async fn outdated_proxies(repo: &ArcRepo, config: &Configuration) -> Result<(), let mut alias_stream = repo.older_aliases(since).await?.into_streamer(); while let Some(res) = alias_stream.next().await { + tracing::trace!("outdated_proxies: looping"); + let alias = res?; if let Some(token) = repo.delete_token(&alias).await? { super::cleanup_alias(repo, alias, token).await?; @@ -229,6 +235,8 @@ where let mut count: u64 = 0; while let Some(hash) = hash_stream.try_next().await? { + tracing::trace!("prune: looping"); + let repo = repo.clone(); let store = store.clone(); diff --git a/src/repo.rs b/src/repo.rs index c226307c..eaada7ae 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -558,6 +558,8 @@ impl dyn FullRepo { let mut slug = None; loop { + tracing::trace!("hashes_stream: looping"); + let page = repo.hash_page(slug, 100).await?; slug = page.next(); diff --git a/src/repo/migrate.rs b/src/repo/migrate.rs index 5f32fbad..8dc39d06 100644 --- a/src/repo/migrate.rs +++ b/src/repo/migrate.rs @@ -43,6 +43,8 @@ pub(crate) async fn migrate_repo(old_repo: ArcRepo, new_repo: ArcRepo) -> Result let mut index = 0; while let Some(res) = hash_stream.next().await { + tracing::trace!("migrate_repo: looping"); + if let Ok(hash) = res { migrate_hash(old_repo.clone(), new_repo.clone(), hash).await; } else { @@ -108,6 +110,8 @@ pub(crate) async fn migrate_04( let mut index = 0; while let Some(res) = hash_stream.next().await { + tracing::trace!("migrate_04: looping"); + if let Ok(hash) = res { set.spawn_local(migrate_hash_04( tmp_dir.clone(), @@ -122,6 +126,8 @@ pub(crate) async fn migrate_04( } while set.len() >= config.upgrade.concurrency { + tracing::trace!("migrate_04: join looping"); + if set.join_next().await.is_some() { index += 1; @@ -135,6 +141,8 @@ pub(crate) async fn migrate_04( } while set.join_next().await.is_some() { + tracing::trace!("migrate_04: cleanup join looping"); + index += 1; if index % pct == 0 { @@ -165,6 +173,8 @@ async fn migrate_hash(old_repo: ArcRepo, new_repo: ArcRepo, hash: Hash) { let mut hash_failures = 0; while let Err(e) = do_migrate_hash(&old_repo, &new_repo, hash.clone()).await { + tracing::trace!("migrate_hash: looping"); + hash_failures += 1; if hash_failures > 10 { diff --git a/src/repo/postgres.rs b/src/repo/postgres.rs index 85c20d02..ccb3aff8 100644 --- a/src/repo/postgres.rs +++ b/src/repo/postgres.rs @@ -364,6 +364,7 @@ async fn delegate_notifications( let upload_notifier_state = UploadNotifierState { inner: &inner }; while let Ok(notification) = receiver.recv_async().await { + tracing::trace!("delegate_notifications: looping"); metrics::counter!("pict-rs.postgres.notification").increment(1); match notification.channel() { @@ -418,6 +419,8 @@ fn spawn_db_notification_task( ) { crate::sync::spawn("postgres-notifications", async move { while let Some(res) = std::future::poll_fn(|cx| conn.poll_message(cx)).await { + tracing::trace!("db_notification_task: looping"); + match res { Err(e) => { tracing::error!("Database Connection {e:?}"); @@ -1138,6 +1141,8 @@ impl QueueRepo for PostgresRepo { use schema::job_queue::dsl::*; loop { + tracing::trace!("pop: looping"); + let mut conn = self.get_connection().await?; let notifier: Arc = self @@ -1667,6 +1672,8 @@ impl UploadRepo for PostgresRepo { let interest = self.inner.interest(upload_id); loop { + tracing::trace!("wait: looping"); + let interest_future = interest.notified_timeout(Duration::from_secs(5)); let mut conn = self.get_connection().await?; @@ -1788,6 +1795,8 @@ where { streem::try_from_fn(|yielder| async move { loop { + tracing::trace!("page_stream: looping"); + let mut page = (next)(inner.clone(), older_than).await?; if let Some((last_time, last_item)) = page.pop() { diff --git a/src/repo/sled.rs b/src/repo/sled.rs index 3400f0ae..e8d424f4 100644 --- a/src/repo/sled.rs +++ b/src/repo/sled.rs @@ -522,6 +522,8 @@ impl UploadRepo for SledRepo { } while let Some(event) = (&mut subscriber).await { + tracing::trace!("wait: looping"); + match event { sled::Event::Remove { .. } => { return Err(RepoError::AlreadyClaimed); @@ -679,6 +681,8 @@ impl QueueRepo for SledRepo { let now = time::OffsetDateTime::now_utc(); loop { + tracing::trace!("pop: looping"); + let queue = self.queue.clone(); let job_state = self.job_state.clone(); diff --git a/src/store.rs b/src/store.rs index 636f86e7..b9e435f5 100644 --- a/src/store.rs +++ b/src/store.rs @@ -132,6 +132,8 @@ pub(crate) trait Store: Clone + Debug { .into_streamer(); while let Some(bytes) = streamer.try_next().await.map_err(StoreError::ReadStream)? { + tracing::trace!("to_bytes: looping"); + buf.add_bytes(bytes); } diff --git a/src/store/file_store.rs b/src/store/file_store.rs index a38f3533..378473e6 100644 --- a/src/store/file_store.rs +++ b/src/store/file_store.rs @@ -236,6 +236,8 @@ impl FileStore { async fn try_remove_parents(&self, mut path: &Path) { while let Some(parent) = path.parent() { + tracing::trace!("try_remove_parents: looping"); + if parent.ends_with(&self.root_dir) { return; } diff --git a/src/store/object_store.rs b/src/store/object_store.rs index 53f3c820..6c817540 100644 --- a/src/store/object_store.rs +++ b/src/store/object_store.rs @@ -177,6 +177,8 @@ where let mut stream = stream.into_streamer(); while buf.len() < CHUNK_SIZE { + tracing::trace!("read_chunk: looping"); + if let Some(bytes) = stream.try_next().await? { buf.add_bytes(bytes) } else { @@ -284,6 +286,8 @@ impl Store for ObjectStore { let mut futures = Vec::new(); while !complete { + tracing::trace!("save_stream: looping"); + part_number += 1; let buf = if let Some(buf) = first_chunk.take() { @@ -459,6 +463,8 @@ impl Store for ObjectStore { let mut stream = stream.into_streamer(); while let Some(res) = stream.next().await { + tracing::trace!("read_into: looping"); + let mut bytes = res.map_err(payload_to_io_error)?; writer.write_all_buf(&mut bytes).await?; } diff --git a/src/stream.rs b/src/stream.rs index e5aa7d6f..0b2e57d4 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -16,6 +16,8 @@ where let mut streamer = stream.into_streamer(); while let Some(item) = streamer.next().await { + tracing::trace!("metrics: looping"); + yielder.yield_(item).await; } } @@ -35,6 +37,8 @@ where let mut streamer = stream.into_streamer(); while let Some(res) = streamer.next().await { + tracing::trace!("make send tx: looping"); + if tx.send_async(res).await.is_err() { break; } @@ -45,6 +49,8 @@ where let mut stream = rx.into_stream().into_streamer(); while let Some(res) = stream.next().await { + tracing::trace!("make send rx: looping"); + yiedler.yield_(res).await; } @@ -71,6 +77,8 @@ where let mut stream = rx.into_stream().into_streamer(); while let Some(res) = stream.next().await { + tracing::trace!("from_iterator: looping"); + yielder.yield_(res).await; } @@ -89,6 +97,8 @@ where let mut streamer = stream.into_streamer(); while let Some(res) = streamer.next().await { + tracing::trace!("map: looping"); + yielder.yield_((f)(res)).await; } }) @@ -153,6 +163,8 @@ where let mut streamer = stream.into_streamer(); while let Some(res) = streamer.next().await { + tracing::trace!("timeout: looping"); + yielder.yield_ok(res).await; } }) @@ -173,6 +185,8 @@ where let mut count = 0; while let Some(bytes) = streamer.try_next().await? { + tracing::trace!("limit: looping"); + count += bytes.len(); if count > limit {