Broke API - need to move inserts into transaction & update revisions

This commit is contained in:
asonix 2023-05-24 12:26:13 -05:00
parent 3c5cdaa831
commit 9ed6853c04
5 changed files with 438 additions and 12 deletions

View file

@ -315,6 +315,74 @@ impl<'a> InsertState<'a> {
})
.collect()
}
fn disowned<'i>(
&self,
docs: impl Iterator<Item = &'i crate::schema::Triple>,
) -> Vec<crate::schema::Disowned> {
#[derive(Default)]
struct TripleState {
iris: HashSet<u128>,
strings: HashSet<u128>,
langtags: HashSet<u128>,
}
let triplestate = docs.fold(TripleState::default(), |mut acc, triple| {
match triple.subject {
crate::schema::Subject::Iri(iri) | crate::schema::Subject::Blank { iri, .. } => {
acc.iris.insert(iri);
}
}
acc.iris.insert(triple.predicate);
match triple.object {
crate::schema::Object::Iri(iri) | crate::schema::Object::Blank { iri, .. } => {
acc.iris.insert(iri);
}
crate::schema::Object::TypedString { string, iri } => {
acc.iris.insert(iri);
acc.strings.insert(string);
}
crate::schema::Object::LangString { string, langtag } => {
acc.strings.insert(string);
acc.langtags.insert(langtag);
}
crate::schema::Object::String(string) => {
acc.strings.insert(string);
}
}
acc
});
let mut existing_iris: HashSet<_> = self.subject_iris.values().copied().collect();
existing_iris.extend(self.other_iris.values().copied());
let existing_strings: HashSet<_> = self.strings.values().copied().collect();
let existing_langtags: HashSet<_> = self.langtags.values().copied().collect();
let iris = triplestate
.iris
.difference(&existing_iris)
.copied()
.map(crate::schema::Disowned::iri);
let strings = triplestate
.strings
.difference(&existing_strings)
.copied()
.map(crate::schema::Disowned::string);
let langtags = triplestate
.langtags
.difference(&existing_langtags)
.copied()
.map(crate::schema::Disowned::langtag);
iris.chain(strings).chain(langtags).collect()
}
}
// Deletes existing subject triples
@ -340,12 +408,12 @@ pub(crate) async fn insert_document<S: AsyncStorageConnection<Database = C>, C:
let docs = database
.view::<crate::schema::TriplesBySubjectIri>()
.with_keys(&subject_iri_ids)
.query()
.query_with_collection_docs()
.await?;
let mut tx = Transaction::new();
for doc in docs {
for doc in docs.mappings {
let op = Operation {
collection: crate::schema::Triple::collection_name(),
command: bonsaidb::core::transaction::Command::Delete { header: doc.source },
@ -353,6 +421,10 @@ pub(crate) async fn insert_document<S: AsyncStorageConnection<Database = C>, C:
tx.push(op);
}
for disowned in insert_state.disowned(docs.documents.values().map(|doc| &doc.contents)) {
disowned.push_in_transaction(&mut tx)?;
}
let translated_triples = insert_state.translate_triples(&triples);
for triple in translated_triples {

View file

@ -1,18 +1,22 @@
mod disowned;
mod iri;
mod langtag;
mod object;
mod revisioned;
mod subject;
use bonsaidb::core::{
document::{BorrowedDocument, Emit},
key::Key,
schema::{Collection, CollectionMapReduce, MapReduce, Schema, View, ViewSchema},
schema::{
view::map::Mappings, Collection, CollectionMapReduce, MapReduce, Schema, View, ViewSchema,
},
};
pub(crate) use self::{object::Object, subject::Subject};
pub(crate) use self::{object::Object, revisioned::Revisioned, subject::Subject};
#[derive(Debug, Schema)]
#[schema(name = "TriplestoreSchema", collections = [Triple, Iri, LiteralString, LiteralLanguageTag], authority = "asonix/triplestore")]
#[schema(name = "TriplestoreSchema", collections = [Triple, Iri, LiteralString, LiteralLanguageTag, Disowned], authority = "asonix/triplestore")]
pub(crate) struct TriplestoreSchema;
// Collections
@ -24,6 +28,9 @@ pub(crate) struct TriplestoreSchema;
TriplesBySubjectAndPredicate,
TriplesBySubjectPredicateAndObject,
TriplesBySubjectIri,
TriplesByIri,
TriplesByString,
TriplesByLangtag,
])]
pub(crate) struct Triple {
pub(crate) subject: Subject,
@ -35,21 +42,48 @@ pub(crate) struct Triple {
#[collection(name = "Iri", authority = "asonix/triplestore", primary_key = u128, views = [IriUniqueness], serialization = Key)]
pub(crate) struct Iri {
#[key]
pub(crate) iri: self::iri::OwnedIri,
pub(crate) iri: Revisioned<self::iri::OwnedIri>,
}
#[derive(Clone, Collection, Debug, Key)]
#[collection(name = "LiteralString", authority = "asonix/triplestore", primary_key = u128, views = [LiteralStringUniqueness], serialization = Key)]
pub(crate) struct LiteralString {
#[key]
pub(crate) string: String,
pub(crate) string: Revisioned<String>,
}
#[derive(Clone, Collection, Debug, Key)]
#[collection(name = "LiteralLanguageTag", authority = "asonix/triplestore", primary_key = u128, views = [LiteralLanguageTagUniqueness], serialization = Key)]
pub(crate) struct LiteralLanguageTag {
#[key]
pub(crate) langtag: self::langtag::OwnedLanguageTag,
pub(crate) langtag: Revisioned<self::langtag::OwnedLanguageTag>,
}
#[derive(Clone, Collection, Debug, Key)]
#[collection(name = "Disowned", authority = "asonix/triplestore", views = [], serialization = Key)]
pub(crate) struct Disowned {
#[key]
pub(crate) disowned: self::disowned::Disowned,
}
impl Disowned {
pub(crate) fn iri(iri: u128) -> Self {
Self {
disowned: disowned::Disowned::Iri(iri),
}
}
pub(crate) fn string(string: u128) -> Self {
Self {
disowned: disowned::Disowned::String(string),
}
}
pub(crate) fn langtag(langtag: u128) -> Self {
Self {
disowned: disowned::Disowned::Langtag(langtag),
}
}
}
// Views
@ -73,6 +107,18 @@ pub(crate) struct TriplesBySubjectPredicateAndObject;
#[view(collection = Triple, key = u128)]
pub(crate) struct TriplesBySubjectIri;
#[derive(Clone, Debug, View, ViewSchema)]
#[view(collection = Triple, key = u128)]
pub(crate) struct TriplesByIri;
#[derive(Clone, Debug, View, ViewSchema)]
#[view(collection = Triple, key = u128)]
pub(crate) struct TriplesByString;
#[derive(Clone, Debug, View, ViewSchema)]
#[view(collection = Triple, key = u128)]
pub(crate) struct TriplesByLangtag;
// Iri views
#[derive(Clone, Debug, View, ViewSchema)]
@ -156,6 +202,64 @@ impl CollectionMapReduce for TriplesBySubjectIri {
}
}
impl CollectionMapReduce for TriplesByIri {
fn map<'doc>(
&self,
document: bonsaidb::core::document::CollectionDocument<<Self::View as View>::Collection>,
) -> bonsaidb::core::schema::ViewMapResult<'doc, Self>
where
bonsaidb::core::document::CollectionDocument<<Self::View as View>::Collection>: 'doc,
{
let mappings = match document.contents.subject {
Subject::Iri(iri) | Subject::Blank { iri, .. } => document.header.emit_key(iri)?,
};
let mappings = mappings.and(document.header.emit_key(document.contents.predicate)?);
match document.contents.object {
Object::Iri(iri) | Object::Blank { iri, .. } | Object::TypedString { iri, .. } => {
Ok(mappings.and(document.header.emit_key(iri)?))
}
Object::String(_) | Object::LangString { .. } => Ok(mappings),
}
}
}
impl CollectionMapReduce for TriplesByString {
fn map<'doc>(
&self,
document: bonsaidb::core::document::CollectionDocument<<Self::View as View>::Collection>,
) -> bonsaidb::core::schema::ViewMapResult<'doc, Self>
where
bonsaidb::core::document::CollectionDocument<<Self::View as View>::Collection>: 'doc,
{
match document.contents.object {
Object::TypedString { string, .. }
| Object::String(string)
| Object::LangString { string, .. } => document.header.emit_key(string),
Object::Blank { .. } | Object::Iri(_) => Ok(Mappings::none()),
}
}
}
impl CollectionMapReduce for TriplesByLangtag {
fn map<'doc>(
&self,
document: bonsaidb::core::document::CollectionDocument<<Self::View as View>::Collection>,
) -> bonsaidb::core::schema::ViewMapResult<'doc, Self>
where
bonsaidb::core::document::CollectionDocument<<Self::View as View>::Collection>: 'doc,
{
match document.contents.object {
Object::LangString { langtag, .. } => document.header.emit_key(langtag),
Object::String(_)
| Object::TypedString { .. }
| Object::Blank { .. }
| Object::Iri(_) => Ok(Mappings::none()),
}
}
}
// Iri MapReduces
impl MapReduce for IriUniqueness {
@ -163,7 +267,8 @@ impl MapReduce for IriUniqueness {
&self,
document: &'doc BorrowedDocument<'_>,
) -> bonsaidb::core::schema::ViewMapResult<'doc, Self> {
let bytes = bonsaidb::core::key::ByteSource::Borrowed(&document.contents);
let bytes =
bonsaidb::core::key::ByteSource::Borrowed(revisioned::key_slice(&document.contents));
// This works since Iri is Key and transparent over self::iri::OwnedIri, which is
// transparent over self::iri::Iri
@ -180,7 +285,8 @@ impl MapReduce for LiteralStringUniqueness {
&self,
document: &'doc BorrowedDocument<'doc>,
) -> bonsaidb::core::schema::ViewMapResult<'doc, Self> {
let str = std::str::from_utf8(&document.contents).expect("IRI is valid format");
let str = std::str::from_utf8(revisioned::key_slice(&document.contents))
.expect("IRI is valid format");
document
.header
@ -195,7 +301,8 @@ impl MapReduce for LiteralLanguageTagUniqueness {
&self,
document: &'doc BorrowedDocument<'_>,
) -> bonsaidb::core::schema::ViewMapResult<'doc, Self> {
let bytes = bonsaidb::core::key::ByteSource::Borrowed(&document.contents);
let bytes =
bonsaidb::core::key::ByteSource::Borrowed(revisioned::key_slice(&document.contents));
// This works since LangaugeTag is Key and transparent over self::langtag::OwnedLanguageTag, which is
// transparent over self::langtag::LanguageTag

137
src/schema/disowned.rs Normal file
View file

@ -0,0 +1,137 @@
use bonsaidb::core::key::{Key, KeyEncoding};
#[derive(
Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Deserialize, serde::Serialize,
)]
pub(crate) enum Disowned {
Iri(u128),
String(u128),
Langtag(u128),
}
#[derive(Debug, PartialEq, Eq)]
pub(crate) enum DisownedError {
IncorrectByteLength(usize),
InvalidType,
}
impl Disowned {
const IRI: u8 = 0;
const STRING: u8 = 1;
const LANGTAG: u8 = 2;
const LEN: usize = 17; // std::mem::size_of::<u128>() + 1;
fn to_bytes(&self) -> Vec<u8> {
let mut bytes = Vec::with_capacity(Self::LEN);
match self {
Self::Iri(key) | Self::String(key) | Self::Langtag(key) => {
bytes.extend_from_slice(&key.to_be_bytes()[..])
}
}
bytes.push(self.type_id());
bytes
}
fn from_slice(bytes: &[u8]) -> Result<Self, DisownedError> {
match bytes.len() {
Self::LEN => {
let key = u128::from_be_bytes(
bytes[0..16].try_into().expect("Length is already checked"),
);
match bytes[Self::LEN - 1] {
Self::IRI => Ok(Self::Iri(key)),
Self::STRING => Ok(Self::String(key)),
Self::LANGTAG => Ok(Self::Langtag(key)),
_ => Err(DisownedError::InvalidType),
}
}
other => Err(DisownedError::IncorrectByteLength(other)),
}
}
const fn type_id(&self) -> u8 {
match self {
Self::Iri(_) => Self::IRI,
Self::String(_) => Self::STRING,
Self::Langtag(_) => Self::LANGTAG,
}
}
}
impl KeyEncoding for Disowned {
type Error = DisownedError;
const LENGTH: Option<usize> = Some(Self::LEN);
fn as_ord_bytes(&self) -> Result<std::borrow::Cow<'_, [u8]>, Self::Error> {
Ok(std::borrow::Cow::Owned(self.to_bytes()))
}
fn describe<Visitor>(visitor: &mut Visitor)
where
Visitor: bonsaidb::core::key::KeyVisitor,
{
visitor.visit_type(bonsaidb::core::key::KeyKind::Bytes)
}
}
impl<'k> Key<'k> for Disowned {
const CAN_OWN_BYTES: bool = false;
fn from_ord_bytes<'e>(
bytes: bonsaidb::core::key::ByteSource<'k, 'e>,
) -> Result<Self, Self::Error> {
Self::from_slice(bytes.as_ref())
}
}
impl std::fmt::Display for DisownedError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::IncorrectByteLength(size) => write!(f, "Incorrect byte length: {size}"),
Self::InvalidType => write!(f, "Invalid type byte"),
}
}
}
impl std::error::Error for DisownedError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Self::IncorrectByteLength(_) => None,
Self::InvalidType => None,
}
}
}
#[cfg(test)]
mod tests {
use super::Disowned;
#[test]
fn round_trip() {
let inputs = [
Disowned::Iri(u128::MAX),
Disowned::Iri(0),
Disowned::Iri(99),
Disowned::String(u128::MAX),
Disowned::String(0),
Disowned::String(99),
Disowned::Langtag(u128::MAX),
Disowned::Langtag(0),
Disowned::Langtag(99),
];
for input in inputs {
let bytes = input.to_bytes();
let object = Disowned::from_slice(&bytes).expect("Deserialize successfully");
assert_eq!(input, object);
}
}
}

