refactor(server::db): introduced newtype wrappers for entity id's in database

This commit is contained in:
Skipper
2026-04-27 13:07:12 +02:00
parent 3f801abdff
commit a773255935
18 changed files with 175 additions and 101 deletions

View File

@@ -43,13 +43,24 @@ create table if not exists arbiter_settings (
insert into arbiter_settings (id) values (1) on conflict do nothing;
-- ensure singleton row exists
create table if not exists operator_client (
create table if not exists operator_identity (
id integer not null primary key,
public_key blob not null,
created_at integer not null default(unixepoch ('now')),
updated_at integer not null default(unixepoch ('now'))
) STRICT;
create unique index if not exists uniq_operator_client_public_key on operator_client (public_key);
create unique index if not exists uniq_operator_identity_public_key on operator_identity (public_key);
create table if not exists operator (
id integer primary key references operator_identity(id) on delete restrict, -- same id as operator_identity
share blob not null,
share_nonce blob not null,
created_at integer not null default(unixepoch ('now')),
updated_at integer not null default(unixepoch ('now'))
) STRICT;
create table if not exists client_metadata (
id integer not null primary key,

View File

@@ -48,7 +48,7 @@ impl Bootstrapper {
let row_count: i64 = {
let mut conn = db.get().await?;
schema::operator_client::table
schema::operator::table
.count()
.get_result(&mut conn)
.await?

View File

@@ -3,7 +3,7 @@ use crate::{
crypto::integrity,
db::{
DatabaseError, DatabasePool,
models::{self},
models::{self, EvmWalletId},
schema,
},
evm::{
@@ -116,7 +116,7 @@ impl EvmActor {
}
#[message]
pub async fn list_wallets(&self) -> Result<Vec<(i32, Address)>, Error> {
pub async fn list_wallets(&self) -> Result<Vec<(EvmWalletId, Address)>, Error> {
let mut conn = self.db.get().await.map_err(DatabaseError::from)?;
let rows: Vec<models::EvmWallet> = schema::evm_wallet::table
.select(models::EvmWallet::as_select())

View File

@@ -6,7 +6,7 @@ use crate::{
},
db::{
self,
models::{self, RootKeyHistory},
models::{self, RootKeyHistory, RootKeyHistoryId},
schema::{self},
},
};
@@ -25,7 +25,6 @@ use strum::{EnumDiscriminants, IntoDiscriminant};
use tracing::{error, info};
pub mod events {
#[derive(Clone, Copy)]
pub struct Bootstrapped;
@@ -64,7 +63,7 @@ pub enum Error {
}
struct Unsealed {
root_key_history_id: i32,
root_key_history_id: RootKeyHistoryId,
root_key: KeyCell,
}
@@ -73,8 +72,9 @@ struct Unsealed {
enum State {
#[default]
Unbootstrapped,
Sealed {
root_key_history_id: i32,
root_key_history_id: RootKeyHistoryId,
},
Unsealed(Unsealed),
}
@@ -115,7 +115,10 @@ impl Vault {
// Exclusive transaction to avoid race condtions if multiple vaults write
// additional layer of protection against nonce-reuse
async fn get_new_nonce(pool: &db::DatabasePool, root_key_id: i32) -> Result<Nonce, Error> {
async fn get_new_nonce(
pool: &db::DatabasePool,
root_key_id: RootKeyHistoryId,
) -> Result<Nonce, Error> {
let mut conn = pool.get().await?;
let nonce = conn
@@ -128,7 +131,7 @@ impl Vault {
let mut nonce = Nonce::try_from(current_nonce.as_slice()).map_err(|()| {
error!(
"Broken database: invalid nonce for root key history id={}",
"Broken database: invalid nonce for root key history id={:#?}",
root_key_id
);
Error::BrokenDatabase
@@ -184,7 +187,7 @@ impl Vault {
let data_encryption_nonce_bytes = data_encryption_nonce.to_vec();
let root_key_history_id = conn
.transaction(async |conn| {
let root_key_history_id: i32 = insert_into(schema::root_key_history::table)
let root_key_history_id = insert_into(schema::root_key_history::table)
.values(&models::NewRootKeyHistory {
ciphertext: root_key_ciphertext.clone(),
tag: v1::ROOT_KEY_TAG.to_vec(),
@@ -202,7 +205,9 @@ impl Vault {
.execute(&mut *conn)
.await?;
Result::<_, diesel::result::Error>::Ok(root_key_history_id)
Result::<_, diesel::result::Error>::Ok(RootKeyHistoryId::from_raw(
root_key_history_id,
))
})
.await?;
@@ -340,7 +345,10 @@ impl Vault {
}
#[message]
pub fn sign_integrity(&mut self, mac_input: Vec<u8>) -> Result<(i32, Vec<u8>), Error> {
pub fn sign_integrity(
&mut self,
mac_input: Vec<u8>,
) -> Result<(RootKeyHistoryId, Vec<u8>), Error> {
let Unsealed {
root_key,
root_key_history_id,
@@ -352,7 +360,7 @@ impl Vault {
Ok(v) => v,
Err(_) => unreachable!("HMAC accepts keys of any size"),
});
hmac.update(&root_key_history_id.to_be_bytes());
hmac.update(&root_key_history_id.to_raw().to_be_bytes());
hmac.update(&mac_input);
let mac = hmac.finalize().into_bytes().to_vec();
@@ -364,7 +372,7 @@ impl Vault {
&mut self,
mac_input: Vec<u8>,
expected_mac: Vec<u8>,
key_version: i32,
key_version: RootKeyHistoryId,
) -> Result<bool, Error> {
let Unsealed {
root_key,
@@ -381,7 +389,7 @@ impl Vault {
Ok(v) => v,
Err(_) => unreachable!("HMAC accepts keys of any size"),
});
hmac.update(&key_version.to_be_bytes());
hmac.update(&key_version.to_raw().to_be_bytes());
hmac.update(&mac_input);
Ok(hmac.verify_slice(&expected_mac).is_ok())
@@ -405,6 +413,7 @@ impl Vault {
#[cfg(test)]
mod tests {
use crate::actors::GlobalActors;
use crate::db::models::RootKeyHistory;
use arbiter_crypto::safecell::SafeCellHandle as _;
use super::*;
@@ -440,8 +449,8 @@ mod tests {
assert!(n2.to_vec() > n1.to_vec(), "nonce must increase");
let mut conn = db.get().await.unwrap();
let root_row: models::RootKeyHistory = schema::root_key_history::table
.select(models::RootKeyHistory::as_select())
let root_row: RootKeyHistory = schema::root_key_history::table
.select(RootKeyHistory::as_select())
.first(&mut conn)
.await
.unwrap();

View File

@@ -79,10 +79,41 @@ pub mod types {
}
}
#[derive(Debug, FromSqlRow, AsExpression, Clone)]
#[diesel(sql_type = Integer)]
#[repr(transparent)] // hint compiler to optimize the wrapper struct away
pub struct ChainId(pub i32);
macro_rules! declare_id {
($name:ident) => {
#[derive(Debug, FromSqlRow, AsExpression, Clone, Hash, Copy, PartialEq, Eq)]
#[diesel(sql_type = Integer)]
#[repr(transparent)] // hint compiler to optimize the wrapper struct away
pub struct $name(i32);
impl $name {
pub const fn to_raw(self) -> i32 {
self.0
}
pub const fn from_raw(raw: i32) -> Self {
Self(raw)
}
}
impl FromSql<Integer, Sqlite> for $name {
fn from_sql(
bytes: <Sqlite as diesel::backend::Backend>::RawValue<'_>,
) -> diesel::deserialize::Result<Self> {
FromSql::<Integer, Sqlite>::from_sql(bytes).map(Self)
}
}
impl ToSql<Integer, Sqlite> for $name {
fn to_sql<'b>(
&'b self,
out: &mut diesel::serialize::Output<'b, '_, Sqlite>,
) -> diesel::serialize::Result {
ToSql::<Integer, Sqlite>::to_sql(&self.0, out)
}
}
};
}
declare_id!(ChainId);
#[expect(
clippy::cast_sign_loss,
@@ -103,21 +134,13 @@ pub mod types {
}
};
impl FromSql<Integer, Sqlite> for ChainId {
fn from_sql(
bytes: <Sqlite as diesel::backend::Backend>::RawValue<'_>,
) -> diesel::deserialize::Result<Self> {
FromSql::<Integer, Sqlite>::from_sql(bytes).map(Self)
}
}
impl ToSql<Integer, Sqlite> for ChainId {
fn to_sql<'b>(
&'b self,
out: &mut diesel::serialize::Output<'b, '_, Sqlite>,
) -> diesel::serialize::Result {
ToSql::<Integer, Sqlite>::to_sql(&self.0, out)
}
}
declare_id!(OperatorId);
declare_id!(OperatorIdentityId);
declare_id!(AeadEncryptedId);
declare_id!(RootKeyHistoryId);
declare_id!(TlsHistoryId);
declare_id!(EvmWalletId);
declare_id!(ClientId);
}
pub use types::*;
@@ -130,12 +153,12 @@ pub use types::*;
)]
#[diesel(table_name = aead_encrypted, check_for_backend(Sqlite))]
pub struct AeadEncrypted {
pub id: i32,
pub id: AeadEncryptedId,
pub ciphertext: Vec<u8>,
pub tag: Vec<u8>,
pub current_nonce: Vec<u8>,
pub schema_version: i32,
pub associated_root_key_id: i32, // references root_key_history.id
pub associated_root_key_id: RootKeyHistoryId,
pub created_at: SqliteTimestamp,
}
@@ -148,7 +171,7 @@ pub struct AeadEncrypted {
attributes_with = "deriveless"
)]
pub struct RootKeyHistory {
pub id: i32,
pub id: RootKeyHistoryId,
pub ciphertext: Vec<u8>,
pub tag: Vec<u8>,
pub root_key_encryption_nonce: Vec<u8>,
@@ -166,7 +189,7 @@ pub struct RootKeyHistory {
attributes_with = "deriveless"
)]
pub struct TlsHistory {
pub id: i32,
pub id: TlsHistoryId,
pub cert: String,
pub cert_key: String, // PEM Encoded private key
pub ca_cert: String, // PEM Encoded certificate for cert signing
@@ -191,7 +214,7 @@ pub struct ArbiterSettings {
attributes_with = "deriveless"
)]
pub struct EvmWallet {
pub id: i32,
pub id: EvmWalletId,
pub address: Vec<u8>,
pub aead_encrypted_id: i32,
pub created_at: SqliteTimestamp,
@@ -213,7 +236,7 @@ pub struct EvmWallet {
)]
pub struct EvmWalletAccess {
pub id: i32,
pub wallet_id: i32,
pub wallet_id: EvmWalletId,
pub client_id: i32,
pub created_at: SqliteTimestamp,
}
@@ -240,7 +263,7 @@ pub struct ProgramClientMetadataHistory {
#[derive(Models, Queryable, Debug, Insertable, Selectable)]
#[diesel(table_name = schema::program_client, check_for_backend(Sqlite))]
pub struct ProgramClient {
pub id: i32,
pub id: ClientId,
pub public_key: Vec<u8>,
pub metadata_id: i32,
pub created_at: SqliteTimestamp,
@@ -250,12 +273,22 @@ pub struct ProgramClient {
#[derive(Queryable, Debug)]
#[diesel(table_name = schema::operator_client, check_for_backend(Sqlite))]
pub struct OperatorClient {
pub id: i32,
pub id: OperatorIdentityId,
pub public_key: Vec<u8>,
pub created_at: SqliteTimestamp,
pub updated_at: SqliteTimestamp,
}
#[derive(Queryable, Debug)]
#[diesel(table_name = schema::operator, check_for_backend(Sqlite))]
pub struct Operator {
pub id: OperatorId,
pub share: Vec<u8>,
pub share_nonce: Vec<u8>,
pub created_at: SqliteTimestamp,
pub updated_at: SqliteTimestamp,
}
#[derive(Models, Queryable, Debug, Insertable, Selectable)]
#[diesel(table_name = evm_ether_transfer_limit, check_for_backend(Sqlite))]
#[view(
@@ -399,7 +432,7 @@ pub struct IntegrityEnvelope {
pub entity_kind: String,
pub entity_id: Vec<u8>,
pub payload_version: i32,
pub key_version: i32,
pub key_version: RootKeyHistoryId,
pub mac: Vec<u8>,
pub signed_at: SqliteTimestamp,
pub created_at: SqliteTimestamp,

View File

@@ -152,6 +152,25 @@ diesel::table! {
}
}
diesel::table! {
operator (id) {
id -> Nullable<Integer>,
share -> Binary,
share_nonce -> Binary,
created_at -> Integer,
updated_at -> Integer,
}
}
diesel::table! {
operator_identity (id) {
id -> Integer,
public_key -> Binary,
created_at -> Integer,
updated_at -> Integer,
}
}
diesel::table! {
program_client (id) {
id -> Integer,
@@ -185,15 +204,6 @@ diesel::table! {
}
}
diesel::table! {
operator_client (id) {
id -> Integer,
public_key -> Binary,
created_at -> Integer,
updated_at -> Integer,
}
}
diesel::joinable!(aead_encrypted -> root_key_history (associated_root_key_id));
diesel::joinable!(arbiter_settings -> root_key_history (root_key_id));
diesel::joinable!(arbiter_settings -> tls_history (tls_id));
@@ -212,6 +222,7 @@ diesel::joinable!(evm_transaction_log -> evm_wallet_access (wallet_access_id));
diesel::joinable!(evm_wallet -> aead_encrypted (aead_encrypted_id));
diesel::joinable!(evm_wallet_access -> evm_wallet (wallet_id));
diesel::joinable!(evm_wallet_access -> program_client (client_id));
diesel::joinable!(operator -> operator_identity (id));
diesel::joinable!(program_client -> client_metadata (metadata_id));
diesel::allow_tables_to_appear_in_same_query!(
@@ -230,8 +241,9 @@ diesel::allow_tables_to_appear_in_same_query!(
evm_wallet,
evm_wallet_access,
integrity_envelope,
operator,
operator_identity,
program_client,
root_key_history,
tls_history,
operator_client,
);

View File

@@ -359,7 +359,8 @@ mod tests {
use crate::db::{
self, DatabaseConnection,
models::{
EvmBasicGrant, EvmWalletAccess, NewEvmBasicGrant, NewEvmTransactionLog, SqliteTimestamp,
EvmBasicGrant, EvmWalletAccess, EvmWalletId, NewEvmBasicGrant, NewEvmTransactionLog,
SqliteTimestamp,
},
schema::{evm_basic_grant, evm_transaction_log},
};
@@ -377,7 +378,7 @@ mod tests {
EvalContext {
target: EvmWalletAccess {
id: WALLET_ACCESS_ID,
wallet_id: 10,
wallet_id: EvmWalletId::from_raw(5),
client_id: 20,
created_at: SqliteTimestamp(Utc::now()),
},

View File

@@ -3,7 +3,8 @@ use crate::{
db::{
self, DatabaseConnection,
models::{
EvmBasicGrant, EvmWalletAccess, NewEvmBasicGrant, NewEvmTransactionLog, SqliteTimestamp,
EvmBasicGrant, EvmWalletAccess, EvmWalletId, NewEvmBasicGrant, NewEvmTransactionLog,
SqliteTimestamp,
},
schema::{evm_basic_grant, evm_transaction_log},
},
@@ -31,7 +32,7 @@ fn ctx(to: Address, value: U256) -> EvalContext {
EvalContext {
target: EvmWalletAccess {
id: WALLET_ACCESS_ID,
wallet_id: 10,
wallet_id: EvmWalletId::from_raw(10),
client_id: 20,
created_at: SqliteTimestamp(Utc::now()),
},

View File

@@ -2,7 +2,7 @@ use super::{Settings, TokenTransfer};
use crate::{
db::{
self, DatabaseConnection,
models::{EvmBasicGrant, EvmWalletAccess, NewEvmBasicGrant, SqliteTimestamp},
models::{EvmBasicGrant, EvmWalletAccess, EvmWalletId, NewEvmBasicGrant, SqliteTimestamp},
schema::evm_basic_grant,
},
evm::{
@@ -45,7 +45,7 @@ fn ctx(to: Address, calldata: Bytes) -> EvalContext {
EvalContext {
target: EvmWalletAccess {
id: WALLET_ACCESS_ID,
wallet_id: 10,
wallet_id: EvmWalletId::from_raw(10),
client_id: 20,
created_at: SqliteTimestamp(Utc::now()),
},

View File

@@ -90,7 +90,7 @@ async fn handle_wallet_list(
.into_iter()
.map(|(id, address)| WalletEntry {
address: address.to_vec(),
id,
id: id.to_raw(),
})
.collect(),
}),

View File

@@ -1,11 +1,10 @@
use crate::{
db::models::{CoreEvmWalletAccess, NewEvmWalletAccess},
db::models::{CoreEvmWalletAccess, EvmWalletId, NewEvmWalletAccess},
evm::policies::{
SharedGrantSettings, SpecificGrant, TransactionRateLimit, VolumeRateLimit, ether_transfer,
token_transfers,
},
grpc::Convert,
grpc::TryConvert,
grpc::{Convert, TryConvert},
};
use arbiter_proto::{
proto::evm::{
@@ -150,7 +149,7 @@ impl Convert for WalletAccess {
fn convert(self) -> Self::Output {
NewEvmWalletAccess {
wallet_id: self.wallet_id,
wallet_id: EvmWalletId::from_raw(self.wallet_id),
client_id: self.sdk_client_id,
}
}
@@ -165,7 +164,7 @@ impl TryConvert for SdkClientWalletAccess {
return Err(Status::invalid_argument("Missing wallet access entry"));
};
Ok(CoreEvmWalletAccess {
wallet_id: access.wallet_id,
wallet_id: EvmWalletId::from_raw(access.wallet_id),
client_id: access.sdk_client_id,
id: self.id,
})

View File

@@ -103,7 +103,7 @@ impl Convert for EvmWalletAccess {
Self::Output {
id: self.id,
access: Some(WalletAccess {
wallet_id: self.wallet_id,
wallet_id: self.wallet_id.to_raw(),
sdk_client_id: self.client_id,
}),
}

View File

@@ -2,7 +2,7 @@ use crate::{
db::models::NewEvmWalletAccess,
grpc::Convert,
peers::operator::{
OutOfBand, OperatorSession,
OperatorSession, OutOfBand,
session::handlers::{
HandleGrantEvmWalletAccess, HandleListWalletAccess, HandleNewClientApprove,
HandleRevokeEvmWalletAccess, HandleSdkClientList,
@@ -11,8 +11,8 @@ use crate::{
};
use arbiter_crypto::authn;
use arbiter_proto::proto::{
shared::ClientInfo as ProtoClientMetadata,
operator::{
operator_response::Payload as OperatorResponsePayload,
sdk_client::{
self as proto_sdk_client, ConnectionCancel as ProtoSdkClientConnectionCancel,
ConnectionRequest as ProtoSdkClientConnectionRequest,
@@ -24,8 +24,8 @@ use arbiter_proto::proto::{
request::Payload as SdkClientRequestPayload,
response::Payload as SdkClientResponsePayload,
},
operator_response::Payload as OperatorResponsePayload,
},
shared::ClientInfo as ProtoClientMetadata,
};
use kameo::actor::ActorRef;
@@ -115,7 +115,7 @@ async fn handle_list(
clients: clients
.into_iter()
.map(|(client, metadata)| ProtoSdkClientEntry {
id: client.id,
id: client.id.to_raw(),
pubkey: client.public_key.clone(),
info: Some(ProtoClientMetadata {
name: metadata.name,

View File

@@ -4,7 +4,7 @@ use super::{
};
use crate::{
actors::bootstrap::ConsumeToken,
db::{DatabasePool, schema::operator_client},
db::{DatabasePool, schema::operator_identity},
peers::operator::auth::Outbound,
};
use arbiter_crypto::authn::{self, AuthChallenge, OPERATOR_CONTEXT};
@@ -44,9 +44,9 @@ async fn get_client_id(db: &DatabasePool, pubkey: &authn::PublicKey) -> Result<O
Error::internal("Database unavailable")
})?;
operator_client::table
.filter(operator_client::public_key.eq(pubkey.to_bytes()))
.select(operator_client::id)
operator_identity::table
.filter(operator_identity::public_key.eq(pubkey.to_bytes()))
.select(operator_identity::id)
.first::<i32>(&mut conn)
.await
.optional()
@@ -63,9 +63,9 @@ async fn register_key(db: &DatabasePool, pubkey: &authn::PublicKey) -> Result<i3
Error::internal("Database unavailable")
})?;
let id: i32 = diesel::insert_into(operator_client::table)
.values((operator_client::public_key.eq(pubkey_bytes),))
.returning(operator_client::id)
let id: i32 = diesel::insert_into(operator_identity::table)
.values((operator_identity::public_key.eq(pubkey_bytes),))
.returning(operator_identity::id)
.get_result(&mut conn)
.await
.map_err(|e| {

View File

@@ -1,12 +1,16 @@
use super::{Error, OperatorSession};
use crate::{
actors::evm::{
ClientSignTransaction, Generate, ListWallets, OperatorCreateGrant, OperatorListGrants,
SignTransactionError as EvmSignError,
actors::{
evm::{
ClientSignTransaction, Generate, ListWallets, OperatorCreateGrant, OperatorListGrants,
SignTransactionError as EvmSignError,
},
flow_coordinator::client_connect_approval::ClientApprovalAnswer,
vault::VaultState,
},
db::models::{
EvmWalletAccess, EvmWalletId, NewEvmWalletAccess, ProgramClient, ProgramClientMetadata,
},
actors::flow_coordinator::client_connect_approval::ClientApprovalAnswer,
actors::vault::VaultState,
db::models::{EvmWalletAccess, NewEvmWalletAccess, ProgramClient, ProgramClientMetadata},
evm::policies::{Grant, SpecificGrant},
};
use arbiter_crypto::authn;
@@ -70,7 +74,9 @@ impl OperatorSession {
}
#[message]
pub(crate) async fn handle_evm_wallet_list(&mut self) -> Result<Vec<(i32, Address)>, Error> {
pub(crate) async fn handle_evm_wallet_list(
&mut self,
) -> Result<Vec<(EvmWalletId, Address)>, Error> {
match self.props.actors.evm.ask(ListWallets {}).await {
Ok(wallets) => Ok(wallets),
Err(err) => {

View File

@@ -86,8 +86,8 @@ async fn insert_bootstrap_sentinel_operator(db: &db::DatabasePool) {
.0
.to_vec();
insert_into(schema::operator_client::table)
.values((schema::operator_client::public_key.eq(sentinel_key),))
insert_into(schema::operator_identity::table)
.values((schema::operator_identity::public_key.eq(sentinel_key),))
.execute(&mut conn)
.await
.unwrap();

View File

@@ -206,8 +206,8 @@ pub async fn bootstrap_token_auth() {
task.await.unwrap().unwrap();
let mut conn = db.get().await.unwrap();
let stored_pubkey: Vec<u8> = schema::operator_client::table
.select(schema::operator_client::public_key)
let stored_pubkey: Vec<u8> = schema::operator_identity::table
.select(schema::operator_identity::public_key)
.first::<Vec<u8>>(&mut conn)
.await
.unwrap();
@@ -259,7 +259,7 @@ pub async fn bootstrap_invalid_token_auth() {
));
let mut conn = db.get().await.unwrap();
let count: i64 = schema::operator_client::table
let count: i64 = schema::operator_identity::table
.count()
.get_result::<i64>(&mut conn)
.await
@@ -285,9 +285,9 @@ pub async fn challenge_auth() {
{
let mut conn = db.get().await.unwrap();
let id: i32 = insert_into(schema::operator_client::table)
.values((schema::operator_client::public_key.eq(pubkey_bytes.clone()),))
.returning(schema::operator_client::id)
let id: i32 = insert_into(schema::operator_identity::table)
.values((schema::operator_identity::public_key.eq(pubkey_bytes.clone()),))
.returning(schema::operator_identity::id)
.get_result(&mut conn)
.await
.unwrap();
@@ -371,8 +371,8 @@ pub async fn challenge_auth_rejects_integrity_tag_mismatch_when_unsealed() {
{
let mut conn = db.get().await.unwrap();
insert_into(schema::operator_client::table)
.values((schema::operator_client::public_key.eq(pubkey_bytes.clone()),))
insert_into(schema::operator_identity::table)
.values((schema::operator_identity::public_key.eq(pubkey_bytes.clone()),))
.execute(&mut conn)
.await
.unwrap();
@@ -444,9 +444,9 @@ pub async fn challenge_auth_rejects_invalid_signature() {
{
let mut conn = db.get().await.unwrap();
let id: i32 = insert_into(schema::operator_client::table)
.values((schema::operator_client::public_key.eq(pubkey_bytes.clone()),))
.returning(schema::operator_client::id)
let id: i32 = insert_into(schema::operator_identity::table)
.values((schema::operator_identity::public_key.eq(pubkey_bytes.clone()),))
.returning(schema::operator_identity::id)
.get_result(&mut conn)
.await
.unwrap();