pw_rpc: Use pw_function callbacks in RPC client

This updates the nanopb RPC client to use pw::Function for callbacks
instead of custom virtual interfaces.

Change-Id: I7c548cde00223ec32e45cc831fe7f9f79577842b
Requires: pigweed-internal:13201
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/48484
Reviewed-by: Wyatt Hepler <hepler@google.com>
diff --git a/pw_function/docs.rst b/pw_function/docs.rst
index fa6e68e..8a2cf76 100644
--- a/pw_function/docs.rst
+++ b/pw_function/docs.rst
@@ -172,7 +172,7 @@
 from Fuchsia with some changes to make it more suitable for embedded
 development.
 
-Functions are moveable, but not copyable. This allows them to store and manage
+Functions are movable, but not copyable. This allows them to store and manage
 callables without having to perform bookkeeping such as reference counting, and
 avoids any reliance on dynamic memory management. The result is a simpler
 implementation which is easy to conceptualize and use in an embedded context.
diff --git a/pw_function/public/pw_function/internal/function.h b/pw_function/public/pw_function/internal/function.h
index 5bf5afa..1a28c3b 100644
--- a/pw_function/public/pw_function/internal/function.h
+++ b/pw_function/public/pw_function/internal/function.h
@@ -65,7 +65,7 @@
   virtual bool IsNull() const = 0;
 
   // Invoke the callable stored by the function target.
-  virtual Return operator()(Args&&... args) const = 0;
+  virtual Return operator()(Args... args) const = 0;
 
   // Move initialize the function target to a provided location.
   virtual void MoveInitializeTo(void* ptr) = 0;
@@ -86,7 +86,7 @@
 
   bool IsNull() const final { return true; }
 
-  Return operator()(Args&&...) const final { PW_ASSERT(false); }
+  Return operator()(Args...) const final { PW_ASSERT(false); }
 
   void MoveInitializeTo(void* ptr) final { new (ptr) NullFunctionTarget(); }
 };
@@ -109,9 +109,7 @@
 
   bool IsNull() const final { return false; }
 
-  Return operator()(Args&&... args) const final {
-    return callable_(std::forward<Args>(args)...);
-  }
+  Return operator()(Args... args) const final { return callable_(args...); }
 
   void MoveInitializeTo(void* ptr) final {
     new (ptr) InlineFunctionTarget(std::move(*this));
@@ -155,9 +153,7 @@
 
   bool IsNull() const final { return false; }
 
-  Return operator()(Args&&... args) const final {
-    return callable()(std::forward<Args>(args)...);
-  }
+  Return operator()(Args... args) const final { return callable()(args...); }
 
   void MoveInitializeTo(void* ptr) final {
     new (ptr) MemoryFunctionTarget(std::move(*this));
@@ -287,9 +283,7 @@
 
   ~Function() { holder_.DestructTarget(); }
 
-  Return operator()(Args&&... args) const {
-    return holder_.target()(std::forward<Args>(args)...);
-  };
+  Return operator()(Args... args) const { return holder_.target()(args...); };
 
   explicit operator bool() const { return !holder_.target().IsNull(); }
 
diff --git a/pw_protobuf_compiler/proto.cmake b/pw_protobuf_compiler/proto.cmake
index 911ba3a..62f6eaf 100644
--- a/pw_protobuf_compiler/proto.cmake
+++ b/pw_protobuf_compiler/proto.cmake
@@ -299,6 +299,7 @@
   target_link_libraries("${NAME}.nanopb_rpc"
     INTERFACE
       "${NAME}.nanopb"
+      pw_rpc.nanopb.client
       pw_rpc.nanopb.method_union
       pw_rpc.server
       ${DEPS}
diff --git a/pw_rpc/docs.rst b/pw_rpc/docs.rst
index 3377b26..5cde965 100644
--- a/pw_rpc/docs.rst
+++ b/pw_rpc/docs.rst
@@ -857,32 +857,26 @@
   #include "pw_rpc/echo_service_nanopb.h"
 
   namespace {
-
-  // RPC response handler for pw.rpc.EchoService/Echo.
-  class EchoResponseHandler
-      : public pw::rpc::UnaryResponseHandler<pw_rpc_EchoMessage> {
-   public:
-    // Callback invoked when a response is received. This is called
-    // synchronously from Client::ProcessPacket.
-    void ReceivedResponse(pw::Status status,
-                          const pw_rpc_EchoMessage& response) final {
-      if (status.ok()) {
-        PW_LOG_INFO("Received echo response: %s", response.msg);
-      } else {
-        PW_LOG_ERROR("Echo failed with status %d",
-                     static_cast<int>(status.code()));
-      }
-    }
-  };
-
   // Generated clients are namespaced with their proto library.
   using pw::rpc::nanopb::EchoServiceClient;
 
   EchoServiceClient::EchoCall echo_call;
-  EchoResponseHandler response_handler;
+
+  // Callback invoked when a response is received. This is called synchronously
+  // from Client::ProcessPacket.
+  void EchoResponse(const pw_rpc_EchoMessage& response,
+                    pw::Status status) {
+    if (status.ok()) {
+      PW_LOG_INFO("Received echo response: %s", response.msg);
+    } else {
+      PW_LOG_ERROR("Echo failed with status %d",
+                   static_cast<int>(status.code()));
+    }
+  }
 
   }  // namespace
 
+
   void CallEcho(const char* message) {
     pw_rpc_EchoMessage request = pw_rpc_EchoMessage_init_default;
     pw::string::Copy(message, request.msg);
@@ -890,7 +884,7 @@
     // By assigning the returned ClientCall to the global echo_call, the RPC
     // call is kept alive until it completes. When a response is received, it
     // will be logged by the handler function and the call will complete.
-    echo_call = EchoServiceClient::Echo(my_channel, request, response_handler);
+    echo_call = EchoServiceClient::Echo(my_channel, request, EchoResponse);
   }
 
 Client implementation details
diff --git a/pw_rpc/nanopb/BUILD.gn b/pw_rpc/nanopb/BUILD.gn
index c2e7788..72cc611 100644
--- a/pw_rpc/nanopb/BUILD.gn
+++ b/pw_rpc/nanopb/BUILD.gn
@@ -50,6 +50,7 @@
   public_deps = [
     ":common",
     "..:client",
+    dir_pw_function,
   ]
   public = [ "public/pw_rpc/nanopb_client_call.h" ]
   sources = [ "nanopb_client_call.cc" ]
diff --git a/pw_rpc/nanopb/CMakeLists.txt b/pw_rpc/nanopb/CMakeLists.txt
index 87552a0..66547f9 100644
--- a/pw_rpc/nanopb/CMakeLists.txt
+++ b/pw_rpc/nanopb/CMakeLists.txt
@@ -37,6 +37,7 @@
   SOURCES
     nanopb_client_call.cc
   PUBLIC_DEPS
+    pw_function
     pw_rpc.nanopb.common
     pw_rpc.common
 )
diff --git a/pw_rpc/nanopb/codegen_test.cc b/pw_rpc/nanopb/codegen_test.cc
index aae7b00..e601d1e 100644
--- a/pw_rpc/nanopb/codegen_test.cc
+++ b/pw_rpc/nanopb/codegen_test.cc
@@ -131,17 +131,16 @@
 }
 
 using TestServiceClient = test::nanopb::TestServiceClient;
-using internal::TestServerStreamingResponseHandler;
-using internal::TestUnaryResponseHandler;
 
 TEST(NanopbCodegen, Client_GeneratesCallAliases) {
   static_assert(
-      std::is_same_v<
-          TestServiceClient::TestRpcCall,
-          NanopbClientCall<UnaryResponseHandler<pw_rpc_test_TestResponse>>>);
-  static_assert(std::is_same_v<TestServiceClient::TestStreamRpcCall,
-                               NanopbClientCall<ServerStreamingResponseHandler<
-                                   pw_rpc_test_TestStreamResponse>>>);
+      std::is_same_v<TestServiceClient::TestRpcCall,
+                     NanopbClientCall<
+                         internal::UnaryCallbacks<pw_rpc_test_TestResponse>>>);
+  static_assert(
+      std::is_same_v<TestServiceClient::TestStreamRpcCall,
+                     NanopbClientCall<internal::ServerStreamingCallbacks<
+                         pw_rpc_test_TestStreamResponse>>>);
 }
 
 TEST(NanopbCodegen, Client_InvokesUnaryRpcWithCallback) {
@@ -149,10 +148,20 @@
   constexpr uint32_t method_id = internal::Hash("TestRpc");
 
   ClientContextForTest<128, 128, 99, service_id, method_id> context;
-  TestUnaryResponseHandler<pw_rpc_test_TestResponse> handler;
+
+  struct {
+    Status last_status = Status::Unknown();
+    int response_value = -1;
+  } result;
 
   auto call = TestServiceClient::TestRpc(
-      context.channel(), {.integer = 123, .status_code = 0}, handler);
+      context.channel(),
+      {.integer = 123, .status_code = 0},
+      [&result](const pw_rpc_test_TestResponse& response, Status status) {
+        result.last_status = status;
+        result.response_value = response.value;
+      });
+
   EXPECT_EQ(context.output().packet_count(), 1u);
   auto packet = context.output().sent_packet();
   EXPECT_EQ(packet.channel_id(), context.channel().id());
