triplestore/src/api.rs
2023-05-24 16:22:50 -05:00

587 lines
20 KiB
Rust

mod object;
mod predicate;
mod subject;
use std::collections::{HashMap, HashSet};
use bonsaidb::{
core::{
api::Api,
connection::{AsyncConnection, AsyncStorageConnection},
schema::SerializedCollection,
transaction::Transaction,
},
server::api::Handler,
};
use object::Object;
use predicate::Predicate;
use subject::Subject;
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub(crate) struct Triple {
pub(crate) subject: Subject,
pub(crate) predicate: Predicate,
pub(crate) object: Object,
}
impl From<rdf_types::Triple> for Triple {
fn from(rdf_types::Triple(subject, predicate, object): rdf_types::Triple) -> Self {
Triple {
subject: subject.into(),
predicate: predicate.into(),
object: object.into(),
}
}
}
#[derive(Debug)]
pub(crate) struct TripleHandler;
#[derive(Api, Debug, serde::Serialize, serde::Deserialize)]
#[api(response = (), error = bonsaidb::core::Error, name = "InsertDocument")]
pub(crate) struct InsertDocument {
pub(crate) triples: Vec<Triple>,
}
#[async_trait::async_trait]
impl Handler<InsertDocument> for TripleHandler {
async fn handle(
session: bonsaidb::server::api::HandlerSession<'_>,
InsertDocument { triples }: InsertDocument,
) -> bonsaidb::server::api::HandlerResult<InsertDocument> {
Ok(insert_document(&session.as_client, triples).await?)
}
}
#[derive(Default)]
struct StartInsertState<'a> {
subject_iris: HashSet<&'a iref::IriBuf>,
other_iris: HashSet<&'a iref::IriBuf>,
blank_nodes: HashMap<&'a rdf_types::BlankIdBuf, &'a rdf_types::Subject>,
strings: HashSet<&'a str>,
langtags: HashSet<&'a langtag::LanguageTagBuf>,
}
struct InsertState<'a> {
subject_iris: HashMap<&'a iref::IriBuf, u128>,
other_iris: HashMap<&'a iref::IriBuf, u128>,
blank_nodes: HashMap<&'a rdf_types::BlankIdBuf, (&'a iref::IriBuf, u64)>,
strings: HashMap<&'a str, u128>,
langtags: HashMap<&'a langtag::LanguageTagBuf, u128>,
}
impl<'a> StartInsertState<'a> {
fn from_triples(triples: &'a [Triple]) -> Self {
triples.iter().fold(Self::default(), Self::add_triple)
}
fn add_triple(mut self, triple: &'a Triple) -> Self {
match &triple.subject.0 {
rdf_types::Subject::Iri(iri) => {
self.subject_iris.insert(iri);
}
rdf_types::Subject::Blank(_) => {}
};
self.other_iris.insert(&triple.predicate.0);
match &triple.object.0 {
rdf_types::Object::Id(rdf_types::Id::Iri(iri)) => {
self.other_iris.insert(iri);
}
rdf_types::Object::Id(rdf_types::Id::Blank(blank)) => {
self.blank_nodes.insert(blank, &triple.subject.0);
}
rdf_types::Object::Literal(rdf_types::Literal::String(s)) => {
self.strings.insert(s);
}
rdf_types::Object::Literal(rdf_types::Literal::TypedString(s, iri)) => {
self.strings.insert(s);
self.other_iris.insert(iri);
}
rdf_types::Object::Literal(rdf_types::Literal::LangString(s, langtag)) => {
self.strings.insert(s);
self.langtags.insert(langtag);
}
};
self
}
async fn insert_related<C: AsyncConnection>(
self,
connection: &C,
) -> Result<InsertState<'a>, bonsaidb::core::Error> {
let subject_iris = Self::insert_iris(connection, &self.subject_iris).await?;
let other_iris = Self::insert_iris(connection, &self.other_iris).await?;
let blank_nodes = self.generate_blank_ids();
let strings = self.insert_strings(connection).await?;
let langtags = self.insert_langtags(connection).await?;
Ok(InsertState {
subject_iris,
other_iris,
blank_nodes,
strings,
langtags,
})
}
async fn insert_iris<C: AsyncConnection>(
connection: &C,
iri_set: &HashSet<&'a iref::IriBuf>,
) -> Result<HashMap<&'a iref::IriBuf, u128>, bonsaidb::core::Error> {
let iris: Vec<_> = iri_set
.iter()
.map(|iri| crate::schema::OwnedIri::from(*iri))
.collect();
let mut ids = HashMap::with_capacity(iris.len());
loop {
let mut tx = Transaction::new();
let docs = connection
.view::<crate::schema::IriUniqueness>()
.with_keys(&iris)
.query_with_collection_docs()
.await?
.documents;
let num_updates = docs.len();
for mut doc in docs.into_values() {
let buf: iref::IriBuf = doc.contents.iri.key.clone().into();
let iribuf = iri_set
.get(&buf)
.expect("Should always be able to insert ID mapping");
ids.insert(*iribuf, doc.header.id);
doc.contents.iri.increment();
doc.update_in_transaction(&mut tx)?;
}
let ordered_iris: Vec<_> = iri_set
.iter()
.copied()
.filter(|iri| !ids.contains_key(iri))
.collect();
for iri in &ordered_iris {
crate::schema::IriRecord::new((*iri).clone()).push_in_transaction(&mut tx)?;
}
match tx.apply_async(connection).await {
Ok(results) => {
for (result, iri) in results.into_iter().skip(num_updates).zip(ordered_iris) {
match result {
bonsaidb::core::transaction::OperationResult::DocumentUpdated {
header,
..
} => {
let iri_id = header
.id
.deserialize::<u128>()
.expect("IDs are always u128");
ids.insert(iri, iri_id);
}
other => {
unreachable!("Wrong result type! {other:?}");
}
}
}
break;
}
Err(
bonsaidb::core::Error::DocumentConflict(_, _)
| bonsaidb::core::Error::UniqueKeyViolation { .. },
) => continue,
Err(e) => return Err(e),
}
}
Ok(ids)
}
async fn insert_strings<C: AsyncConnection>(
&self,
connection: &C,
) -> Result<HashMap<&'a str, u128>, bonsaidb::core::Error> {
let strings: Vec<_> = self.strings.iter().map(ToString::to_string).collect();
let mut ids = HashMap::with_capacity(strings.len());
loop {
let mut tx = Transaction::new();
let docs = connection
.view::<crate::schema::StringUniqueness>()
.with_keys(&strings)
.query_with_collection_docs()
.await?
.documents;
let num_updates = docs.len();
for mut doc in docs.into_values() {
let string = self
.strings
.get(doc.contents.string.key.as_str())
.expect("Should always be able to insert ID mapping");
ids.insert(*string, doc.header.id);
doc.contents.string.increment();
doc.update_in_transaction(&mut tx)?;
}
let ordered_strings: Vec<_> = self
.strings
.iter()
.copied()
.filter(|string| !ids.contains_key(string))
.collect();
for string in &ordered_strings {
crate::schema::StringRecord::new(string.to_string())
.push_in_transaction(&mut tx)?;
}
match tx.apply_async(connection).await {
Ok(results) => {
for (result, string) in
results.into_iter().skip(num_updates).zip(ordered_strings)
{
match result {
bonsaidb::core::transaction::OperationResult::DocumentUpdated {
header,
..
} => {
let string_id = header
.id
.deserialize::<u128>()
.expect("IDs are always u128");
ids.insert(string, string_id);
}
other => {
unreachable!("Wrong result type! {other:?}");
}
}
}
break;
}
Err(
bonsaidb::core::Error::DocumentConflict(_, _)
| bonsaidb::core::Error::UniqueKeyViolation { .. },
) => continue,
Err(e) => return Err(e),
}
}
Ok(ids)
}
async fn insert_langtags<C: AsyncConnection>(
&self,
connection: &C,
) -> Result<HashMap<&'a langtag::LanguageTagBuf, u128>, bonsaidb::core::Error> {
let langtags: Vec<_> = self
.langtags
.iter()
.map(|langtag| crate::schema::OwnedLanguageTag::from(*langtag))
.collect();
let mut ids = HashMap::with_capacity(langtags.len());
loop {
let mut tx = Transaction::new();
let docs = connection
.view::<crate::schema::LanguageTagUniqueness>()
.with_keys(&langtags)
.query_with_collection_docs()
.await?
.documents;
let num_updates = docs.len();
for mut doc in docs.into_values() {
let buf: langtag::LanguageTagBuf = doc.contents.langtag.key.clone().into();
let langtag = self
.langtags
.get(&buf)
.expect("Should always be able to insert ID mapping");
ids.insert(*langtag, doc.header.id);
doc.contents.langtag.increment();
doc.update_in_transaction(&mut tx)?;
}
let ordered_langtags: Vec<_> = self
.langtags
.iter()
.copied()
.filter(|langtag| !ids.contains_key(langtag))
.collect();
for langtag in &ordered_langtags {
crate::schema::LanguageTagRecord::new((*langtag).clone())
.push_in_transaction(&mut tx)?;
}
match tx.apply_async(connection).await {
Ok(results) => {
for (result, langtag) in
results.into_iter().skip(num_updates).zip(ordered_langtags)
{
match result {
bonsaidb::core::transaction::OperationResult::DocumentUpdated {
header,
..
} => {
let langtag_id = header
.id
.deserialize::<u128>()
.expect("IDs are always u128");
ids.insert(langtag, langtag_id);
}
other => {
unreachable!("Wrong result type! {other:?}");
}
}
}
break;
}
Err(
bonsaidb::core::Error::DocumentConflict(_, _)
| bonsaidb::core::Error::UniqueKeyViolation { .. },
) => continue,
Err(e) => return Err(e),
}
}
Ok(ids)
}
fn find_iri(&self, mut blank_id: &'a rdf_types::BlankIdBuf) -> Option<&'a iref::IriBuf> {
while let Some(id) = self.blank_nodes.get(blank_id) {
match id {
rdf_types::Id::Iri(iri) => return Some(iri),
rdf_types::Id::Blank(blank) => blank_id = blank,
}
}
None
}
fn generate_blank_ids(&self) -> HashMap<&'a rdf_types::BlankIdBuf, (&'a iref::IriBuf, u64)> {
let mut blank_id_discriminant = 0;
self.blank_nodes
.keys()
.filter_map(|blank_id| {
let iri = self.find_iri(blank_id)?;
let id = blank_id_discriminant;
blank_id_discriminant += 1;
Some((*blank_id, (iri, id)))
})
.collect()
}
}
impl<'a> InsertState<'a> {
fn get_iri(&self, iri: &'a iref::IriBuf) -> Option<u128> {
self.subject_iris
.get(iri)
.or_else(|| self.other_iris.get(iri))
.copied()
}
fn translate_triples(&self, triples: &[Triple]) -> Vec<crate::schema::TripleRecord> {
triples
.iter()
.filter_map(|triple| {
let subject = match &triple.subject.0 {
rdf_types::Id::Iri(iri) => {
crate::schema::Subject::Iri(self.get_iri(iri).expect("Iri exists"))
}
rdf_types::Id::Blank(blank_id) => {
let (iri, id) = self.blank_nodes.get(blank_id)?;
crate::schema::Subject::Blank {
iri: self.get_iri(iri).expect("Iri exists"),
discriminant: *id,
}
}
};
let predicate = self.get_iri(&triple.predicate.0).expect("Iri exists");
let object = match &triple.object.0 {
rdf_types::Term::Id(rdf_types::Id::Iri(iri)) => {
crate::schema::Object::Iri(self.get_iri(iri).expect("Iri exists"))
}
rdf_types::Term::Id(rdf_types::Id::Blank(blank)) => {
let (iri, id) = self.blank_nodes.get(blank)?;
crate::schema::Object::Blank {
iri: self.get_iri(iri).expect("Iri exists"),
discriminant: *id,
}
}
rdf_types::Term::Literal(rdf_types::Literal::String(s)) => {
crate::schema::Object::String(
*self.strings.get(s.as_str()).expect("String exists"),
)
}
rdf_types::Term::Literal(rdf_types::Literal::TypedString(s, iri)) => {
crate::schema::Object::TypedString {
string: *self.strings.get(s.as_str()).expect("String exists"),
iri: self.get_iri(iri).expect("Iri exists"),
}
}
rdf_types::Term::Literal(rdf_types::Literal::LangString(s, langtag)) => {
crate::schema::Object::LangString {
string: *self.strings.get(s.as_str()).expect("String exists"),
langtag: *self.langtags.get(langtag).expect("Langtag exists"),
}
}
};
Some(crate::schema::TripleRecord {
subject,
predicate,
object,
})
})
.collect()
}
fn disowned<'i>(
&self,
docs: impl Iterator<Item = &'i crate::schema::TripleRecord>,
) -> Vec<crate::schema::DisownedRecord> {
#[derive(Default)]
struct TripleState {
iris: HashSet<u128>,
strings: HashSet<u128>,
langtags: HashSet<u128>,
}
let triplestate = docs.fold(TripleState::default(), |mut acc, triple| {
match triple.subject {
crate::schema::Subject::Iri(iri) | crate::schema::Subject::Blank { iri, .. } => {
acc.iris.insert(iri);
}
}
acc.iris.insert(triple.predicate);
match triple.object {
crate::schema::Object::Iri(iri) | crate::schema::Object::Blank { iri, .. } => {
acc.iris.insert(iri);
}
crate::schema::Object::TypedString { string, iri } => {
acc.iris.insert(iri);
acc.strings.insert(string);
}
crate::schema::Object::LangString { string, langtag } => {
acc.strings.insert(string);
acc.langtags.insert(langtag);
}
crate::schema::Object::String(string) => {
acc.strings.insert(string);
}
}
acc
});
let mut existing_iris: HashSet<_> = self.subject_iris.values().copied().collect();
existing_iris.extend(self.other_iris.values().copied());
let existing_strings: HashSet<_> = self.strings.values().copied().collect();
let existing_langtags: HashSet<_> = self.langtags.values().copied().collect();
let iris = triplestate
.iris
.difference(&existing_iris)
.copied()
.map(crate::schema::DisownedRecord::iri);
let strings = triplestate
.strings
.difference(&existing_strings)
.copied()
.map(crate::schema::DisownedRecord::string);
let langtags = triplestate
.langtags
.difference(&existing_langtags)
.copied()
.map(crate::schema::DisownedRecord::langtag);
iris.chain(strings).chain(langtags).collect()
}
}
// Deletes existing subject triples
pub(crate) async fn insert_document<S: AsyncStorageConnection<Database = C>, C: AsyncConnection>(
storage: &S,
triples: Vec<Triple>,
) -> Result<(), bonsaidb::core::Error> {
let database = storage
.database::<crate::schema::TriplestoreSchema>("triplestore")
.await?;
let insert_state = StartInsertState::from_triples(&triples)
.insert_related(&database)
.await?;
let subject_iri_ids = insert_state
.subject_iris
.values()
.copied()
.collect::<Vec<_>>();
loop {
let docs = database
.view::<crate::schema::TriplesBySubjectIri>()
.with_keys(&subject_iri_ids)
.query_with_collection_docs()
.await?;
let mut tx = Transaction::new();
for doc in docs.documents.values() {
doc.delete_in_transaction(&mut tx)?;
}
for disowned in insert_state.disowned(docs.documents.values().map(|doc| &doc.contents)) {
disowned.push_in_transaction(&mut tx)?;
}
for triple in insert_state.translate_triples(&triples) {
triple.push_in_transaction(&mut tx)?;
}
match tx.apply_async(&database).await {
Ok(_) => break,
Err(bonsaidb::core::Error::DocumentConflict(_, _)) => {
println!("Document conflict, retrying");
}
Err(e) => return Err(e),
}
}
Ok(())
}