Add trace-level logs to every loop (paranoid)

This commit is contained in:
asonix 2023-12-28 11:58:38 -06:00
parent db0464aa27
commit bfc2410552
15 changed files with 109 additions and 4 deletions

View file

@ -48,6 +48,8 @@ mod tokio_file {
let mut stream = stream.into_streamer();
while let Some(res) = stream.next().await {
tracing::trace!("write_from_stream: looping");
let mut bytes = res?;
self.inner.write_all_buf(&mut bytes).await?;
@ -158,6 +160,8 @@ mod io_uring {
let mut cursor: u64 = 0;
loop {
tracing::trace!("write_from_bytes: looping");
if cursor == len {
break;
}
@ -189,12 +193,16 @@ mod io_uring {
let mut cursor: u64 = 0;
while let Some(res) = stream.next().await {
tracing::trace!("write_from_stream while: looping");
let mut buf = res?;
let len = buf.len();
let mut position = 0;
loop {
tracing::trace!("write_from_stream: looping");
if position == len {
break;
}
@ -234,6 +242,8 @@ mod io_uring {
let mut cursor: u64 = 0;
loop {
tracing::trace!("write_from_async_read: looping");
let max_size = 65_536;
let mut buf = Vec::with_capacity(max_size.try_into().unwrap());
@ -246,6 +256,8 @@ mod io_uring {
let mut position = 0;
loop {
tracing::trace!("write_from_async_read: looping inner");
if position == n {
break;
}
@ -288,6 +300,8 @@ mod io_uring {
let mut cursor: u64 = 0;
loop {
tracing::trace!("read_to_async_write: looping");
if cursor == size {
break;
}
@ -343,6 +357,8 @@ mod io_uring {
let mut bytes = BytesMut::new();
loop {
tracing::trace!("bytes_stream: looping");
let max_size = read_until.saturating_sub(cursor);
if max_size == 0 {

View file

@ -40,6 +40,8 @@ where
let mut stream = stream.into_streamer();
while let Some(res) = stream.next().await {
tracing::trace!("aggregate: looping");
buf.add_bytes(res?);
}
@ -269,6 +271,8 @@ impl Session {
#[tracing::instrument(level = "debug", skip(self, hash))]
async fn create_alias(&mut self, hash: Hash, input_type: InternalFormat) -> Result<(), Error> {
loop {
tracing::trace!("create_alias: looping");
let alias = Alias::generate(input_type.file_extension().to_string());
if self
@ -281,8 +285,6 @@ impl Session {
return Ok(());
}
tracing::trace!("Alias exists, regenerating");
}
}
}

View file

@ -1418,6 +1418,8 @@ where
let mut streamer = stream.into_streamer();
while let Some(res) = streamer.next().await {
tracing::trace!("srv_response: looping");
let item = res.map_err(Error::from)??;
yielder.yield_ok(item).await;
}
@ -1790,6 +1792,8 @@ fn spawn_cleanup(repo: ArcRepo, config: &Configuration) {
let mut interval = tokio::time::interval(Duration::from_secs(30));
loop {
tracing::trace!("queue_cleanup: looping");
interval.tick().await;
if let Err(e) = queue::cleanup_outdated_variants(&repo).await {

View file

@ -19,21 +19,29 @@ async fn drain(rx: flume::Receiver<actix_web::dev::Payload>) {
let mut set = JoinSet::new();
while let Ok(payload) = rx.recv_async().await {
tracing::trace!("drain: looping");
// draining a payload is a best-effort task - if we can't collect in 2 minutes we bail
set.spawn_local(tokio::time::timeout(Duration::from_secs(120), async move {
let mut streamer = payload.into_streamer();
while streamer.next().await.is_some() {}
while streamer.next().await.is_some() {
tracing::trace!("drain drop bytes: looping");
}
}));
let mut count = 0;
// drain completed tasks
while set.join_next().now_or_never().is_some() {
tracing::trace!("drain join now: looping");
count += 1;
}
// if we're past the limit, wait for completions
while set.len() > LIMIT {
tracing::trace!("drain join await: looping");
if set.join_next().await.is_some() {
count += 1;
}
@ -45,7 +53,9 @@ async fn drain(rx: flume::Receiver<actix_web::dev::Payload>) {
}
// drain set
while set.join_next().await.is_some() {}
while set.join_next().await.is_some() {
tracing::trace!("drain join await cleanup: looping");
}
}
#[derive(Clone)]

View file

@ -136,6 +136,8 @@ where
let mut joinset = tokio::task::JoinSet::new();
while let Some(hash) = stream.next().await {
tracing::trace!("do_migrate_store: looping");
let hash = hash?;
if joinset.len() >= concurrency {
@ -149,6 +151,8 @@ where
}
while let Some(res) = joinset.join_next().await {
tracing::trace!("do_migrate_store: join looping");
res.map_err(|_| UploadError::Canceled)??;
}
@ -383,6 +387,8 @@ where
let mut failure_count = 0;
loop {
tracing::trace!("migrate_file: looping");
match do_migrate_file(tmp_dir, repo, from, to, identifier, timeout).await {
Ok(identifier) => return Ok(identifier),
Err(MigrateError::From(e)) if e.is_not_found() && skip_missing_files => {

View file

@ -207,6 +207,8 @@ async fn process_jobs<S, F>(
let worker_id = uuid::Uuid::new_v4();
loop {
tracing::trace!("process_jobs: looping");
let res = job_loop(repo, store, config, worker_id, queue, callback).await;
if let Err(e) = res {
@ -274,6 +276,8 @@ where
+ Copy,
{
loop {
tracing::trace!("job_loop: looping");
let fut = async {
let (job_id, job) = repo.pop(queue, worker_id).await?;
@ -334,6 +338,8 @@ async fn process_image_jobs<S, F>(
let worker_id = uuid::Uuid::new_v4();
loop {
tracing::trace!("process_image_jobs: looping");
let res = image_job_loop(
tmp_dir,
repo,
@ -388,6 +394,8 @@ where
+ Copy,
{
loop {
tracing::trace!("image_job_loop: looping");
let fut = async {
let (job_id, job) = repo.pop(queue, worker_id).await?;
@ -439,6 +447,8 @@ where
let mut hb = None;
loop {
tracing::trace!("heartbeat: looping");
tokio::select! {
output = &mut fut => {
return output;

View file

@ -144,6 +144,8 @@ async fn all_variants(repo: &ArcRepo) -> Result<(), Error> {
let mut hash_stream = hash_stream.into_streamer();
while let Some(res) = hash_stream.next().await {
tracing::trace!("all_variants: looping");
let hash = res?;
super::cleanup_variants(repo, hash, None).await?;
}
@ -159,6 +161,8 @@ async fn outdated_variants(repo: &ArcRepo, config: &Configuration) -> Result<(),
let mut variant_stream = repo.older_variants(since).await?.into_streamer();
while let Some(res) = variant_stream.next().await {
tracing::trace!("outdated_variants: looping");
let (hash, variant) = res?;
super::cleanup_variants(repo, hash, Some(variant)).await?;
}
@ -174,6 +178,8 @@ async fn outdated_proxies(repo: &ArcRepo, config: &Configuration) -> Result<(),
let mut alias_stream = repo.older_aliases(since).await?.into_streamer();
while let Some(res) = alias_stream.next().await {
tracing::trace!("outdated_proxies: looping");
let alias = res?;
if let Some(token) = repo.delete_token(&alias).await? {
super::cleanup_alias(repo, alias, token).await?;
@ -229,6 +235,8 @@ where
let mut count: u64 = 0;
while let Some(hash) = hash_stream.try_next().await? {
tracing::trace!("prune: looping");
let repo = repo.clone();
let store = store.clone();

View file

@ -558,6 +558,8 @@ impl dyn FullRepo {
let mut slug = None;
loop {
tracing::trace!("hashes_stream: looping");
let page = repo.hash_page(slug, 100).await?;
slug = page.next();

View file

@ -43,6 +43,8 @@ pub(crate) async fn migrate_repo(old_repo: ArcRepo, new_repo: ArcRepo) -> Result
let mut index = 0;
while let Some(res) = hash_stream.next().await {
tracing::trace!("migrate_repo: looping");
if let Ok(hash) = res {
migrate_hash(old_repo.clone(), new_repo.clone(), hash).await;
} else {
@ -108,6 +110,8 @@ pub(crate) async fn migrate_04<S: Store + 'static>(
let mut index = 0;
while let Some(res) = hash_stream.next().await {
tracing::trace!("migrate_04: looping");
if let Ok(hash) = res {
set.spawn_local(migrate_hash_04(
tmp_dir.clone(),
@ -122,6 +126,8 @@ pub(crate) async fn migrate_04<S: Store + 'static>(
}
while set.len() >= config.upgrade.concurrency {
tracing::trace!("migrate_04: join looping");
if set.join_next().await.is_some() {
index += 1;
@ -135,6 +141,8 @@ pub(crate) async fn migrate_04<S: Store + 'static>(
}
while set.join_next().await.is_some() {
tracing::trace!("migrate_04: cleanup join looping");
index += 1;
if index % pct == 0 {
@ -165,6 +173,8 @@ async fn migrate_hash(old_repo: ArcRepo, new_repo: ArcRepo, hash: Hash) {
let mut hash_failures = 0;
while let Err(e) = do_migrate_hash(&old_repo, &new_repo, hash.clone()).await {
tracing::trace!("migrate_hash: looping");
hash_failures += 1;
if hash_failures > 10 {

View file

@ -364,6 +364,7 @@ async fn delegate_notifications(
let upload_notifier_state = UploadNotifierState { inner: &inner };
while let Ok(notification) = receiver.recv_async().await {
tracing::trace!("delegate_notifications: looping");
metrics::counter!("pict-rs.postgres.notification").increment(1);
match notification.channel() {
@ -418,6 +419,8 @@ fn spawn_db_notification_task(
) {
crate::sync::spawn("postgres-notifications", async move {
while let Some(res) = std::future::poll_fn(|cx| conn.poll_message(cx)).await {
tracing::trace!("db_notification_task: looping");
match res {
Err(e) => {
tracing::error!("Database Connection {e:?}");
@ -1138,6 +1141,8 @@ impl QueueRepo for PostgresRepo {
use schema::job_queue::dsl::*;
loop {
tracing::trace!("pop: looping");
let mut conn = self.get_connection().await?;
let notifier: Arc<Notify> = self
@ -1667,6 +1672,8 @@ impl UploadRepo for PostgresRepo {
let interest = self.inner.interest(upload_id);
loop {
tracing::trace!("wait: looping");
let interest_future = interest.notified_timeout(Duration::from_secs(5));
let mut conn = self.get_connection().await?;
@ -1788,6 +1795,8 @@ where
{
streem::try_from_fn(|yielder| async move {
loop {
tracing::trace!("page_stream: looping");
let mut page = (next)(inner.clone(), older_than).await?;
if let Some((last_time, last_item)) = page.pop() {

View file

@ -522,6 +522,8 @@ impl UploadRepo for SledRepo {
}
while let Some(event) = (&mut subscriber).await {
tracing::trace!("wait: looping");
match event {
sled::Event::Remove { .. } => {
return Err(RepoError::AlreadyClaimed);
@ -679,6 +681,8 @@ impl QueueRepo for SledRepo {
let now = time::OffsetDateTime::now_utc();
loop {
tracing::trace!("pop: looping");
let queue = self.queue.clone();
let job_state = self.job_state.clone();

View file

@ -132,6 +132,8 @@ pub(crate) trait Store: Clone + Debug {
.into_streamer();
while let Some(bytes) = streamer.try_next().await.map_err(StoreError::ReadStream)? {
tracing::trace!("to_bytes: looping");
buf.add_bytes(bytes);
}

View file

@ -236,6 +236,8 @@ impl FileStore {
async fn try_remove_parents(&self, mut path: &Path) {
while let Some(parent) = path.parent() {
tracing::trace!("try_remove_parents: looping");
if parent.ends_with(&self.root_dir) {
return;
}

View file

@ -177,6 +177,8 @@ where
let mut stream = stream.into_streamer();
while buf.len() < CHUNK_SIZE {
tracing::trace!("read_chunk: looping");
if let Some(bytes) = stream.try_next().await? {
buf.add_bytes(bytes)
} else {
@ -284,6 +286,8 @@ impl Store for ObjectStore {
let mut futures = Vec::new();
while !complete {
tracing::trace!("save_stream: looping");
part_number += 1;
let buf = if let Some(buf) = first_chunk.take() {
@ -459,6 +463,8 @@ impl Store for ObjectStore {
let mut stream = stream.into_streamer();
while let Some(res) = stream.next().await {
tracing::trace!("read_into: looping");
let mut bytes = res.map_err(payload_to_io_error)?;
writer.write_all_buf(&mut bytes).await?;
}

View file

@ -16,6 +16,8 @@ where
let mut streamer = stream.into_streamer();
while let Some(item) = streamer.next().await {
tracing::trace!("metrics: looping");
yielder.yield_(item).await;
}
}
@ -35,6 +37,8 @@ where
let mut streamer = stream.into_streamer();
while let Some(res) = streamer.next().await {
tracing::trace!("make send tx: looping");
if tx.send_async(res).await.is_err() {
break;
}
@ -45,6 +49,8 @@ where
let mut stream = rx.into_stream().into_streamer();
while let Some(res) = stream.next().await {
tracing::trace!("make send rx: looping");
yiedler.yield_(res).await;
}
@ -71,6 +77,8 @@ where
let mut stream = rx.into_stream().into_streamer();
while let Some(res) = stream.next().await {
tracing::trace!("from_iterator: looping");
yielder.yield_(res).await;
}
@ -89,6 +97,8 @@ where
let mut streamer = stream.into_streamer();
while let Some(res) = streamer.next().await {
tracing::trace!("map: looping");
yielder.yield_((f)(res)).await;
}
})
@ -153,6 +163,8 @@ where
let mut streamer = stream.into_streamer();
while let Some(res) = streamer.next().await {
tracing::trace!("timeout: looping");
yielder.yield_ok(res).await;
}
})
@ -173,6 +185,8 @@ where
let mut count = 0;
while let Some(bytes) = streamer.try_next().await? {
tracing::trace!("limit: looping");
count += bytes.len();
if count > limit {