@@ -163,9 +172,8 @@
 
   PW_ENCODE_PB(pw_rpc_test_TestResponse, response, .value = 42);
   context.SendResponse(OkStatus(), response);
-  ASSERT_EQ(handler.responses_received(), 1u);
-  EXPECT_EQ(handler.last_status(), OkStatus());
-  EXPECT_EQ(handler.last_response().value, 42);
+  EXPECT_EQ(result.last_status, OkStatus());
+  EXPECT_EQ(result.response_value, 42);
 }
 
 TEST(NanopbCodegen, Client_InvokesServerStreamingRpcWithCallback) {
@@ -173,10 +181,25 @@
   constexpr uint32_t method_id = internal::Hash("TestStreamRpc");
 
   ClientContextForTest<128, 128, 99, service_id, method_id> context;
-  TestServerStreamingResponseHandler<pw_rpc_test_TestStreamResponse> handler;
+
+  struct {
+    bool active = true;
+    Status stream_status = Status::Unknown();
+    int response_value = -1;
+  } result;
 
   auto call = TestServiceClient::TestStreamRpc(
-      context.channel(), {.integer = 123, .status_code = 0}, handler);
+      context.channel(),
+      {.integer = 123, .status_code = 0},
+      [&result](const pw_rpc_test_TestStreamResponse& response) {
+        result.active = true;
+        result.response_value = response.number;
+      },
+      [&result](Status status) {
+        result.active = false;
+        result.stream_status = status;
+      });
+
   EXPECT_EQ(context.output().packet_count(), 1u);
   auto packet = context.output().sent_packet();
   EXPECT_EQ(packet.channel_id(), context.channel().id());
@@ -188,13 +211,13 @@
   PW_ENCODE_PB(
       pw_rpc_test_TestStreamResponse, response, .chunk = {}, .number = 11u);
   context.SendResponse(OkStatus(), response);
-  ASSERT_EQ(handler.responses_received(), 1u);
-  EXPECT_EQ(handler.last_response().number, 11u);
+  EXPECT_TRUE(result.active);
+  EXPECT_EQ(result.response_value, 11);
 
   context.SendPacket(internal::PacketType::SERVER_STREAM_END,
                      Status::NotFound());
-  EXPECT_FALSE(handler.active());
-  EXPECT_EQ(handler.status(), Status::NotFound());
+  EXPECT_FALSE(result.active);
+  EXPECT_EQ(result.stream_status, Status::NotFound());
 }
 
 }  // namespace
diff --git a/pw_rpc/nanopb/docs.rst b/pw_rpc/nanopb/docs.rst
index 0d61766..859e466 100644
--- a/pw_rpc/nanopb/docs.rst
+++ b/pw_rpc/nanopb/docs.rst
@@ -43,7 +43,7 @@
 
 Generated code API
 ==================
-Take the following RPC service as an example.
+All examples in this document use the following RPC service definition.
 
 .. code:: protobuf
 
@@ -145,146 +145,65 @@
 ``ChatService`` would create a ``generated::ChatServiceClient``.
 
 The client class contains static methods to call each of the service's methods.
-It is not meant to be instantiated. The signatures for the methods all follow
-the same format, taking a channel through which to communicate, the initial
-request struct, and a response handler.
+It is not meant to be instantiated.
 
 .. code-block:: c++
 
-  static NanopbClientCall<UnaryResponseHandler<RoomInfoResponse>>
-  GetRoomInformation(Channel& channel,
-                     const RoomInfoRequest& request,
-                     UnaryResponseHandler<RoomInfoResponse> handler);
+  static GetRoomInformationCall GetRoomInformation(
+      Channel& channel,
+      const RoomInfoRequest& request,
+      ::pw::Function<void(Status, const RoomInfoResponse&)> on_response,
+      ::pw::Function<void(Status)> on_rpc_error = nullptr);
 
 The ``NanopbClientCall`` object returned by the RPC invocation stores the active
 RPC's context. For more information on ``ClientCall`` objects, refer to the
