Fix deadlock in deck start
This commit is contained in:
parent
6345fc102d
commit
71fedceefa
|
@ -127,11 +127,13 @@ impl Manager {
|
|||
Box::pin(
|
||||
async move {
|
||||
let mut inner = self.inner.write().await;
|
||||
|
||||
if let Some(port_name) = inner.children.remove(&child_id) {
|
||||
inner.connected_decks.remove(&port_name);
|
||||
inner.retrying_ports.insert(port_name.clone());
|
||||
ctx.handle().send(ManagerMessage::Found(port_name)).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
.instrument(Span::current()),
|
||||
|
@ -146,9 +148,11 @@ impl Manager {
|
|||
async fn ignored(&mut self, port_name: String) -> tokio_actors::Result<()> {
|
||||
tracing::debug!("ignored");
|
||||
let mut inner = self.inner.write().await;
|
||||
|
||||
if !inner.retrying_ports.remove(&port_name) {
|
||||
inner.ignored_ports.insert(port_name);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -160,12 +164,12 @@ impl Manager {
|
|||
) -> tokio_actors::Result<()> {
|
||||
tracing::debug!("connected");
|
||||
|
||||
let was_pending = self
|
||||
.inner
|
||||
.write()
|
||||
.await
|
||||
.pending_ports
|
||||
.remove(deck.port_name());
|
||||
let was_pending = {
|
||||
let mut inner = self.inner.write().await;
|
||||
let was_pending = inner.pending_ports.remove(deck.port_name());
|
||||
drop(inner);
|
||||
was_pending
|
||||
};
|
||||
|
||||
if !was_pending {
|
||||
return Ok(());
|
||||
|
@ -192,6 +196,7 @@ impl Manager {
|
|||
|
||||
inner.connected_decks.insert(port_name.clone(), connected);
|
||||
inner.children.insert(actor_id, port_name);
|
||||
|
||||
drop(inner);
|
||||
}
|
||||
|
||||
|
@ -209,7 +214,7 @@ impl Manager {
|
|||
inner.pending_ports.remove(&port_name);
|
||||
inner.ignored_ports.remove(&port_name);
|
||||
if let Some(mut connected) = inner.connected_decks.remove(&port_name) {
|
||||
tracing::info!("Sending stop");
|
||||
tracing::info!("Sending stop to {}", connected.handle.actor_id());
|
||||
connected.handle.stop();
|
||||
}
|
||||
|
||||
|
|
|
@ -183,6 +183,7 @@ impl Deck {
|
|||
Box::pin(
|
||||
async move {
|
||||
let mut inner = self.inner.write().await;
|
||||
|
||||
if let Some(id) = inner.child.take() {
|
||||
if id == child_id {
|
||||
ctx.stop();
|
||||
|
@ -199,11 +200,13 @@ impl Deck {
|
|||
|
||||
#[tracing::instrument(skip(ctx))]
|
||||
async fn start(&mut self, ctx: &'_ mut Context<DeckMessage>) -> tokio_actors::Result<()> {
|
||||
if let Some(serial) = self.inner.write().await.port.take() {
|
||||
let mut inner = self.inner.write().await;
|
||||
|
||||
if let Some(serial) = inner.port.take() {
|
||||
let port = Port::new(ctx.handle(), serial);
|
||||
let port_handle =
|
||||
ctx.spawn_child_with_hooks(Actor::new(port, Port::turn).on_start(Port::on_start));
|
||||
self.inner.write().await.child = Some(port_handle.actor_id());
|
||||
inner.child = Some(port_handle.actor_id());
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue