refactor(useragent): using request/response for correct multiplexing behaviour
This commit is contained in:
@@ -5,33 +5,113 @@ import 'package:grpc/grpc.dart';
|
||||
import 'package:mtcore/markettakers.dart';
|
||||
|
||||
class Connection {
|
||||
final ClientChannel channel;
|
||||
final StreamController<UserAgentRequest> _tx;
|
||||
final StreamIterator<UserAgentResponse> _rx;
|
||||
|
||||
Connection({
|
||||
required this.channel,
|
||||
required StreamController<UserAgentRequest> tx,
|
||||
required ResponseStream<UserAgentResponse> rx,
|
||||
}) : _tx = tx,
|
||||
_rx = StreamIterator(rx);
|
||||
|
||||
Future<void> send(UserAgentRequest request) async {
|
||||
talker.debug('Sending request: ${request.toDebugString()}');
|
||||
_tx.add(request);
|
||||
}) : _tx = tx {
|
||||
_rxSubscription = rx.listen(
|
||||
_handleResponse,
|
||||
onError: _handleError,
|
||||
onDone: _handleDone,
|
||||
cancelOnError: true,
|
||||
);
|
||||
}
|
||||
|
||||
Future<UserAgentResponse> receive() async {
|
||||
final hasValue = await _rx.moveNext();
|
||||
if (!hasValue) {
|
||||
throw Exception('Connection closed while waiting for server response.');
|
||||
final ClientChannel channel;
|
||||
final StreamController<UserAgentRequest> _tx;
|
||||
final Map<int, Completer<UserAgentResponse>> _pendingRequests = {};
|
||||
final StreamController<UserAgentResponse> _outOfBandMessages =
|
||||
StreamController<UserAgentResponse>.broadcast();
|
||||
|
||||
StreamSubscription<UserAgentResponse>? _rxSubscription;
|
||||
int _nextRequestId = 0;
|
||||
|
||||
Stream<UserAgentResponse> get outOfBandMessages => _outOfBandMessages.stream;
|
||||
|
||||
Future<UserAgentResponse> request(UserAgentRequest message) async {
|
||||
_ensureOpen();
|
||||
|
||||
final requestId = _nextRequestId++;
|
||||
final completer = Completer<UserAgentResponse>();
|
||||
_pendingRequests[requestId] = completer;
|
||||
|
||||
message.id = requestId;
|
||||
talker.debug('Sending request: ${message.toDebugString()}');
|
||||
|
||||
try {
|
||||
_tx.add(message);
|
||||
} catch (error, stackTrace) {
|
||||
_pendingRequests.remove(requestId);
|
||||
completer.completeError(error, stackTrace);
|
||||
}
|
||||
talker.debug('Received response: ${_rx.current.toDebugString()}');
|
||||
return _rx.current;
|
||||
|
||||
return completer.future;
|
||||
}
|
||||
|
||||
Future<void> close() async {
|
||||
final rxSubscription = _rxSubscription;
|
||||
if (rxSubscription == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
_rxSubscription = null;
|
||||
await rxSubscription.cancel();
|
||||
_failPendingRequests(Exception('Connection closed.'));
|
||||
await _outOfBandMessages.close();
|
||||
await _tx.close();
|
||||
await channel.shutdown();
|
||||
}
|
||||
|
||||
void _handleResponse(UserAgentResponse response) {
|
||||
talker.debug('Received response: ${response.toDebugString()}');
|
||||
|
||||
if (response.hasId()) {
|
||||
final completer = _pendingRequests.remove(response.id);
|
||||
if (completer == null) {
|
||||
talker.warning('Received response for unknown request id ${response.id}');
|
||||
return;
|
||||
}
|
||||
completer.complete(response);
|
||||
return;
|
||||
}
|
||||
|
||||
_outOfBandMessages.add(response);
|
||||
}
|
||||
|
||||
void _handleError(Object error, StackTrace stackTrace) {
|
||||
_rxSubscription = null;
|
||||
_failPendingRequests(error, stackTrace);
|
||||
_outOfBandMessages.addError(error, stackTrace);
|
||||
}
|
||||
|
||||
void _handleDone() {
|
||||
if (_rxSubscription == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
_rxSubscription = null;
|
||||
final error = Exception(
|
||||
'Connection closed while waiting for server response.',
|
||||
);
|
||||
_failPendingRequests(error);
|
||||
_outOfBandMessages.close();
|
||||
}
|
||||
|
||||
void _failPendingRequests(Object error, [StackTrace? stackTrace]) {
|
||||
final pendingRequests = _pendingRequests.values.toList(growable: false);
|
||||
_pendingRequests.clear();
|
||||
|
||||
for (final completer in pendingRequests) {
|
||||
if (!completer.isCompleted) {
|
||||
completer.completeError(error, stackTrace);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void _ensureOpen() {
|
||||
if (_rxSubscription == null) {
|
||||
throw StateError('Connection is closed.');
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user