-:ref:`core RPC documentation <module-pw_rpc-making-calls>`.
+:ref:`core RPC documentation <module-pw_rpc-making-calls>`. The type of the
+returned object is complex, so it is aliased using the method name.
 
-Response handlers
-^^^^^^^^^^^^^^^^^
-RPC responses are sent back to the caller through a response handler object.
-These are classes with virtual callback functions implemented by the RPC caller
-to handle RPC events.
+.. admonition:: Callback invocation
 
-There are two types of response handlers: unary and server-streaming, which are
-used depending whether the method's responses are a stream or not.
+  RPC callbacks are invoked synchronously from ``Client::ProcessPacket``.
 
-Unary / client streaming RPC
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-A ``UnaryResponseHandler`` is used by methods where the server returns a single
-response. It contains a callback for the response, which is only called once.
+Method APIs
+^^^^^^^^^^^
+The first argument to each client call method is the channel through which to
+send the RPC. Following that, the arguments depend on the method type.
+
+Unary RPC
+~~~~~~~~~
+A unary RPC call takes the request struct and a callback to invoke when a
+response is received. The callback receives the RPC's status and response
+struct.
+
+An optional second callback can be provided to handle internal errors.
 
 .. code-block:: c++
 
-  template <typename Response>
-  class UnaryResponseHandler {
-   public:
-    virtual ~UnaryResponseHandler() = default;
+  static GetRoomInformationCall GetRoomInformation(
+      Channel& channel,
+      const RoomInfoRequest& request,
+      ::pw::Function<void(const RoomInfoResponse&, Status)> on_response,
+      ::pw::Function<void(Status)> on_rpc_error = nullptr);
 
-    // Called when the response is received from the server with the method's
-    // status and the deserialized response struct.
-    virtual void ReceivedResponse(Status status, const Response& response) = 0;
+Server streaming RPC
+~~~~~~~~~~~~~~~~~~~~
+A server streaming RPC call takes the initial request struct and two callbacks.
+The first is invoked on every stream response received, and the second is
+invoked once the stream is complete with its overall status.
 
-    // Called when an error occurs internally in the RPC client or server.
-    virtual void RpcError(Status) {}
-  };
-
-.. cpp:class:: template <typename Response> UnaryResponseHandler
-
-  A handler for RPC methods which return a single response (i.e. unary and
-  client streaming).
-
-.. cpp:function:: virtual void UnaryResponseHandler::ReceivedResponse(Status status, const Response& response)
-
-  Callback invoked when the response is recieved from the server. Guaranteed to
-  only be called once.
-
-.. cpp:function:: virtual void UnaryResponseHandler::RpcError(Status status)
-
-  Callback invoked if an internal error occurs in the RPC system. Optional;
-  defaults to a no-op.
-
-**Example implementation**
+An optional third callback can be provided to handle internal errors.
 
 .. code-block:: c++
 
-  class RoomInfoHandler : public UnaryResponseHandler<RoomInfoResponse> {
-   public:
-    void ReceivedResponse(Status status,
-                          const RoomInfoResponse& response) override {
-      if (status.ok()) {
-        response_ = response;
-      }
-    }
-
-    constexpr RoomInfoResponse& response() { return response_; }
-
-   private:
-    RoomInfoResponse response_;
-  };
-
-Server streaming / bidirectional streaming RPC
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-For methods which return a response stream, a ``ServerStreamingResponseHandler``
-is used.
-
-.. code:: c++
-
-  class ServerStreamingResponseHandler {
-   public:
-    virtual ~ServerStreamingResponseHandler() = default;
-
-    // Called on every response received from the server with the deserialized
-    // response struct.
-    virtual void ReceivedResponse(const Response& response) = 0;
-
-    // Called when the server ends the stream with the overall RPC status.
-    virtual void Complete(Status status) = 0;
-
-    // Called when an error occurs internally in the RPC client or server.
-    virtual void RpcError(Status) {}
-  };
-
-.. cpp:class:: template <typename Response> ServerStreamingResponseHandler
-
-  A handler for RPC methods which return zero or more responses (i.e. server
-  and bidirectional streaming).
-
-.. cpp:function:: virtual void ServerStreamingResponseHandler::ReceivedResponse(const Response& response)
-
-  Callback invoked whenever a response is received from the server.
-
-.. cpp:function:: virtual void ServerStreamingResponseHandler::Complete(Status status)
-
-  Callback invoked when the server ends the stream, with the overall status for
-  the RPC.
-
-.. cpp:function:: virtual void ServerStreamingResponseHandler::RpcError(Status status)
-
-  Callback invoked if an internal error occurs in the RPC system. Optional;
-  defaults to a no-op.
-
-**Example implementation**
-
-.. code-block:: c++
-
-  class ChatHandler : public UnaryResponseHandler<ChatMessage> {
-   public:
-    void ReceivedResponse(const ChatMessage& response) override {
-      gui_.RenderChatMessage(response);
-    }
-
-    void Complete(Status status) override {
-      client_.Exit(status);
-    }
-
-   private:
-    ChatGui& gui_;
-    ChatClient& client_;
-  };
+  static ListUsersInRoomCall ListUsersInRoom(
+      Channel& channel,
+      const ListUsersRequest& request,
+      ::pw::Function<void(const ListUsersResponse&)> on_response,
+      ::pw::Function<void(Status)> on_stream_end,
+      ::pw::Function<void(Status)> on_rpc_error = nullptr);
 
 Example usage
-~~~~~~~~~~~~~
+^^^^^^^^^^^^^
 The following example demonstrates how to call an RPC method using a nanopb
 service client and receive the response.
 
@@ -296,15 +215,16 @@
     MyChannelOutput output;
     pw::rpc::Channel channels[] = {pw::rpc::Channel::Create<0>(&output)};
     pw::rpc::Client client(channels);
+
+    // Callback function for GetRoomInformation.
+    void LogRoomInformation(const RoomInfoResponse& response, Status status);
   }
 
   void InvokeSomeRpcs() {
-    RoomInfoHandler handler;
-
     // The RPC will remain active as long as `call` is alive.
     auto call = ChatServiceClient::GetRoomInformation(channels[0],
                                                       {.room = "pigweed"},
-                                                      handler);
+                                                      LogRoomInformation);
 
     // For simplicity, block here. An actual implementation would likely
     // std::move the call somewhere to keep it active while doing other work.
@@ -312,5 +232,5 @@
       Wait();
     }
 
-    DoStuff(handler.response());
+    // Do other stuff now that we have the room information.
   }
