housekeeping(server): deps upgrade + diesel migration to AsyncFnOnce #92

Merged
Skipper merged 1 commits from push-olyuomynmmus into main 2026-05-01 11:25:42 +00:00
9 changed files with 394 additions and 486 deletions

432
server/Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -6,7 +6,7 @@ resolver = "3"
[workspace.dependencies] [workspace.dependencies]
alloy = "2.0.0" alloy = "2.0.4"
async-trait = "0.1.89" async-trait = "0.1.89"
base64 = "0.22.1" base64 = "0.22.1"
chrono = { version = "0.4.44", features = ["serde"] } chrono = { version = "0.4.44", features = ["serde"] }
@@ -16,15 +16,15 @@ kameo = {git = "https://github.com/hdbg/kameo.git", rev = "805b417"}
kameo_actors = {git = "https://github.com/hdbg/kameo.git", rev = "805b417"} kameo_actors = {git = "https://github.com/hdbg/kameo.git", rev = "805b417"}
hmac = "0.13.0" hmac = "0.13.0"
miette = { version = "7.6.0", features = ["fancy", "serde"] } miette = { version = "7.6.0", features = ["fancy", "serde"] }
ml-dsa = { version = "0.1.0-rc.8", features = ["zeroize"] } ml-dsa = { version = "0.1.0-rc.9", features = ["zeroize"] }
mutants = "0.0.4" mutants = "0.0.4"
prost = "0.14.3" prost = "0.14.3"
prost-types = { version = "0.14.3", features = ["chrono"] } prost-types = { version = "0.14.3", features = ["chrono"] }
rand = "0.10.1" rand = "0.10.1"
rcgen = { version = "0.14.7", features = [ "aws_lc_rs", "pem", "x509-parser", "zeroize" ], default-features = false } rcgen = { version = "0.14.7", features = [ "aws_lc_rs", "pem", "x509-parser", "zeroize" ], default-features = false }
rstest = "0.26.1" rstest = "0.26.1"
rustls = { version = "0.23.38", features = ["aws-lc-rs", "logging", "prefer-post-quantum", "std"], default-features = false } rustls = { version = "0.23.40", features = ["aws-lc-rs", "logging", "prefer-post-quantum", "std"], default-features = false }
rustls-pki-types = "1.14.0" rustls-pki-types = "1.14.1"
sha2 = "0.11" sha2 = "0.11"
smlang = "0.8.0" smlang = "0.8.0"
thiserror = "2.0.18" thiserror = "2.0.18"

View File

@@ -21,7 +21,7 @@ tokio.workspace = true
tokio-stream.workspace = true tokio-stream.workspace = true
thiserror.workspace = true thiserror.workspace = true
http = "1.4.0" http = "1.4.0"
rustls-webpki = { version = "0.103.12", features = ["aws-lc-rs"] } rustls-webpki = { version = "0.103.13", features = ["aws-lc-rs"] }
async-trait.workspace = true async-trait.workspace = true
chrono.workspace = true chrono.workspace = true

View File