View file

@ -3,7 +3,6 @@ use bonsaidb::core::key::{Key, KeyEncoding};
#[derive(
Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Deserialize, serde::Serialize,
)]
#[repr(u8)]
pub(crate) enum Object {
Iri(u128),
Blank { iri: u128, discriminant: u64 },

111
src/schema/revisioned.rs Normal file
View file

@ -0,0 +1,111 @@
use bonsaidb::core::key::{Key, KeyEncoding};
pub(super) fn key_slice(revisioned_slice: &[u8]) -> &[u8] {
&revisioned_slice[16..]
}
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub(crate) struct Revisioned<T> {
pub(crate) revision: u128,
pub(crate) key: T,
}
#[derive(Debug)]
pub(crate) enum RevisionedError<E> {
DecodeKeyError(E),
EncodeKeyError(E),
InvalidBytesLength(usize),
}
impl<T, U> KeyEncoding<Revisioned<U>> for Revisioned<T>
where
T: KeyEncoding<U>,
{
type Error = RevisionedError<T::Error>;
const LENGTH: Option<usize> = {
if let Some(len) = T::LENGTH {
Some(len + 16)
} else {
None
}
};
fn as_ord_bytes(&self) -> Result<std::borrow::Cow<'_, [u8]>, Self::Error> {
let bytes = self
.key
.as_ord_bytes()
.map_err(RevisionedError::EncodeKeyError)?;
let mut output = Vec::with_capacity(bytes.len() + 16);
output.extend_from_slice(&self.revision.to_be_bytes()[..]);
output.extend_from_slice(&bytes);
Ok(std::borrow::Cow::Owned(output))
}
fn describe<Visitor>(visitor: &mut Visitor)
where
Visitor: bonsaidb::core::key::KeyVisitor,
{
visitor.visit_type(bonsaidb::core::key::KeyKind::Bytes)
}
}
impl<'k, T> Key<'k> for Revisioned<T>
where
T: Key<'k>,
{
const CAN_OWN_BYTES: bool = false;
fn from_ord_bytes<'e>(
bytes: bonsaidb::core::key::ByteSource<'k, 'e>,
) -> Result<Self, Self::Error> {
if bytes.len() < 16 {
return Err(RevisionedError::InvalidBytesLength(bytes.len()));
}
let revision =
u128::from_be_bytes(bytes[0..16].try_into().expect("Length already checked"));
let key = match bytes {
bonsaidb::core::key::ByteSource::Owned(owned) => {
T::from_ord_bytes(bonsaidb::core::key::ByteSource::Ephemeral(&owned[16..]))
.map_err(RevisionedError::DecodeKeyError)?
}
bonsaidb::core::key::ByteSource::Ephemeral(ephemeral) => {
T::from_ord_bytes(bonsaidb::core::key::ByteSource::Ephemeral(&ephemeral[16..]))
.map_err(RevisionedError::DecodeKeyError)?
}
bonsaidb::core::key::ByteSource::Borrowed(borrowed) => {
T::from_ord_bytes(bonsaidb::core::key::ByteSource::Ephemeral(&borrowed[16..]))
.map_err(RevisionedError::DecodeKeyError)?
}
};
Ok(Revisioned { revision, key })
}
}
impl<T> std::fmt::Display for RevisionedError<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::DecodeKeyError(_) => write!(f, "Failed to decode key type"),
Self::EncodeKeyError(_) => write!(f, "Failed to encode key type"),
Self::InvalidBytesLength(size) => write!(f, "Invalid length for key bytes: {size}"),
}
}
}
impl<T> std::error::Error for RevisionedError<T>
where
T: std::error::Error + 'static,
{
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Self::DecodeKeyError(e) => Some(e),
Self::EncodeKeyError(e) => Some(e),
Self::InvalidBytesLength(_) => None,
}
}
}