This commit is contained in:
Aode (Lion) 2021-10-03 20:24:57 -05:00
parent 25855fb97a
commit 14ac47e805
6 changed files with 94 additions and 135 deletions

2
Cargo.lock generated
View file

@ -1004,7 +1004,7 @@ dependencies = [
[[package]]
name = "tokio-actors"
version = "0.1.0"
source = "git+https://git.asonix.dog/asonix/tokio-actors?branch=main#9eb61841dc75551f3614bb83ea3f8202d94bb69a"
source = "git+https://git.asonix.dog/asonix/tokio-actors?branch=main#708f94263f769a92add9312deae71d26242dc594"
dependencies = [
"once_cell",
"tokio",

View file

@ -4,8 +4,7 @@ use std::{
time::Duration,
};
use tokio::sync::RwLock;
use tokio_actors::{Actor, BoxFuture, Context, Handle, SendHandle};
use tracing::{Instrument, Span};
use tokio_actors::{Actor, Context, Handle, SendHandle};
use zbus::SignalContext;
mod connector;
@ -100,44 +99,34 @@ impl Manager {
}
#[tracing::instrument(skip(ctx))]
fn turn<'a>(
async fn turn<'a>(
&'a mut self,
msg: ManagerMessage,
ctx: &'a mut Context<ManagerMessage>,
) -> BoxFuture<'a> {
Box::pin(
async move {
match msg {
ManagerMessage::Found(port_name) => self.found(port_name, ctx).await,
ManagerMessage::Removed(port_name) => self.removed(port_name).await,
ManagerMessage::Connected(deck) => self.connected(deck, ctx).await,
ManagerMessage::Ignored(port_name) => self.ignored(port_name).await,
}
}
.instrument(Span::current()),
)
) -> tokio_actors::Result<()> {
match msg {
ManagerMessage::Found(port_name) => self.found(port_name, ctx).await,
ManagerMessage::Removed(port_name) => self.removed(port_name).await,
ManagerMessage::Connected(deck) => self.connected(deck, ctx).await,
ManagerMessage::Ignored(port_name) => self.ignored(port_name).await,
}
}
#[tracing::instrument(skip(ctx))]
fn on_remove<'a>(
async fn on_remove<'a>(
&'a mut self,
child_id: usize,
ctx: &'a mut Context<ManagerMessage>,
) -> BoxFuture<'a> {
Box::pin(
async move {
let mut inner = self.inner.write().await;
) -> tokio_actors::Result<()> {
let mut inner = self.inner.write().await;
if let Some(port_name) = inner.children.remove(&child_id) {
inner.connected_decks.remove(&port_name);
inner.retrying_ports.insert(port_name.clone());
ctx.handle().send(ManagerMessage::Found(port_name)).await?;
}
if let Some(port_name) = inner.children.remove(&child_id) {
inner.connected_decks.remove(&port_name);
inner.retrying_ports.insert(port_name.clone());
ctx.handle().send(ManagerMessage::Found(port_name)).await?;
}
Ok(())
}
.instrument(Span::current()),
)
Ok(())
}
fn object_name(&self) -> String {

View file

@ -2,8 +2,7 @@ use crate::manager::{
deck::{Deck, DeckInfo},
ManagerMessage,
};
use tokio_actors::{BoxFuture, Context, SendHandle};
use tracing::{Instrument, Span};
use tokio_actors::{Context, SendHandle};
use zbus::Connection;
#[derive(Clone)]
@ -25,31 +24,26 @@ impl Connector {
}
#[tracing::instrument(skip(ctx))]
pub(super) fn turn<'a>(
pub(super) async fn turn<'a>(
&'a mut self,
port_name: String,
ctx: &'a mut Context<String>,
) -> BoxFuture<'a> {
) -> tokio_actors::Result<()> {
tracing::debug!("Connector");
Box::pin(
async move {
ctx.stop();
ctx.stop();
match self.connect(port_name.clone()).await {
Ok(deck) => {
self.parent.send(ManagerMessage::Connected(deck)).await?;
Ok(())
}
Err(e) => {
tracing::info!("Error: {}", e);
tracing::info!("Details: {:?}", e);
self.parent.send(ManagerMessage::Ignored(port_name)).await?;
Err(e)
}
}
match self.connect(port_name.clone()).await {
Ok(deck) => {
self.parent.send(ManagerMessage::Connected(deck)).await?;
Ok(())
}
.instrument(Span::current()),
)
Err(e) => {
tracing::info!("Error: {}", e);
tracing::info!("Details: {:?}", e);
self.parent.send(ManagerMessage::Ignored(port_name)).await?;
Err(e)
}
}
}
#[tracing::instrument]

View file

@ -1,9 +1,8 @@
use crate::manager::ManagerMessage;
use std::sync::Arc;
use tokio::sync::RwLock;
use tokio_actors::{Actor, BoxFuture, Context, SendHandle};
use tokio_actors::{Actor, Context, SendHandle};
use tokio_serial::SerialStream;
use tracing::{Instrument, Span};
use zbus::{Connection, SignalContext};
mod port;
@ -139,67 +138,21 @@ impl Deck {
}
#[tracing::instrument(skip(_ctx))]
pub(super) fn turn<'a>(
pub(super) async fn turn<'a>(
&'a mut self,
msg: DeckMessage,
_ctx: &'a mut Context<DeckMessage>,
) -> BoxFuture<'a> {
Box::pin(
async move {
match msg {
DeckMessage::Press(key) => self.press(key).await,
}
}
.instrument(Span::current()),
)
) -> tokio_actors::Result<()> {
match msg {
DeckMessage::Press(key) => self.press(key).await,
}
}
#[tracing::instrument(skip(ctx))]
pub(super) fn on_start<'a>(&'a mut self, ctx: &'a mut Context<DeckMessage>) -> BoxFuture<'a> {
Box::pin(self.start(ctx).instrument(Span::current()))
}
#[tracing::instrument(skip(_ctx))]
pub(super) fn on_stop<'a>(&'a mut self, _ctx: &'a mut Context<DeckMessage>) -> BoxFuture<'a> {
Box::pin(
async move {
self.connection
.object_server_mut()
.await
.remove::<Deck, _>(self.object_name())?;
Ok(())
}
.instrument(Span::current()),
)
}
#[tracing::instrument(skip(ctx))]
pub(super) fn on_remove<'a>(
pub(super) async fn on_start<'a>(
&'a mut self,
child_id: usize,
ctx: &'a mut Context<DeckMessage>,
) -> BoxFuture<'a> {
Box::pin(
async move {
let mut inner = self.inner.write().await;
if let Some(id) = inner.child.take() {
if id == child_id {
ctx.stop();
} else {
inner.child = Some(id);
}
}
Ok(())
}
.instrument(Span::current()),
)
}
#[tracing::instrument(skip(ctx))]
async fn start(&mut self, ctx: &'_ mut Context<DeckMessage>) -> tokio_actors::Result<()> {
) -> tokio_actors::Result<()> {
let mut inner = self.inner.write().await;
if let Some(serial) = inner.port.take() {
@ -208,6 +161,39 @@ impl Deck {
ctx.spawn_child_with_hooks(Actor::new(port, Port::turn).on_start(Port::on_start));
inner.child = Some(port_handle.actor_id());
}
Ok(())
}
#[tracing::instrument(skip(_ctx))]
pub(super) async fn on_stop<'a>(
&'a mut self,
_ctx: &'a mut Context<DeckMessage>,
) -> tokio_actors::Result<()> {
self.connection
.object_server_mut()
.await
.remove::<Deck, _>(self.object_name())?;
Ok(())
}
#[tracing::instrument(skip(ctx))]
pub(super) async fn on_remove<'a>(
&'a mut self,
child_id: usize,
ctx: &'a mut Context<DeckMessage>,
) -> tokio_actors::Result<()> {
let mut inner = self.inner.write().await;
if let Some(id) = inner.child.take() {
if id == child_id {
ctx.stop();
} else {
inner.child = Some(id);
}
}
Ok(())
}

