Merge pull request 'Add long polling for game state' (#4) from asonix/long-polling into kumu
All checks were successful
continuous-integration/drone/push Build is passing

Reviewed-on: #4
This commit is contained in:
asonix 2022-01-13 23:15:19 +00:00
commit 4160032cba
4 changed files with 145 additions and 71 deletions

View file

@ -28,41 +28,54 @@ pub(crate) struct Coordinates {
pub(crate) rank: Rank,
}
#[derive(serde::Deserialize, serde::Serialize)]
#[derive(Debug, serde::Deserialize, serde::Serialize)]
#[serde(rename_all = "camelCase")]
pub(crate) struct Move {
pub(crate) piece: Piece,
pub(crate) from: Coordinates,
pub(crate) to: Coordinates,
pub(crate) kind: Option<PieceKind>,
}
#[derive(serde::Deserialize, serde::Serialize)]
#[serde(rename_all = "camelCase")]
pub(crate) struct Promote {
pub(crate) location: Coordinates,
pub(crate) kind: PieceKind,
}
#[derive(serde::Deserialize, serde::Serialize)]
#[derive(Debug, serde::Deserialize, serde::Serialize)]
#[serde(rename_all = "camelCase")]
pub(crate) struct ApiMessage<T> {
pub(crate) data: T,
pub(crate) game_id: GameId,
}
#[derive(serde::Deserialize, serde::Serialize)]
#[derive(Debug, serde::Deserialize, serde::Serialize)]
#[serde(rename_all = "camelCase")]
pub(crate) struct Start {
pub(crate) player_color: Color,
}
#[derive(serde::Serialize)]
#[derive(Debug, serde::Serialize)]
#[serde(rename_all = "camelCase")]
pub(crate) struct GameStart {
pub(crate) board: Vec<(File, Rank, PieceKind, Color)>,
pub(crate) game_id: GameId,
}
#[derive(Debug, serde::Serialize)]
#[serde(rename_all = "camelCase")]
pub(crate) struct PollResponse {
pub(crate) board_state: Vec<(File, Rank, PieceKind, Color)>,
pub(crate) game_state: GameState,
}
#[derive(
Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Deserialize, serde::Serialize,
)]
#[serde(rename_all = "camelCase")]
pub(crate) enum GameState {
White,
Black,
Win,
Lose,
Draw,
}
#[derive(
Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Deserialize, serde::Serialize,
)]
@ -131,6 +144,15 @@ impl GameId {
}
}
impl GameState {
pub(crate) fn from_color(color: Color) -> Self {
match color {
Color::White => GameState::White,
Color::Black => GameState::Black,
}
}
}
impl Color {
pub(crate) fn opposite(&self) -> Self {
match self {
@ -198,6 +220,12 @@ impl File {
}
}
impl Default for Color {
fn default() -> Self {
Color::White
}
}
impl std::fmt::Display for GameId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
std::fmt::Display::fmt(&self.inner, f)

View file

@ -1,7 +1,7 @@
use crate::api_types::{
self,
Color::{self, White},
Coordinates, Move, PieceKind,
Coordinates, Move, PieceKind, PollResponse,
};
use std::{
cell::RefCell,
@ -266,6 +266,7 @@ pub(crate) struct GameState {
visible: HashMap<Color, HashSet<Position>>,
en_passant_target: Option<Position>,
castle: HashMap<Color, CastleState>,
allowed_turn: Color,
}
impl GameState {
@ -303,10 +304,9 @@ impl GameState {
this
}
pub(crate) fn to_serializable(
&self,
) -> Vec<(api_types::File, api_types::Rank, PieceKind, Color)> {
self.board
pub(crate) fn to_serializable(&self) -> PollResponse {
let board_state = self
.board
.iter()
.map(|(pos, piece)| {
(
@ -316,7 +316,14 @@ impl GameState {
piece.color.clone(),
)
})
.collect()
.collect();
let game_state = api_types::GameState::from_color(self.allowed_turn.clone());
PollResponse {
board_state,
game_state,
}
}
fn generate_visible(&mut self) {
@ -451,7 +458,8 @@ impl GameState {
.clone();
if piece.get_possible_moves(&from_pos, self).contains(&to_pos) {
self.apply_move(from_pos, to_pos)
self.apply_move(from_pos, to_pos);
self.allowed_turn = self.allowed_turn.opposite();
}
}
}

31
src/long_poller.rs Normal file
View file

@ -0,0 +1,31 @@
use std::sync::Mutex;
use tokio::sync::Notify;
#[derive(Debug)]
pub(crate) struct LongPoller<T> {
inner: Mutex<Option<T>>,
notify: Notify,
}
impl<T> LongPoller<T> {
pub(crate) fn new() -> Self {
Self {
inner: Mutex::new(None),
notify: Notify::new(),
}
}
pub(crate) async fn wait(&self) -> T {
loop {
if let Some(value) = self.inner.lock().unwrap().take() {
return value;
}
self.notify.notified().await;
}
}
pub(crate) fn update(&self, value: T) {
*self.inner.lock().unwrap() = Some(value);
self.notify.notify_one();
}
}

View file

@ -1,6 +1,8 @@
#![allow(clippy::async_yields_async)]
use actix_cors::Cors;
use actix_web::{
web::{Data, Json},
web::{Data, Json, Path},
App, HttpResponse, HttpServer,
};
use std::{
@ -15,26 +17,24 @@ use tracing_actix_web::TracingLogger;
mod api_types;
mod board_state;
mod init_tracing;
mod long_poller;
use api_types::{ApiMessage, GameId, GameStart, Move, Promote, Start};
use api_types::{ApiMessage, GameId, GameStart, Move, Start};
use long_poller::LongPoller;
struct Reply {
board_state: board_state::GameState,
}
enum Action {
Move(Move),
Promote(Promote),
}
struct Session {
action: Action,
reply: tokio::sync::oneshot::Sender<Reply>,
}
#[derive(Clone, Debug)]
#[derive(Clone)]
struct GameData {
sender: Sender<Session>,
sender: Sender<Move>,
poller: Arc<LongPoller<board_state::GameState>>,
}
impl std::fmt::Debug for GameData {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("GameData")
.field("sender", &"Sender")
.field("poller", &"LongPoller")
.finish()
}
}
#[derive(Clone, Default)]
@ -42,6 +42,15 @@ struct GameState {
inner: Arc<Mutex<HashMap<GameId, GameData>>>,
}
impl std::fmt::Debug for GameState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("GameState")
.field("inner", &"ActiveGames")
.finish()
}
}
#[derive(Debug)]
struct GameDropper(GameState, GameId);
impl GameState {
@ -60,6 +69,7 @@ impl Drop for GameDropper {
}
}
#[tracing::instrument]
async fn start(Json(_start): Json<Start>, game_state: Data<GameState>) -> HttpResponse {
let game_id = GameId::new();
@ -67,7 +77,15 @@ async fn start(Json(_start): Json<Start>, game_state: Data<GameState>) -> HttpRe
let (sender, mut rx) = tokio::sync::mpsc::channel(1);
game_state.save_data(game_id.clone(), GameData { sender });
let poller = Arc::new(LongPoller::new());
game_state.save_data(
game_id.clone(),
GameData {
sender,
poller: Arc::clone(&poller),
},
);
let game_span = tracing::info_span!(
parent: None,
@ -80,17 +98,11 @@ async fn start(Json(_start): Json<Start>, game_state: Data<GameState>) -> HttpRe
actix_web::rt::spawn(
async move {
while let Ok(Some(msg)) =
while let Ok(Some(action)) =
actix_web::rt::time::timeout(Duration::from_secs(60 * 5), rx.recv()).await
{
match msg.action {
Action::Move(action) => board_state.handle(action),
Action::Promote(_) => unimplemented!(),
}
let _ = msg.reply.send(Reply {
board_state: board_state.clone(),
});
board_state.handle(action);
poller.update(board_state.clone());
}
drop(game_dropper);
@ -99,44 +111,39 @@ async fn start(Json(_start): Json<Start>, game_state: Data<GameState>) -> HttpRe
);
HttpResponse::Ok().json(GameStart {
board: serializable,
board: serializable.board_state,
game_id,
})
}
#[tracing::instrument]
async fn make_move(
Json(action): Json<ApiMessage<Move>>,
Json(message): Json<ApiMessage<Move>>,
game_state: Data<GameState>,
) -> HttpResponse {
send_message(action.game_id, Action::Move(action.data), game_state).await
}
async fn promote_pawn(
Json(action): Json<ApiMessage<Promote>>,
game_state: Data<GameState>,
) -> HttpResponse {
send_message(action.game_id, Action::Promote(action.data), game_state).await
}
async fn send_message(
game_id: GameId,
action: Action,
game_state: Data<GameState>,
) -> HttpResponse {
if let Some(data) = game_state.data_for_id(&game_id) {
let (reply, rx) = tokio::sync::oneshot::channel();
let res = data.sender.send(Session { action, reply }).await;
if let Some(data) = game_state.data_for_id(&message.game_id) {
let res = data.sender.send(message.data).await;
if res.is_err() {
return HttpResponse::InternalServerError().finish();
}
if let Ok(reply) = rx.await {
return HttpResponse::Ok().json(reply.board_state.to_serializable());
return HttpResponse::Accepted().finish();
}
HttpResponse::BadRequest().finish()
}
#[tracing::instrument]
async fn long_poll(game_id: Path<GameId>, game_state: Data<GameState>) -> HttpResponse {
if let Some(data) = game_state.data_for_id(game_id.as_ref()) {
if let Ok(board_state) =
actix_web::rt::time::timeout(Duration::from_secs(30), data.poller.wait()).await
{
return HttpResponse::Ok().json(board_state.to_serializable());
}
return HttpResponse::InternalServerError().finish();
return HttpResponse::NoContent().finish();
}
HttpResponse::BadRequest().finish()
@ -160,7 +167,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
)
.route("/start", actix_web::web::post().to(start))
.route("/move", actix_web::web::post().to(make_move))
.route("/promote", actix_web::web::post().to(promote_pawn))
.route("/poll/{id}", actix_web::web::get().to(long_poll))
})
.bind("0.0.0.0:8000")?
.run()