pw_rpc: Utilities for testing RPC service methods

Define the PW_RPC_TEST_METHOD_CONTEXT macro, which declares a context
object that can be used to invoke service methods in tests.

Change-Id: I829c7e4f73265ba4c6c2e9ae6ff0cd16fe1fb2a5
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/14025
Commit-Queue: Wyatt Hepler <hepler@google.com>
Reviewed-by: Alexei Frolov <frolv@google.com>
Reviewed-by: Keir Mierle <keir@google.com>
diff --git a/pw_rpc/BUILD b/pw_rpc/BUILD
index fccebc7..aafcdba 100644
--- a/pw_rpc/BUILD
+++ b/pw_rpc/BUILD
@@ -36,6 +36,7 @@
         "public/pw_rpc/server_context.h",
         "public/pw_rpc/internal/channel.h",
         "public/pw_rpc/internal/server.h",
+        "public/pw_rpc/internal/hash.h",
         # TODO(hepler): Only building the test version of the server for now.
         "test_impl/public_overrides/pw_rpc/internal/method.h",
     ],
@@ -76,8 +77,8 @@
 )
 
 pw_cc_library(
-    name = "test_utils",
-    hdrs = ["pw_rpc_private/test_utils.h"],
+    name = "internal_test_utils",
+    hdrs = ["pw_rpc_private/internal_test_utils.h"],
     visibility = ["//visibility:private"],
     deps = [
         ":common",
@@ -97,6 +98,7 @@
         "nanopb/test.pb.c",
         "nanopb/test.pb.h",
         "nanopb/test_rpc.pb.h",
+        "public/pw_rpc/test_method_context.h",
     ],
 )
 
@@ -106,8 +108,8 @@
         "base_server_writer_test.cc",
     ],
     deps = [
+        ":internal_test_utils",
         ":pw_rpc",
-        ":test_utils",
     ],
 )
 
@@ -143,8 +145,8 @@
         "server_test.cc",
     ],
     deps = [
+        ":internal_test_utils",
         ":pw_rpc",
-        ":test_utils",
         "//pw_assert",
     ],
 )
diff --git a/pw_rpc/BUILD.gn b/pw_rpc/BUILD.gn
index 13b88cb..83b2cc7 100644
--- a/pw_rpc/BUILD.gn
+++ b/pw_rpc/BUILD.gn
@@ -64,8 +64,8 @@
     friend = [ "./*" ]
   }
 
