blob: b524f7bda4523c979598345009128fef3deee96f [file] [log] [blame]
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "iree/hal/vulkan/serializing_command_queue.h"
#include <memory>
#include "absl/types/span.h"
#include "iree/base/api.h"
#include "iree/base/memory.h"
#include "iree/base/tracing.h"
#include "iree/hal/api.h"
#include "iree/hal/vulkan/direct_command_buffer.h"
#include "iree/hal/vulkan/emulated_semaphore.h"
#include "iree/hal/vulkan/status_util.h"
namespace iree {
namespace hal {
namespace vulkan {
namespace {
// Tries to prepare all necessary binary `VKSemaphore`s for emulating the time
// points as specified in the given submission |batch_wait_semaphores| and
// |batch_signal_semaphores|, then returns true if possible so that the
// batch is ready to be submitted to GPU.
// |wait_semaphores| and |signal_semaphores| will be filled with the binary
// `VkSemaphores` on success.
iree_status_t TryToPrepareSemaphores(
const absl::InlinedVector<SemaphoreValue, 4>& batch_wait_semaphores,
const absl::InlinedVector<SemaphoreValue, 4>& batch_signal_semaphores,
const ref_ptr<TimePointFence>& batch_fence,
absl::InlinedVector<VkSemaphore, 4>* wait_semaphores,
absl::InlinedVector<VkSemaphore, 4>* signal_semaphores,
bool* out_ready_to_submit) {
IREE_TRACE_SCOPE0("TryToPrepareSemaphores");
*out_ready_to_submit = false;
wait_semaphores->clear();
for (const auto& timeline_semaphore : batch_wait_semaphores) {
// Query first to progress this timeline semaphore to the furthest.
uint64_t signaled_value = 0;
IREE_RETURN_IF_ERROR(
iree_hal_semaphore_query(timeline_semaphore.first, &signaled_value));
// If it's already signaled to a value greater than we require here,
// we can just ignore this semaphore now.
if (signaled_value >= timeline_semaphore.second) {
continue;
}
// Otherwise try to get a binary semaphore for this time point so that
// we can wait on.
// TODO(antiagainst): if this fails we need to cancel.
VkSemaphore wait_semaphore = VK_NULL_HANDLE;
IREE_RETURN_IF_ERROR(iree_hal_vulkan_emulated_semaphore_acquire_wait_handle(
timeline_semaphore.first, timeline_semaphore.second, batch_fence,
&wait_semaphore));
wait_semaphores->push_back(wait_semaphore);
if (wait_semaphore == VK_NULL_HANDLE) {
// We cannot wait on this time point yet: there are no previous semaphores
// submitted to the GPU that can signal a value greater than what's
// desired here.
// Cancel the wait so others may make progress.
// TODO(antiagainst): if any of these fail we need to cancel.
for (iree_host_size_t i = 0; i < batch_wait_semaphores.size(); ++i) {
if (!wait_semaphores->at(i)) break;
IREE_RETURN_IF_ERROR(
iree_hal_vulkan_emulated_semaphore_cancel_wait_handle(
batch_wait_semaphores[i].first, wait_semaphores->at(i)));
}
// This batch cannot be submitted to GPU yet.
return iree_ok_status();
}
}
// We've collected all necessary binary semaphores for each timeline we need
// to wait on. Now prepare binary semaphores for signaling.
signal_semaphores->clear();
for (const auto& timeline_semaphore : batch_signal_semaphores) {
// SerializingCommandQueue only works with EmulatedTimelineSemaphore.
VkSemaphore signal_semaphore = VK_NULL_HANDLE;
IREE_RETURN_IF_ERROR(
iree_hal_vulkan_emulated_semaphore_acquire_signal_handle(
timeline_semaphore.first, timeline_semaphore.second, batch_fence,
&signal_semaphore));
signal_semaphores->push_back(signal_semaphore);
}
// Good to submit!
*out_ready_to_submit = true;
return iree_ok_status();
}
// Prepares `VkSubmitInfo` to submit the given list of |command_buffers| that
// waiting on |wait_semaphores| and signalling |signal_semaphores|. Necessary
// structures are allocated from |arena| and the result `VkSubmitInfo` is
// written to |submit_info|.
void PrepareSubmitInfo(absl::Span<const VkSemaphore> wait_semaphore_handles,
absl::Span<const VkCommandBuffer> command_buffer_handles,
absl::Span<const VkSemaphore> signal_semaphore_handles,
VkSubmitInfo* submit_info, Arena* arena) {
// TODO(benvanik): see if we can go to finer-grained stages.
// For example, if this was just queue ownership transfers then we can use
// the pseudo-stage of VK_PIPELINE_STAGE_BOTTOM_OF_PIPE_BIT.
auto wait_dst_stage_masks =
arena->AllocateSpan<VkPipelineStageFlags>(wait_semaphore_handles.size());
for (size_t i = 0, e = wait_semaphore_handles.size(); i < e; ++i) {
wait_dst_stage_masks[i] =
VK_PIPELINE_STAGE_TRANSFER_BIT | VK_PIPELINE_STAGE_COMPUTE_SHADER_BIT;
}
// NOTE: this code does some very weird things - the handles we take in as
// args are mutated in-place after this function is called so we can't
// reference them here. If we were going to preserve this code post-Vulkan 1.2
// then we'd really want to rework all of this to properly use the arena from
// the start instead of all this InlinedVector tomfoolery.
auto wait_semaphores =
arena->AllocateSpan<VkSemaphore>(wait_semaphore_handles.size());
for (size_t i = 0, e = wait_semaphore_handles.size(); i < e; ++i) {
wait_semaphores[i] = wait_semaphore_handles[i];
}
auto command_buffers =
arena->AllocateSpan<VkCommandBuffer>(command_buffer_handles.size());
for (size_t i = 0, e = command_buffer_handles.size(); i < e; ++i) {
command_buffers[i] = command_buffer_handles[i];
}
auto signal_semaphores =
arena->AllocateSpan<VkSemaphore>(signal_semaphore_handles.size());
for (size_t i = 0, e = signal_semaphore_handles.size(); i < e; ++i) {
signal_semaphores[i] = signal_semaphore_handles[i];
}
submit_info->sType = VK_STRUCTURE_TYPE_SUBMIT_INFO;
submit_info->pNext = nullptr;
submit_info->waitSemaphoreCount =
static_cast<uint32_t>(wait_semaphores.size());
submit_info->pWaitSemaphores = wait_semaphores.data();
submit_info->pWaitDstStageMask = wait_dst_stage_masks.data();
submit_info->commandBufferCount =
static_cast<uint32_t>(command_buffers.size());
submit_info->pCommandBuffers = command_buffers.data();
submit_info->signalSemaphoreCount =
static_cast<uint32_t>(signal_semaphores.size());
submit_info->pSignalSemaphores = signal_semaphores.data();
}
} // namespace
SerializingCommandQueue::SerializingCommandQueue(
VkDeviceHandle* logical_device, std::string name,
iree_hal_command_category_t supported_categories, VkQueue queue,
TimePointFencePool* fence_pool)
: CommandQueue(logical_device, std::move(name), supported_categories,
queue),
fence_pool_(fence_pool) {}
SerializingCommandQueue::~SerializingCommandQueue() = default;
iree_status_t SerializingCommandQueue::Submit(
iree_host_size_t batch_count, const iree_hal_submission_batch_t* batches) {
IREE_TRACE_SCOPE0("SerializingCommandQueue::Submit");
IntrusiveList<std::unique_ptr<FencedSubmission>> new_submissions;
for (iree_host_size_t i = 0; i < batch_count; ++i) {
const iree_hal_submission_batch_t* batch = &batches[i];
// Grab a fence for this submission first. This will be used to check the
// progress of emulated timeline semaphores later.
auto submission = std::make_unique<FencedSubmission>();
IREE_ASSIGN_OR_RETURN(submission->fence, fence_pool_->Acquire());
submission->wait_semaphores.resize(batch->wait_semaphores.count);
for (iree_host_size_t j = 0; j < batch->wait_semaphores.count; ++j) {
submission->wait_semaphores[j] = {
batch->wait_semaphores.semaphores[j],
batch->wait_semaphores.payload_values[j]};
}
submission->command_buffers.resize(batch->command_buffer_count);
for (iree_host_size_t j = 0; j < batch->command_buffer_count; ++j) {
submission->command_buffers[j] =
iree_hal_vulkan_direct_command_buffer_handle(
batch->command_buffers[j]);
}
submission->signal_semaphores.resize(batch->signal_semaphores.count);
for (iree_host_size_t j = 0; j < batch->signal_semaphores.count; ++j) {
submission->signal_semaphores[j] = {
batch->signal_semaphores.semaphores[j],
batch->signal_semaphores.payload_values[j]};
}
new_submissions.push_back(std::move(submission));
}
iree_slim_mutex_lock(&queue_mutex_);
deferred_submissions_.merge_from(&new_submissions);
iree_status_t status = ProcessDeferredSubmissions();
iree_slim_mutex_unlock(&queue_mutex_);
return status;
}
iree_status_t SerializingCommandQueue::ProcessDeferredSubmissions(
bool* out_work_submitted) {
IREE_TRACE_SCOPE0("SerializingCommandQueue::ProcessDeferredSubmissions");
if (out_work_submitted) *out_work_submitted = false;
// We need to return all remaining submissions back to the queue to avoid
// dropping work.
IntrusiveList<std::unique_ptr<FencedSubmission>> remaining_submissions;
auto submission_cleanup = MakeCleanup([this, &remaining_submissions]() {
// Disable thread-safety-analysis as it doesn't understand this lambda.
// - This entire function is ABSL_EXCLUSIVE_LOCKS_REQUIRED(queue_mutex_)
// - This Cleanup object is destroyed when it drops out of scope
// - The mutex is always held when executing this function
#ifdef __clang__
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wthread-safety-analysis"
#endif
while (!remaining_submissions.empty()) {
deferred_submissions_.push_back(
remaining_submissions.take(remaining_submissions.front()));
}
#ifdef __clang__
#pragma clang diagnostic pop
#endif
});
Arena arena(4 * 1024);
absl::InlinedVector<VkSubmitInfo, 4> submit_infos;
absl::InlinedVector<VkFence, 4> submit_fences;
while (!deferred_submissions_.empty()) {
FencedSubmission* submission = deferred_submissions_.front();
ref_ptr<TimePointFence>& fence = submission->fence;
absl::InlinedVector<VkSemaphore, 4> wait_semaphores;
absl::InlinedVector<VkSemaphore, 4> signal_semaphores;
bool ready_to_submit = false;
IREE_RETURN_IF_ERROR(TryToPrepareSemaphores(
submission->wait_semaphores, submission->signal_semaphores, fence,
&wait_semaphores, &signal_semaphores, &ready_to_submit));
if (ready_to_submit) {
submit_infos.emplace_back();
PrepareSubmitInfo(wait_semaphores, submission->command_buffers,
signal_semaphores, &submit_infos.back(), &arena);
submit_fences.push_back(fence->value());
pending_fences_.emplace_back(std::move(fence));
deferred_submissions_.pop_front();
} else {
// We need to defer the submission until later.
remaining_submissions.push_back(deferred_submissions_.take(submission));
}
}
if (submit_infos.empty()) {
if (out_work_submitted) *out_work_submitted = false;
return iree_ok_status();
}
// Note: We might be able to batch the submission but it involves non-trivial
// fence handling. We can handle that if really needed.
for (size_t i = 0, e = submit_infos.size(); i < e; ++i) {
VK_RETURN_IF_ERROR(
syms()->vkQueueSubmit(queue_, /*submitCount=*/1, &submit_infos[i],
submit_fences[i]),
"vkQueueSubmit");
}
if (out_work_submitted) *out_work_submitted = true;
return iree_ok_status();
}
iree_status_t SerializingCommandQueue::WaitIdle(iree_time_t deadline_ns) {
iree_status_t status = iree_ok_status();
if (deadline_ns == IREE_TIME_INFINITE_FUTURE) {
IREE_TRACE_SCOPE0("SerializingCommandQueue::WaitIdle#vkQueueWaitIdle");
// Fast path for using vkQueueWaitIdle, which is usually cheaper (as it
// requires fewer calls into the driver).
iree_slim_mutex_lock(&queue_mutex_);
// Complete all pending work on the queue.
status =
VK_RESULT_TO_STATUS(syms()->vkQueueWaitIdle(queue_), "vkQueueWaitIdle");
if (!iree_status_is_ok(status)) {
iree_slim_mutex_unlock(&queue_mutex_);
return status;
}
pending_fences_.clear();
// Submit and complete all deferred work.
while (!deferred_submissions_.empty()) {
bool work_submitted = false;
status = ProcessDeferredSubmissions(&work_submitted);
if (!iree_status_is_ok(status)) break;
if (work_submitted) {
status = VK_RESULT_TO_STATUS(syms()->vkQueueWaitIdle(queue_),
"vkQueueWaitIdle");
if (!iree_status_is_ok(status)) break;
pending_fences_.clear();
}
}
iree_slim_mutex_unlock(&queue_mutex_);
return status;
}
IREE_TRACE_SCOPE0("SerializingCommandQueue::WaitIdle#Fence");
// Keep trying to submit more workload to the GPU until reaching the deadline.
iree_slim_mutex_lock(&queue_mutex_);
do {
status = ProcessDeferredSubmissions();
bool has_deferred_submissions = !deferred_submissions_.empty();
absl::InlinedVector<VkFence, 8> fence_handles(pending_fences_.size());
for (size_t i = 0; i < pending_fences_.size(); ++i) {
fence_handles[i] = pending_fences_[i]->value();
}
if (!iree_status_is_ok(status)) {
break; // unable to process submissions
} else if (!has_deferred_submissions && fence_handles.empty()) {
break; // no more work - idle achieved
}
uint64_t timeout_ns;
if (deadline_ns == IREE_TIME_INFINITE_FUTURE) {
timeout_ns = UINT64_MAX;
} else if (deadline_ns == IREE_TIME_INFINITE_PAST) {
timeout_ns = 0;
} else {
// Convert to relative time in nanoseconds.
// The implementation may not wait with this granularity (like by 10000x).
iree_time_t now_ns = iree_time_now();
if (deadline_ns < now_ns) {
return iree_status_from_code(IREE_STATUS_DEADLINE_EXCEEDED);
}
timeout_ns = (uint64_t)(deadline_ns - now_ns);
}
VkResult result = syms()->vkWaitForFences(
*logical_device_, static_cast<uint32_t>(fence_handles.size()),
fence_handles.data(),
/*waitAll=*/VK_TRUE, timeout_ns);
switch (result) {
case VK_SUCCESS:
pending_fences_.clear();
break;
case VK_TIMEOUT:
status = iree_status_from_code(IREE_STATUS_DEADLINE_EXCEEDED);
break;
default:
status = VK_RESULT_TO_STATUS(result, "vkWaitForFences");
break;
}
// As long as there is submitted or deferred work still pending.
} while (iree_status_is_ok(status));
iree_slim_mutex_unlock(&queue_mutex_);
return status;
}
iree_status_t SerializingCommandQueue::AdvanceQueueSubmission() {
// The returned value just indicates whether there were newly ready
// submissions gotten submitted to the GPU. Other callers might be
// interested in that information but for this API we just want to advance
// queue submisison if possible. So we ignore it here.
iree_slim_mutex_lock(&queue_mutex_);
iree_status_t status = ProcessDeferredSubmissions();
iree_slim_mutex_unlock(&queue_mutex_);
return status;
}
void SerializingCommandQueue::AbortQueueSubmission() {
iree_slim_mutex_lock(&queue_mutex_);
// We have fences in deferred_submissions_ but they are not submitted to GPU
// yet so we don't need to reset.
deferred_submissions_.clear();
absl::InlinedVector<VkFence, 8> fence_handles(pending_fences_.size());
for (size_t i = 0; i < pending_fences_.size(); ++i) {
fence_handles[i] = pending_fences_[i]->value();
}
syms()->vkWaitForFences(*logical_device_,
static_cast<uint32_t>(fence_handles.size()),
fence_handles.data(),
/*waitAll=*/VK_TRUE, /*timeout=*/UINT64_MAX);
// Clear the list. Fences will be automatically returned back to the queue
// after refcount reaches 0.
pending_fences_.clear();
iree_slim_mutex_unlock(&queue_mutex_);
}
void SerializingCommandQueue::SignalFences(absl::Span<VkFence> fences) {
const auto span_contains = [fences](VkFence fence) {
for (VkFence f : fences) {
if (f == fence) return true;
}
return false;
};
iree_slim_mutex_lock(&queue_mutex_);
auto it = pending_fences_.begin();
while (it != pending_fences_.end()) {
if (span_contains((*it)->value())) {
it = pending_fences_.erase(it);
} else {
++it;
}
}
iree_slim_mutex_unlock(&queue_mutex_);
}
} // namespace vulkan
} // namespace hal
} // namespace iree