View file

@ -1,8 +1,7 @@
use super::DeckMessage;
use tokio::io::AsyncReadExt;
use tokio_actors::{BoxFuture, Context, SendHandle};
use tokio_actors::{Context, SendHandle};
use tokio_serial::SerialStream;
use tracing::{Instrument, Span};
#[derive(Clone, Debug)]
pub(super) enum PortMessage {
@ -26,32 +25,25 @@ impl Port {
}
#[tracing::instrument(skip(ctx))]
pub(super) fn on_start<'a>(&'a mut self, ctx: &'a mut Context<PortMessage>) -> BoxFuture<'a> {
Box::pin(
async move {
ctx.stop_on_error();
ctx.handle().send(PortMessage::Start).await?;
pub(super) async fn on_start<'a>(
&'a mut self,
ctx: &'a mut Context<PortMessage>,
) -> tokio_actors::Result<()> {
ctx.stop_on_error();
ctx.handle().send(PortMessage::Start).await?;
Ok(())
}
.instrument(Span::current()),
)
Ok(())
}
#[tracing::instrument(skip(ctx))]
pub(super) fn turn<'a>(
pub(super) async fn turn<'a>(
&'a mut self,
msg: PortMessage,
ctx: &'a mut Context<PortMessage>,
) -> BoxFuture<'a> {
Box::pin(
async move {
match msg {
PortMessage::Start => self.start(ctx).await,
}
}
.instrument(Span::current()),
)
) -> tokio_actors::Result<()> {
match msg {
PortMessage::Start => self.start(ctx).await,
}
}
#[tracing::instrument(skip(ctx))]

View file

@ -3,8 +3,7 @@ use std::{
collections::HashSet,
path::{Path, PathBuf},
};
use tokio_actors::{BoxFuture, Context, SendHandle};
use tracing::{Instrument, Span};
use tokio_actors::{Context, SendHandle};
#[derive(Clone)]
pub(super) struct Searcher {
@ -30,12 +29,11 @@ impl Searcher {
}
#[tracing::instrument(skip(_ctx))]
pub(super) fn turn<'a>(&'a mut self, _: (), _ctx: &'a mut Context<()>) -> BoxFuture<'a> {
Box::pin(self.search().instrument(Span::current()))
}
#[tracing::instrument]
async fn search(&mut self) -> tokio_actors::Result<()> {
pub(super) async fn turn<'a>(
&'a mut self,
_: (),
_ctx: &'a mut Context<()>,
) -> tokio_actors::Result<()> {
let ports = tokio::task::block_in_place(|| tokio_serial::available_ports())?;
let port_names = ports