-  pw_source_set("test_utils_$_target_name") {
-    public = [ "pw_rpc_private/test_utils.h" ]
+  pw_source_set("internal_test_utils_$_target_name") {
+    public = [ "pw_rpc_private/internal_test_utils.h" ]
     public_configs = [ ":private_includes" ]
     public_deps = [
       ":$_target_name",
@@ -119,6 +119,17 @@
   implementation = "nanopb"
 }
 
+pw_source_set("nanopb_test_method_context") {
+  public_configs = [ ":default_config" ]
+  public = [ "public/pw_rpc/test_method_context.h" ]
+  sources = [ "public/pw_rpc/internal/hash.h" ]
+  public_deps = [
+    ":nanopb_server",
+    dir_pw_containers,
+    dir_pw_unit_test,
+  ]
+}
+
 config("private_includes") {
   include_dirs = [ "." ]
   visibility = [ ":*" ]
@@ -165,8 +176,8 @@
 
 pw_test("base_server_writer_test") {
   deps = [
+    ":internal_test_utils_test_server",
     ":test_server",
-    ":test_utils_test_server",
   ]
   sources = [ "base_server_writer_test.cc" ]
 }
@@ -174,7 +185,7 @@
 pw_test("channel_test") {
   deps = [
     ":common",
-    ":test_utils_test_server",
+    ":internal_test_utils_test_server",
   ]
   sources = [ "channel_test.cc" ]
 }
@@ -189,9 +200,9 @@
 
 pw_test("server_test") {
   deps = [
+    ":internal_test_utils_test_server",
     ":protos_pwpb",
     ":test_server",
-    ":test_utils_test_server",
     dir_pw_assert,
   ]
   sources = [ "server_test.cc" ]
diff --git a/pw_rpc/base_server_writer_test.cc b/pw_rpc/base_server_writer_test.cc
index ce9e806..2100d5a 100644
--- a/pw_rpc/base_server_writer_test.cc
+++ b/pw_rpc/base_server_writer_test.cc
@@ -21,7 +21,7 @@
 #include "gtest/gtest.h"
 #include "pw_rpc/internal/service.h"
 #include "pw_rpc/server_context.h"
-#include "pw_rpc_private/test_utils.h"
+#include "pw_rpc_private/internal_test_utils.h"
 
 namespace pw::rpc {
 
diff --git a/pw_rpc/channel_test.cc b/pw_rpc/channel_test.cc
index c2938dc..74e691a 100644
--- a/pw_rpc/channel_test.cc
+++ b/pw_rpc/channel_test.cc
@@ -16,7 +16,7 @@
 
 #include "gtest/gtest.h"
 #include "pw_rpc/internal/packet.h"
-#include "pw_rpc_private/test_utils.h"
+#include "pw_rpc_private/internal_test_utils.h"
 
 namespace pw::rpc::internal {
 namespace {
diff --git a/pw_rpc/nanopb/BUILD.gn b/pw_rpc/nanopb/BUILD.gn
index 75ed96f..197e1a1 100644
--- a/pw_rpc/nanopb/BUILD.gn
+++ b/pw_rpc/nanopb/BUILD.gn
@@ -46,6 +46,7 @@
 pw_test("codegen_test") {
   deps = [
     "..:nanopb_server",
+    "..:nanopb_test_method_context",
     "..:test_protos_nanopb_rpc",
   ]
   sources = [ "codegen_test.cc" ]
@@ -54,9 +55,9 @@
 
 pw_test("method_test") {
   deps = [
+    "..:internal_test_utils_nanopb_server",
     "..:nanopb_server",
     "..:test_protos_nanopb",
-    "..:test_utils_nanopb_server",
   ]
   sources = [ "method_test.cc" ]
   enable_if = dir_pw_third_party_nanopb != ""
diff --git a/pw_rpc/nanopb/codegen_test.cc b/pw_rpc/nanopb/codegen_test.cc
index b2de196..9e57ded 100644
--- a/pw_rpc/nanopb/codegen_test.cc
+++ b/pw_rpc/nanopb/codegen_test.cc
@@ -13,9 +13,33 @@
 // the License.
 
 #include "gtest/gtest.h"
+#include "pw_rpc/test_method_context.h"
 #include "pw_rpc_test_protos/test_rpc.pb.h"
 
-namespace pw::rpc::internal {
+namespace pw::rpc {
+namespace test {
+
+Status TestService::TestRpc(ServerContext&,
+                            const pw_rpc_test_TestRequest& request,
+                            pw_rpc_test_TestResponse& response) {
+  response.value = request.integer + 1;
+  return static_cast<Status::Code>(request.status_code);
+}
+
+void TestService::TestStreamRpc(
+    ServerContext&,
+    const pw_rpc_test_TestRequest& request,
+    ServerWriter<pw_rpc_test_TestStreamResponse>& writer) {
+  for (int i = 0; i < request.integer; ++i) {
+    writer.Write({.number = static_cast<uint32_t>(i)});
+  }
+
+  writer.Finish(static_cast<Status::Code>(request.status_code));
+}
+
+}  // namespace test
+
+namespace internal {
 namespace {
 
 TEST(NanopbCodegen, CompilesProperly) {
@@ -24,5 +48,57 @@
   EXPECT_STREQ(service.name(), "TestService");
 }
 
+TEST(NanopbCodegen, InvokeUnaryRpc) {
+  PW_RPC_TEST_METHOD_CONTEXT(test::TestService, TestRpc) context;
+
+  EXPECT_EQ(Status::OK,
+            context.call({.integer = 123, .status_code = Status::OK}));
+
+  EXPECT_EQ(124, context.response().value);
+
+  EXPECT_EQ(
+      Status::INVALID_ARGUMENT,
+      context.call({.integer = 999, .status_code = Status::INVALID_ARGUMENT}));
+  EXPECT_EQ(1000, context.response().value);
+}
+
+TEST(NanopbCodegen, InvokeStreamingRpc) {
+  PW_RPC_TEST_METHOD_CONTEXT(test::TestService, TestStreamRpc) context;
+
+  context.call({.integer = 0, .status_code = Status::ABORTED});
+
+  EXPECT_EQ(Status::ABORTED, context.status());
+  EXPECT_TRUE(context.done());
+  EXPECT_TRUE(context.responses().empty());
+  EXPECT_EQ(0u, context.total_responses());
+
+  context.call({.integer = 4, .status_code = Status::OK});
+
+  ASSERT_EQ(4u, context.responses().size());
+  ASSERT_EQ(4u, context.total_responses());
+
+  for (size_t i = 0; i < context.responses().size(); ++i) {
+    EXPECT_EQ(context.responses()[i].number, i);
+  }
+
+  EXPECT_EQ(Status::OK, context.status());
+}
+
+TEST(NanopbCodegen, InvokeStreamingRpc_ContextKeepsFixedNumberOfResponses) {
+  PW_RPC_TEST_METHOD_CONTEXT(test::TestService, TestStreamRpc, 3) context;
+
+  ASSERT_EQ(3u, context.responses().max_size());
+
+  context.call({.integer = 5, .status_code = Status::NOT_FOUND});
+
+  ASSERT_EQ(3u, context.responses().size());
+  ASSERT_EQ(5u, context.total_responses());
+
+  EXPECT_EQ(context.responses()[0].number, 0u);
+  EXPECT_EQ(context.responses()[1].number, 1u);
+  EXPECT_EQ(context.responses()[2].number, 4u);
+}
+
 }  // namespace
-}  // namespace pw::rpc::internal
+}  // namespace internal
+}  // namespace pw::rpc
diff --git a/pw_rpc/nanopb/method.cc b/pw_rpc/nanopb/method.cc
index 6c96d0a..87997b5 100644
--- a/pw_rpc/nanopb/method.cc
+++ b/pw_rpc/nanopb/method.cc
@@ -49,6 +49,13 @@
   return StatusWithSize::INTERNAL;
 }
 
+bool Method::DecodeResponse(std::span<const byte> response,
+                            void* proto_struct) const {
+  auto input = pb_istream_from_buffer(
+      reinterpret_cast<const pb_byte_t*>(response.data()), response.size());
+  return pb_decode(&input, static_cast<Fields>(response_fields_), proto_struct);
+}
+
 void Method::CallUnary(ServerCall& call,
                        const Packet& request,
                        void* request_struct,
diff --git a/pw_rpc/nanopb/method_test.cc b/pw_rpc/nanopb/method_test.cc
index b90406f..350dec2 100644
--- a/pw_rpc/nanopb/method_test.cc
+++ b/pw_rpc/nanopb/method_test.cc
@@ -20,7 +20,7 @@
 #include "pb_encode.h"
 #include "pw_rpc/internal/service.h"
 #include "pw_rpc/server_context.h"
-#include "pw_rpc_private/test_utils.h"
+#include "pw_rpc_private/internal_test_utils.h"
 #include "pw_rpc_test_protos/test.pb.h"
 
 namespace pw::rpc::internal {
diff --git a/pw_rpc/nanopb/public_overrides/pw_rpc/internal/method.h b/pw_rpc/nanopb/public_overrides/pw_rpc/internal/method.h
index 0a5d2a2..956c4ef 100644
--- a/pw_rpc/nanopb/public_overrides/pw_rpc/internal/method.h
+++ b/pw_rpc/nanopb/public_overrides/pw_rpc/internal/method.h
@@ -49,6 +49,8 @@
 // Use a void* to cover both Nanopb 3's pb_field_s and Nanopb 4's pb_msgdesc_s.
 using NanopbMessageDescriptor = const void*;
 
+enum class Type { kUnary, kServerStreaming, kClientStreaming, kBidiStreaming };
+
 // Extracts the request and response proto types from a method.
 template <typename Method>
 struct RpcTraits;
@@ -59,6 +61,10 @@
     ServerContext&, const RequestType&, ResponseType&)> {
   using Request = RequestType;
   using Response = ResponseType;
+
+  static constexpr Type kType = Type::kUnary;
+  static constexpr bool kServerStreaming = false;
+  static constexpr bool kClientStreaming = false;
 };
 
 // Specialization for server streaming RPCs.
@@ -67,6 +73,10 @@
     ServerContext&, const RequestType&, ServerWriter<ResponseType>&)> {
   using Request = RequestType;
   using Response = ResponseType;
+
+  static constexpr Type kType = Type::kServerStreaming;
+  static constexpr bool kServerStreaming = true;
+  static constexpr bool kClientStreaming = false;
 };
 
 template <auto method>
@@ -145,6 +155,11 @@
   StatusWithSize EncodeResponse(const void* proto_struct,
                                 std::span<std::byte> buffer) const;
 
+  // Decodes a response protobuf with Nanopb to the provided buffer. For testing
+  // use.
+  bool DecodeResponse(std::span<const std::byte> response,
+                      void* proto_struct) const;
+
  private:
   // Generic version of the unary RPC function signature:
   //
diff --git a/pw_rpc/public/pw_rpc/internal/hash.h b/pw_rpc/public/pw_rpc/internal/hash.h
new file mode 100644
index 0000000..57fee52
--- /dev/null
+++ b/pw_rpc/public/pw_rpc/internal/hash.h
@@ -0,0 +1,47 @@
+// Copyright 2020 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+//     https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+#pragma once
+
+#include <string_view>
+
+#include "pw_preprocessor/compiler.h"
+
+namespace pw::rpc::internal {
+
+// This is the hash function pw_rpc uses internally to calculate IDs from
+// service and method names.
+//
+// This is the same hash function that is used in pw_tokenizer, with the maximum
+// length removed. It is chosen due to its simplicity. The tokenizer code is
+// duplicated here to avoid unnecessary dependencies between modules.
+constexpr uint32_t Hash(std::string_view string)
+    PW_NO_SANITIZE("unsigned-integer-overflow") {
+  constexpr uint32_t kHashConstant = 65599;
+
+  // The length is hashed as if it were the first character.
+  uint32_t hash = string.size();
+  uint32_t coefficient = kHashConstant;
+
+  // Hash all of the characters in the string as unsigned ints.
+  // The coefficient calculation is done modulo 0x100000000, so the unsigned
+  // integer overflows are intentional.
+  for (uint8_t ch : string) {
+    hash += coefficient * ch;
+    coefficient *= kHashConstant;
+  }
+
+  return hash;
+}
+
+}  // namespace pw::rpc::internal
diff --git a/pw_rpc/public/pw_rpc/test_method_context.h b/pw_rpc/public/pw_rpc/test_method_context.h
new file mode 100644
index 0000000..ea5dc17
--- /dev/null
+++ b/pw_rpc/public/pw_rpc/test_method_context.h
@@ -0,0 +1,285 @@
+// Copyright 2020 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+//     https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+#pragma once
+
+#include <tuple>
+
+#include "pw_containers/vector.h"
+#include "pw_preprocessor/macro_arg_count.h"
+#include "pw_rpc/channel.h"
+#include "pw_rpc/internal/hash.h"
+#include "pw_rpc/internal/method.h"
+#include "pw_rpc/internal/packet.h"
+#include "pw_rpc/internal/server.h"
+#include "pw_unit_test/framework.h"
+
+// Declares a context object that may be used to invoke an RPC. The context is
+// declared with the RPC service and method name. The RPC is then invoked with
+// the call method.
+//
+// For a unary RPC, context.call(request) returns the status, and the response
+// struct can be accessed via context.response().
+//
+//   PW_RPC_TEST_METHOD_CONTEXT(my::CoolService, TheMethod) context;
+//   EXPECT_EQ(Status::OK, context.call({.some_arg = 123}));
+//   EXPECT_EQ(500, context.response().some_response_value);
+//
+// For a server streaming RPC, context.call(request) invokes the method. As in a
+// normal RPC, the method completes when the ServerWriter's Finish method is
+// called (or it goes out of scope).
+//
+//
+//   PW_RPC_TEST_METHOD_CONTEXT(my::CoolService, TheStreamingMethod) context;
+//   context.call({.some_arg = 123});
+//
+//   EXPECT_TRUE(context.done());  // Check that the RPC completed
+//   EXPECT_EQ(Status::OK, context.status());  // Check the status
+//
+//   EXPECT_EQ(3u, context.responses().size());
+//   EXPECT_EQ(123, context.responses()[0].value); // check individual responses
+//
+//   for (const MyResponse& response : context.responses()) {
+//     // iterate over the responses
+//   }
+//
+// PW_RPC_TEST_METHOD_CONTEXT takes two optional arguments:
+//
+//   size_t max_responses: maximum responses to store; ignored unless streaming
+//   size_t output_buffer_size: buffer size; must be large enough for a packet
+//
+// Example:
+//
+//   PW_RPC_TEST_METHOD_CONTEXT(MyService, BestMethod, 3, 256) context;
+//   ASSERT_EQ(3u, context.responses().max_size());
+//
+#define PW_RPC_TEST_METHOD_CONTEXT(service, method_name, ...) \
+  ::pw::rpc::test_internal::MethodInvocationContext<          \
+      ::pw::rpc::test_internal::ServiceTestUtilities<         \
+          service,                                            \
+          ::pw::rpc::internal::Hash(#method_name)>,           \
+      service::method_name PW_COMMA_ARGS(__VA_ARGS__)>
+
+// Internal classes that implement PW_RPC_TEST_METHOD_CONTEXT.
+namespace pw::rpc::test_internal {
+
+// Finds the method object in a service at compile time. This class friended by
+// the generated service classes to give it access to the internal method list.
+template <typename ServiceType, uint32_t method_hash>
+class ServiceTestUtilities {
+ public:
+  using Service = ServiceType;
+
+  static constexpr const internal::Method& method() { return *FindMethod(); }
+
+ private:
+  static constexpr const internal::Method* FindMethod() {
+    for (const internal::Method& method : Service::kMethods) {
+      if (method.id() == method_hash) {
+        return &method;
+      }
+    }
+    return nullptr;
+  }
+
+  static_assert(FindMethod() != nullptr,
+                "The specified service method does not exist");
+};
+
+// A ChannelOutput implementation that stores the outgoing payloads and status.
+template <typename Response>
+class MessageOutput final : public ChannelOutput {
+ public:
+  MessageOutput(const internal::Method& method,
+                Vector<Response>& responses,
+                std::span<std::byte> buffer)
+      : ChannelOutput("test_internal::MessageOutput"),
+        method_(method),
+        responses_(responses),
+        buffer_(buffer) {
+    clear();
+  }
+
+  Status last_status() const { return last_status_; }
+  void set_last_status(Status status) { last_status_ = status; }
+
+  size_t total_responses() const { return total_responses_; }
+
+  bool stream_ended() const { return stream_ended_; }
+
+  void clear();
+
+ private:
+  std::span<std::byte> AcquireBuffer() override { return buffer_; }
+
+  void SendAndReleaseBuffer(size_t size) override;
+
+  const internal::Method& method_;
+  Vector<Response>& responses_;
+  std::span<std::byte> buffer_;
+  size_t total_responses_;
+  bool stream_ended_;
+  Status last_status_;
+};
+
+// Collects everything needed to invoke a particular RPC.
+template <typename ServiceUtils,
+          auto function,
+          size_t max_responses,
+          size_t output_size>
+struct InvocationContext {
+  using Request = internal::Request<function>;
+  using Response = internal::Response<function>;
+
+  InvocationContext()
+      : output(ServiceUtils::method(), responses, buffer),
+        channel(Channel::Create<123>(&output)),
+        server(std::span(&channel, 1)),
+        call(static_cast<internal::Server&>(server),
+             static_cast<internal::Channel&>(channel),
+             service,
+             ServiceUtils::method()) {}
+
+  MessageOutput<Response> output;
+
+  Channel channel;
+  Server server;
+  typename ServiceUtils::Service service;
+  Vector<Response, max_responses> responses;
+  std::array<std::byte, output_size> buffer = {};
+
+  internal::ServerCall call;
+};
+
+// Method invocation context for a unary RPC. Returns the status in call() and
+// provides the response through the response() method.
+template <typename ServiceUtils, auto function, size_t output_size>
+class UnaryContext {
+ private:
+  InvocationContext<ServiceUtils, function, 1, output_size> ctx_;
+
+ public:
+  using Request = typename decltype(ctx_)::Request;
+  using Response = typename decltype(ctx_)::Response;
+
+  // Invokes the RPC with the provided request. Returns the status.
+  Status call(const Request& request) {
+    ctx_.output.clear();
+    ctx_.responses.emplace_back();
+    ctx_.responses.back() = {};
+    return function(ctx_.call.context(), request, ctx_.responses.back());
+  }
+
+  // Gives access to the RPC's response.
+  const Response& response() const {
+    EXPECT_FALSE(ctx_.responses.empty());
+    return ctx_.responses.back();
+  }
+};
+
+// Method invocation context for a server streaming RPC.
+template <typename ServiceUtils,
+          auto function,
+          size_t max_responses,
+          size_t output_size>
+class ServerStreamingContext {
+ private:
+  InvocationContext<ServiceUtils, function, max_responses, output_size> ctx_;
+
+ public:
+  using Request = typename decltype(ctx_)::Request;
+  using Response = typename decltype(ctx_)::Response;
+
+  // Invokes the RPC with the provided request.
+  void call(const Request& request) {
+    ctx_.output.clear();
+    internal::BaseServerWriter server_writer(ctx_.call);
+    function(ctx_.call.context(),
+             request,
+             static_cast<ServerWriter<Response>&>(server_writer));
+  }
+
+  // Returns the responses that have been recorded. The maximum number of
+  // responses is responses().max_size(). responses().back() is always the most
+  // recent response, even if total_responses() > responses().max_size().
+  const Vector<Response>& responses() const { return ctx_.responses; }
+
+  // The total number of responses sent, which may be larger than
+  // responses.max_size().
+  size_t total_responses() const { return ctx_.output.total_responses(); }
+
+  // True if the stream has terminated.
+  bool done() const { return ctx_.output.stream_ended(); }
+
+  // The status of the stream. Only valid if done() is true.
+  Status status() const {
+    EXPECT_TRUE(done());
+    return ctx_.output.last_status();
+  }
+};
+
+// Alias to select the type of the context object to use based on which type of
+// RPC it is for.
+template <typename T,
+          auto function,
+          size_t responses = 4,
+          size_t output_size = 128>
+using MethodInvocationContext = std::tuple_element_t<
+    static_cast<size_t>(internal::RpcTraits<decltype(function)>::kType),
+    std::tuple<UnaryContext<T, function, output_size>,
+               ServerStreamingContext<T, function, responses, output_size>
+               // TODO(hepler): Support client and bidi streaming
+               >>;
+
+template <typename Response>
+void MessageOutput<Response>::clear() {
+  responses_.clear();
+  total_responses_ = 0;
+  stream_ended_ = false;
+  last_status_ = Status::UNKNOWN;
+}
+
+template <typename Response>
+void MessageOutput<Response>::SendAndReleaseBuffer(size_t size) {
+  EXPECT_FALSE(stream_ended_);
+
+  if (size == 0u) {
+    return;
+  }
+
+  internal::Packet packet;
+  EXPECT_EQ(
+      Status::OK,
+      internal::Packet::FromBuffer(std::span(buffer_.data(), size), packet));
+
+  last_status_ = packet.status();
+
+  switch (packet.type()) {
+    case internal::PacketType::RPC:
+      // If we run out of space, the back message is always the most recent.
+      responses_.emplace_back();
+      responses_.back() = {};
+      EXPECT_TRUE(method_.DecodeResponse(packet.payload(), &responses_.back()));
+      total_responses_ += 1;
+      break;
+    case internal::PacketType::STREAM_END:
+      stream_ended_ = true;
+      break;
+    case internal::PacketType::CANCEL:
+    case internal::PacketType::ERROR:
+      FAIL();
+      break;
+  }
+}
+
+}  // namespace pw::rpc::test_internal
diff --git a/pw_rpc/pw_rpc_private/test_utils.h b/pw_rpc/pw_rpc_private/internal_test_utils.h
similarity index 95%
rename from pw_rpc/pw_rpc_private/test_utils.h
rename to pw_rpc/pw_rpc_private/internal_test_utils.h
index 0bf3272..c3759e7 100644
--- a/pw_rpc/pw_rpc_private/test_utils.h
+++ b/pw_rpc/pw_rpc_private/internal_test_utils.h
@@ -11,6 +11,9 @@
 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 // License for the specific language governing permissions and limitations under
 // the License.
+
+// Internal-only testing utilities. public/pw_rpc/test_method_context.h provides
+// improved public-facing utilities for testing RPC services.
 #pragma once
 
 #include <array>
diff --git a/pw_rpc/pw_rpc_test_protos/test.proto b/pw_rpc/pw_rpc_test_protos/test.proto
index 16c4022..20ab3a2 100644
--- a/pw_rpc/pw_rpc_test_protos/test.proto
+++ b/pw_rpc/pw_rpc_test_protos/test.proto
@@ -17,6 +17,7 @@
 
 message TestRequest {
   int64 integer = 1;
+  uint32 status_code = 2;
 }
 
 message TestResponse {
@@ -25,11 +26,12 @@
 
 message TestStreamResponse {
   bytes chunk = 1;
+  uint32 number = 2;
 }
 
 message Empty {}
 
 service TestService {
   rpc TestRpc(TestRequest) returns (TestResponse) {}
-  rpc TestStreamRpc(Empty) returns (stream TestStreamResponse) {}
+  rpc TestStreamRpc(TestRequest) returns (stream TestStreamResponse) {}
 }
diff --git a/pw_rpc/py/pw_rpc/codegen_nanopb.py b/pw_rpc/py/pw_rpc/codegen_nanopb.py
index fbfbe1c..5928ca2 100644
--- a/pw_rpc/py/pw_rpc/codegen_nanopb.py
+++ b/pw_rpc/py/pw_rpc/codegen_nanopb.py
@@ -145,7 +145,11 @@
             for method in service.methods():
                 _generate_method_descriptor(method, output)
 
-        output.write_line('};')
+        output.write_line('};\n')
+
+        output.write_line('template <typename, uint32_t>')
+        output.write_line(
+            'friend class ::pw::rpc::test_internal::ServiceTestUtilities;')
 
     output.write_line('};')
 
@@ -163,9 +167,9 @@
     output.write_line('#pragma once\n')
     output.write_line('#include <cstddef>')
     output.write_line('#include <cstdint>\n')
-    output.write_line('#include "pw_rpc/server_context.h"')
     output.write_line('#include "pw_rpc/internal/method.h"')
     output.write_line('#include "pw_rpc/internal/service.h"')
+    output.write_line('#include "pw_rpc/server_context.h"')
 
     # Include the corresponding nanopb header file for this proto file, in which
     # the file's messages and enums are generated.
@@ -177,6 +181,11 @@
         generated_header = _proto_filename_to_nanopb_header(imported_file)
         output.write_line(f'#include "{generated_header}"')
 
+    output.write_line('namespace pw::rpc::test_internal {\n')
+    output.write_line('template <typename, uint32_t>')
+    output.write_line('class ServiceTestUtilities;')
+    output.write_line('\n}  // namespace pw::rpc::test_internal')
+
     if package.cpp_namespace():
         file_namespace = package.cpp_namespace()
         if file_namespace.startswith('::'):
diff --git a/pw_rpc/server_test.cc b/pw_rpc/server_test.cc
index a78118c..832a129 100644
--- a/pw_rpc/server_test.cc
+++ b/pw_rpc/server_test.cc
@@ -21,7 +21,7 @@
 #include "pw_assert/assert.h"
 #include "pw_rpc/internal/packet.h"
 #include "pw_rpc/internal/service.h"
-#include "pw_rpc_private/test_utils.h"
+#include "pw_rpc_private/internal_test_utils.h"
 
 namespace pw::rpc {
 namespace {