diff --git a/pw_rpc/nanopb/nanopb_client_call_test.cc b/pw_rpc/nanopb/nanopb_client_call_test.cc
index 171ae28..ddac23b 100644
--- a/pw_rpc/nanopb/nanopb_client_call_test.cc
+++ b/pw_rpc/nanopb/nanopb_client_call_test.cc
@@ -28,46 +28,49 @@
 
 class FakeGeneratedServiceClient {
  public:
-  static NanopbClientCall<UnaryResponseHandler<pw_rpc_test_TestResponse>>
+  static NanopbClientCall<internal::UnaryCallbacks<pw_rpc_test_TestResponse>>
   TestRpc(Channel& channel,
           const pw_rpc_test_TestRequest& request,
-          UnaryResponseHandler<pw_rpc_test_TestResponse>& callback) {
-    auto call = NanopbClientCall(&channel,
-                                 kServiceId,
-                                 kUnaryMethodId,
-                                 callback,
-                                 pw_rpc_test_TestRequest_fields,
-                                 pw_rpc_test_TestResponse_fields);
+          Function<void(const pw_rpc_test_TestResponse&, Status)> on_response,
+          Function<void(Status)> on_error = nullptr) {
+    auto call = NanopbClientCall(
+        &channel,
+        kServiceId,
+        kUnaryMethodId,
+        internal::UnaryCallbacks(std::move(on_response), std::move(on_error)),
+        pw_rpc_test_TestRequest_fields,
+        pw_rpc_test_TestResponse_fields);
     call.SendRequest(&request);
     return call;
   }
 
   static NanopbClientCall<
-      ServerStreamingResponseHandler<pw_rpc_test_TestStreamResponse>>
-  TestStreamRpc(Channel& channel,
-                const pw_rpc_test_TestRequest& request,
-                ServerStreamingResponseHandler<pw_rpc_test_TestStreamResponse>&
-                    callback) {
-    auto call = NanopbClientCall(&channel,
-                                 kServiceId,
-                                 kServerStreamingMethodId,
-                                 callback,
-                                 pw_rpc_test_TestRequest_fields,
-                                 pw_rpc_test_TestStreamResponse_fields);
+      internal::ServerStreamingCallbacks<pw_rpc_test_TestStreamResponse>>
+  TestStreamRpc(
+      Channel& channel,
+      const pw_rpc_test_TestRequest& request,
+      Function<void(const pw_rpc_test_TestStreamResponse&)> on_response,
+      Function<void(Status)> on_stream_end,
+      Function<void(Status)> on_error = nullptr) {
+    auto call = NanopbClientCall(
+        &channel,
+        kServiceId,
+        kServerStreamingMethodId,
+        internal::ServerStreamingCallbacks(std::move(on_response),
+                                           std::move(on_stream_end),
+                                           std::move(on_error)),
+        pw_rpc_test_TestRequest_fields,
+        pw_rpc_test_TestStreamResponse_fields);
     call.SendRequest(&request);
     return call;
   }
 };
 
-using internal::TestServerStreamingResponseHandler;
-using internal::TestUnaryResponseHandler;
-
 TEST(NanopbClientCall, Unary_SendsRequestPacket) {
   ClientContextForTest context;
-  TestUnaryResponseHandler<pw_rpc_test_TestResponse> handler;
 
   auto call = FakeGeneratedServiceClient::TestRpc(
-      context.channel(), {.integer = 123, .status_code = 0}, handler);
+      context.channel(), {.integer = 123, .status_code = 0}, nullptr);
 
   EXPECT_EQ(context.output().packet_count(), 1u);
   auto packet = context.output().sent_packet();
@@ -79,55 +82,116 @@
   EXPECT_EQ(sent_proto.integer, 123);
 }
 
-TEST(NanopbClientCall, Unary_InvokesCallbackOnValidResponse) {
+class UnaryClientCall : public ::testing::Test {
+ protected:
+  Status last_status_ = Status::Unknown();
+  Status last_error_ = Status::Unknown();
+  int responses_received_ = 0;
+  int last_response_value_ = 0;
+};
+
+TEST_F(UnaryClientCall, InvokesCallbackOnValidResponse) {
   ClientContextForTest context;
-  TestUnaryResponseHandler<pw_rpc_test_TestResponse> handler;
 
   auto call = FakeGeneratedServiceClient::TestRpc(
-      context.channel(), {.integer = 123, .status_code = 0}, handler);
+      context.channel(),
+      {.integer = 123, .status_code = 0},
+      [this](const pw_rpc_test_TestResponse& response, Status status) {
+        ++responses_received_;
+        last_status_ = status;
+        last_response_value_ = response.value;
+      });
 
   PW_ENCODE_PB(pw_rpc_test_TestResponse, response, .value = 42);
   context.SendResponse(OkStatus(), response);
 
-  ASSERT_EQ(handler.responses_received(), 1u);
-  EXPECT_EQ(handler.last_status(), OkStatus());
-  EXPECT_EQ(handler.last_response().value, 42);
+  ASSERT_EQ(responses_received_, 1);
+  EXPECT_EQ(last_status_, OkStatus());
+  EXPECT_EQ(last_response_value_, 42);
 }
 
-TEST(NanopbClientCall, Unary_InvokesErrorCallbackOnInvalidResponse) {
+TEST_F(UnaryClientCall, DoesNothingOnNullCallback) {
   ClientContextForTest context;
-  TestUnaryResponseHandler<pw_rpc_test_TestResponse> handler;
 
   auto call = FakeGeneratedServiceClient::TestRpc(
-      context.channel(), {.integer = 123, .status_code = 0}, handler);
+      context.channel(), {.integer = 123, .status_code = 0}, nullptr);
+
+  PW_ENCODE_PB(pw_rpc_test_TestResponse, response, .value = 42);
+  context.SendResponse(OkStatus(), response);
+
+  ASSERT_EQ(responses_received_, 0);
+}
+
+TEST_F(UnaryClientCall, InvokesErrorCallbackOnInvalidResponse) {
+  ClientContextForTest context;
+
+  auto call = FakeGeneratedServiceClient::TestRpc(
+      context.channel(),
+      {.integer = 123, .status_code = 0},
+      [this](const pw_rpc_test_TestResponse& response, Status status) {
+        ++responses_received_;
+        last_status_ = status;
+        last_response_value_ = response.value;
+      },
+      [this](Status status) { last_error_ = status; });
 
   constexpr std::byte bad_payload[]{
       std::byte{0xab}, std::byte{0xcd}, std::byte{0xef}};
   context.SendResponse(OkStatus(), bad_payload);
 
-  EXPECT_EQ(handler.responses_received(), 0u);
-  EXPECT_EQ(handler.rpc_error(), Status::DataLoss());
+  EXPECT_EQ(responses_received_, 0);
+  EXPECT_EQ(last_error_, Status::DataLoss());
 }
 