@@ -9,8 +9,8 @@ license = "Apache-2.0"
workspace = true workspace = true
[dependencies] [dependencies]
diesel = { version = "2.3.7", features = ["chrono", "returning_clauses_for_sqlite_3_35", "serde_json", "time", "uuid"] } diesel = { version = "2.3.9", features = ["chrono", "returning_clauses_for_sqlite_3_35", "serde_json", "time", "uuid"] }
diesel-async = { version = "0.8.0", features = [ diesel-async = { version = "0.9.0", features = [
"bb8", "bb8",
"migrations", "migrations",
"sqlite", "sqlite",
@@ -27,7 +27,7 @@ tokio.workspace = true
rustls.workspace = true rustls.workspace = true
smlang.workspace = true smlang.workspace = true
thiserror.workspace = true thiserror.workspace = true
diesel_migrations = { version = "2.3.1", features = ["sqlite"] } diesel_migrations = { version = "2.3.2", features = ["sqlite"] }
async-trait.workspace = true async-trait.workspace = true
tokio-stream.workspace = true tokio-stream.workspace = true
rand.workspace = true rand.workspace = true

View File

@@ -119,31 +119,29 @@ impl Vault {
let mut conn = pool.get().await?; let mut conn = pool.get().await?;
let nonce = conn let nonce = conn
.exclusive_transaction(|conn| { .exclusive_transaction(async |conn| {
Box::pin(async move { let current_nonce: Vec<u8> = schema::root_key_history::table
let current_nonce: Vec<u8> = schema::root_key_history::table .filter(schema::root_key_history::id.eq(root_key_id))
.filter(schema::root_key_history::id.eq(root_key_id)) .select(schema::root_key_history::data_encryption_nonce)
.select(schema::root_key_history::data_encryption_nonce) .first(&mut *conn)
.first(conn) .await?;
.await?;
let mut nonce = Nonce::try_from(current_nonce.as_slice()).map_err(|()| { let mut nonce = Nonce::try_from(current_nonce.as_slice()).map_err(|()| {
error!( error!(
"Broken database: invalid nonce for root key history id={}", "Broken database: invalid nonce for root key history id={}",
root_key_id root_key_id
); );
Error::BrokenDatabase Error::BrokenDatabase
})?; })?;
nonce.increment(); nonce.increment();
update(schema::root_key_history::table) update(schema::root_key_history::table)
.filter(schema::root_key_history::id.eq(root_key_id)) .filter(schema::root_key_history::id.eq(root_key_id))
.set(schema::root_key_history::data_encryption_nonce.eq(nonce.to_vec())) .set(schema::root_key_history::data_encryption_nonce.eq(nonce.to_vec()))
.execute(conn) .execute(&mut *conn)
.await?; .await?;
Result::<_, Error>::Ok(nonce) Result::<_, Error>::Ok(nonce)
})
}) })
.await?; .await?;
@@ -185,28 +183,26 @@ impl Vault {
let data_encryption_nonce_bytes = data_encryption_nonce.to_vec(); let data_encryption_nonce_bytes = data_encryption_nonce.to_vec();
let root_key_history_id = conn let root_key_history_id = conn
.transaction(|conn| { .transaction(async |conn| {
Box::pin(async move { let root_key_history_id: i32 = insert_into(schema::root_key_history::table)
let root_key_history_id: i32 = insert_into(schema::root_key_history::table) .values(&models::NewRootKeyHistory {
.values(&models::NewRootKeyHistory { ciphertext: root_key_ciphertext.clone(),
ciphertext: root_key_ciphertext, tag: v1::ROOT_KEY_TAG.to_vec(),
tag: v1::ROOT_KEY_TAG.to_vec(), root_key_encryption_nonce: root_key_nonce.to_vec(),
root_key_encryption_nonce: root_key_nonce.to_vec(), data_encryption_nonce: data_encryption_nonce_bytes.clone(),
data_encryption_nonce: data_encryption_nonce_bytes, schema_version: 1,
schema_version: 1, salt: salt.to_vec(),
salt: salt.to_vec(), })
}) .returning(schema::root_key_history::id)
.returning(schema::root_key_history::id) .get_result(&mut *conn)
.get_result(conn) .await?;
.await?;
update(schema::arbiter_settings::table) update(schema::arbiter_settings::table)
.set(schema::arbiter_settings::root_key_id.eq(root_key_history_id)) .set(schema::arbiter_settings::root_key_id.eq(root_key_history_id))
.execute(conn) .execute(&mut *conn)
.await?; .await?;
Result::<_, diesel::result::Error>::Ok(root_key_history_id) Result::<_, diesel::result::Error>::Ok(root_key_history_id)
})
}) })
.await?; .await?;

View File

@@ -174,28 +174,26 @@ impl TlsManager {
{ {
let mut conn = db.get().await?; let mut conn = db.get().await?;
conn.transaction(|conn| { conn.transaction(async |conn| {
Box::pin(async { let new_tls_history = NewTlsHistory {
let new_tls_history = NewTlsHistory { cert: new_cert.cert.pem(),
cert: new_cert.cert.pem(), cert_key: new_cert.cert_key.serialize_pem(),
cert_key: new_cert.cert_key.serialize_pem(), ca_cert: encode_cert_to_pem(&ca.cert),
ca_cert: encode_cert_to_pem(&ca.cert), ca_key: ca.issuer.key().serialize_pem(),
ca_key: ca.issuer.key().serialize_pem(), };
};
let inserted_tls_history: i32 = diesel::insert_into(tls_history::table) let inserted_tls_history: i32 = diesel::insert_into(tls_history::table)
.values(&new_tls_history) .values(&new_tls_history)
.returning(tls_history::id) .returning(tls_history::id)
.get_result(conn) .get_result(&mut *conn)
.await?; .await?;
diesel::update(arbiter_settings::table) diesel::update(arbiter_settings::table)
.set(arbiter_settings::tls_id.eq(inserted_tls_history)) .set(arbiter_settings::tls_id.eq(inserted_tls_history))
.execute(conn) .execute(&mut *conn)
.await?; .await?;
Result::<_, diesel::result::Error>::Ok(()) Result::<_, diesel::result::Error>::Ok(())
})
}) })
.await?; .await?;
} }

View File

@@ -179,24 +179,22 @@ impl Engine {
} }
if run_kind == RunKind::Execution { if run_kind == RunKind::Execution {
conn.transaction(|conn| { conn.transaction(async |conn| {
Box::pin(async move { let log_id: i32 = insert_into(evm_transaction_log::table)
let log_id: i32 = insert_into(evm_transaction_log::table) .values(&NewEvmTransactionLog {
.values(&NewEvmTransactionLog { grant_id: grant.common_settings_id,
grant_id: grant.common_settings_id, wallet_access_id: context.target.id,
wallet_access_id: context.target.id, chain_id: context.chain.into(),
chain_id: context.chain.into(), eth_value: utils::u256_to_bytes(context.value).to_vec(),
eth_value: utils::u256_to_bytes(context.value).to_vec(), signed_at: Utc::now().into(),
signed_at: Utc::now().into(), })
}) .returning(evm_transaction_log::id)
.returning(evm_transaction_log::id) .get_result(&mut *conn)
.get_result(conn) .await?;
.await?;
P::record_transaction(&context, meaning, log_id, &grant, conn).await?; P::record_transaction(&context, meaning, log_id, &grant, &mut *conn).await?;
QueryResult::Ok(()) QueryResult::Ok(())
})
}) })
.await .await
.map_err(DatabaseError::from)?; .map_err(DatabaseError::from)?;
@@ -222,54 +220,52 @@ impl Engine {
let vault = self.vault.clone(); let vault = self.vault.clone();
let id = conn let id = conn
.transaction(|conn| { .transaction(async |conn| {
Box::pin(async move { use schema::evm_basic_grant;
use schema::evm_basic_grant;
#[expect( #[expect(
clippy::cast_possible_truncation, clippy::cast_possible_truncation,
clippy::cast_possible_wrap, clippy::cast_possible_wrap,
clippy::as_conversions, clippy::as_conversions,
reason = "fixme! #86" reason = "fixme! #86"
)] )]
let basic_grant: EvmBasicGrant = insert_into(evm_basic_grant::table) let basic_grant: EvmBasicGrant = insert_into(evm_basic_grant::table)
.values(&NewEvmBasicGrant { .values(&NewEvmBasicGrant {
chain_id: full_grant.shared.chain.into(), chain_id: full_grant.shared.chain.into(),
wallet_access_id: full_grant.shared.wallet_access_id, wallet_access_id: full_grant.shared.wallet_access_id,
valid_from: full_grant.shared.valid_from.map(SqliteTimestamp), valid_from: full_grant.shared.valid_from.map(SqliteTimestamp),
valid_until: full_grant.shared.valid_until.map(SqliteTimestamp), valid_until: full_grant.shared.valid_until.map(SqliteTimestamp),
max_gas_fee_per_gas: full_grant max_gas_fee_per_gas: full_grant
.shared .shared
.max_gas_fee_per_gas .max_gas_fee_per_gas
.map(|fee| utils::u256_to_bytes(fee).to_vec()), .map(|fee| utils::u256_to_bytes(fee).to_vec()),
max_priority_fee_per_gas: full_grant max_priority_fee_per_gas: full_grant
.shared .shared
.max_priority_fee_per_gas .max_priority_fee_per_gas
.map(|fee| utils::u256_to_bytes(fee).to_vec()), .map(|fee| utils::u256_to_bytes(fee).to_vec()),
rate_limit_count: full_grant rate_limit_count: full_grant
.shared .shared
.rate_limit .rate_limit
.as_ref() .as_ref()
.map(|rl| rl.count as i32), .map(|rl| rl.count as i32),
rate_limit_window_secs: full_grant rate_limit_window_secs: full_grant
.shared .shared
.rate_limit .rate_limit
.as_ref() .as_ref()
.map(|rl| rl.window.num_seconds() as i32), .map(|rl| rl.window.num_seconds() as i32),
revoked_at: None, revoked_at: None,
}) })
.returning(evm_basic_grant::all_columns) .returning(evm_basic_grant::all_columns)
.get_result(conn) .get_result(&mut *conn)
.await?; .await?;
P::create_grant(&basic_grant, &full_grant.specific, conn).await?; P::create_grant(&basic_grant, &full_grant.specific, &mut *conn).await?;
integrity::sign_entity(conn, &vault, &full_grant, basic_grant.id) integrity::sign_entity(&mut *conn, &vault, &full_grant, basic_grant.id)
.await .await
.map_err(|_| diesel::result::Error::RollbackTransaction)?; .map_err(|_| diesel::result::Error::RollbackTransaction)?;
QueryResult::Ok(basic_grant.id) QueryResult::Ok(basic_grant.id)
})
}) })
.await?; .await?;

View File

@@ -171,46 +171,42 @@ async fn insert_client(
Error::DatabasePoolUnavailable Error::DatabasePoolUnavailable
})?; })?;
conn.exclusive_transaction(|conn| { conn.exclusive_transaction(async |conn| {
let vault = vault.clone(); let metadata_id = insert_into(client_metadata::table)
let pubkey = pubkey.clone(); .values((
Box::pin(async move { client_metadata::name.eq(&metadata.name),
let metadata_id = insert_into(client_metadata::table) client_metadata::description.eq(&metadata.description),
.values(( client_metadata::version.eq(&metadata.version),
client_metadata::name.eq(&metadata.name), ))
client_metadata::description.eq(&metadata.description), .returning(client_metadata::id)
client_metadata::version.eq(&metadata.version), .get_result::<i32>(&mut *conn)
)) .await?;
.returning(client_metadata::id)
.get_result::<i32>(conn)
.await?;
let client_id = insert_into(program_client::table) let client_id = insert_into(program_client::table)
.values(( .values((
program_client::public_key.eq(pubkey.to_bytes()), program_client::public_key.eq(pubkey.to_bytes()),
program_client::metadata_id.eq(metadata_id), program_client::metadata_id.eq(metadata_id),
)) ))
.on_conflict_do_nothing() .on_conflict_do_nothing()
.returning(program_client::id) .returning(program_client::id)
.get_result::<i32>(conn) .get_result::<i32>(&mut *conn)
.await?; .await?;
integrity::sign_entity( integrity::sign_entity(
conn, &mut *conn,
&vault, vault,
&ClientCredentials { &ClientCredentials {
pubkey: pubkey.clone(), pubkey: pubkey.clone(),
}, },
client_id, client_id,
) )
.await .await
.map_err(|e| { .map_err(|e| {
error!(error = ?e, "Failed to sign integrity tag for new client key"); error!(error = ?e, "Failed to sign integrity tag for new client key");
Error::DatabaseOperationFailed Error::DatabaseOperationFailed
})?; })?;
Ok(client_id) Ok(client_id)
})
}) })
.await .await
} }
@@ -229,55 +225,51 @@ async fn sync_client_metadata(
Error::DatabasePoolUnavailable Error::DatabasePoolUnavailable
})?; })?;
conn.exclusive_transaction(|conn| { conn.exclusive_transaction(async |conn| {
let metadata = metadata.clone(); let (current_metadata_id, current): (i32, ProgramClientMetadata) = program_client::table
Box::pin(async move { .find(client_id)
let (current_metadata_id, current): (i32, ProgramClientMetadata) = .inner_join(client_metadata::table)
program_client::table .select((
.find(client_id) program_client::metadata_id,
.inner_join(client_metadata::table) ProgramClientMetadata::as_select(),
.select(( ))
program_client::metadata_id, .first(&mut *conn)
ProgramClientMetadata::as_select(), .await?;
))
.first(conn)
.await?;
let unchanged = current.name == metadata.name let unchanged = current.name == metadata.name
&& current.description == metadata.description && current.description == metadata.description
&& current.version == metadata.version; && current.version == metadata.version;
if unchanged { if unchanged {
return Ok(()); return Ok(());
} }
insert_into(client_metadata_history::table) insert_into(client_metadata_history::table)
.values(( .values((
client_metadata_history::metadata_id.eq(current_metadata_id), client_metadata_history::metadata_id.eq(current_metadata_id),
client_metadata_history::client_id.eq(client_id), client_metadata_history::client_id.eq(client_id),
)) ))
.execute(conn) .execute(&mut *conn)
.await?; .await?;
let metadata_id = insert_into(client_metadata::table) let metadata_id = insert_into(client_metadata::table)
.values(( .values((
client_metadata::name.eq(&metadata.name), client_metadata::name.eq(&metadata.name),
client_metadata::description.eq(&metadata.description), client_metadata::description.eq(&metadata.description),
client_metadata::version.eq(&metadata.version), client_metadata::version.eq(&metadata.version),
)) ))
.returning(client_metadata::id) .returning(client_metadata::id)
.get_result::<i32>(conn) .get_result::<i32>(&mut *conn)
.await?; .await?;
update(program_client::table.find(client_id)) update(program_client::table.find(client_id))
.set(( .set((
program_client::metadata_id.eq(metadata_id), program_client::metadata_id.eq(metadata_id),
program_client::updated_at.eq(now), program_client::updated_at.eq(now),
)) ))
.execute(conn) .execute(&mut *conn)
.await?; .await?;
Ok::<(), diesel::result::Error>(()) Ok::<(), diesel::result::Error>(())
})
}) })
.await .await
.map_err(|e| { .map_err(|e| {

View File

@@ -1,8 +1,8 @@
use super::{Error, OperatorSession}; use super::{Error, OperatorSession};
use crate::{ use crate::{
actors::evm::{ actors::evm::{
ClientSignTransaction, Generate, ListWallets, SignTransactionError as EvmSignError, ClientSignTransaction, Generate, ListWallets, OperatorCreateGrant, OperatorListGrants,
OperatorCreateGrant, OperatorListGrants, SignTransactionError as EvmSignError,
}, },
actors::flow_coordinator::client_connect_approval::ClientApprovalAnswer, actors::flow_coordinator::client_connect_approval::ClientApprovalAnswer,
actors::vault::VaultState, actors::vault::VaultState,
@@ -169,20 +169,18 @@ impl OperatorSession {
entries: Vec<NewEvmWalletAccess>, entries: Vec<NewEvmWalletAccess>,
) -> Result<(), Error> { ) -> Result<(), Error> {
let mut conn = self.props.db.get().await?; let mut conn = self.props.db.get().await?;
conn.transaction(|conn| { conn.transaction(async |conn| {
Box::pin(async move { use crate::db::schema::evm_wallet_access;
use crate::db::schema::evm_wallet_access;
for entry in entries { for entry in entries {
diesel::insert_into(evm_wallet_access::table) diesel::insert_into(evm_wallet_access::table)
.values(&entry) .values(&entry)
.on_conflict_do_nothing() .on_conflict_do_nothing()
.execute(conn) .execute(&mut *conn)
.await?; .await?;
} }
Result::<_, Error>::Ok(()) Result::<_, Error>::Ok(())
})
}) })
.await?; .await?;
Ok(()) Ok(())
@@ -194,18 +192,16 @@ impl OperatorSession {
entries: Vec<i32>, entries: Vec<i32>,
) -> Result<(), Error> { ) -> Result<(), Error> {
let mut conn = self.props.db.get().await?; let mut conn = self.props.db.get().await?;
conn.transaction(|conn| { conn.transaction(async |conn| {
Box::pin(async move { use crate::db::schema::evm_wallet_access;
use crate::db::schema::evm_wallet_access; for entry in entries {
for entry in entries { diesel::delete(evm_wallet_access::table)
diesel::delete(evm_wallet_access::table) .filter(evm_wallet_access::wallet_id.eq(entry))
.filter(evm_wallet_access::wallet_id.eq(entry)) .execute(&mut *conn)
.execute(conn) .await?;
.await?; }
}
Result::<_, Error>::Ok(()) Result::<_, Error>::Ok(())
})
}) })
.await?; .await?;
Ok(()) Ok(())