cancelable<S> method

CancelableOperation<S> cancelable<S>(
  1. Future<S> callback(
    1. StreamQueue<T>
    )
)

Passes a copy of this queue to callback, and updates this queue to match the copy's position once callback completes.

If the returned CancelableOperation is canceled, this queue instead continues as though cancelable hadn't been called. Otherwise, it emits the same value or error as callback.

See also startTransaction and withTransaction.

final _stdinQueue = StreamQueue(stdin);

/// Returns an operation that completes when the user sends a line to
/// standard input.
///
/// If the operation is canceled, stops waiting for user input.
CancelableOperation<String> nextStdinLine() =>
    _stdinQueue.cancelable((queue) => queue.next);

Implementation

CancelableOperation<S> cancelable<S>(
    Future<S> Function(StreamQueue<T>) callback) {
  var transaction = startTransaction();
  var completer = CancelableCompleter<S>(onCancel: () {
    transaction.reject();
  });

  var queue = transaction.newQueue();
  completer.complete(callback(queue).whenComplete(() {
    if (!completer.isCanceled) transaction.commit(queue);
  }));

  return completer.operation;
}