-TEST(NanopbClientCall, Unary_InvokesErrorCallbackOnServerError) {
+TEST_F(UnaryClientCall, InvokesErrorCallbackOnServerError) {
   ClientContextForTest context;
-  TestUnaryResponseHandler<pw_rpc_test_TestResponse> handler;
 
   auto call = FakeGeneratedServiceClient::TestRpc(
-      context.channel(), {.integer = 123, .status_code = 0}, handler);
+      context.channel(),
+      {.integer = 123, .status_code = 0},
+      [this](const pw_rpc_test_TestResponse& response, Status status) {
+        ++responses_received_;
+        last_status_ = status;
+        last_response_value_ = response.value;
+      },
+      [this](Status status) { last_error_ = status; });
 
   context.SendPacket(internal::PacketType::SERVER_ERROR, Status::NotFound());
 
-  EXPECT_EQ(handler.responses_received(), 0u);
-  EXPECT_EQ(handler.rpc_error(), Status::NotFound());
+  EXPECT_EQ(responses_received_, 0);
+  EXPECT_EQ(last_error_, Status::NotFound());
 }
 
-TEST(NanopbClientCall, Unary_OnlyReceivesOneResponse) {
+TEST_F(UnaryClientCall, DoesNothingOnErrorWithoutCallback) {
   ClientContextForTest context;
-  TestUnaryResponseHandler<pw_rpc_test_TestResponse> handler;
 
   auto call = FakeGeneratedServiceClient::TestRpc(
-      context.channel(), {.integer = 123, .status_code = 0}, handler);
+      context.channel(),
+      {.integer = 123, .status_code = 0},
+      [this](const pw_rpc_test_TestResponse& response, Status status) {
+        ++responses_received_;
+        last_status_ = status;
+        last_response_value_ = response.value;
+      });
+
+  constexpr std::byte bad_payload[]{
+      std::byte{0xab}, std::byte{0xcd}, std::byte{0xef}};
+  context.SendResponse(OkStatus(), bad_payload);
+
+  EXPECT_EQ(responses_received_, 0);
+}
+
+TEST_F(UnaryClientCall, OnlyReceivesOneResponse) {
+  ClientContextForTest context;
+
+  auto call = FakeGeneratedServiceClient::TestRpc(
+      context.channel(),
+      {.integer = 123, .status_code = 0},
+      [this](const pw_rpc_test_TestResponse& response, Status status) {
+        ++responses_received_;
+        last_status_ = status;
+        last_response_value_ = response.value;
+      });
 
   PW_ENCODE_PB(pw_rpc_test_TestResponse, r1, .value = 42);
   context.SendResponse(Status::Unimplemented(), r1);
@@ -136,18 +200,26 @@
   PW_ENCODE_PB(pw_rpc_test_TestResponse, r3, .value = 46);
   context.SendResponse(Status::Internal(), r3);
 
-  EXPECT_EQ(handler.responses_received(), 1u);
-  EXPECT_EQ(handler.last_status(), Status::Unimplemented());
-  EXPECT_EQ(handler.last_response().value, 42);
+  EXPECT_EQ(responses_received_, 1);
+  EXPECT_EQ(last_status_, Status::Unimplemented());
+  EXPECT_EQ(last_response_value_, 42);
 }
 
-TEST(NanopbClientCall, ServerStreaming_SendsRequestPacket) {
+class ServerStreamingClientCall : public ::testing::Test {
+ protected:
+  bool active_ = true;
+  Status stream_status_ = Status::Unknown();
+  Status rpc_error_ = Status::Unknown();
+  int responses_received_ = 0;
+  int last_response_number_ = 0;
+};
+
+TEST_F(ServerStreamingClientCall, SendsRequestPacket) {
   ClientContextForTest<128, 128, 99, kServiceId, kServerStreamingMethodId>
       context;
-  TestServerStreamingResponseHandler<pw_rpc_test_TestStreamResponse> handler;
 
   auto call = FakeGeneratedServiceClient::TestStreamRpc(
-      context.channel(), {.integer = 71, .status_code = 0}, handler);
+      context.channel(), {.integer = 71, .status_code = 0}, nullptr, nullptr);
 
   EXPECT_EQ(context.output().packet_count(), 1u);
   auto packet = context.output().sent_packet();
@@ -159,48 +231,64 @@
   EXPECT_EQ(sent_proto.integer, 71);
 }
 
-TEST(NanopbClientCall, ServerStreaming_InvokesCallbackOnValidResponse) {
+TEST_F(ServerStreamingClientCall, InvokesCallbackOnValidResponse) {
   ClientContextForTest<128, 128, 99, kServiceId, kServerStreamingMethodId>
       context;
-  TestServerStreamingResponseHandler<pw_rpc_test_TestStreamResponse> handler;
 
   auto call = FakeGeneratedServiceClient::TestStreamRpc(
-      context.channel(), {.integer = 71, .status_code = 0}, handler);
+      context.channel(),
+      {.integer = 71, .status_code = 0},
+      [this](const pw_rpc_test_TestStreamResponse& response) {
+        ++responses_received_;
+        last_response_number_ = response.number;
+      },
+      [this](Status status) {
+        active_ = false;
+        stream_status_ = status;
+      });
 
   PW_ENCODE_PB(pw_rpc_test_TestStreamResponse, r1, .chunk = {}, .number = 11u);
   context.SendResponse(OkStatus(), r1);
-  EXPECT_TRUE(handler.active());
-  EXPECT_EQ(handler.responses_received(), 1u);
-  EXPECT_EQ(handler.last_response().number, 11u);
+  EXPECT_TRUE(active_);
+  EXPECT_EQ(responses_received_, 1);
+  EXPECT_EQ(last_response_number_, 11);
 
   PW_ENCODE_PB(pw_rpc_test_TestStreamResponse, r2, .chunk = {}, .number = 22u);
   context.SendResponse(OkStatus(), r2);
-  EXPECT_TRUE(handler.active());
-  EXPECT_EQ(handler.responses_received(), 2u);
-  EXPECT_EQ(handler.last_response().number, 22u);
+  EXPECT_TRUE(active_);
+  EXPECT_EQ(responses_received_, 2);
+  EXPECT_EQ(last_response_number_, 22);
 
   PW_ENCODE_PB(pw_rpc_test_TestStreamResponse, r3, .chunk = {}, .number = 33u);
   context.SendResponse(OkStatus(), r3);
-  EXPECT_TRUE(handler.active());
-  EXPECT_EQ(handler.responses_received(), 3u);
-  EXPECT_EQ(handler.last_response().number, 33u);
+  EXPECT_TRUE(active_);
+  EXPECT_EQ(responses_received_, 3);
+  EXPECT_EQ(last_response_number_, 33);
 }
 
