refactor(useragent): using request/response for correct multiplexing behaviour
This commit is contained in:
@@ -30,15 +30,18 @@ Future<Connection> connectAndAuthorize(
|
||||
KeyAlgorithm.ed25519 => KeyType.KEY_TYPE_ED25519,
|
||||
},
|
||||
);
|
||||
await connection.send(UserAgentRequest(authChallengeRequest: req));
|
||||
final response = await connection.request(
|
||||
UserAgentRequest(authChallengeRequest: req),
|
||||
);
|
||||
talker.info(
|
||||
"Sent auth challenge request with pubkey ${base64Encode(pubkey)}",
|
||||
);
|
||||
|
||||
final response = await connection.receive();
|
||||
talker.info('Received response from server, checking auth flow...');
|
||||
|
||||
if (response.hasAuthOk()) {
|
||||
if (response.hasAuthResult()) {
|
||||
if (response.authResult != AuthResult.AUTH_RESULT_SUCCESS) {
|
||||
throw Exception('Authentication failed: ${response.authResult}');
|
||||
}
|
||||
talker.info('Authentication successful, connection established');
|
||||
return connection;
|
||||
}
|
||||
@@ -55,18 +58,20 @@ Future<Connection> connectAndAuthorize(
|
||||
);
|
||||
|
||||
final signature = await key.sign(challenge);
|
||||
await connection.send(
|
||||
final solutionResponse = await connection.request(
|
||||
UserAgentRequest(authChallengeSolution: AuthChallengeSolution(signature: signature)),
|
||||
);
|
||||
|
||||
talker.info('Sent auth challenge solution, waiting for server response...');
|
||||
|
||||
final solutionResponse = await connection.receive();
|
||||
if (!solutionResponse.hasAuthOk()) {
|
||||
if (!solutionResponse.hasAuthResult()) {
|
||||
throw Exception(
|
||||
'Expected AuthChallengeSolutionResponse, got ${solutionResponse.whichPayload()}',
|
||||
);
|
||||
}
|
||||
if (solutionResponse.authResult != AuthResult.AUTH_RESULT_SUCCESS) {
|
||||
throw Exception('Authentication failed: ${solutionResponse.authResult}');
|
||||
}
|
||||
|
||||
talker.info('Authentication successful, connection established');
|
||||
return connection;
|
||||
|
||||
@@ -5,33 +5,113 @@ import 'package:grpc/grpc.dart';
|
||||
import 'package:mtcore/markettakers.dart';
|
||||
|
||||
class Connection {
|
||||
final ClientChannel channel;
|
||||
final StreamController<UserAgentRequest> _tx;
|
||||
final StreamIterator<UserAgentResponse> _rx;
|
||||
|
||||
Connection({
|
||||
required this.channel,
|
||||
required StreamController<UserAgentRequest> tx,
|
||||
required ResponseStream<UserAgentResponse> rx,
|
||||
}) : _tx = tx,
|
||||
_rx = StreamIterator(rx);
|
||||
|
||||
Future<void> send(UserAgentRequest request) async {
|
||||
talker.debug('Sending request: ${request.toDebugString()}');
|
||||
_tx.add(request);
|
||||
}) : _tx = tx {
|
||||
_rxSubscription = rx.listen(
|
||||
_handleResponse,
|
||||
onError: _handleError,
|
||||
onDone: _handleDone,
|
||||
cancelOnError: true,
|
||||
);
|
||||
}
|
||||
|
||||
Future<UserAgentResponse> receive() async {
|
||||
final hasValue = await _rx.moveNext();
|
||||
if (!hasValue) {
|
||||
throw Exception('Connection closed while waiting for server response.');
|
||||
final ClientChannel channel;
|
||||
final StreamController<UserAgentRequest> _tx;
|
||||
final Map<int, Completer<UserAgentResponse>> _pendingRequests = {};
|
||||
final StreamController<UserAgentResponse> _outOfBandMessages =
|
||||
StreamController<UserAgentResponse>.broadcast();
|
||||
|
||||
StreamSubscription<UserAgentResponse>? _rxSubscription;
|
||||
int _nextRequestId = 0;
|
||||
|
||||
Stream<UserAgentResponse> get outOfBandMessages => _outOfBandMessages.stream;
|
||||
|
||||
Future<UserAgentResponse> request(UserAgentRequest message) async {
|
||||
_ensureOpen();
|
||||
|
||||
final requestId = _nextRequestId++;
|
||||
final completer = Completer<UserAgentResponse>();
|
||||
_pendingRequests[requestId] = completer;
|
||||
|
||||
message.id = requestId;
|
||||
talker.debug('Sending request: ${message.toDebugString()}');
|
||||
|
||||
try {
|
||||
_tx.add(message);
|
||||
} catch (error, stackTrace) {
|
||||
_pendingRequests.remove(requestId);
|
||||
completer.completeError(error, stackTrace);
|
||||
}
|
||||
talker.debug('Received response: ${_rx.current.toDebugString()}');
|
||||
return _rx.current;
|
||||
|
||||
return completer.future;
|
||||
}
|
||||
|
||||
Future<void> close() async {
|
||||
final rxSubscription = _rxSubscription;
|
||||
if (rxSubscription == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
_rxSubscription = null;
|
||||
await rxSubscription.cancel();
|
||||
_failPendingRequests(Exception('Connection closed.'));
|
||||
await _outOfBandMessages.close();
|
||||
await _tx.close();
|
||||
await channel.shutdown();
|
||||
}
|
||||
|
||||
void _handleResponse(UserAgentResponse response) {
|
||||
talker.debug('Received response: ${response.toDebugString()}');
|
||||
|
||||
if (response.hasId()) {
|
||||
final completer = _pendingRequests.remove(response.id);
|
||||
if (completer == null) {
|
||||
talker.warning('Received response for unknown request id ${response.id}');
|
||||
return;
|
||||
}
|
||||
completer.complete(response);
|
||||
return;
|
||||
}
|
||||
|
||||
_outOfBandMessages.add(response);
|
||||
}
|
||||
|
||||
void _handleError(Object error, StackTrace stackTrace) {
|
||||
_rxSubscription = null;
|
||||
_failPendingRequests(error, stackTrace);
|
||||
_outOfBandMessages.addError(error, stackTrace);
|
||||
}
|
||||
|
||||
void _handleDone() {
|
||||
if (_rxSubscription == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
_rxSubscription = null;
|
||||
final error = Exception(
|
||||
'Connection closed while waiting for server response.',
|
||||
);
|
||||
_failPendingRequests(error);
|
||||
_outOfBandMessages.close();
|
||||
}
|
||||
|
||||
void _failPendingRequests(Object error, [StackTrace? stackTrace]) {
|
||||
final pendingRequests = _pendingRequests.values.toList(growable: false);
|
||||
_pendingRequests.clear();
|
||||
|
||||
for (final completer in pendingRequests) {
|
||||
if (!completer.isCompleted) {
|
||||
completer.completeError(error, stackTrace);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void _ensureOpen() {
|
||||
if (_rxSubscription == null) {
|
||||
throw StateError('Connection is closed.');
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,9 +4,9 @@ import 'package:arbiter/proto/user_agent.pb.dart';
|
||||
import 'package:protobuf/well_known_types/google/protobuf/empty.pb.dart';
|
||||
|
||||
Future<List<WalletEntry>> listEvmWallets(Connection connection) async {
|
||||
await connection.send(UserAgentRequest(evmWalletList: Empty()));
|
||||
|
||||
final response = await connection.receive();
|
||||
final response = await connection.request(
|
||||
UserAgentRequest(evmWalletList: Empty()),
|
||||
);
|
||||
if (!response.hasEvmWalletList()) {
|
||||
throw Exception(
|
||||
'Expected EVM wallet list response, got ${response.whichPayload()}',
|
||||
@@ -25,9 +25,9 @@ Future<List<WalletEntry>> listEvmWallets(Connection connection) async {
|
||||
}
|
||||
|
||||
Future<void> createEvmWallet(Connection connection) async {
|
||||
await connection.send(UserAgentRequest(evmWalletCreate: Empty()));
|
||||
|
||||
final response = await connection.receive();
|
||||
final response = await connection.request(
|
||||
UserAgentRequest(evmWalletCreate: Empty()),
|
||||
);
|
||||
if (!response.hasEvmWalletCreate()) {
|
||||
throw Exception(
|
||||
'Expected EVM wallet create response, got ${response.whichPayload()}',
|
||||
|
||||
@@ -13,9 +13,9 @@ Future<List<GrantEntry>> listEvmGrants(
|
||||
request.walletId = walletId;
|
||||
}
|
||||
|
||||
await connection.send(UserAgentRequest(evmGrantList: request));
|
||||
|
||||
final response = await connection.receive();
|
||||
final response = await connection.request(
|
||||
UserAgentRequest(evmGrantList: request),
|
||||
);
|
||||
if (!response.hasEvmGrantList()) {
|
||||
throw Exception(
|
||||
'Expected EVM grant list response, got ${response.whichPayload()}',
|
||||
@@ -45,7 +45,7 @@ Future<int> createEvmGrant(
|
||||
TransactionRateLimit? rateLimit,
|
||||
required SpecificGrant specific,
|
||||
}) async {
|
||||
await connection.send(
|
||||
final response = await connection.request(
|
||||
UserAgentRequest(
|
||||
evmGrantCreate: EvmGrantCreateRequest(
|
||||
clientId: clientId,
|
||||
@@ -62,8 +62,6 @@ Future<int> createEvmGrant(
|
||||
),
|
||||
),
|
||||
);
|
||||
|
||||
final response = await connection.receive();
|
||||
if (!response.hasEvmGrantCreate()) {
|
||||
throw Exception(
|
||||
'Expected EVM grant create response, got ${response.whichPayload()}',
|
||||
@@ -82,11 +80,9 @@ Future<int> createEvmGrant(
|
||||
}
|
||||
|
||||
Future<void> deleteEvmGrant(Connection connection, int grantId) async {
|
||||
await connection.send(
|
||||
final response = await connection.request(
|
||||
UserAgentRequest(evmGrantDelete: EvmGrantDeleteRequest(grantId: grantId)),
|
||||
);
|
||||
|
||||
final response = await connection.receive();
|
||||
if (!response.hasEvmGrantDelete()) {
|
||||
throw Exception(
|
||||
'Expected EVM grant delete response, got ${response.whichPayload()}',
|
||||
|
||||
@@ -10,7 +10,7 @@ Future<BootstrapResult> bootstrapVault(
|
||||
) async {
|
||||
final encryptedKey = await _encryptVaultKeyMaterial(connection, password);
|
||||
|
||||
await connection.send(
|
||||
final response = await connection.request(
|
||||
UserAgentRequest(
|
||||
bootstrapEncryptedKey: BootstrapEncryptedKey(
|
||||
nonce: encryptedKey.nonce,
|
||||
@@ -19,8 +19,6 @@ Future<BootstrapResult> bootstrapVault(
|
||||
),
|
||||
),
|
||||
);
|
||||
|
||||
final response = await connection.receive();
|
||||
if (!response.hasBootstrapResult()) {
|
||||
throw Exception(
|
||||
'Expected bootstrap result, got ${response.whichPayload()}',
|
||||
@@ -33,7 +31,7 @@ Future<BootstrapResult> bootstrapVault(
|
||||
Future<UnsealResult> unsealVault(Connection connection, String password) async {
|
||||
final encryptedKey = await _encryptVaultKeyMaterial(connection, password);
|
||||
|
||||
await connection.send(
|
||||
final response = await connection.request(
|
||||
UserAgentRequest(
|
||||
unsealEncryptedKey: UnsealEncryptedKey(
|
||||
nonce: encryptedKey.nonce,
|
||||
@@ -42,8 +40,6 @@ Future<UnsealResult> unsealVault(Connection connection, String password) async {
|
||||
),
|
||||
),
|
||||
);
|
||||
|
||||
final response = await connection.receive();
|
||||
if (!response.hasUnsealResult()) {
|
||||
throw Exception('Expected unseal result, got ${response.whichPayload()}');
|
||||
}
|
||||
@@ -60,11 +56,9 @@ Future<_EncryptedVaultKey> _encryptVaultKeyMaterial(
|
||||
final clientKeyPair = await keyExchange.newKeyPair();
|
||||
final clientPublicKey = await clientKeyPair.extractPublicKey();
|
||||
|
||||
await connection.send(
|
||||
final handshakeResponse = await connection.request(
|
||||
UserAgentRequest(unsealStart: UnsealStart(clientPubkey: clientPublicKey.bytes)),
|
||||
);
|
||||
|
||||
final handshakeResponse = await connection.receive();
|
||||
if (!handshakeResponse.hasUnsealStartResponse()) {
|
||||
throw Exception(
|
||||
'Expected unseal handshake response, got ${handshakeResponse.whichPayload()}',
|
||||
|
||||
Reference in New Issue
Block a user