pw_rpc: Python RPC client

Define the pw_rpc.client module. The pw_rpc.client.Client class handles
sending and receiving RPCs packet for one or more pw_rpc channels.

Change-Id: I26dd8532b1ee68777d7b232c08cfa16a9b6b0139
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/13163
Reviewed-by: Alexei Frolov <frolv@google.com>
Commit-Queue: Wyatt Hepler <hepler@google.com>
diff --git a/.pylintrc b/.pylintrc
index 85cf4fa..2feb460 100644
--- a/.pylintrc
+++ b/.pylintrc
@@ -299,6 +299,7 @@
            db,
            ex,
            fd,
+           id,
            ok,
            Run,
            T,
@@ -437,7 +438,7 @@
 max-statements=100
 
 # Minimum number of public methods for a class (see R0903).
-min-public-methods=1
+min-public-methods=0
 
 
 [CLASSES]
diff --git a/pw_rpc/py/client_test.py b/pw_rpc/py/client_test.py
new file mode 100755
index 0000000..7569cf0
--- /dev/null
+++ b/pw_rpc/py/client_test.py
@@ -0,0 +1,258 @@
+#!/usr/bin/env python3
+# Copyright 2020 The Pigweed Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+#     https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+"""Tests creating pw_rpc client."""
+
+import unittest
+import tempfile
+from pathlib import Path
+from typing import List, Tuple
+
+from pw_protobuf_compiler import python_protos
+from pw_rpc import client, ids, packets
+from pw_status import Status
+
+TEST_PROTO_1 = b"""\
+syntax = "proto3";
+
+package pw.call.test1;
+
+message SomeMessage {
+  uint32 magic_number = 1;
+}
+
+message AnotherMessage {
+  enum Result {
+    FAILED = 0;
+    FAILED_MISERABLY = 1;
+    I_DONT_WANT_TO_TALK_ABOUT_IT = 2;
+  }
+
+  Result result = 1;
+  string payload = 2;
+}
+
+service PublicService {
+  rpc SomeUnary(SomeMessage) returns (AnotherMessage) {}
+  rpc SomeServerStreaming(SomeMessage) returns (stream AnotherMessage) {}
+  rpc SomeClientStreaming(stream SomeMessage) returns (AnotherMessage) {}
+  rpc SomeBidiStreaming(stream SomeMessage) returns (stream AnotherMessage) {}
+}
+"""
+
+TEST_PROTO_2 = b"""\
+syntax = "proto2";
+
+package pw.call.test2;
+
+message Request {
+  optional float magic_number = 1;
+}
+
+message Response {
+}
+
+service Alpha {
+  rpc Unary(Request) returns (Response) {}
+}
+
+service Bravo {
+  rpc BidiStreaming(stream Request) returns (stream Response) {}
+}
+"""
+
+
+class ClientTest(unittest.TestCase):
+    """Tests the pw_rpc Python client."""
+    def setUp(self):
+        self._proto_dir = tempfile.TemporaryDirectory(prefix='proto_test')
+        protos = []
+
+        for i, contents in enumerate([TEST_PROTO_1, TEST_PROTO_2]):
+            protos.append(Path(self._proto_dir.name, f'test_{i}.proto'))
+            protos[-1].write_bytes(contents)
+
+        self._protos = python_protos.Library(
+            python_protos.compile_and_import(protos))
+
+        self._impl = client.SimpleSynchronousClient()
+        self._client = client.Client.from_modules(
+            self._impl, [client.Channel(1, self._handle_request)],
+            self._protos.modules())
+
+        self._last_request: packets.RpcPacket = None
+        self._next_packets: List[Tuple[bytes, bool]] = []
+
+    def tearDown(self):
+        self._proto_dir.cleanup()
+
+    def _enqueue_response(self,
+                          channel_id: int,
+                          service_id: int,
+                          method_id: int,
+                          status: Status = Status.OK,
+                          response=b'',
+                          valid=True):
+        if isinstance(response, bytes):
+            payload = response
+        else:
+            payload = response.SerializeToString()
+
+        self._next_packets.append(
+            (packets.RpcPacket(channel_id=channel_id,
+                               service_id=service_id,
+                               method_id=method_id,
+                               status=status.value,
+                               payload=payload).SerializeToString(), valid))
+
+    def _handle_request(self, data: bytes):
+        self._last_request = packets.decode(data)
+
+        self.assertTrue(self._next_packets)
+        for packet, valid in self._next_packets:
+            self.assertEqual(valid, self._client.process_packet(packet))
+
+    def _last_payload(self, message_type):
+        self.assertIsNotNone(self._last_request)
+        message = message_type()
+        message.ParseFromString(self._last_request.payload)
+        return message
+
+    def test_access_service_client_as_attribute_or_index(self):
+        self.assertIs(
+            self._client.channel(1).call.PublicService,
+            self._client.channel(1).call['PublicService'])
+        self.assertIs(
+            self._client.channel(1).call.PublicService,
+            self._client.channel(1).call[ids.calculate('PublicService')])
+
+    def test_access_method_client_as_attribute_or_index(self):
+        self.assertIs(
+            self._client.channel(1).call.Alpha.Unary,
+            self._client.channel(1).call['Alpha']['Unary'])
+        self.assertIs(
+            self._client.channel(1).call.Alpha.Unary,
+            self._client.channel(1).call['Alpha'][ids.calculate('Unary')])
+
+    def test_check_for_presence_of_services(self):
+        self.assertIn('PublicService', self._client.channel(1).call)
+        self.assertIn(ids.calculate('PublicService'),
+                      self._client.channel(1).call)
+        self.assertNotIn('NotAService', self._client.channel(1).call)
+        self.assertNotIn(-1213, self._client.channel(1).call)
+
+    def test_check_for_presence_of_methods(self):
+        self.assertIn('SomeUnary', self._client.channel(1).call.PublicService)
+        self.assertIn(ids.calculate('SomeUnary'),
+                      self._client.channel(1).call.PublicService)
+
+        self.assertNotIn('Unary', self._client.channel(1).call.PublicService)
+        self.assertNotIn(12345, self._client.channel(1).call.PublicService)
+
+    def test_invoke_unary_rpc(self):
+        rpcs = self._client.channel(1).call
+        method = rpcs.PublicService.SomeUnary.method
+
+        for _ in range(3):
+            self._enqueue_response(1, method.service.id, method.id,
+                                   Status.ABORTED,
+                                   method.response_type(payload='0_o'))
+
+            status, response = rpcs.PublicService.SomeUnary(magic_number=6)
+
+            self.assertEqual(
+                6,
+                self._last_payload(method.request_type).magic_number)
+
+            self.assertIs(Status.ABORTED, status)
+            self.assertEqual('0_o', response.payload)
+
+    def test_ignore_bad_packets_with_pending_rpc(self):
+        rpcs = self._client.channel(1).call
+        method = rpcs.PublicService.SomeUnary.method
+        service_id = method.service.id
+
+        # Unknown channel
+        self._enqueue_response(999, service_id, method.id, valid=False)
+        # Bad service
+        self._enqueue_response(1, 999, method.id, valid=False)
+        # Bad method
+        self._enqueue_response(1, service_id, 999, valid=False)
+        # For RPC not pending (valid=True because the packet is processed)
+        self._enqueue_response(1,
+                               service_id,
+                               rpcs.PublicService.SomeBidiStreaming.method.id,
+                               valid=True)
+
+        self._enqueue_response(1, service_id, method.id, valid=True)
+
+        status, response = rpcs.PublicService.SomeUnary(magic_number=6)
+        self.assertIs(Status.OK, status)
+        self.assertEqual('', response.payload)
+
+    def test_pass_none_if_payload_fails_to_decode(self):
+        rpcs = self._client.channel(1).call
+        method = rpcs.PublicService.SomeUnary.method
+
+        self._enqueue_response(1,
+                               method.service.id,
+                               method.id,
+                               Status.OK,
+                               b'INVALID DATA!!!',
+                               valid=True)
+
+        status, response = rpcs.PublicService.SomeUnary(magic_number=6)
+        self.assertIs(status, Status.OK)
+        self.assertIsNone(response)
+
+    def test_call_method_with_both_message_and_kwargs(self):
+        req = self._client.services['Alpha'].methods['Unary'].request_type()
+
+        with self.assertRaisesRegex(TypeError, r'either'):
+            self._client.channel(1).call.Alpha.Unary(req, magic_number=1.0)
+
+    def test_call_method_with_wrong_type(self):
+        with self.assertRaisesRegex(TypeError, r'pw\.call\.test2\.Request'):
+            self._client.channel(1).call.Alpha.Unary('This is str!')
+
+    def test_call_method_with_incorrect_message_type(self):
+        msg = self._protos.packages.pw.call.test1.AnotherMessage()
+        with self.assertRaisesRegex(TypeError,
+                                    r'pw\.call\.test1\.SomeMessage'):
+            self._client.channel(1).call.PublicService.SomeUnary(msg)
+
+    def test_process_packet_invalid_proto_data(self):
+        self.assertFalse(self._client.process_packet(b'NOT a packet!'))
+
+    def test_process_packet_unrecognized_channel(self):
+        self.assertFalse(
+            self._client.process_packet(
+                packets.encode((123, 456, 789),
+                               self._protos.packages.pw.call.test2.Request())))
+
+    def test_process_packet_unrecognized_service(self):
+        self.assertFalse(
+            self._client.process_packet(
+                packets.encode((1, 456, 789),
+                               self._protos.packages.pw.call.test2.Request())))
+
+    def test_process_packet_unrecognized_method(self):
+        self.assertFalse(
+            self._client.process_packet(
+                packets.encode((1, next(iter(self._client.services)).id, 789),
+                               self._protos.packages.pw.call.test2.Request())))
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/pw_rpc/py/packets_test.py b/pw_rpc/py/packets_test.py
new file mode 100755
index 0000000..4b54caf
--- /dev/null
+++ b/pw_rpc/py/packets_test.py
@@ -0,0 +1,42 @@
+#!/usr/bin/env python3
+# Copyright 2020 The Pigweed Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+#     https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+"""Tests creating pw_rpc client."""
+
+import unittest
+
+from pw_rpc import packets
+
+_TEST_PACKET = packets.RpcPacket(
+    channel_id=1,
+    service_id=2,
+    method_id=3,
+    payload=packets.RpcPacket(status=321).SerializeToString())
+
+
+class PacketsTest(unittest.TestCase):
+    def test_encode(self):
+        data = packets.encode((1, 2, 3), packets.RpcPacket(status=321))
+        packet = packets.RpcPacket()
+        packet.ParseFromString(data)
+
+        self.assertEqual(_TEST_PACKET, packet)
+
+    def test_decode(self):
+        self.assertEqual(_TEST_PACKET,
+                         packets.decode(_TEST_PACKET.SerializeToString()))
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/pw_rpc/py/pw_rpc/client.py b/pw_rpc/py/pw_rpc/client.py
new file mode 100644
index 0000000..d69a57f
--- /dev/null
+++ b/pw_rpc/py/pw_rpc/client.py
@@ -0,0 +1,274 @@
+# Copyright 2020 The Pigweed Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+#     https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+"""Creates an RPC client."""
+
+import abc
+from collections import defaultdict
+from dataclasses import dataclass
+import logging
+from queue import SimpleQueue
+from typing import Any, Collection, Dict, Iterable
+
+from pw_rpc import descriptors, packets
+from pw_rpc.descriptors import Channel, Service, Method, PendingRpc
+from pw_status import Status
+
+_LOG = logging.getLogger(__name__)
+
+
+class ClientImpl(abc.ABC):
+    """The internal interface of the RPC client.
+
+    This interface defines the semantics for invoking an RPC on a particular
+    client. The return values can objects that provide for synchronous or
+    asynchronous behavior.
+    """
+    @abc.abstractmethod
+    def invoke_unary(self, rpc: PendingRpc, request) -> Any:
+        """Invokes a unary RPC."""
+
+    @abc.abstractmethod
+    def invoke_server_streaming(self, rpc: PendingRpc, request) -> Any:
+        """Invokes a server streaming RPC."""
+
+    @abc.abstractmethod
+    def invoke_client_streaming(self, rpc: PendingRpc) -> Any:
+        """Invokes a client streaming streaming RPC."""
+
+    @abc.abstractmethod
+    def invoke_bidirectional_streaming(self, rpc: PendingRpc) -> Any:
+        """Invokes a bidirectional streaming streaming RPC."""
+
+    @abc.abstractmethod
+    def process_response(self, rpc: PendingRpc, payload,
+                         status: Status) -> None:
+        """Processes a response from the RPC server."""
+
+
+class SimpleSynchronousClient(ClientImpl):
+    """A client that blocks until a response is received for unary RPCs."""
+    def __init__(self):
+        self._responses: Dict[PendingRpc,
+                              SimpleQueue] = defaultdict(SimpleQueue)
+        self._pending: Dict[PendingRpc, bool] = defaultdict(bool)
+
+    def invoke_unary(self, rpc: PendingRpc, request: packets.Message):
+        queue = self._responses[rpc]
+
+        assert not self._pending[rpc], f'{rpc} is already pending!'
+        self._pending[rpc] = True
+
+        try:
+            rpc.channel.output(packets.encode(rpc, request))
+            result = queue.get()
+        finally:
+            self._pending[rpc] = False
+        return result
+
+    def invoke_server_streaming(self, rpc: PendingRpc, request):
+        raise NotImplementedError
+
+    def invoke_client_streaming(self, rpc: PendingRpc):
+        raise NotImplementedError
+
+    def invoke_bidirectional_streaming(self, rpc: PendingRpc):
+        raise NotImplementedError
+
+    def process_response(self, rpc: PendingRpc, payload,
+                         status: Status) -> None:
+        if not self._pending[rpc]:
+            _LOG.warning('Discarding packet for %s', rpc)
+            return
+
+        self._responses[rpc].put((status, payload))
+
+
+class _MethodClient:
+    """A method that can be invoked for a particular channel."""
+    @classmethod
+    def create(cls, client_impl: ClientImpl, channel: Channel, method: Method):
+        """Instantiates a _MethodClient according to the RPC type."""
+        if method.type is Method.Type.UNARY:
+            return UnaryMethodClient(client_impl, channel, method)
+
+        raise NotImplementedError('Streaming methods are not yet supported')
+
+    def __init__(self, client_impl: ClientImpl, channel: Channel,
+                 method: Method):
+        self._client_impl = client_impl
+        self.channel = channel
+        self.method = method
+
+    def _get_request(self, proto: packets.Message,
+                     kwargs: dict) -> packets.Message:
+        if proto and kwargs:
+            raise TypeError(
+                'Requests must be provided either as a message object or a '
+                'series of keyword args, but both were provided')
+
+        if proto is None:
+            return self.method.request_type(**kwargs)
+
+        if not isinstance(proto, self.method.request_type):
+            try:
+                bad_type = proto.DESCRIPTOR.full_name
+            except AttributeError:
+                bad_type = type(proto).__name__
+
+            raise TypeError(
+                f'Expected a message of type '
+                f'{self.method.request_type.DESCRIPTOR.full_name}, '
+                f'got {bad_type}')
+
+        return proto
+
+
+class UnaryMethodClient(_MethodClient):
+    # TODO(hepler): This function should make _request a positional-only
+    #     argument, to avoid confusion with keyword-specified protobuf fields.
+    #     However, yapf does not yet support Python 3.8's grammar, and
+    #     positional-only arguments crash it.
+    def __call__(self, _request=None, **request_fields):
+        """Invokes this unary method using its associated channel.
+
+        The request can be provided as either a message object or as keyword
+        arguments for the message's fields (but not both).
+        """
+        return self._client_impl.invoke_unary(
+            PendingRpc(self.channel, self.method.service, self.method),
+            self._get_request(_request, request_fields))
+
+
+class _MethodClients(descriptors.ServiceAccessor[_MethodClient]):
+    """Navigates the methods in a service provided by a ChannelClient."""
+    def __init__(self, client_impl: ClientImpl, channel: Channel,
+                 methods: Collection[Method]):
+        super().__init__(
+            {
+                method.name: _MethodClient.create(client_impl, channel, method)
+                for method in methods
+            },
+            as_attrs=True)
+
+
+class _ServiceClients(descriptors.ServiceAccessor[_MethodClients]):
+    """Navigates the services provided by a ChannelClient."""
+    def __init__(self, client_impl, channel: Channel,
+                 services: Collection[Service]):
+        super().__init__(
+            {
+                s.name: _MethodClients(client_impl, channel, s.methods)
+                for s in services
+            },
+            as_attrs=True)
+
+
+@dataclass(frozen=True, eq=False)
+class ChannelClient:
+    """RPC services and methods bound to a particular channel.
+
+    RPCs are invoked from a ChannelClient using its call member. The service and
+    method may be selected as attributes or by indexing call with service and
+    method name or ID:
+
+      response = client.channel(1).call.FooService.SomeMethod(foo=bar)
+
+      response = client.channel(1).call[foo_service_id]['SomeMethod'](foo=bar)
+
+    The type and semantics of the return value, if there is one, are determined
+    by the ClientImpl instance used by the Client.
+    """
+    channel: Channel
+    call: _ServiceClients
+
+
+class Client:
+    """Sends requests and handles responses for a set of channels.
+
+    RPC invocations occur through a ChannelClient.
+    """
+    @classmethod
+    def from_modules(cls, impl: ClientImpl, channels: Iterable[Channel],
+                     modules: Iterable):
+        return cls(
+            impl, channels,
+            (Service.from_descriptor(module, service) for module in modules
+             for service in module.DESCRIPTOR.services_by_name.values()))
+
+    def __init__(self, impl: ClientImpl, channels: Iterable[Channel],
+                 services: Iterable[Service]):
+        self.services = descriptors.Services(services)
+        self._impl = impl
+        self._channels_by_id = {
+            channel.id:
+            ChannelClient(channel,
+                          _ServiceClients(self._impl, channel, self.services))
+            for channel in channels
+        }
+
+    def channel(self, channel_id: int) -> ChannelClient:
+        """Returns a ChannelClient, which is used to call RPCs on a channel."""
+        return self._channels_by_id[channel_id]
+
+    def process_packet(self, data: bytes) -> bool:
+        """Processes an incoming packet.
+
+        Args:
+            data: raw binary data for exactly one RPC packet
+
+        Returns:
+            True if the packet was decoded and handled by this client
+        """
+        try:
+            packet = packets.decode(data)
+        except packets.DecodeError as err:
+            _LOG.warning('Failed to decode packet: %s', err)
+            _LOG.debug('Raw packet: %r', data)
+            return False
+
+        try:
+            rpc = self._lookup_packet(packet)
+        except ValueError as err:
+            _LOG.warning('Unable to process packet: %s', err)
+            return False
+
+        try:
+            response = packets.decode_payload(packet, rpc.method.response_type)
+        except packets.DecodeError as err:
+            response = None
+            _LOG.warning('Failed to decode %s response for %s: %s',
+                         rpc.method.response_type.DESCRIPTOR.full_name,
+                         rpc.method.full_name, err)
+
+        self._impl.process_response(rpc, response, Status(packet.status))
+        return True
+
+    def _lookup_packet(self, packet: packets.RpcPacket) -> PendingRpc:
+        try:
+            channel_client = self._channels_by_id[packet.channel_id]
+        except KeyError:
+            raise ValueError(f'Unrecognized channel ID {packet.channel_id}')
+
+        try:
+            service = self.services[packet.service_id]
+        except KeyError:
+            raise ValueError(f'Unrecognized service ID {packet.service_id}')
+
+        try:
+            method = service.methods[packet.method_id]
+        except KeyError:
+            raise ValueError(
+                f'No method ID {packet.method_id} in service {service.name}')
+
+        return PendingRpc(channel_client.channel, service, method)
diff --git a/pw_rpc/py/pw_rpc/descriptors.py b/pw_rpc/py/pw_rpc/descriptors.py
new file mode 100644
index 0000000..1299e0f
--- /dev/null
+++ b/pw_rpc/py/pw_rpc/descriptors.py
@@ -0,0 +1,161 @@
+# Copyright 2020 The Pigweed Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+#     https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+"""Types representing the basic pw_rpc concepts: channel, service, method."""
+
+from dataclasses import dataclass
+import enum
+from typing import Any, Callable, Collection, Iterable, Iterator, NamedTuple
+from typing import TypeVar, Union
+
+from pw_rpc import ids
+
+
+@dataclass(frozen=True)
+class Channel:
+    id: int
+    output: Callable[[bytes], None]
+
+    def __repr__(self) -> str:
+        return f'Channel({self.id})'
+
+
+@dataclass(frozen=True, eq=False)
+class Service:
+    """Describes an RPC service."""
+    name: str
+    id: int
+    methods: 'ServiceAccessor'
+
+    @classmethod
+    def from_descriptor(cls, module, descriptor):
+        service = cls(descriptor.name, ids.calculate(descriptor.name), None)
+        object.__setattr__(
+            service, 'methods',
+            Methods(
+                Method.from_descriptor(module, method_descriptor, service)
+                for method_descriptor in descriptor.methods))
+
+        return service
+
+    def __repr__(self) -> str:
+        return f'Service({self.name!r})'
+
+
+@dataclass(frozen=True, eq=False)
+class Method:
+    """Describes a method in a service."""
+
+    service: Service
+    name: str
+    id: int
+    type: 'Method.Type'
+    request_type: Any
+    response_type: Any
+
+    @property
+    def full_name(self) -> str:
+        return f'{self.service.name}.{self.name}'
+
+    class Type(enum.Enum):
+        UNARY = 0
+        SERVER_STREAMING = 1
+        CLIENT_STREAMING = 2
+        BIDI_STREAMING = 3
+
+        @classmethod
+        def from_descriptor(cls, unused_descriptor) -> 'Method.Type':
+            # TODO(hepler): Add server_streaming and client_streaming to
+            #     protobuf generated code, or access these attributes by
+            #     deserializing the FileDescriptor.
+            return cls.UNARY
+
+    @classmethod
+    def from_descriptor(cls, module, descriptor, service: Service):
+        return Method(
+            service,
+            descriptor.name,
+            ids.calculate(descriptor.name),
+            cls.Type.from_descriptor(descriptor),
+            getattr(module, descriptor.input_type.name),
+            getattr(module, descriptor.output_type.name),
+        )
+
+    def __repr__(self) -> str:
+        return f'Method({self.name!r})'
+
+
+class PendingRpc(NamedTuple):
+    """Uniquely identifies an RPC call."""
+    channel: Channel
+    service: Service
+    method: Method
+
+
+T = TypeVar('T')
+
+
+class ServiceAccessor(Collection[T]):
+    """Navigates RPC services by name or ID."""
+    def __init__(self, members, as_attrs: bool):
+        """Creates accessor from a {name: value} dict or [values] iterable."""
+        if isinstance(members, dict):
+            by_name = members
+            self._by_id = {
+                ids.calculate(name): m
+                for name, m in by_name.items()
+            }
+        else:
+            by_name = {m.name: m for m in members}
+            self._by_id = {m.id: m for m in by_name.values()}
+
+        if as_attrs:
+            for name, member in by_name.items():
+                setattr(self, name, member)
+
+    def __getitem__(self, name_or_id: Union[str, int]):
+        """Accesses a service/method by the string name or ID."""
+        try:
+            return self._by_id[_id(name_or_id)]
+        except KeyError:
+            name = ' (name_or_id)' if isinstance(name_or_id, str) else ''
+            raise KeyError(f'Unknown ID {_id(name_or_id)}{name}')
+
+    def __iter__(self) -> Iterator[T]:
+        return iter(self._by_id.values())
+
+    def __len__(self) -> int:
+        return len(self._by_id)
+
+    def __contains__(self, name_or_id) -> bool:
+        return _id(name_or_id) in self._by_id
+
+    def __repr__(self) -> str:
+        members = ', '.join(repr(m) for m in self._by_id.values())
+        return f'{self.__class__.__name__}({members})'
+
+
+def _id(handle: Union[str, int]) -> int:
+    return ids.calculate(handle) if isinstance(handle, str) else handle
+
+
+class Services(ServiceAccessor[Service]):
+    """A collection of Service descriptors."""
+    def __init__(self, services: Iterable[Service]):
+        super().__init__(services, as_attrs=False)
+
+
+class Methods(ServiceAccessor[Method]):
+    """A collection of Method descriptors in a Service."""
+    def __init__(self, method: Iterable[Method]):
+        super().__init__(method, as_attrs=False)
diff --git a/pw_rpc/py/pw_rpc/packets.py b/pw_rpc/py/pw_rpc/packets.py
new file mode 100644
index 0000000..152d837
--- /dev/null
+++ b/pw_rpc/py/pw_rpc/packets.py
@@ -0,0 +1,50 @@
+# Copyright 2020 The Pigweed Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+#     https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+"""Functions for working with pw_rpc packets."""
+
+import os
+
+from google.protobuf import message
+from pw_protobuf_compiler import python_protos
+
+packet_pb2 = python_protos.compile_and_import_file(
+    os.path.join(__file__, '..', '..', '..', 'pw_rpc_protos', 'packet.proto'))
+
+RpcPacket = packet_pb2.RpcPacket
+
+DecodeError = message.DecodeError
+Message = message.Message
+
+
+def decode(data: bytes):
+    packet = RpcPacket()
+    packet.MergeFromString(data)
+    return packet
+
+
+def decode_payload(packet, payload_type):
+    payload = payload_type()
+    payload.MergeFromString(packet.payload)
+    return payload
+
+
+def encode(rpc: tuple, request: message.Message) -> bytes:
+    channel, service, method = rpc
+
+    return packet_pb2.RpcPacket(
+        type=packet_pb2.PacketType.RPC,
+        channel_id=channel if isinstance(channel, int) else channel.id,
+        service_id=service if isinstance(service, int) else service.id,
+        method_id=method if isinstance(method, int) else method.id,
+        payload=request.SerializeToString()).SerializeToString()
diff --git a/pw_rpc/py/setup.py b/pw_rpc/py/setup.py
index 07c6b2c..44779c9 100644
--- a/pw_rpc/py/setup.py
+++ b/pw_rpc/py/setup.py
@@ -11,7 +11,7 @@
 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 # License for the specific language governing permissions and limitations under
 # the License.
-"""pw_protobuf"""
+"""pw_rpc"""
 
 import setuptools