-TEST(NanopbClientCall, ServerStreaming_ClosesOnFinish) {
+TEST_F(ServerStreamingClientCall, InvokesStreamEndOnFinish) {
   ClientContextForTest<128, 128, 99, kServiceId, kServerStreamingMethodId>
       context;
-  TestServerStreamingResponseHandler<pw_rpc_test_TestStreamResponse> handler;
 
   auto call = FakeGeneratedServiceClient::TestStreamRpc(
-      context.channel(), {.integer = 71, .status_code = 0}, handler);
+      context.channel(),
+      {.integer = 71, .status_code = 0},
+      [this](const pw_rpc_test_TestStreamResponse& response) {
+        ++responses_received_;
+        last_response_number_ = response.number;
+      },
+      [this](Status status) {
+        active_ = false;
+        stream_status_ = status;
+      });
 
   PW_ENCODE_PB(pw_rpc_test_TestStreamResponse, r1, .chunk = {}, .number = 11u);
   context.SendResponse(OkStatus(), r1);
-  EXPECT_TRUE(handler.active());
+  EXPECT_TRUE(active_);
 
   PW_ENCODE_PB(pw_rpc_test_TestStreamResponse, r2, .chunk = {}, .number = 22u);
   context.SendResponse(OkStatus(), r2);
-  EXPECT_TRUE(handler.active());
+  EXPECT_TRUE(active_);
 
   // Close the stream.
   context.SendPacket(internal::PacketType::SERVER_STREAM_END,
@@ -208,40 +296,46 @@
 
   PW_ENCODE_PB(pw_rpc_test_TestStreamResponse, r3, .chunk = {}, .number = 33u);
   context.SendResponse(OkStatus(), r3);
-  EXPECT_FALSE(handler.active());
+  EXPECT_FALSE(active_);
 
-  EXPECT_EQ(handler.responses_received(), 2u);
+  EXPECT_EQ(responses_received_, 2);
 }
 
-TEST(NanopbClientCall, ServerStreaming_InvokesErrorCallbackOnInvalidResponses) {
+TEST_F(ServerStreamingClientCall, InvokesErrorCallbackOnInvalidResponses) {
   ClientContextForTest<128, 128, 99, kServiceId, kServerStreamingMethodId>
       context;
-  TestServerStreamingResponseHandler<pw_rpc_test_TestStreamResponse> handler;
 
   auto call = FakeGeneratedServiceClient::TestStreamRpc(
-      context.channel(), {.integer = 71, .status_code = 0}, handler);
+      context.channel(),
+      {.integer = 71, .status_code = 0},
+      [this](const pw_rpc_test_TestStreamResponse& response) {
+        ++responses_received_;
+        last_response_number_ = response.number;
+      },
+      nullptr,
+      [this](Status error) { rpc_error_ = error; });
 
   PW_ENCODE_PB(pw_rpc_test_TestStreamResponse, r1, .chunk = {}, .number = 11u);
   context.SendResponse(OkStatus(), r1);
-  EXPECT_TRUE(handler.active());
-  EXPECT_EQ(handler.responses_received(), 1u);
-  EXPECT_EQ(handler.last_response().number, 11u);
+  EXPECT_TRUE(active_);
+  EXPECT_EQ(responses_received_, 1);
+  EXPECT_EQ(last_response_number_, 11);
 
   constexpr std::byte bad_payload[]{
       std::byte{0xab}, std::byte{0xcd}, std::byte{0xef}};
   context.SendResponse(OkStatus(), bad_payload);
-  EXPECT_EQ(handler.responses_received(), 1u);
-  EXPECT_EQ(handler.rpc_error(), Status::DataLoss());
+  EXPECT_EQ(responses_received_, 1);
+  EXPECT_EQ(rpc_error_, Status::DataLoss());
 
   PW_ENCODE_PB(pw_rpc_test_TestStreamResponse, r2, .chunk = {}, .number = 22u);
   context.SendResponse(OkStatus(), r2);
-  EXPECT_TRUE(handler.active());
-  EXPECT_EQ(handler.responses_received(), 2u);
-  EXPECT_EQ(handler.last_response().number, 22u);
+  EXPECT_TRUE(active_);
+  EXPECT_EQ(responses_received_, 2);
+  EXPECT_EQ(last_response_number_, 22);
 
   context.SendPacket(internal::PacketType::SERVER_ERROR, Status::NotFound());
-  EXPECT_EQ(handler.responses_received(), 2u);
-  EXPECT_EQ(handler.rpc_error(), Status::NotFound());
+  EXPECT_EQ(responses_received_, 2);
+  EXPECT_EQ(rpc_error_, Status::NotFound());
 }
 
 }  // namespace
diff --git a/pw_rpc/nanopb/public/pw_rpc/nanopb_client_call.h b/pw_rpc/nanopb/public/pw_rpc/nanopb_client_call.h
index 368d36f..bd46f84 100644
--- a/pw_rpc/nanopb/public/pw_rpc/nanopb_client_call.h
+++ b/pw_rpc/nanopb/public/pw_rpc/nanopb_client_call.h
@@ -16,6 +16,7 @@
 #include <new>
 
 #include "pw_bytes/span.h"
+#include "pw_function/function.h"
 #include "pw_rpc/internal/base_client_call.h"
 #include "pw_rpc/internal/method_type.h"
 #include "pw_rpc/internal/nanopb_common.h"
@@ -62,13 +63,12 @@
   Status SendRequest(const void* request_struct);
 
  protected:
-  constexpr BaseNanopbClientCall(
-      rpc::Channel* channel,
-      uint32_t service_id,
-      uint32_t method_id,
-      ResponseHandler handler,
-      internal::NanopbMessageDescriptor request_fields,
-      internal::NanopbMessageDescriptor response_fields)
+  constexpr BaseNanopbClientCall(rpc::Channel* channel,
+                                 uint32_t service_id,
+                                 uint32_t method_id,
+                                 ResponseHandler handler,
+                                 NanopbMessageDescriptor request_fields,
+                                 NanopbMessageDescriptor response_fields)
       : BaseClientCall(channel, service_id, method_id, handler),
         serde_(request_fields, response_fields) {}
 
@@ -81,38 +81,61 @@
   BaseNanopbClientCall(BaseNanopbClientCall&&) = default;
   BaseNanopbClientCall& operator=(BaseNanopbClientCall&&) = default;
 
-  constexpr const internal::NanopbMethodSerde& serde() const { return serde_; }
+  constexpr const NanopbMethodSerde& serde() const { return serde_; }
 
  private:
-  internal::NanopbMethodSerde serde_;
+  NanopbMethodSerde serde_;
 };
 
