63 std::shared_ptr<StreamHandler<T>> shared_handler(handler.release());
64 const MethodCodec<T>* codec = codec_;
65 const std::string channel_name = name_;
66 const BinaryMessenger* messenger = messenger_;
68 [shared_handler, codec, channel_name, messenger,
70 is_listening = bool(
false)](
const uint8_t* message,
71 const size_t message_size,
73 constexpr
char kOnListenMethod[] =
"listen";
74 constexpr
char kOnCancelMethod[] =
"cancel";
76 std::unique_ptr<MethodCall<T>> method_call =
77 codec->DecodeMethodCall(message, message_size);
80 <<
"Unable to construct method call from message on channel: "
81 << channel_name << std::endl;
86 const std::string& method = method_call->method_name();
87 if (method.compare(kOnListenMethod) == 0) {
89 std::unique_ptr<StreamHandlerError<T>> error =
90 shared_handler->OnCancel(
nullptr);
92 std::cerr <<
"Failed to cancel existing stream: "
93 << (error->error_code) <<
", "
94 << (error->error_message) <<
", "
95 << (error->error_details);
100 std::unique_ptr<std::vector<uint8_t>> result;
101 auto sink = std::make_unique<EventSinkImplementation>(
102 messenger, channel_name, codec);
103 std::unique_ptr<StreamHandlerError<T>> error =
104 shared_handler->OnListen(method_call->arguments(),
107 result = codec->EncodeErrorEnvelope(error->error_code,
108 error->error_message,
109 error->error_details.get());
111 result = codec->EncodeSuccessEnvelope();
113 reply(result->data(), result->size());
114 }
else if (method.compare(kOnCancelMethod) == 0) {
115 std::unique_ptr<std::vector<uint8_t>> result;
117 std::unique_ptr<StreamHandlerError<T>> error =
118 shared_handler->OnCancel(method_call->arguments());
120 result = codec->EncodeErrorEnvelope(error->error_code,
121 error->error_message,
122 error->error_details.get());
124 result = codec->EncodeSuccessEnvelope();
126 is_listening =
false;
128 result = codec->EncodeErrorEnvelope(
129 "error",
"No active stream to cancel",
nullptr);
131 reply(result->data(), result->size());