2 Commits

Author SHA1 Message Date
Skipper
f074a4f00b refactor(server::db): introduced newtype wrappers for entity id's in database
Some checks failed
ci/woodpecker/pr/server-lint Pipeline failed
ci/woodpecker/pr/server-audit Pipeline failed
ci/woodpecker/pr/server-vet Pipeline failed
ci/woodpecker/pr/server-test Pipeline was successful
2026-05-01 10:20:06 +02:00
Skipper
5f239c426d feat(server): introducle table separation in preparation of shamir secret sharing vault 2026-04-19 14:04:53 +02:00
21 changed files with 559 additions and 784 deletions

View File

@@ -5,8 +5,7 @@ package arbiter.shared;
enum VaultState {
VAULT_STATE_UNSPECIFIED = 0;
VAULT_STATE_UNBOOTSTRAPPED = 1;
VAULT_STATE_BOOSTRAPPING = 2;
VAULT_STATE_SEALED = 3;
VAULT_STATE_UNSEALED = 4;
VAULT_STATE_ERROR = 5;
VAULT_STATE_SEALED = 2;
VAULT_STATE_UNSEALED = 3;
VAULT_STATE_ERROR = 4;
}

656
server/Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -6,7 +6,7 @@ resolver = "3"
[workspace.dependencies]
alloy = "2.0.4"
alloy = "2.0.0"
async-trait = "0.1.89"
base64 = "0.22.1"
chrono = { version = "0.4.44", features = ["serde"] }
@@ -16,15 +16,15 @@ kameo = {git = "https://github.com/hdbg/kameo.git", rev = "805b417"}
kameo_actors = {git = "https://github.com/hdbg/kameo.git", rev = "805b417"}
hmac = "0.13.0"
miette = { version = "7.6.0", features = ["fancy", "serde"] }
ml-dsa = { version = "0.1.0-rc.9", features = ["zeroize"] }
ml-dsa = { version = "0.1.0-rc.8", features = ["zeroize"] }
mutants = "0.0.4"
prost = "0.14.3"
prost-types = { version = "0.14.3", features = ["chrono"] }
rand = "0.10.1"
rcgen = { version = "0.14.7", features = [ "aws_lc_rs", "pem", "x509-parser", "zeroize" ], default-features = false }
rstest = "0.26.1"
rustls = { version = "0.23.40", features = ["aws-lc-rs", "logging", "prefer-post-quantum", "std"], default-features = false }
rustls-pki-types = "1.14.1"
rustls = { version = "0.23.38", features = ["aws-lc-rs", "logging", "prefer-post-quantum", "std"], default-features = false }
rustls-pki-types = "1.14.0"
sha2 = "0.11"
smlang = "0.8.0"
thiserror = "2.0.18"

View File

@@ -21,7 +21,7 @@ tokio.workspace = true
tokio-stream.workspace = true
thiserror.workspace = true
http = "1.4.0"
rustls-webpki = { version = "0.103.13", features = ["aws-lc-rs"] }
rustls-webpki = { version = "0.103.12", features = ["aws-lc-rs"] }
async-trait.workspace = true
chrono.workspace = true

View File

