| // Copyright 2019 Google LLC |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // https://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| #include "third_party/mlir_edge/iree/hal/host/async_command_queue.h" |
| |
| #include "third_party/absl/base/thread_annotations.h" |
| #include "third_party/mlir_edge/iree/base/status.h" |
| #include "third_party/mlir_edge/iree/base/tracing.h" |
| |
| namespace iree { |
| namespace hal { |
| |
| AsyncCommandQueue::AsyncCommandQueue(std::unique_ptr<CommandQueue> target_queue) |
| : CommandQueue(target_queue->name(), target_queue->supported_categories()), |
| target_queue_(std::move(target_queue)) { |
| IREE_TRACE_SCOPE0("AsyncCommandQueue::ctor"); |
| thread_ = std::thread([this]() { ThreadMain(); }); |
| } |
| |
| AsyncCommandQueue::~AsyncCommandQueue() { |
| IREE_TRACE_SCOPE0("AsyncCommandQueue::dtor"); |
| { |
| // Signal to thread that we want to stop. Note that the thread may have |
| // already been stopped and that's ok (as we'll Join right away). |
| // The thread will finish processing any queued submissions. |
| absl::MutexLock lock(&submission_mutex_); |
| submission_queue_.SignalShutdown(); |
| } |
| thread_.join(); |
| |
| // Ensure we shut down OK. |
| { |
| absl::MutexLock lock(&submission_mutex_); |
| CHECK(submission_queue_.empty()) |
| << "Dirty shutdown of async queue (unexpected thread exit?)"; |
| } |
| } |
| |
| void AsyncCommandQueue::ThreadMain() { |
| // TODO(benvanik): make this safer (may die if trace is flushed late). |
| IREE_TRACE_THREAD_ENABLE(target_queue_->name().c_str()); |
| |
| bool is_exiting = false; |
| while (!is_exiting) { |
| // Block until we are either requested to exit or there are pending |
| // submissions. |
| submission_mutex_.Lock(); |
| submission_mutex_.Await(absl::Condition( |
| +[](HostSubmissionQueue* queue) { |
| return queue->has_shutdown() || !queue->empty(); |
| }, |
| &submission_queue_)); |
| if (!submission_queue_.empty()) { |
| // Run all ready submissions (this may be called many times). |
| submission_mutex_.AssertHeld(); |
| submission_queue_ |
| .ProcessBatches( |
| [this](absl::Span<CommandBuffer* const> command_buffers) |
| ABSL_EXCLUSIVE_LOCKS_REQUIRED(submission_mutex_) { |
| // Release the lock while we perform the processing so that |
| // other threads can submit more work. |
| submission_mutex_.AssertHeld(); |
| submission_mutex_.Unlock(); |
| |
| // Relay the command buffers to the target queue. |
| // Since we are taking care of all synchronization they |
| // don't need any waiters or fences. |
| auto status = target_queue_->Submit( |
| {{}, command_buffers, {}}, {nullptr, 0u}); |
| |
| // Take back the lock so we can manipulate the queue safely. |
| submission_mutex_.Lock(); |
| submission_mutex_.AssertHeld(); |
| |
| return status; |
| }) |
| .IgnoreError(); |
| submission_mutex_.AssertHeld(); |
| } |
| if (submission_queue_.has_shutdown()) { |
| // Exit when there are no more submissions to process and an exit was |
| // requested (or we errored out). |
| is_exiting = true; |
| } |
| submission_mutex_.Unlock(); |
| } |
| } |
| |
| Status AsyncCommandQueue::Submit(absl::Span<const SubmissionBatch> batches, |
| FenceValue fence) { |
| IREE_TRACE_SCOPE0("AsyncCommandQueue::Submit"); |
| absl::MutexLock lock(&submission_mutex_); |
| return submission_queue_.Enqueue(batches, fence); |
| } |
| |
| Status AsyncCommandQueue::Flush() { |
| IREE_TRACE_SCOPE0("AsyncCommandQueue::Flush"); |
| // No-op (as we don't currently delay). |
| absl::MutexLock lock(&submission_mutex_); |
| return submission_queue_.permanent_error(); |
| } |
| |
| Status AsyncCommandQueue::WaitIdle(absl::Time deadline) { |
| IREE_TRACE_SCOPE0("AsyncCommandQueue::WaitIdle"); |
| |
| // Wait until the deadline, the thread exits, or there are no more pending |
| // submissions. |
| absl::MutexLock lock(&submission_mutex_); |
| if (!submission_mutex_.AwaitWithDeadline( |
| absl::Condition( |
| +[](HostSubmissionQueue* queue) { |
| return queue->empty() || !queue->permanent_error().ok(); |
| }, |
| &submission_queue_), |
| deadline)) { |
| return DeadlineExceededErrorBuilder(ABSL_LOC) |
| << "Deadline exceeded waiting for submission thread to go idle"; |
| } |
| return submission_queue_.permanent_error(); |
| } |
| |
| } // namespace hal |
| } // namespace iree |