merge: new flow into main

This commit is contained in:
hdbg
2026-03-22 12:23:07 +01:00
24 changed files with 995 additions and 206 deletions

View File

@@ -5,4 +5,22 @@ edition = "2024"
repository = "https://git.markettakers.org/MarketTakers/arbiter"
license = "Apache-2.0"
[lints]
workspace = true
[features]
evm = ["dep:alloy"]
[dependencies]
arbiter-proto.path = "../arbiter-proto"
alloy = { workspace = true, optional = true }
tonic.workspace = true
tonic.features = ["tls-aws-lc"]
tokio.workspace = true
tokio-stream.workspace = true
ed25519-dalek.workspace = true
thiserror.workspace = true
http = "1.4.0"
rustls-webpki = { version = "0.103.10", features = ["aws-lc-rs"] }
async-trait.workspace = true
rand.workspace = true

View File

@@ -0,0 +1,147 @@
use arbiter_proto::{
ClientMetadata, format_challenge,
proto::client::{
AuthChallengeRequest, AuthChallengeSolution, AuthResult, ClientInfo as ProtoClientInfo,
ClientRequest, client_request::Payload as ClientRequestPayload,
client_response::Payload as ClientResponsePayload,
},
};
use ed25519_dalek::Signer as _;
use crate::{
storage::StorageError,
transport::{ClientTransport, next_request_id},
};
#[derive(Debug, thiserror::Error)]
pub enum ConnectError {
#[error("Could not establish connection")]
Connection(#[from] tonic::transport::Error),
#[error("Invalid server URI")]
InvalidUri(#[from] http::uri::InvalidUri),
#[error("Invalid CA certificate")]
InvalidCaCert(#[from] webpki::Error),
#[error("gRPC error")]
Grpc(#[from] tonic::Status),
#[error("Auth challenge was not returned by server")]
MissingAuthChallenge,
#[error("Client approval denied by User Agent")]
ApprovalDenied,
#[error("No User Agents online to approve client")]
NoUserAgentsOnline,
#[error("Unexpected auth response payload")]
UnexpectedAuthResponse,
#[error("Signing key storage error")]
Storage(#[from] StorageError),
}
fn map_auth_result(code: i32) -> ConnectError {
match AuthResult::try_from(code).unwrap_or(AuthResult::Unspecified) {
AuthResult::ApprovalDenied => ConnectError::ApprovalDenied,
AuthResult::NoUserAgentsOnline => ConnectError::NoUserAgentsOnline,
AuthResult::Unspecified
| AuthResult::Success
| AuthResult::InvalidKey
| AuthResult::InvalidSignature
| AuthResult::Internal => ConnectError::UnexpectedAuthResponse,
}
}
async fn send_auth_challenge_request(
transport: &mut ClientTransport,
metadata: ClientMetadata,
key: &ed25519_dalek::SigningKey,
) -> std::result::Result<(), ConnectError> {
transport
.send(ClientRequest {
request_id: next_request_id(),
payload: Some(ClientRequestPayload::AuthChallengeRequest(
AuthChallengeRequest {
pubkey: key.verifying_key().to_bytes().to_vec(),
client_info: Some(ProtoClientInfo {
name: metadata.name,
description: metadata.description,
version: metadata.version,
}),
},
)),
})
.await
.map_err(|_| ConnectError::UnexpectedAuthResponse)
}
async fn receive_auth_challenge(
transport: &mut ClientTransport,
) -> std::result::Result<arbiter_proto::proto::client::AuthChallenge, ConnectError> {
let response = transport
.recv()
.await
.map_err(|_| ConnectError::MissingAuthChallenge)?;
let payload = response.payload.ok_or(ConnectError::MissingAuthChallenge)?;
match payload {
ClientResponsePayload::AuthChallenge(challenge) => Ok(challenge),
ClientResponsePayload::AuthResult(result) => Err(map_auth_result(result)),
_ => Err(ConnectError::UnexpectedAuthResponse),
}
}
async fn send_auth_challenge_solution(
transport: &mut ClientTransport,
key: &ed25519_dalek::SigningKey,
challenge: arbiter_proto::proto::client::AuthChallenge,
) -> std::result::Result<(), ConnectError> {
let challenge_payload = format_challenge(challenge.nonce, &challenge.pubkey);
let signature = key.sign(&challenge_payload).to_bytes().to_vec();
transport
.send(ClientRequest {
request_id: next_request_id(),
payload: Some(ClientRequestPayload::AuthChallengeSolution(
AuthChallengeSolution { signature },
)),
})
.await
.map_err(|_| ConnectError::UnexpectedAuthResponse)
}
async fn receive_auth_confirmation(
transport: &mut ClientTransport,
) -> std::result::Result<(), ConnectError> {
let response = transport
.recv()
.await
.map_err(|_| ConnectError::UnexpectedAuthResponse)?;
let payload = response
.payload
.ok_or(ConnectError::UnexpectedAuthResponse)?;
match payload {
ClientResponsePayload::AuthResult(result)
if AuthResult::try_from(result).ok() == Some(AuthResult::Success) =>
{
Ok(())
}
ClientResponsePayload::AuthResult(result) => Err(map_auth_result(result)),
_ => Err(ConnectError::UnexpectedAuthResponse),
}
}
pub(crate) async fn authenticate(
transport: &mut ClientTransport,
metadata: ClientMetadata,
key: &ed25519_dalek::SigningKey,
) -> std::result::Result<(), ConnectError> {
send_auth_challenge_request(transport, metadata, key).await?;
let challenge = receive_auth_challenge(transport).await?;
send_auth_challenge_solution(transport, key, challenge).await?;
receive_auth_confirmation(transport).await
}

View File

@@ -0,0 +1,78 @@
use arbiter_proto::{ClientMetadata, proto::arbiter_service_client::ArbiterServiceClient, url::ArbiterUrl};
use std::sync::Arc;
use tokio::sync::{Mutex, mpsc};
use tokio_stream::wrappers::ReceiverStream;
use tonic::transport::ClientTlsConfig;
use crate::{
auth::{ConnectError, authenticate},
storage::{FileSigningKeyStorage, SigningKeyStorage},
transport::{BUFFER_LENGTH, ClientTransport},
};
#[cfg(feature = "evm")]
use crate::wallets::evm::ArbiterEvmWallet;
#[derive(Debug, thiserror::Error)]
pub enum ClientError {
#[error("gRPC error")]
Grpc(#[from] tonic::Status),
#[error("Connection closed by server")]
ConnectionClosed,
}
pub struct ArbiterClient {
#[allow(dead_code)]
transport: Arc<Mutex<ClientTransport>>,
}
impl ArbiterClient {
pub async fn connect(url: ArbiterUrl, metadata: ClientMetadata) -> Result<Self, ConnectError> {
let storage = FileSigningKeyStorage::from_default_location()?;
Self::connect_with_storage(url, metadata, &storage).await
}
pub async fn connect_with_storage<S: SigningKeyStorage>(
url: ArbiterUrl,
metadata: ClientMetadata,
storage: &S,
) -> Result<Self, ConnectError> {
let key = storage.load_or_create()?;
Self::connect_with_key(url, metadata, key).await
}
pub async fn connect_with_key(
url: ArbiterUrl,
metadata: ClientMetadata,
key: ed25519_dalek::SigningKey,
) -> Result<Self, ConnectError> {
let anchor = webpki::anchor_from_trusted_cert(&url.ca_cert)?.to_owned();
let tls = ClientTlsConfig::new().trust_anchor(anchor);
let channel = tonic::transport::Channel::from_shared(format!("{}:{}", url.host, url.port))?
.tls_config(tls)?
.connect()
.await?;
let mut client = ArbiterServiceClient::new(channel);
let (tx, rx) = mpsc::channel(BUFFER_LENGTH);
let response_stream = client.client(ReceiverStream::new(rx)).await?.into_inner();
let mut transport = ClientTransport {
sender: tx,
receiver: response_stream,
};
authenticate(&mut transport, metadata, &key).await?;
Ok(Self {
transport: Arc::new(Mutex::new(transport)),
})
}
#[cfg(feature = "evm")]
pub async fn evm_wallets(&self) -> Result<Vec<ArbiterEvmWallet>, ClientError> {
todo!("fetch EVM wallet list from server")
}
}

View File

@@ -1,14 +1,12 @@
pub fn add(left: u64, right: u64) -> u64 {
left + right
}
mod auth;
mod client;
mod storage;
mod transport;
pub mod wallets;
#[cfg(test)]
mod tests {
use super::*;
pub use auth::ConnectError;
pub use client::{ArbiterClient, ClientError};
pub use storage::{FileSigningKeyStorage, SigningKeyStorage, StorageError};
#[test]
fn it_works() {
let result = add(2, 2);
assert_eq!(result, 4);
}
}
#[cfg(feature = "evm")]
pub use wallets::evm::ArbiterEvmWallet;

View File

@@ -0,0 +1,132 @@
use arbiter_proto::home_path;
use std::path::{Path, PathBuf};
#[derive(Debug, thiserror::Error)]
pub enum StorageError {
#[error("I/O error")]
Io(#[from] std::io::Error),
#[error("Invalid signing key length in storage: expected {expected} bytes, got {actual} bytes")]
InvalidKeyLength { expected: usize, actual: usize },
}
pub trait SigningKeyStorage {
fn load_or_create(&self) -> std::result::Result<ed25519_dalek::SigningKey, StorageError>;
}
#[derive(Debug, Clone)]
pub struct FileSigningKeyStorage {
path: PathBuf,
}
impl FileSigningKeyStorage {
pub const DEFAULT_FILE_NAME: &str = "sdk_client_ed25519.key";
pub fn new(path: impl Into<PathBuf>) -> Self {
Self { path: path.into() }
}
pub fn from_default_location() -> std::result::Result<Self, StorageError> {
Ok(Self::new(home_path()?.join(Self::DEFAULT_FILE_NAME)))
}
fn read_key(path: &Path) -> std::result::Result<ed25519_dalek::SigningKey, StorageError> {
let bytes = std::fs::read(path)?;
let raw: [u8; 32] =
bytes
.try_into()
.map_err(|v: Vec<u8>| StorageError::InvalidKeyLength {
expected: 32,
actual: v.len(),
})?;
Ok(ed25519_dalek::SigningKey::from_bytes(&raw))
}
}
impl SigningKeyStorage for FileSigningKeyStorage {
fn load_or_create(&self) -> std::result::Result<ed25519_dalek::SigningKey, StorageError> {
if let Some(parent) = self.path.parent() {
std::fs::create_dir_all(parent)?;
}
if self.path.exists() {
return Self::read_key(&self.path);
}
let key = ed25519_dalek::SigningKey::generate(&mut rand::rng());
let raw_key = key.to_bytes();
// Use create_new to prevent accidental overwrite if another process creates the key first.
match std::fs::OpenOptions::new()
.create_new(true)
.write(true)
.open(&self.path)
{
Ok(mut file) => {
use std::io::Write as _;
file.write_all(&raw_key)?;
Ok(key)
}
Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => {
Self::read_key(&self.path)
}
Err(err) => Err(StorageError::Io(err)),
}
}
}
#[cfg(test)]
mod tests {
use super::{FileSigningKeyStorage, SigningKeyStorage, StorageError};
fn unique_temp_key_path() -> std::path::PathBuf {
let nanos = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("clock should be after unix epoch")
.as_nanos();
std::env::temp_dir().join(format!(
"arbiter-client-key-{}-{}.bin",
std::process::id(),
nanos
))
}
#[test]
fn file_storage_creates_and_reuses_key() {
let path = unique_temp_key_path();
let storage = FileSigningKeyStorage::new(path.clone());
let key_a = storage
.load_or_create()
.expect("first load_or_create should create key");
let key_b = storage
.load_or_create()
.expect("second load_or_create should read same key");
assert_eq!(key_a.to_bytes(), key_b.to_bytes());
assert!(path.exists());
std::fs::remove_file(path).expect("temp key file should be removable");
}
#[test]
fn file_storage_rejects_invalid_key_length() {
let path = unique_temp_key_path();
std::fs::write(&path, [42u8; 31]).expect("should write invalid key file");
let storage = FileSigningKeyStorage::new(path.clone());
let err = storage
.load_or_create()
.expect_err("storage should reject non-32-byte key file");
match err {
StorageError::InvalidKeyLength { expected, actual } => {
assert_eq!(expected, 32);
assert_eq!(actual, 31);
}
other => panic!("unexpected error: {other:?}"),
}
std::fs::remove_file(path).expect("temp key file should be removable");
}
}

View File

@@ -0,0 +1,48 @@
use arbiter_proto::proto::{
client::{ClientRequest, ClientResponse},
};
use std::sync::atomic::{AtomicI32, Ordering};
use tokio::sync::mpsc;
pub(crate) const BUFFER_LENGTH: usize = 16;
static NEXT_REQUEST_ID: AtomicI32 = AtomicI32::new(1);
pub(crate) fn next_request_id() -> i32 {
NEXT_REQUEST_ID.fetch_add(1, Ordering::Relaxed)
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum ClientSignError {
#[error("Transport channel closed")]
ChannelClosed,
#[error("Connection closed by server")]
ConnectionClosed,
}
pub(crate) struct ClientTransport {
pub(crate) sender: mpsc::Sender<ClientRequest>,
pub(crate) receiver: tonic::Streaming<ClientResponse>,
}
impl ClientTransport {
pub(crate) async fn send(
&mut self,
request: ClientRequest,
) -> std::result::Result<(), ClientSignError> {
self.sender
.send(request)
.await
.map_err(|_| ClientSignError::ChannelClosed)
}
pub(crate) async fn recv(
&mut self,
) -> std::result::Result<ClientResponse, ClientSignError> {
match self.receiver.message().await {
Ok(Some(resp)) => Ok(resp),
Ok(None) => Err(ClientSignError::ConnectionClosed),
Err(_) => Err(ClientSignError::ConnectionClosed),
}
}
}

View File

@@ -0,0 +1,89 @@
use alloy::{
consensus::SignableTransaction,
network::TxSigner,
primitives::{Address, B256, ChainId, Signature},
signers::{Error, Result, Signer},
};
use async_trait::async_trait;
use std::sync::Arc;
use tokio::sync::Mutex;
use crate::transport::ClientTransport;
pub struct ArbiterEvmWallet {
transport: Arc<Mutex<ClientTransport>>,
address: Address,
chain_id: Option<ChainId>,
}
impl ArbiterEvmWallet {
pub(crate) fn new(transport: Arc<Mutex<ClientTransport>>, address: Address) -> Self {
Self {
transport,
address,
chain_id: None,
}
}
pub fn address(&self) -> Address {
self.address
}
pub fn with_chain_id(mut self, chain_id: ChainId) -> Self {
self.chain_id = Some(chain_id);
self
}
fn validate_chain_id(&self, tx: &mut dyn SignableTransaction<Signature>) -> Result<()> {
if let Some(chain_id) = self.chain_id
&& !tx.set_chain_id_checked(chain_id)
{
return Err(Error::TransactionChainIdMismatch {
signer: chain_id,
tx: tx.chain_id().unwrap(),
});
}
Ok(())
}
}
#[async_trait]
impl Signer for ArbiterEvmWallet {
async fn sign_hash(&self, _hash: &B256) -> Result<Signature> {
Err(Error::other(
"hash-only signing is not supported for ArbiterEvmWallet; use transaction signing",
))
}
fn address(&self) -> Address {
self.address
}
fn chain_id(&self) -> Option<ChainId> {
self.chain_id
}
fn set_chain_id(&mut self, chain_id: Option<ChainId>) {
self.chain_id = chain_id;
}
}
#[async_trait]
impl TxSigner<Signature> for ArbiterEvmWallet {
fn address(&self) -> Address {
self.address
}
async fn sign_transaction(
&self,
tx: &mut dyn SignableTransaction<Signature>,
) -> Result<Signature> {
let _transport = self.transport.lock().await;
self.validate_chain_id(tx)?;
Err(Error::other(
"transaction signing is not supported by current arbiter.client protocol",
))
}
}

View File

@@ -0,0 +1,2 @@
#[cfg(feature = "evm")]
pub mod evm;

View File

@@ -10,7 +10,7 @@ tonic.workspace = true
tokio.workspace = true
futures.workspace = true
hex = "0.4.3"
tonic-prost = "0.14.3"
tonic-prost = "0.14.5"
prost = "0.14.3"
kameo.workspace = true
url = "2.5.8"
@@ -24,7 +24,8 @@ async-trait.workspace = true
tokio-stream.workspace = true
[build-dependencies]
tonic-prost-build = "0.14.3"
tonic-prost-build = "0.14.5"
protoc-bin-vendored = "3"
[dev-dependencies]
rstest.workspace = true
@@ -33,5 +34,3 @@ rcgen.workspace = true
[package.metadata.cargo-shear]
ignored = ["tonic-prost", "prost", "kameo"]

View File

@@ -19,6 +19,13 @@ pub mod proto {
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ClientMetadata {
pub name: String,
pub description: Option<String>,
pub version: Option<String>,
}
pub static BOOTSTRAP_PATH: &str = "bootstrap_token";
pub fn home_path() -> Result<std::path::PathBuf, std::io::Error> {

View File

@@ -9,8 +9,8 @@ license = "Apache-2.0"
workspace = true
[dependencies]
diesel = { version = "2.3.6", features = ["chrono", "returning_clauses_for_sqlite_3_35", "serde_json", "time", "uuid"] }
diesel-async = { version = "0.7.4", features = [
diesel = { version = "2.3.7", features = ["chrono", "returning_clauses_for_sqlite_3_35", "serde_json", "time", "uuid"] }
diesel-async = { version = "0.8.0", features = [
"bb8",
"migrations",
"sqlite",
@@ -27,6 +27,7 @@ rustls.workspace = true
smlang.workspace = true
miette.workspace = true
thiserror.workspace = true
fatality = "0.1.1"
diesel_migrations = { version = "2.3.1", features = ["sqlite"] }
async-trait.workspace = true
secrecy = "0.10.3"
@@ -43,7 +44,7 @@ x25519-dalek.workspace = true
chacha20poly1305 = { version = "0.10.1", features = ["std"] }
argon2 = { version = "0.5.3", features = ["zeroize"] }
restructed = "0.2.2"
strum = { version = "0.27.2", features = ["derive"] }
strum = { version = "0.28.0", features = ["derive"] }
pem = "3.0.6"
k256.workspace = true
rsa.workspace = true

View File

@@ -80,6 +80,9 @@ create table if not exists program_client (
updated_at integer not null default(unixepoch ('now'))
) STRICT;
create unique index if not exists program_client_public_key_unique
on program_client (public_key);
create unique index if not exists uniq_program_client_public_key on program_client (public_key);
create table if not exists evm_wallet (

View File

@@ -1,6 +1,5 @@
use arbiter_proto::{
format_challenge,
transport::{Bi, expect_message},
ClientMetadata, format_challenge, transport::{Bi, expect_message}
};
use chrono::Utc;
use diesel::{
@@ -24,13 +23,6 @@ use crate::{
},
};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ClientMetadata {
pub name: String,
pub description: Option<String>,
pub version: Option<String>,
}
#[derive(thiserror::Error, Debug, Clone, PartialEq, Eq)]
pub enum Error {
#[error("Database pool unavailable")]
@@ -72,9 +64,17 @@ pub enum Outbound {
AuthSuccess,
}
pub struct ClientInfo {
pub id: i32,
pub current_nonce: i32,
}
/// Atomically reads and increments the nonce for a known client.
/// Returns `None` if the pubkey is not registered.
async fn get_nonce(db: &db::DatabasePool, pubkey: &VerifyingKey) -> Result<Option<i32>, Error> {
async fn get_client_and_nonce(
db: &db::DatabasePool,
pubkey: &VerifyingKey,
) -> Result<Option<ClientInfo>, Error> {
let pubkey_bytes = pubkey.as_bytes().to_vec();
let mut conn = db.get().await.map_err(|e| {
@@ -85,10 +85,10 @@ async fn get_nonce(db: &db::DatabasePool, pubkey: &VerifyingKey) -> Result<Optio
conn.exclusive_transaction(|conn| {
let pubkey_bytes = pubkey_bytes.clone();
Box::pin(async move {
let Some(current_nonce) = program_client::table
let Some((client_id, current_nonce)) = program_client::table
.filter(program_client::public_key.eq(&pubkey_bytes))
.select(program_client::nonce)
.first::<i32>(conn)
.select((program_client::id, program_client::nonce))
.first::<(i32, i32)>(conn)
.await
.optional()?
else {
@@ -101,7 +101,10 @@ async fn get_nonce(db: &db::DatabasePool, pubkey: &VerifyingKey) -> Result<Optio
.execute(conn)
.await?;
Ok(Some(current_nonce))
Ok(Some(ClientInfo {
id: client_id,
current_nonce,
}))
})
})
.await
@@ -138,9 +141,8 @@ async fn insert_client(
db: &db::DatabasePool,
pubkey: &VerifyingKey,
metadata: &ClientMetadata,
) -> Result<(), Error> {
use crate::db::schema::client_metadata;
) -> Result<i32, Error> {
use crate::db::schema::{client_metadata, program_client};
let mut conn = db.get().await.map_err(|e| {
error!(error = ?e, "Database pool error");
Error::DatabasePoolUnavailable
@@ -160,38 +162,22 @@ async fn insert_client(
Error::DatabaseOperationFailed
})?;
insert_into(program_client::table)
let client_id = insert_into(program_client::table)
.values((
program_client::public_key.eq(pubkey.as_bytes().to_vec()),
program_client::metadata_id.eq(metadata_id),
program_client::nonce.eq(1), // pre-incremented; challenge uses 0
))
.execute(&mut conn)
.on_conflict_do_nothing()
.returning(program_client::id)
.get_result::<i32>(&mut conn)
.await
.map_err(|e| {
error!(error = ?e, "Failed to insert new client");
error!(error = ?e, "Failed to insert client metadata");
Error::DatabaseOperationFailed
})?;
Ok(())
}
async fn get_client_id(db: &db::DatabasePool, pubkey: &VerifyingKey) -> Result<Option<i32>, Error> {
let mut conn = db.get().await.map_err(|e| {
error!(error = ?e, "Database pool error");
Error::DatabasePoolUnavailable
})?;
program_client::table
.filter(program_client::public_key.eq(pubkey.as_bytes().to_vec()))
.select(program_client::id)
.first::<i32>(&mut conn)
.await
.optional()
.map_err(|e| {
error!(error = ?e, "Database error");
Error::DatabaseOperationFailed
})
Ok(client_id)
}
async fn sync_client_metadata(
@@ -312,7 +298,7 @@ where
return Err(Error::Transport);
};
let nonce = match get_nonce(&props.db, &pubkey).await? {
let info = match get_client_and_nonce(&props.db, &pubkey).await? {
Some(nonce) => nonce,
None => {
approve_new_client(
@@ -323,17 +309,18 @@ where
},
)
.await?;
insert_client(&props.db, &pubkey, &metadata).await?;
0
let client_id = insert_client(&props.db, &pubkey, &metadata).await?;
ClientInfo {
id: client_id,
current_nonce: 0,
}
}
};
let client_id = get_client_id(&props.db, &pubkey)
.await?
.ok_or(Error::DatabaseOperationFailed)?;
sync_client_metadata(&props.db, client_id, &metadata).await?;
sync_client_metadata(&props.db, info.id, &metadata).await?;
challenge_client(transport, pubkey, nonce).await?;
challenge_client(transport, pubkey, info.current_nonce).await?;
transport
.send(Ok(Outbound::AuthSuccess))
.await

View File

@@ -1,9 +1,9 @@
use arbiter_proto::transport::Bi;
use arbiter_proto::{ClientMetadata, transport::Bi};
use kameo::actor::Spawn;
use tracing::{error, info};
use crate::{
actors::{GlobalActors, client::{auth::ClientMetadata, session::ClientSession}},
actors::{GlobalActors, client::{ session::ClientSession}},
db,
};

View File

@@ -3,7 +3,7 @@ use std::{borrow::Cow, collections::HashMap};
use arbiter_proto::transport::Sender;
use async_trait::async_trait;
use ed25519_dalek::VerifyingKey;
use kameo::{Actor, actor::ActorRef, messages, prelude::Context};
use kameo::{Actor, actor::ActorRef, messages};
use thiserror::Error;
use tracing::error;

View File

@@ -1,12 +1,11 @@
use arbiter_proto::{
proto::client::{
ClientMetadata, proto::client::{
AuthChallenge as ProtoAuthChallenge, AuthChallengeRequest as ProtoAuthChallengeRequest,
AuthChallengeSolution as ProtoAuthChallengeSolution, AuthResult as ProtoAuthResult,
ClientInfo as ProtoClientInfo, ClientRequest, ClientResponse,
client_request::Payload as ClientRequestPayload,
client_response::Payload as ClientResponsePayload,
},
transport::{Bi, Error as TransportError, Receiver, Sender, grpc::GrpcBi},
}, transport::{Bi, Error as TransportError, Receiver, Sender, grpc::GrpcBi}
};
use async_trait::async_trait;
use tonic::Status;
@@ -170,8 +169,8 @@ impl Receiver<auth::Inbound> for AuthTransportAdapter<'_> {
impl Bi<auth::Inbound, Result<auth::Outbound, auth::Error>> for AuthTransportAdapter<'_> {}
fn client_metadata_from_proto(metadata: ProtoClientInfo) -> auth::ClientMetadata {
auth::ClientMetadata {
fn client_metadata_from_proto(metadata: ProtoClientInfo) -> ClientMetadata {
ClientMetadata {
name: metadata.name,
description: metadata.description,
version: metadata.version,

View File

@@ -20,15 +20,18 @@ use arbiter_proto::{
},
user_agent::{
BootstrapEncryptedKey as ProtoBootstrapEncryptedKey,
BootstrapResult as ProtoBootstrapResult, ClientConnectionCancel,
ClientConnectionRequest, UnsealEncryptedKey as ProtoUnsealEncryptedKey,
UnsealResult as ProtoUnsealResult, UnsealStart, UserAgentRequest, UserAgentResponse,
VaultState as ProtoVaultState, user_agent_request::Payload as UserAgentRequestPayload,
BootstrapResult as ProtoBootstrapResult,
SdkClientConnectionCancel as ProtoSdkClientConnectionCancel,
SdkClientConnectionRequest as ProtoSdkClientConnectionRequest,
UnsealEncryptedKey as ProtoUnsealEncryptedKey, UnsealResult as ProtoUnsealResult,
UnsealStart, UserAgentRequest, UserAgentResponse, VaultState as ProtoVaultState,
user_agent_request::Payload as UserAgentRequestPayload,
user_agent_response::Payload as UserAgentResponsePayload,
},
},
transport::{Error as TransportError, Receiver, Sender, grpc::GrpcBi},
};
use prost_types::{Timestamp as ProtoTimestamp, };
use async_trait::async_trait;
use chrono::{TimeZone, Utc};
use kameo::{
@@ -261,12 +264,14 @@ async fn dispatch_conn_message(
actor.ask(HandleGrantDelete { grant_id }).await,
))
}
UserAgentRequestPayload::ClientConnectionResponse(resp) => {
UserAgentRequestPayload::SdkClientConnectionResponse(resp) => {
let pubkey_bytes: [u8; 32] = match resp.pubkey.try_into() {
Ok(bytes) => bytes,
Err(_) => {
let _ = bi
.send(Err(Status::invalid_argument("Invalid Ed25519 public key length")))
.send(Err(Status::invalid_argument(
"Invalid Ed25519 public key length",
)))
.await;
return Err(());
}
@@ -289,13 +294,18 @@ async fn dispatch_conn_message(
.await
{
warn!(?err, "Failed to process client connection response");
let _ = bi.send(Err(Status::internal("Failed to process response"))).await;
let _ = bi
.send(Err(Status::internal("Failed to process response")))
.await;
return Err(());
}
return Ok(());
}
UserAgentRequestPayload::AuthChallengeRequest(..) | UserAgentRequestPayload::AuthChallengeSolution(..) => {
UserAgentRequestPayload::SdkClientRevoke(_sdk_client_revoke_request) => todo!(),
UserAgentRequestPayload::SdkClientList(_) => todo!(),
UserAgentRequestPayload::AuthChallengeRequest(..)
| UserAgentRequestPayload::AuthChallengeSolution(..) => {
warn!(?payload, "Unsupported post-auth user agent request");
let _ = bi
.send(Err(Status::invalid_argument(
@@ -304,7 +314,7 @@ async fn dispatch_conn_message(
.await;
return Err(());
}
};
bi.send(Ok(UserAgentResponse {
@@ -321,7 +331,7 @@ async fn send_out_of_band(
) -> Result<(), ()> {
let payload = match oob {
OutOfBand::ClientConnectionRequest { profile } => {
UserAgentResponsePayload::ClientConnectionRequest(ClientConnectionRequest {
UserAgentResponsePayload::SdkClientConnectionRequest(ProtoSdkClientConnectionRequest {
pubkey: profile.pubkey.to_bytes().to_vec(),
info: Some(ProtoClientMetadata {
name: profile.metadata.name,
@@ -331,7 +341,7 @@ async fn send_out_of_band(
})
}
OutOfBand::ClientConnectionCancel { pubkey } => {
UserAgentResponsePayload::ClientConnectionCancel(ClientConnectionCancel {
UserAgentResponsePayload::SdkClientConnectionCancel(ProtoSdkClientConnectionCancel {
pubkey: pubkey.to_bytes().to_vec(),
})
}
@@ -435,9 +445,7 @@ fn u256_from_proto_bytes(bytes: &[u8]) -> Result<U256, Status> {
Ok(U256::from_be_slice(bytes))
}
fn proto_timestamp_to_utc(
timestamp: prost_types::Timestamp,
) -> Result<chrono::DateTime<Utc>, Status> {
fn proto_timestamp_to_utc(timestamp: ProtoTimestamp) -> Result<chrono::DateTime<Utc>, Status> {
Utc.timestamp_opt(timestamp.seconds, timestamp.nanos as u32)
.single()
.ok_or_else(|| Status::invalid_argument("Invalid timestamp"))
@@ -447,11 +455,11 @@ fn shared_settings_to_proto(shared: SharedGrantSettings) -> ProtoSharedSettings
ProtoSharedSettings {
wallet_access_id: shared.wallet_access_id,
chain_id: shared.chain,
valid_from: shared.valid_from.map(|time| prost_types::Timestamp {
valid_from: shared.valid_from.map(|time| ProtoTimestamp {
seconds: time.timestamp(),
nanos: time.timestamp_subsec_nanos() as i32,
}),
valid_until: shared.valid_until.map(|time| prost_types::Timestamp {
valid_until: shared.valid_until.map(|time| ProtoTimestamp {
seconds: time.timestamp(),
nanos: time.timestamp_subsec_nanos() as i32,
}),

View File

@@ -1,6 +1,4 @@
#![forbid(unsafe_code)]
#![deny(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
use crate::context::ServerContext;
pub mod actors;

View File

@@ -1,3 +1,4 @@
use arbiter_proto::ClientMetadata;
use arbiter_proto::transport::{Receiver, Sender};
use arbiter_server::actors::GlobalActors;
use arbiter_server::{
@@ -10,8 +11,8 @@ use ed25519_dalek::Signer as _;
use super::common::ChannelTransport;
fn metadata(name: &str, description: Option<&str>, version: Option<&str>) -> auth::ClientMetadata {
auth::ClientMetadata {
fn metadata(name: &str, description: Option<&str>, version: Option<&str>) -> ClientMetadata {
ClientMetadata {
name: name.to_owned(),
description: description.map(str::to_owned),
version: version.map(str::to_owned),
@@ -21,7 +22,7 @@ fn metadata(name: &str, description: Option<&str>, version: Option<&str>) -> aut
async fn insert_registered_client(
db: &db::DatabasePool,
pubkey: Vec<u8>,
metadata: &auth::ClientMetadata,
metadata: &ClientMetadata,
) {
use arbiter_server::db::schema::{client_metadata, program_client};