@@ -22,7 +22,7 @@ pub trait SafeCellHandle<T> {
fn read(&mut self) -> Self::CellRead<'_>;
fn write(&mut self) -> Self::CellWrite<'_>;
fn new_inline_default<F>(f: F) -> Self
fn new_inline<F>(f: F) -> Self
where
Self: Sized,
T: Default,
@@ -36,14 +36,6 @@ pub trait SafeCellHandle<T> {
cell
}
fn new_inline<F>(f: Box<F>) -> Self
where
Self: Sized,
F: for<'a> FnOnce() -> T,
{
Self::new(f())
}
#[inline(always)]
fn read_inline<F, R>(&mut self, f: F) -> R
where

View File

@@ -9,8 +9,8 @@ license = "Apache-2.0"
workspace = true
[dependencies]
diesel = { version = "2.3.9", features = ["chrono", "returning_clauses_for_sqlite_3_35", "serde_json", "time", "uuid"] }
diesel-async = { version = "0.9.0", features = [
diesel = { version = "2.3.7", features = ["chrono", "returning_clauses_for_sqlite_3_35", "serde_json", "time", "uuid"] }
diesel-async = { version = "0.8.0", features = [
"bb8",
"migrations",
"sqlite",
@@ -27,7 +27,7 @@ tokio.workspace = true
rustls.workspace = true
smlang.workspace = true
thiserror.workspace = true
diesel_migrations = { version = "2.3.2", features = ["sqlite"] }
diesel_migrations = { version = "2.3.1", features = ["sqlite"] }
async-trait.workspace = true
tokio-stream.workspace = true
rand.workspace = true
@@ -50,7 +50,7 @@ subtle = "2.6.1"
x25519-dalek.workspace = true
k256.workspace = true
kameo_actors.workspace = true
vsss-rs = "5.4.0"
blahaj = "0.6.0"
[dev-dependencies]
proptest = "1.11.0"

View File

@@ -49,7 +49,7 @@ create table if not exists operator_identity (
created_at integer not null default(unixepoch ('now')),
updated_at integer not null default(unixepoch ('now'))
) STRICT;
create unique index if not exists uniq_operator_identity_public_key on operator_identity (public_key);
create unique index if not exists uniq_operator_client_public_key on operator_identity (public_key);
create table if not exists operator (
id integer primary key references operator_identity(id) on delete restrict, -- same id as operator_identity

View File

@@ -48,7 +48,7 @@ impl Bootstrapper {
let row_count: i64 = {
let mut conn = db.get().await?;
schema::operator::table
schema::operator_identity::table
.count()
.get_result(&mut conn)
.await?

View File

@@ -1,5 +1,3 @@
use std::collections::HashMap;
use crate::{
crypto::{
KeyCell, derive_key,
@@ -8,7 +6,7 @@ use crate::{
},
db::{
self,
models::{self, OperatorId, OperatorIdentityId, RootKeyHistory, RootKeyHistoryId},
models::{self, RootKeyHistory, RootKeyHistoryId},
schema::{self},
},
};
@@ -17,11 +15,10 @@ use arbiter_crypto::safecell::{SafeCell, SafeCellHandle as _};
use chrono::Utc;
use diesel::{
ExpressionMethods as _, OptionalExtension, QueryDsl, SelectableHelper,
dsl::{count, insert_into, update},
select,
dsl::{insert_into, update},
};
use diesel_async::{AsyncConnection, RunQueryDsl};
use hmac::{KeyInit as _, Mac as _, digest::common};
use hmac::{KeyInit as _, Mac as _};
use kameo::{Actor, Reply, actor::ActorRef, messages};
use kameo_actors::message_bus::{MessageBus, Publish};
use strum::{EnumDiscriminants, IntoDiscriminant};
@@ -65,15 +62,6 @@ pub enum Error {
BrokenDatabase,
}
#[derive(Debug, thiserror::Error)]
pub enum UnsealError {}
#[derive(Debug, thiserror::Error)]
pub enum BootstrapError {
#[error("That operator already contributed his share")]
AlreadyContributed,
}
struct Unsealed {
root_key_history_id: RootKeyHistoryId,
root_key: KeyCell,
@@ -85,15 +73,8 @@ enum State {
#[default]
Unbootstrapped,
Bootstrapping {
declared_operators: u64,
current_passphrases: HashMap<OperatorIdentityId, SafeCell<Vec<u8>>>,
},
Sealed {
threshold: u64, // basically, quorum size
root_key_history_id: RootKeyHistoryId,
current_shares: HashMap<OperatorId, SafeCell<Vec<u8>>>,
},
Unsealed(Unsealed),
}
@@ -109,6 +90,7 @@ pub struct Vault {
events: ActorRef<MessageBus>,
}
#[messages]
impl Vault {
pub async fn new(db: db::DatabasePool, events: ActorRef<MessageBus>) -> Result<Self, Error> {
let state = {
@@ -121,17 +103,9 @@ impl Vault {
.await?;
match root_key_history {
Some(root_key_history) => {
let operator_count: i64 = schema::operator::table
.count()
.get_result(&mut conn)
.await?;
State::Sealed {
Some(root_key_history) => State::Sealed {
root_key_history_id: root_key_history.id,
current_shares: HashMap::default(),
threshold: shamir_threshold(operator_count.cast_unsigned()), // invariant: db couldn't return negative number of rows
}
}
},
None => State::Unbootstrapped,
}
};
@@ -148,11 +122,12 @@ impl Vault {
let mut conn = pool.get().await?;
let nonce = conn
.exclusive_transaction(async |conn| {
.exclusive_transaction(|conn| {
Box::pin(async move {
let current_nonce: Vec<u8> = schema::root_key_history::table
.filter(schema::root_key_history::id.eq(root_key_id))
.select(schema::root_key_history::data_encryption_nonce)
.first(&mut *conn)
.first(conn)
.await?;
let mut nonce = Nonce::try_from(current_nonce.as_slice()).map_err(|()| {
@@ -167,11 +142,12 @@ impl Vault {
update(schema::root_key_history::table)
.filter(schema::root_key_history::id.eq(root_key_id))
.set(schema::root_key_history::data_encryption_nonce.eq(nonce.to_vec()))
.execute(&mut *conn)
.execute(conn)
.await?;
Result::<_, Error>::Ok(nonce)
})
})
.await?;
Ok(nonce)
@@ -180,28 +156,19 @@ impl Vault {
const fn expect_unsealed(state: &mut State) -> Result<&mut Unsealed, Error> {
match state {
State::Unsealed(unsealed) => Ok(unsealed),
State::Bootstrapping { .. } => Err(Error::NotBootstrapped),
State::Unbootstrapped => Err(Error::NotBootstrapped),
State::Sealed { .. } => Err(Error::Sealed),
}
}
pub async fn finalize_bootstrap(&mut self) -> Result<(), Error> {
let State::Bootstrapping {
declared_operators,
current_passphrases,
} = &mut self.state
else {
#[message]
pub async fn bootstrap(&mut self, seal_key_raw: SafeCell<Vec<u8>>) -> Result<(), Error> {
if !matches!(self.state, State::Unbootstrapped) {
return Err(Error::AlreadyBootstrapped);
};
}
let salt = v1::generate_salt();
let mut seal_key = derive_key(seal_key_raw, &salt);
let mut root_key = KeyCell::new_secure_random();
let root_key_salt = v1::generate_salt();
let mut seal_key = KeyCell::new_secure_random();
let shares = seal_key.0.read_inline(|seal_key| {
generate_shamir_shares(current_passphrases.len() as u64, seal_key.as_slice())
});
// Zero nonces are fine because they are one-time
let root_key_nonce = Nonce::default();
@@ -217,42 +184,33 @@ impl Vault {
})
})?;
let data_encryption_nonce_bytes = data_encryption_nonce.to_vec();
let mut conn = self.db.get().await?;
let data_encryption_nonce_bytes = data_encryption_nonce.to_vec();
let root_key_history_id = conn
.transaction(async |conn| {
for ((operator_id, raw_passphrase), raw_share) in
current_passphrases.iter_mut().zip(shares.iter())
{
let salt = v1::generate_salt();
let mut share_seal_key = derive_key(&mut raw_passphrase, &salt);
let share_encryption_nonce = Nonce::default();
let share_key = derive_key(&mut raw_passphrase, &salt);
}
let root_key_history_id = insert_into(schema::root_key_history::table)
.transaction(|conn| {
Box::pin(async move {
let root_key_history_id: RootKeyHistoryId =
insert_into(schema::root_key_history::table)
.values(&models::NewRootKeyHistory {
ciphertext: root_key_ciphertext.clone(),
ciphertext: root_key_ciphertext,
tag: v1::ROOT_KEY_TAG.to_vec(),
root_key_encryption_nonce: root_key_nonce.to_vec(),
data_encryption_nonce: data_encryption_nonce_bytes.clone(),
data_encryption_nonce: data_encryption_nonce_bytes,
schema_version: 1,
salt: root_key_salt.to_vec(),
salt: salt.to_vec(),
})
.returning(schema::root_key_history::id)
.get_result(&mut *conn)
.get_result(conn)
.await?;
update(schema::arbiter_settings::table)
.set(schema::arbiter_settings::root_key_id.eq(root_key_history_id))
.execute(&mut *conn)
.execute(conn)
.await?;
Result::<_, diesel::result::Error>::Ok(RootKeyHistoryId::from_raw(
root_key_history_id,
))
Result::<_, diesel::result::Error>::Ok(root_key_history_id)
})
})
.await?;
@@ -266,59 +224,11 @@ impl Vault {
Ok(())
}
}
// Seal / unseal / bootstrap stuff. Will be separated into another actor, eventually
#[messages]
impl Vault {
#[message]
pub async fn start_bootstrap(&mut self, declared_operators: u64) -> Result<(), Error> {
if !matches!(&self.state, State::Unbootstrapped) {
return Err(Error::AlreadyBootstrapped);
}
self.state = State::Bootstrapping {
declared_operators,
current_passphrases: HashMap::default(),
};
Ok(())
}
#[message]
pub async fn contribute_bootstrap(
&mut self,
operator: OperatorIdentityId,
key_raw: SafeCell<Vec<u8>>,
) -> Result<(), Error> {
let State::Bootstrapping {
current_passphrases,
declared_operators,
} = &mut self.state
else {
return Err(Error::AlreadyBootstrapped);
};
if current_passphrases.contains_key(&operator) {
return Err(Error::AlreadyBootstrapped);
}
current_passphrases.insert(operator, key_raw);
if current_passphrases.len() == declared_operators {
return self.finalize_bootstrap(seal_key_raw);
}
Ok(())
}
#[message]
pub async fn contribute_unseal(
&mut self,
operator: OperatorId,
key_raw: SafeCell<Vec<u8>>,
) -> Result<(), Error> {
pub async fn try_unseal(&mut self, seal_key_raw: SafeCell<Vec<u8>>) -> Result<(), Error> {
let State::Sealed {
root_key_history_id,
current_shares,
} = &self.state
else {
return Err(Error::NotBootstrapped);
@@ -339,7 +249,7 @@ impl Vault {
error!("Broken database: invalid salt for root key");
Error::BrokenDatabase
})?;
let mut seal_key = derive_key(key_raw, &salt);
let mut seal_key = derive_key(seal_key_raw, &salt);
let mut root_key = SafeCell::new(current_key.ciphertext.clone());
@@ -370,25 +280,6 @@ impl Vault {
Ok(())
}
#[message]
pub async fn seal(&mut self) -> Result<(), Error> {
let Unsealed {
root_key_history_id,
..
} = Self::expect_unsealed(&mut self.state)?;
self.state = State::Sealed {
root_key_history_id: *root_key_history_id,
current_shares: HashMap::new(),
};
let _ = self.events.tell(Publish(events::VaultResealed)).await;
Ok(())
}
}
// Server-side cryptographic operations
#[messages]
impl Vault {
#[message]
pub async fn decrypt(&mut self, aead_id: i32) -> Result<SafeCell<Vec<u8>>, Error> {
let Unsealed { root_key, .. } = Self::expect_unsealed(&mut self.state)?;
@@ -506,47 +397,25 @@ impl Vault {
Ok(hmac.verify_slice(&expected_mac).is_ok())
}
}
/// According to the spec, the quorum is 50% + 1
/// with exception for 1 and 2 operators, those require exactly the number of operators registered
fn shamir_threshold(comittee_size: u64) -> u64 {
if comittee_size == 2 || comittee_size == 1 {
return comittee_size;
}
#[message]
pub async fn seal(&mut self) -> Result<(), Error> {
let Unsealed {
root_key_history_id,
..
} = Self::expect_unsealed(&mut self.state)?;
let half_comittee = match comittee_size % 2 != 0 {
true => (comittee_size - 1) / 2,
false => comittee_size / 2,
self.state = State::Sealed {
root_key_history_id: *root_key_history_id,
};
half_comittee + 1
let _ = self.events.tell(Publish(events::VaultResealed)).await;
Ok(())
}
/// Beware: this function accepts raw key references (without memory protection)
fn generate_shamir_shares(threshold: u64, key: &[u8]) -> Vec<SafeCell<Vec<u8>>> {
use vsss_rs::{shamir, *};
type P256Share = DefaultShare<IdentifierPrimeField<Scalar>, IdentifierPrimeField<Scalar>>;
let mut osrng = rand_core::OsRng::default();
let sk = SecretKey::random(&mut osrng);
let nzs = sk.to_nonzero_scalar();
let shared_secret = IdentifierPrimeField(*nzs.as_ref());
let res = shamir::split_secret::<P256Share>(2, 3, &shared_secret, &mut osrng);
assert!(res.is_ok());
let shares = res.unwrap();
let res = shares.combine();
assert!(res.is_ok());
let scalar = res.unwrap();
let nzs_dup = NonZeroScalar::from_repr(scalar.0.to_repr()).unwrap();
let sk_dup = SecretKey::from(nzs_dup);
assert_eq!(sk_dup.to_bytes(), sk.to_bytes());
}
#[cfg(test)]
mod tests {
use crate::actors::GlobalActors;
use crate::{actors::GlobalActors, db::models::RootKeyHistory};
use arbiter_crypto::safecell::SafeCellHandle as _;
use super::*;
@@ -556,7 +425,7 @@ mod tests {
.await
.unwrap();
let seal_key = SafeCell::new(b"test-seal-key".to_vec());
actor.finalize_bootstrap(seal_key).await.unwrap();
actor.bootstrap(seal_key).await.unwrap();
actor
}

View File

@@ -174,7 +174,8 @@ impl TlsManager {
{
let mut conn = db.get().await?;
conn.transaction(async |conn| {
conn.transaction(|conn| {
Box::pin(async {
let new_tls_history = NewTlsHistory {
cert: new_cert.cert.pem(),
cert_key: new_cert.cert_key.serialize_pem(),
@@ -185,16 +186,17 @@ impl TlsManager {
let inserted_tls_history: i32 = diesel::insert_into(tls_history::table)
.values(&new_tls_history)
.returning(tls_history::id)
.get_result(&mut *conn)
.get_result(conn)
.await?;
diesel::update(arbiter_settings::table)
.set(arbiter_settings::tls_id.eq(inserted_tls_history))
.execute(&mut *conn)
.execute(conn)
.await?;
Result::<_, diesel::result::Error>::Ok(())
})
})
.await?;
}

View File

@@ -28,7 +28,7 @@ impl TryFrom<SafeCell<Vec<u8>>> for KeyCell {
if value.len() != size_of::<Key>() {
return Err(());
}
let cell = SafeCell::new_inline_default(|cell_write: &mut Key| {
let cell = SafeCell::new_inline(|cell_write: &mut Key| {
cell_write.copy_from_slice(&value);
});
Ok(Self(cell))
@@ -37,7 +37,7 @@ impl TryFrom<SafeCell<Vec<u8>>> for KeyCell {
impl KeyCell {
pub fn new_secure_random() -> Self {
let key = SafeCell::new_inline_default(|key_buffer: &mut Key| {
let key = SafeCell::new_inline(|key_buffer: &mut Key| {
let mut rng = StdRng::try_from_rng(&mut SysRng)
.expect("Rng failure is unrecoverable and should panic");
rng.fill_bytes(key_buffer);
@@ -94,7 +94,7 @@ impl KeyCell {
}
/// Derive a fixed-length key from the password using Argon2id, which is designed for password hashing and key derivation.
pub fn derive_key(password: &mut SafeCell<Vec<u8>>, salt: &Salt) -> KeyCell {
pub fn derive_key(mut password: SafeCell<Vec<u8>>, salt: &Salt) -> KeyCell {
let params = {
#[cfg(debug_assertions)]
{

View File

@@ -271,8 +271,8 @@ pub struct ProgramClient {
}
#[derive(Queryable, Debug)]
#[diesel(table_name = schema::operator_client, check_for_backend(Sqlite))]
pub struct OperatorClient {
#[diesel(table_name = schema::operator_identity, check_for_backend(Sqlite))]
pub struct OperatorIdentity {
pub id: OperatorIdentityId,
pub public_key: Vec<u8>,
pub created_at: SqliteTimestamp,

View File

@@ -179,7 +179,8 @@ impl Engine {
}
if run_kind == RunKind::Execution {
conn.transaction(async |conn| {
conn.transaction(|conn| {
Box::pin(async move {
let log_id: i32 = insert_into(evm_transaction_log::table)
.values(&NewEvmTransactionLog {
grant_id: grant.common_settings_id,
@@ -189,13 +190,14 @@ impl Engine {
signed_at: Utc::now().into(),
})
.returning(evm_transaction_log::id)
.get_result(&mut *conn)
.get_result(conn)
.await?;
P::record_transaction(&context, meaning, log_id, &grant, &mut *conn).await?;
P::record_transaction(&context, meaning, log_id, &grant, conn).await?;
QueryResult::Ok(())
})
})
.await
.map_err(DatabaseError::from)?;
}
@@ -220,7 +222,8 @@ impl Engine {
let vault = self.vault.clone();
let id = conn
.transaction(async |conn| {
.transaction(|conn| {
Box::pin(async move {
use schema::evm_basic_grant;
#[expect(
@@ -256,17 +259,18 @@ impl Engine {
revoked_at: None,
})
.returning(evm_basic_grant::all_columns)
.get_result(&mut *conn)
.get_result(conn)
.await?;
P::create_grant(&basic_grant, &full_grant.specific, &mut *conn).await?;
P::create_grant(&basic_grant, &full_grant.specific, conn).await?;
integrity::sign_entity(&mut *conn, &vault, &full_grant, basic_grant.id)
integrity::sign_entity(conn, &vault, &full_grant, basic_grant.id)
.await
.map_err(|_| diesel::result::Error::RollbackTransaction)?;
QueryResult::Ok(basic_grant.id)
})
})
.await?;
Ok(id)

View File

@@ -44,7 +44,7 @@ impl std::fmt::Debug for SafeSigner {
/// Returns the protected key bytes and the derived Ethereum address.
pub fn generate(rng: &mut impl rand::Rng) -> (SafeCell<[u8; 32]>, Address) {
loop {
let mut cell = SafeCell::new_inline_default(|w: &mut [u8; 32]| {
let mut cell = SafeCell::new_inline(|w: &mut [u8; 32]| {
rng.fill_bytes(w);
});

View File

@@ -31,7 +31,6 @@ pub(super) async fn dispatch(
VaultRequestPayload::QueryState(()) => {
let state = match actor.ask(HandleQueryVaultState {}).await {
Ok(VaultState::Unbootstrapped) => ProtoVaultState::Unbootstrapped,
Ok(VaultState::Bootstrapping) => ProtoVaultState::Boostrapping,
Ok(VaultState::Sealed) => ProtoVaultState::Sealed,
Ok(VaultState::Unsealed) => ProtoVaultState::Unsealed,
Err(SendError::HandlerError(Error::Internal)) => ProtoVaultState::Error,

View File

@@ -1,5 +1,5 @@
use crate::{
db::models::EvmWalletAccess,
db::models::{EvmWalletAccess, EvmWalletId},
evm::policies::{SharedGrantSettings, SpecificGrant, TransactionRateLimit, VolumeRateLimit},
grpc::Convert,
};

View File

@@ -1,5 +1,5 @@
use crate::{
db::models::NewEvmWalletAccess,
db::models::{ClientId, NewEvmWalletAccess},
grpc::Convert,
peers::operator::{
OperatorSession, OutOfBand,

View File

@@ -3,6 +3,7 @@ use crate::{
peers::operator::{OperatorSession, session::handlers::HandleQueryVaultState},
};
use arbiter_proto::{
proto::shared::VaultState as ProtoVaultState,
proto::operator::{
operator_response::Payload as OperatorResponsePayload,
vault::{
@@ -10,7 +11,6 @@ use arbiter_proto::{
response::Payload as VaultResponsePayload,
},
},
proto::shared::VaultState as ProtoVaultState,
};
use kameo::actor::ActorRef;
@@ -47,7 +47,6 @@ async fn handle_query_vault_state(
let state = match actor.ask(HandleQueryVaultState {}).await {
Ok(VaultState::Unbootstrapped) => ProtoVaultState::Unbootstrapped,
Ok(VaultState::Sealed) => ProtoVaultState::Sealed,
Ok(VaultState::Bootstrapping) => ProtoVaultState::Boostrapping,
Ok(VaultState::Unsealed) => ProtoVaultState::Unsealed,
Err(err) => {
warn!(error = ?err, "Failed to query vault state");

View File

@@ -4,6 +4,7 @@ use crate::{
peers::operator::vault_gate::{self as vault_gate},
};
use arbiter_proto::proto::{
shared::VaultState as ProtoVaultState,
operator::{
operator_response::Payload as OperatorResponsePayload,
vault::{
@@ -16,7 +17,6 @@ use arbiter_proto::proto::{
},
},
},
shared::VaultState as ProtoVaultState,
};
use tonic::Status;
@@ -46,7 +46,6 @@ impl Convert for VaultState {
fn convert(self) -> OperatorResponsePayload {
let proto_state = match self {
Self::Unbootstrapped => ProtoVaultState::Unbootstrapped,
Self::Bootstrapping => ProtoVaultState::Boostrapping,
Self::Sealed => ProtoVaultState::Sealed,
Self::Unsealed => ProtoVaultState::Unsealed,
};

View File

@@ -171,7 +171,10 @@ async fn insert_client(
Error::DatabasePoolUnavailable
})?;
conn.exclusive_transaction(async |conn| {
conn.exclusive_transaction(|conn| {
let vault = vault.clone();
let pubkey = pubkey.clone();
Box::pin(async move {
let metadata_id = insert_into(client_metadata::table)
.values((
client_metadata::name.eq(&metadata.name),
@@ -179,7 +182,7 @@ async fn insert_client(
client_metadata::version.eq(&metadata.version),
))
.returning(client_metadata::id)
.get_result::<i32>(&mut *conn)
.get_result::<i32>(conn)
.await?;
let client_id = insert_into(program_client::table)
@@ -189,12 +192,12 @@ async fn insert_client(
))
.on_conflict_do_nothing()
.returning(program_client::id)
.get_result::<i32>(&mut *conn)
.get_result::<i32>(conn)
.await?;
integrity::sign_entity(
&mut *conn,
vault,
conn,
&vault,
&ClientCredentials {
pubkey: pubkey.clone(),
},
@@ -208,6 +211,7 @@ async fn insert_client(
Ok(client_id)
})
})
.await
}
@@ -225,15 +229,18 @@ async fn sync_client_metadata(
Error::DatabasePoolUnavailable
})?;
conn.exclusive_transaction(async |conn| {
let (current_metadata_id, current): (i32, ProgramClientMetadata) = program_client::table
conn.exclusive_transaction(|conn| {
let metadata = metadata.clone();
Box::pin(async move {
let (current_metadata_id, current): (i32, ProgramClientMetadata) =
program_client::table
.find(client_id)
.inner_join(client_metadata::table)
.select((
program_client::metadata_id,
ProgramClientMetadata::as_select(),
))
.first(&mut *conn)
.first(conn)
.await?;
let unchanged = current.name == metadata.name
@@ -248,7 +255,7 @@ async fn sync_client_metadata(
client_metadata_history::metadata_id.eq(current_metadata_id),
client_metadata_history::client_id.eq(client_id),
))
.execute(&mut *conn)
.execute(conn)
.await?;
let metadata_id = insert_into(client_metadata::table)
@@ -258,7 +265,7 @@ async fn sync_client_metadata(
client_metadata::version.eq(&metadata.version),
))
.returning(client_metadata::id)
.get_result::<i32>(&mut *conn)
.get_result::<i32>(conn)
.await?;
update(program_client::table.find(client_id))
@@ -266,11 +273,12 @@ async fn sync_client_metadata(
program_client::metadata_id.eq(metadata_id),
program_client::updated_at.eq(now),
))
.execute(&mut *conn)
.execute(conn)
.await?;
Ok::<(), diesel::result::Error>(())
})
})
.await
.map_err(|e| {
error!(error = ?e, "Database error");

View File

@@ -175,19 +175,21 @@ impl OperatorSession {
entries: Vec<NewEvmWalletAccess>,
) -> Result<(), Error> {
let mut conn = self.props.db.get().await?;
conn.transaction(async |conn| {
conn.transaction(|conn| {
Box::pin(async move {
use crate::db::schema::evm_wallet_access;
for entry in entries {
diesel::insert_into(evm_wallet_access::table)
.values(&entry)
.on_conflict_do_nothing()
.execute(&mut *conn)
.execute(conn)
.await?;
}
Result::<_, Error>::Ok(())
})
})
.await?;
Ok(())
}
@@ -198,17 +200,19 @@ impl OperatorSession {
entries: Vec<i32>,
) -> Result<(), Error> {
let mut conn = self.props.db.get().await?;
conn.transaction(async |conn| {
conn.transaction(|conn| {
Box::pin(async move {
use crate::db::schema::evm_wallet_access;
for entry in entries {
diesel::delete(evm_wallet_access::table)
.filter(evm_wallet_access::wallet_id.eq(entry))
.execute(&mut *conn)
.execute(conn)
.await?;
}
Result::<_, Error>::Ok(())
})
})
.await?;
Ok(())
}