-template <typename Callback>
-struct CallbackTraits {};
+struct ErrorCallbacks {
+  ErrorCallbacks(Function<void(Status)> error) : rpc_error(std::move(error)) {}
+
+  void InvokeRpcError(Status status) {
+    if (rpc_error != nullptr) {
+      rpc_error(status);
+    }
+  }
+
+  Function<void(Status)> rpc_error;
+};
 
 template <typename ResponseType>
-struct CallbackTraits<UnaryResponseHandler<ResponseType>> {
+struct UnaryCallbacks : public ErrorCallbacks {
   using Response = ResponseType;
-
   static constexpr MethodType kType = MethodType::kUnary;
+
+  UnaryCallbacks(Function<void(const Response&, Status)> response,
+                 Function<void(Status)> error)
+      : ErrorCallbacks(std::move(error)), unary_response(std::move(response)) {}
+
+  Function<void(const Response&, Status)> unary_response;
 };
 
 template <typename ResponseType>
-struct CallbackTraits<ServerStreamingResponseHandler<ResponseType>> {
+struct ServerStreamingCallbacks : public ErrorCallbacks {
   using Response = ResponseType;
-
   static constexpr MethodType kType = MethodType::kServerStreaming;
+
+  ServerStreamingCallbacks(Function<void(const Response&)> response,
+                           Function<void(Status)> end,
+                           Function<void(Status)> error)
+      : ErrorCallbacks(std::move(error)),
+        stream_response(std::move(response)),
+        stream_end(std::move(end)) {}
+
+  Function<void(const Response&)> stream_response;
+  Function<void(Status)> stream_end;
 };
 
 }  // namespace internal
 
-template <typename Callback>
+template <typename Callbacks>
 class NanopbClientCall : public internal::BaseNanopbClientCall {
  public:
   constexpr NanopbClientCall(Channel* channel,
                              uint32_t service_id,
                              uint32_t method_id,
-                             Callback& callback,
+                             Callbacks callbacks,
                              internal::NanopbMessageDescriptor request_fields,
                              internal::NanopbMessageDescriptor response_fields)
       : BaseNanopbClientCall(channel,
@@ -121,9 +144,9 @@
                              &ResponseHandler,
                              request_fields,
                              response_fields),
-        callback_(&callback) {}
+        callbacks_(std::move(callbacks)) {}
 
-  constexpr NanopbClientCall() : BaseNanopbClientCall(), callback_(nullptr) {}
+  constexpr NanopbClientCall() : BaseNanopbClientCall(), callbacks_({}) {}
 
   NanopbClientCall(const NanopbClientCall&) = delete;
   NanopbClientCall& operator=(const NanopbClientCall&) = delete;
@@ -132,8 +155,7 @@
   NanopbClientCall& operator=(NanopbClientCall&&) = default;
 
  private:
-  using Traits = internal::CallbackTraits<Callback>;
-  using Response = typename Traits::Response;
+  using Response = typename Callbacks::Response;
 
   // Buffer into which the nanopb struct is decoded. Its contents are unknown,
   // so it is aligned to maximum alignment to accommodate any type.
@@ -144,32 +166,33 @@
 
   static void ResponseHandler(internal::BaseClientCall& call,
                               const internal::Packet& packet) {
-    static_cast<NanopbClientCall<Callback>&>(call).HandleResponse(packet);
+    static_cast<NanopbClientCall<Callbacks>&>(call).HandleResponse(packet);
   }
 
   void HandleResponse(const internal::Packet& packet) {
-    if constexpr (Traits::kType == internal::MethodType::kUnary) {
+    if constexpr (Callbacks::kType == internal::MethodType::kUnary) {
       InvokeUnaryCallback(packet);
     }
-    if constexpr (Traits::kType == internal::MethodType::kServerStreaming) {
+    if constexpr (Callbacks::kType == internal::MethodType::kServerStreaming) {
       InvokeServerStreamingCallback(packet);
     }
   }
 
   void InvokeUnaryCallback(const internal::Packet& packet) {
     if (packet.type() == internal::PacketType::SERVER_ERROR) {
-      callback_->RpcError(packet.status());
+      callbacks_.InvokeRpcError(packet.status());
       return;
     }
 
     ResponseBuffer response_struct{};
 
-    if (serde().DecodeResponse(&response_struct, packet.payload())) {
-      callback_->ReceivedResponse(
-          packet.status(),
-          *std::launder(reinterpret_cast<Response*>(&response_struct)));
+    if (callbacks_.unary_response &&
+        serde().DecodeResponse(&response_struct, packet.payload())) {
+      callbacks_.unary_response(
+          *std::launder(reinterpret_cast<Response*>(&response_struct)),
+          packet.status());
     } else {
-      callback_->RpcError(Status::DataLoss());
+      callbacks_.InvokeRpcError(Status::DataLoss());
     }
 
     Unregister();
@@ -177,26 +200,28 @@
 
   void InvokeServerStreamingCallback(const internal::Packet& packet) {
     if (packet.type() == internal::PacketType::SERVER_ERROR) {
-      callback_->RpcError(packet.status());
+      callbacks_.InvokeRpcError(packet.status());
       return;
     }
 
-    if (packet.type() == internal::PacketType::SERVER_STREAM_END) {
-      callback_->Complete(packet.status());
+    if (packet.type() == internal::PacketType::SERVER_STREAM_END &&
+        callbacks_.stream_end) {
+      callbacks_.stream_end(packet.status());
       return;
     }
 
     ResponseBuffer response_struct{};
 
-    if (serde().DecodeResponse(&response_struct, packet.payload())) {
-      callback_->ReceivedResponse(
+    if (callbacks_.stream_response &&
+        serde().DecodeResponse(&response_struct, packet.payload())) {
+      callbacks_.stream_response(
           *std::launder(reinterpret_cast<Response*>(&response_struct)));
     } else {
-      callback_->RpcError(Status::DataLoss());
+      callbacks_.InvokeRpcError(Status::DataLoss());
     }
   }
 
-  Callback* callback_;
+  Callbacks callbacks_;
 };
 
 }  // namespace pw::rpc
diff --git a/pw_rpc/nanopb/pw_rpc_nanopb_private/internal_test_utils.h b/pw_rpc/nanopb/pw_rpc_nanopb_private/internal_test_utils.h
index 1cd8df7..5312463 100644
--- a/pw_rpc/nanopb/pw_rpc_nanopb_private/internal_test_utils.h
+++ b/pw_rpc/nanopb/pw_rpc_nanopb_private/internal_test_utils.h
@@ -17,7 +17,6 @@
 
 #include "pb_decode.h"
 #include "pb_encode.h"
-#include "pw_rpc/nanopb_client_call.h"
 
 namespace pw::rpc::internal {
 
@@ -60,61 +59,4 @@
   EXPECT_TRUE(pb_decode(&input, fields, &protobuf));
 }
 
-// Client response handler for a unary RPC invocation which captures the
-// response it receives.
-template <typename Response>
-class TestUnaryResponseHandler : public UnaryResponseHandler<Response> {
- public:
-  void ReceivedResponse(Status status, const Response& response) override {
-    last_status_ = status;
-    last_response_ = response;
-    ++responses_received_;
-  }
-
-  void RpcError(Status status) override { rpc_error_ = status; }
-
-  constexpr Status last_status() const { return last_status_; }
-  constexpr const Response& last_response() const& { return last_response_; }
-  constexpr size_t responses_received() const { return responses_received_; }
-  constexpr Status rpc_error() const { return rpc_error_; }
-
- private:
-  Status last_status_;
-  Response last_response_;
-  size_t responses_received_ = 0;
-  Status rpc_error_;
-};
-
-// Client response handler for a unary RPC invocation which stores information
-// about the state of the stream.
-template <typename Response>
-class TestServerStreamingResponseHandler
-    : public ServerStreamingResponseHandler<Response> {
- public:
-  void ReceivedResponse(const Response& response) override {
-    last_response_ = response;
-    ++responses_received_;
-  }
-
-  void Complete(Status status) override {
-    active_ = false;
-    status_ = status;
-  }
-
-  void RpcError(Status status) override { rpc_error_ = status; }
-
-  constexpr bool active() const { return active_; }
-  constexpr Status status() const { return status_; }
-  constexpr const Response& last_response() const& { return last_response_; }
-  constexpr size_t responses_received() const { return responses_received_; }
-  constexpr Status rpc_error() const { return rpc_error_; }
-
- private:
-  Status status_;
-  Response last_response_;
-  size_t responses_received_ = 0;
-  bool active_ = true;
-  Status rpc_error_;
-};
-
 }  // namespace pw::rpc::internal
diff --git a/pw_rpc/py/pw_rpc/codegen_nanopb.py b/pw_rpc/py/pw_rpc/codegen_nanopb.py
index efb778c..c142340 100644
--- a/pw_rpc/py/pw_rpc/codegen_nanopb.py
+++ b/pw_rpc/py/pw_rpc/codegen_nanopb.py
@@ -14,7 +14,7 @@
 """This module generates the code for nanopb-based pw_rpc services."""
 
 import os
-from typing import Iterable, Iterator
+from typing import Iterable, Iterator, NamedTuple, Optional
 
 from pw_protobuf.output_file import OutputFile
 from pw_protobuf.proto_tree import ProtoNode, ProtoService, ProtoServiceMethod
@@ -71,6 +71,20 @@
                           'NanopbMethodUnion', _generate_method_descriptor)
 
 
+class _CallbackFunction(NamedTuple):
+    """Represents a callback function parameter in a client RPC call."""
+
+    function_type: str
+    name: str
+    default_value: Optional[str] = None
+
+    def __str__(self):
+        param = f'::pw::Function<{self.function_type}> {self.name}'
+        if self.default_value:
+            param += f' = {self.default_value}'
+        return param
+
+
 def _generate_code_for_client_method(method: ProtoServiceMethod,
                                      output: OutputFile) -> None:
     """Outputs client code for a single RPC method."""
@@ -79,10 +93,26 @@
     res = method.response_type().nanopb_name()
     method_id = pw_rpc.ids.calculate(method.name())
 
+    rpc_error = _CallbackFunction(
+        'void(::pw::Status)',
+        'on_rpc_error',
+        'nullptr',
+    )
+
     if method.type() == ProtoServiceMethod.Type.UNARY:
-        callback = f'{RPC_NAMESPACE}::UnaryResponseHandler<{res}>'
+        callbacks = f'{RPC_NAMESPACE}::internal::UnaryCallbacks'
+        functions = [
+            _CallbackFunction(f'void(const {res}&, ::pw::Status)',
+                              'on_response'),
+            rpc_error,
+        ]
     elif method.type() == ProtoServiceMethod.Type.SERVER_STREAMING:
-        callback = f'{RPC_NAMESPACE}::ServerStreamingResponseHandler<{res}>'
+        callbacks = f'{RPC_NAMESPACE}::internal::ServerStreamingCallbacks'
+        functions = [
+            _CallbackFunction(f'void(const {res}&)', 'on_response'),
+            _CallbackFunction('void(::pw::Status)', 'on_stream_end'),
+            rpc_error,
+        ]
     else:
         raise NotImplementedError(
             'Only unary and server streaming RPCs are currently supported')
@@ -92,13 +122,19 @@
     output.write_line()
     output.write_line(
         f'using {call_alias} = {RPC_NAMESPACE}::NanopbClientCall<')
-    output.write_line(f'    {callback}>;')
+    output.write_line(f'    {callbacks}<{res}>>;')
     output.write_line()
     output.write_line(f'static {call_alias} {method.name()}(')
     with output.indent(4):
         output.write_line(f'{RPC_NAMESPACE}::Channel& channel,')
         output.write_line(f'const {req}& request,')
-        output.write_line(f'{callback}& callback) {{')
+
+        # Write out each of the callback functions for the method type.
+        for i, function in enumerate(functions):
+            if i == len(functions) - 1:
+                output.write_line(f'{function}) {{')
+            else:
+                output.write_line(f'{function},')
 
     with output.indent():
         output.write_line(f'{call_alias} call(&channel,')
@@ -106,7 +142,11 @@
             output.write_line('kServiceId,')
             output.write_line(
                 f'0x{method_id:08x},  // Hash of "{method.name()}"')
-            output.write_line('callback,')
+
+            moved_functions = (f'std::move({function.name})'
+                               for function in functions)
+            output.write_line(f'{callbacks}({", ".join(moved_functions)}),')
+
             output.write_line(f'{req}_fields,')
             output.write_line(f'{res}_fields);')
         output.write_line('call.SendRequest(&request);')
@@ -166,7 +206,7 @@
 
 class StubGenerator(codegen.StubGenerator):
     def unary_signature(self, method: ProtoServiceMethod, prefix: str) -> str:
-        return (f'pw::Status {prefix}{method.name()}(ServerContext&, '
+        return (f'::pw::Status {prefix}{method.name()}(ServerContext&, '
                 f'const {method.request_type().nanopb_name()}& request, '
                 f'{method.response_type().nanopb_name()}& response)')
 
@@ -176,7 +216,7 @@
         output.write_line('static_cast<void>(request);')
         output.write_line(codegen.STUB_RESPONSE_TODO)
         output.write_line('static_cast<void>(response);')
-        output.write_line('return pw::Status::Unimplemented();')
+        output.write_line('return ::pw::Status::Unimplemented();')
 
     def server_streaming_signature(self, method: ProtoServiceMethod,
                                    prefix: str) -> str: