pw_multisink: Lock multisink operations Guards all multisink transactions with a lock. The new configuration option PW_MULTISINK_LOCK_INTERRUPT_SAFE allows the project to select the type of lock used to guard transactions. By default, it is enabled and makes use of an interrupt spin-lock. If disabled, a mutex is used instead. Change-Id: I71ab2729d130c524da27e0d06beb0c3fdf73d145 Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/45720 Commit-Queue: Prashanth Swaminathan <prashanthsw@google.com> Pigweed-Auto-Submit: Prashanth Swaminathan <prashanthsw@google.com> Reviewed-by: Ewout van Bekkum <ewout@google.com>
diff --git a/pw_multisink/BUILD b/pw_multisink/BUILD index 11f772c..f7840a2 100644 --- a/pw_multisink/BUILD +++ b/pw_multisink/BUILD
@@ -29,6 +29,7 @@ "multisink.cc", ], hdrs = [ + "public/pw_multisink/config.h", "public/pw_multisink/drain.h", "public/pw_multisink/multisink.h", ], @@ -36,6 +37,9 @@ deps = [ "//pw_assert", "//pw_bytes", + "//pw_sync:interrupt_spin_lock", + "//pw_sync:lock_annotations", + "//pw_sync:mutex", "//pw_result", "//pw_ring_buffer", "//pw_varint",
diff --git a/pw_multisink/BUILD.gn b/pw_multisink/BUILD.gn index c8446d0..dac134c 100644 --- a/pw_multisink/BUILD.gn +++ b/pw_multisink/BUILD.gn
@@ -26,6 +26,7 @@ pw_source_set("pw_multisink") { public_configs = [ ":default_config" ] public = [ + "public/pw_multisink/config.h", "public/pw_multisink/drain.h", "public/pw_multisink/multisink.h", ] @@ -34,6 +35,9 @@ "$dir_pw_result", "$dir_pw_ring_buffer", "$dir_pw_status", + "$dir_pw_sync:interrupt_spin_lock", + "$dir_pw_sync:lock_annotations", + "$dir_pw_sync:mutex", ] deps = [ "$dir_pw_assert",
diff --git a/pw_multisink/docs.rst b/pw_multisink/docs.rst index cf406f3..7052e14 100644 --- a/pw_multisink/docs.rst +++ b/pw_multisink/docs.rst
@@ -1,9 +1,25 @@ .. _module-pw_multisink: ------------- +============ pw_multisink ------------- +============ This is an module that forwards messages to multiple attached sinks, which consume messages asynchronously. It is not ready for use and is under construction. +Module Configuration Options +============================ +The following configurations can be adjusted via compile-time configuration +of this module, see the +:ref:`module documentation <module-structure-compile-time-configuration>` for +more details. + +.. c:macro:: PW_MULTISINK_CONFIG_LOCK_INTERRUPT_SAFE + + Whether an interrupt-safe lock is used to guard multisink read/write operations. + + By default, this option is enabled and the multisink uses an interrupt spin-lock + to guard its transactions. If disabled, a mutex is used instead. + + Disabling this will alter the entry precondition of the multisink, requiring that + it not be called from an interrupt context.
diff --git a/pw_multisink/drain.cc b/pw_multisink/drain.cc index e4548c5..cc684f1 100644 --- a/pw_multisink/drain.cc +++ b/pw_multisink/drain.cc
@@ -13,15 +13,19 @@ // the License. #include "pw_multisink/drain.h" +#include "pw_assert/check.h" + namespace pw { namespace multisink { Result<ConstByteSpan> Drain::GetEntry(ByteSpan entry, uint32_t& drop_count_out) { + PW_DCHECK_NOTNULL(multisink_); uint32_t entry_sequence_id = 0; drop_count_out = 0; + const Result<ConstByteSpan> result = - MultiSink::GetEntry(*this, entry, entry_sequence_id); + multisink_->GetEntry(*this, entry, entry_sequence_id); // Exit immediately if the result isn't OK or OUT_OF_RANGE, as the // entry_sequence_id cannot be used for computation. Later invocations to
diff --git a/pw_multisink/multisink.cc b/pw_multisink/multisink.cc index a956071..c1eca3c 100644 --- a/pw_multisink/multisink.cc +++ b/pw_multisink/multisink.cc
@@ -28,17 +28,15 @@ uint32_t& sequence_id_out) { size_t bytes_read = 0; - // Exit immediately if there's no multisink attached to this drain. - if (drain.multisink_ == nullptr) { - return Status::FailedPrecondition(); - } + std::lock_guard lock(lock_); + PW_DCHECK_PTR_EQ(drain.multisink_, this); const Status status = drain.reader_.PeekFrontWithPreamble(buffer, sequence_id_out, bytes_read); if (status.IsOutOfRange()) { // If the drain has caught up, report the last handled sequence ID so that // it can still process any dropped entries. - sequence_id_out = drain.multisink_->sequence_id_ - 1; + sequence_id_out = sequence_id_ - 1; return status; } PW_CHECK(drain.reader_.PopFront().ok()); @@ -46,14 +44,16 @@ } Status MultiSink::AttachDrain(Drain& drain) { - PW_DCHECK(drain.multisink_ == nullptr); + std::lock_guard lock(lock_); + PW_DCHECK_PTR_EQ(drain.multisink_, nullptr); drain.multisink_ = this; drain.last_handled_sequence_id_ = sequence_id_ - 1; return ring_buffer_.AttachReader(drain.reader_); } Status MultiSink::DetachDrain(Drain& drain) { - PW_DCHECK(drain.multisink_ == this); + std::lock_guard lock(lock_); + PW_DCHECK_PTR_EQ(drain.multisink_, this); drain.multisink_ = nullptr; return ring_buffer_.DetachReader(drain.reader_); }
diff --git a/pw_multisink/public/pw_multisink/config.h b/pw_multisink/public/pw_multisink/config.h new file mode 100644 index 0000000..98de4a7 --- /dev/null +++ b/pw_multisink/public/pw_multisink/config.h
@@ -0,0 +1,41 @@ +// Copyright 2021 The Pigweed Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not +// use this file except in compliance with the License. You may obtain a copy of +// the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations under +// the License. +#pragma once + +// PW_MULTISINK_LOCK_INTERRUPT_SAFE controls whether an interrupt-safe lock is +// used when reading and writing from the underlying ring-buffer. This is +// enabled by default, using an interrupt spin-lock instead of a mutex. +// Disabling this alters the entry precondition of the multisink, requiring that +// it not be invoked from an interrupt context. +#if !defined(PW_MULTISINK_CONFIG_LOCK_INTERRUPT_SAFE) +#define PW_MULTISINK_CONFIG_LOCK_INTERRUPT_SAFE 1 +#endif // !defined(PW_MULTISINK_CONFIG_LOCK_INTERRUPT_SAFE) + +#if PW_MULTISINK_CONFIG_LOCK_INTERRUPT_SAFE +#include "pw_sync/interrupt_spin_lock.h" +#else // !PW_MULTISINK_CONFIG_LOCK_INTERRUPT_SAFE +#include "pw_sync/mutex.h" +#endif // PW_MULTISINK_CONFIG_LOCK_INTERRUPT_SAFE + +namespace pw { +namespace multisink { + +#if PW_MULTISINK_CONFIG_LOCK_INTERRUPT_SAFE +using LockType = pw::sync::InterruptSpinLock; +#else // !PW_MULTISINK_CONFIG_LOCK_INTERRUPT_SAFE +using LockType = pw::sync::Mutex; +#endif // PW_MULTISINK_CONFIG_LOCK_INTERRUPT_SAFE + +} // namespace multisink +} // namespace pw
diff --git a/pw_multisink/public/pw_multisink/multisink.h b/pw_multisink/public/pw_multisink/multisink.h index 097e4a3..d86ad35 100644 --- a/pw_multisink/public/pw_multisink/multisink.h +++ b/pw_multisink/public/pw_multisink/multisink.h
@@ -13,10 +13,14 @@ // the License. #pragma once +#include <mutex> + #include "pw_bytes/span.h" +#include "pw_multisink/config.h" #include "pw_result/result.h" #include "pw_ring_buffer/prefixed_entry_ring_buffer.h" #include "pw_status/status.h" +#include "pw_sync/lock_annotations.h" namespace pw { namespace multisink { @@ -25,10 +29,12 @@ // An asynchronous single-writer multi-reader queue that ensures readers can // poll for dropped message counts, which is useful for logging or similar // scenarios where readers need to be aware of the input message sequence. +// +// This class is thread-safe but NOT IRQ-safe when +// PW_MULTISINK_LOCK_INTERRUPT_SAFE is disabled. +// // TODO(pwbug/342): Support notifying readers when the queue is readable, // rather than requiring them to poll to check for new entries. -// TODO(pwbug/343): Add thread-safety, separate from the thread-safety work -// planned for the underlying ring buffer. class MultiSink { public: // Constructs a multisink using a ring buffer backed by the provided buffer. @@ -42,12 +48,16 @@ // The sequence ID of the multisink will always increment as a result of // calling HandleEntry, regardless of whether pushing the entry succeeds. // + // Precondition: If PW_MULTISINK_LOCK_INTERRUPT_SAFE is disabled, this + // function must not be called from an interrupt context. + // // Return values: // Ok - Entry was successfully pushed to the ring buffer. // InvalidArgument - Size of data to write is zero bytes. // OutOfRange - Size of data is greater than buffer size. // FailedPrecondition - Buffer was not initialized. - Status HandleEntry(ConstByteSpan entry) { + Status HandleEntry(ConstByteSpan entry) PW_LOCKS_EXCLUDED(lock_) { + std::lock_guard lock(lock_); return ring_buffer_.PushBack(entry, sequence_id_++); } @@ -56,7 +66,10 @@ // before being sent to the multisink (e.g. the writer failed to encode // the message). This API increments the sequence ID of the multisink by // the provided `drop_count`. - void HandleDropped(uint32_t drop_count = 1) { sequence_id_ += drop_count; } + void HandleDropped(uint32_t drop_count = 1) PW_LOCKS_EXCLUDED(lock_) { + std::lock_guard lock(lock_); + sequence_id_ += drop_count; + } // Attach a drain to the multisink. Drains may not be associated with more // than one multisink at a time. Entries pushed before the drain was attached @@ -66,7 +79,7 @@ // Return values: // Ok - Drain was successfully attached. // InvalidArgument - Drain is currently associated with another multisink. - Status AttachDrain(Drain& drain); + Status AttachDrain(Drain& drain) PW_LOCKS_EXCLUDED(lock_); // Detaches a drain from the multisink. Drains may only be detached if they // were previously attached to this multisink. @@ -74,11 +87,14 @@ // Return values: // Ok - Drain was successfully detached. // InvalidArgument - Drain is not currently associated with this multisink. - Status DetachDrain(Drain& drain); + Status DetachDrain(Drain& drain) PW_LOCKS_EXCLUDED(lock_); // Removes all data from the internal buffer. The multisink's sequence ID is // not modified, so readers may interpret this event as droppping entries. - void Clear() { ring_buffer_.Clear(); } + void Clear() PW_LOCKS_EXCLUDED(lock_) { + std::lock_guard lock(lock_); + ring_buffer_.Clear(); + } protected: friend Drain; @@ -94,13 +110,15 @@ // the next available entry. // DataLoss - An entry was read from the multisink, but did not contains an // encoded sequence ID. - static Result<ConstByteSpan> GetEntry(Drain& drain, - ByteSpan buffer, - uint32_t& sequence_id_out); + Result<ConstByteSpan> GetEntry(Drain& drain, + ByteSpan buffer, + uint32_t& sequence_id_out) + PW_LOCKS_EXCLUDED(lock_); private: - ring_buffer::PrefixedEntryRingBufferMulti ring_buffer_; - uint32_t sequence_id_ = 0; + ring_buffer::PrefixedEntryRingBufferMulti ring_buffer_ PW_GUARDED_BY(lock_); + uint32_t sequence_id_ PW_GUARDED_BY(lock_); + LockType lock_; }; } // namespace multisink