Nccl integration (#11585)

This integration enables basic NCCL features in the CUDA runtime. This
enables a minimum test to run. Many more things should be done on top of
this.

Two environmental variables are introduced to set the number of
processes and process ID.
1. `IREE_CUDA_NCCL_NPROCS`
2. `IREE_CUDA_NCCL_PROCID`

The GPU ID can be set using `--device=cuda://<index>` or
`--device=cuda://GPU-<uuid>` for `iree-run-module`.

The NCCL dynamic library is loaded only when users set
`IREE_CUDA_NCCL_NPROCS` to >= 1. (Without knowing the number of
processes, we can't create a unique ID, which is needed to create a
channel.)

There are many things to be done based on this work. We need

1. a full set of E2E tests from stream async ops to the runtime,
2. supporting high level ops such as stablehlo, and
3. a CI test setup.

Here is a sample allgather test:

```mlir
func.func @main() -> !hal.buffer_view {
  %c0 = arith.constant 0 : index
  %c2 = arith.constant 2 : index
  %c8 = arith.constant 8 : index
  %c16 = arith.constant 16 : index
  %input_cst = stream.tensor.constant : tensor<2xi32> in !stream.resource<constant> =
    dense<[101, 102]> : tensor<2xi32>
  %input = stream.async.transfer %input_cst : !stream.resource<constant>{%c8} -> !stream.resource<*>{%c8}
  %fill_val = arith.constant -1 : i32
  %output = stream.tensor.splat %fill_val :
    i32 -> tensor<2x2xi32> in !stream.resource<*>{%c16}
  %channel = stream.channel.default on(#hal.affinity.queue<[0]>) : !stream.channel

  %0 = stream.async.collective<all_gather : si32>[%c2]
      on(#hal.affinity.queue<[0]>) channel(%channel)
      %input[%c0 to %c8 for %c8],
      %output[%c0 to %c16 for %c16] :
      !stream.resource<*>{%c8} -> %output as !stream.resource<*>{%c16}
  %1 = stream.async.transfer %0 : !stream.resource<*>{%c16} -> !stream.resource<external>{%c16}
  %result = stream.tensor.export %1 :
    tensor<2x2xi32> in !stream.resource<external>{%c16} -> !hal.buffer_view
  return %result : !hal.buffer_view
}
```

A sample command to build is:
```zsh
iree-compile --iree-hal-cuda-llvm-target-arch=sm_86 --iree-hal-target-backends=cuda -o allgather.vmfb allgather.mlir
```

Here is a sample command line for a host with two CUDA devices and the
result.
```zsh
IREE_CUDA_NCCL_NPROCS=2 NCCL_COMM_ID=127.0.0.1:8000 IREE_CUDA_NCCL_PROCID=0 iree-run-module --device=cuda://0 --module_file=allgather.vmfb --entry_function=main & \
IREE_CUDA_NCCL_NPROCS=2 NCCL_COMM_ID=127.0.0.1:8000 IREE_CUDA_NCCL_PROCID=1 iree-run-module --device=cuda://1 --module_file=allgather.vmfb --entry_function=main
EXEC @main
EXEC @main
result[0]: hal.buffer_view
2x2xi32=[101 102][101 102]
result[0]: hal.buffer_view
2x2xi32=[101 102][101 102]
```
diff --git a/third_party/nccl/nccl.h b/third_party/nccl/nccl.h
new file mode 100644
index 0000000..262d2fc
--- /dev/null
+++ b/third_party/nccl/nccl.h
@@ -0,0 +1,398 @@
+/*************************************************************************
+ * Copyright (c) 2015-2021, NVIDIA CORPORATION. All rights reserved.
+ *
+ * See LICENSE.txt for license information
+ ************************************************************************/
+
+#ifndef NCCL_H_
+#define NCCL_H_
+
+#include <cuda_runtime.h>
+#include <cuda_fp16.h>
+#if CUDART_VERSION >= 11000
+#include <cuda_bf16.h>
+#endif
+
+#define NCCL_MAJOR 2
+#define NCCL_MINOR 15
+#define NCCL_PATCH 5
+#define NCCL_SUFFIX ""
+
+#define NCCL_VERSION_CODE 21505
+#define NCCL_VERSION(X,Y,Z) (((X) <= 2 && (Y) <= 8) ? (X) * 1000 + (Y) * 100 + (Z) : (X) * 10000 + (Y) * 100 + (Z))
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/* Opaque handle to communicator */
+typedef struct ncclComm* ncclComm_t;
+
+#define NCCL_UNIQUE_ID_BYTES 128
+typedef struct { char internal[NCCL_UNIQUE_ID_BYTES]; } ncclUniqueId;
+
+/* Error type */
+typedef enum { ncclSuccess                 =  0,
+               ncclUnhandledCudaError      =  1,
+               ncclSystemError             =  2,
+               ncclInternalError           =  3,
+               ncclInvalidArgument         =  4,
+               ncclInvalidUsage            =  5,
+               ncclRemoteError             =  6,
+               ncclInProgress              =  7,
+               ncclNumResults              =  8 } ncclResult_t;
+
+/* Communicator configuration. Users can assign value to attributes to specify the
+ * behavior of a communicator. */
+typedef struct ncclConfig_v21400 {
+  /* attributes that users should never touch. */
+  size_t size;
+  unsigned int magic;
+  unsigned int version;
+  /* attributes that users are able to customize. */
+  int blocking;
+} ncclConfig_t;
+
+/* Config initializer must be assigned to initialize config structure when it is created.
+ * Not initialized config will result in NCCL error. */
+#define NCCL_CONFIG_INITIALIZER {                                       \
+  sizeof(ncclConfig_t), /* size */                                      \
+  0xcafebeef,           /* magic */                                     \
+  NCCL_VERSION(NCCL_MAJOR, NCCL_MINOR, NCCL_PATCH), /* version */       \
+  1                     /* blocking */                                  \
+}
+
+/* Return the NCCL_VERSION_CODE of the NCCL library in the supplied integer.
+ * This integer is coded with the MAJOR, MINOR and PATCH level of the
+ * NCCL library
+ */
+ncclResult_t  ncclGetVersion(int *version);
+ncclResult_t pncclGetVersion(int *version);
+
+/* Generates an Id to be used in ncclCommInitRank. ncclGetUniqueId should be
+ * called once and the Id should be distributed to all ranks in the
+ * communicator before calling ncclCommInitRank. */
+ncclResult_t  ncclGetUniqueId(ncclUniqueId* uniqueId);
+ncclResult_t pncclGetUniqueId(ncclUniqueId* uniqueId);
+
+/* Create a new communicator (multi thread/process version) with a configuration
+ * set by users. */
+ncclResult_t  ncclCommInitRankConfig(ncclComm_t* comm, int nranks, ncclUniqueId commId, int rank, ncclConfig_t* config);
+ncclResult_t pncclCommInitRankConfig(ncclComm_t* comm, int nranks, ncclUniqueId commId, int rank, ncclConfig_t* config);
+
+/* Creates a new communicator (multi thread/process version).
+ * rank must be between 0 and nranks-1 and unique within a communicator clique.
+ * Each rank is associated to a CUDA device, which has to be set before calling
+ * ncclCommInitRank.
+ * ncclCommInitRank implicitly syncronizes with other ranks, so it must be
+ * called by different threads/processes or use ncclGroupStart/ncclGroupEnd. */
+ncclResult_t  ncclCommInitRank(ncclComm_t* comm, int nranks, ncclUniqueId commId, int rank);
+ncclResult_t pncclCommInitRank(ncclComm_t* comm, int nranks, ncclUniqueId commId, int rank);
+
+/* Creates a clique of communicators (single process version).
+ * This is a convenience function to create a single-process communicator clique.
+ * Returns an array of ndev newly initialized communicators in comm.
+ * comm should be pre-allocated with size at least ndev*sizeof(ncclComm_t).
+ * If devlist is NULL, the first ndev CUDA devices are used.
+ * Order of devlist defines user-order of processors within the communicator. */
+ncclResult_t  ncclCommInitAll(ncclComm_t* comm, int ndev, const int* devlist);
+ncclResult_t pncclCommInitAll(ncclComm_t* comm, int ndev, const int* devlist);
+
+/* Finalize a communicator. ncclCommFinalize flushes all issued communications,
+ * and marks communicator state as ncclInProgress. The state will change to ncclSuccess
+ * when the communicator is globally quiescent and related resources are freed; then,
+ * calling ncclCommDestroy can locally free the rest of the resources (e.g. communicator
+ * itself) without blocking. */
+ncclResult_t  ncclCommFinalize(ncclComm_t comm);
+ncclResult_t pncclCommFinalize(ncclComm_t comm);
+
+/* Frees local resources associated with communicator object. */
+ncclResult_t  ncclCommDestroy(ncclComm_t comm);
+ncclResult_t pncclCommDestroy(ncclComm_t comm);
+
+/* Frees resources associated with communicator object and aborts any operations
+ * that might still be running on the device. */
+ncclResult_t  ncclCommAbort(ncclComm_t comm);
+ncclResult_t pncclCommAbort(ncclComm_t comm);
+
+/* Returns a string for each error code. */
+const char*  ncclGetErrorString(ncclResult_t result);
+const char* pncclGetErrorString(ncclResult_t result);
+
+/* Returns a human-readable message of the last error that occurred.
+ * comm is currently unused and can be set to NULL
+ */
+const char*  ncclGetLastError(ncclComm_t comm);
+const char* pncclGetError(ncclComm_t comm);
+
+/* Checks whether the comm has encountered any asynchronous errors */
+ncclResult_t  ncclCommGetAsyncError(ncclComm_t comm, ncclResult_t *asyncError);
+ncclResult_t pncclCommGetAsyncError(ncclComm_t comm, ncclResult_t *asyncError);
+
+/* Gets the number of ranks in the communicator clique. */
+ncclResult_t  ncclCommCount(const ncclComm_t comm, int* count);
+ncclResult_t pncclCommCount(const ncclComm_t comm, int* count);
+
+/* Returns the cuda device number associated with the communicator. */
+ncclResult_t  ncclCommCuDevice(const ncclComm_t comm, int* device);
+ncclResult_t pncclCommCuDevice(const ncclComm_t comm, int* device);
+
+/* Returns the user-ordered "rank" associated with the communicator. */
+ncclResult_t  ncclCommUserRank(const ncclComm_t comm, int* rank);
+ncclResult_t pncclCommUserRank(const ncclComm_t comm, int* rank);
+
+/* Reduction operation selector */
+typedef enum { ncclNumOps_dummy = 5 } ncclRedOp_dummy_t;
+typedef enum { ncclSum        = 0,
+               ncclProd       = 1,
+               ncclMax        = 2,
+               ncclMin        = 3,
+               ncclAvg        = 4,
+               /* ncclNumOps: The number of built-in ncclRedOp_t values. Also
+                * serves as the least possible value for dynamic ncclRedOp_t's
+                * as constructed by ncclRedOpCreate*** functions. */
+               ncclNumOps     = 5,
+               /* ncclMaxRedOp: The largest valid value for ncclRedOp_t.
+                * It is defined to be the largest signed value (since compilers
+                * are permitted to use signed enums) that won't grow
+                * sizeof(ncclRedOp_t) when compared to previous NCCL versions to
+                * maintain ABI compatibility. */
+               ncclMaxRedOp   = 0x7fffffff>>(32-8*sizeof(ncclRedOp_dummy_t))
+             } ncclRedOp_t;
+
+/* Data types */
+typedef enum { ncclInt8       = 0, ncclChar       = 0,
+               ncclUint8      = 1,
+               ncclInt32      = 2, ncclInt        = 2,
+               ncclUint32     = 3,
+               ncclInt64      = 4,
+               ncclUint64     = 5,
+               ncclFloat16    = 6, ncclHalf       = 6,
+               ncclFloat32    = 7, ncclFloat      = 7,
+               ncclFloat64    = 8, ncclDouble     = 8,
+#if defined(__CUDA_BF16_TYPES_EXIST__)
+               ncclBfloat16   = 9,
+               ncclNumTypes   = 10
+#else
+               ncclNumTypes   = 9
+#endif
+} ncclDataType_t;
+
+/* ncclScalarResidence_t: Location and dereferencing logic for scalar arguments. */
+typedef enum {
+  /* ncclScalarDevice: The scalar is in device-visible memory and will be
+   * dereferenced while the collective is running. */
+  ncclScalarDevice = 0,
+
+  /* ncclScalarHostImmediate: The scalar is in host-visible memory and will be
+   * dereferenced before the ncclRedOpCreate***() function returns. */
+  ncclScalarHostImmediate = 1
+} ncclScalarResidence_t;
+
+/*
+ * ncclRedOpCreatePreMulSum
+ *
+ * Creates a new reduction operator which pre-multiplies input values by a given
+ * scalar locally before reducing them with peer values via summation. For use
+ * only with collectives launched against *comm* and *datatype*. The
+ * *residence* argument indicates how/when the memory pointed to by *scalar*
+ * will be dereferenced. Upon return, the newly created operator's handle
+ * is stored in *op*.
+ */
+ncclResult_t  ncclRedOpCreatePreMulSum(ncclRedOp_t *op, void *scalar, ncclDataType_t datatype, ncclScalarResidence_t residence, ncclComm_t comm);
+ncclResult_t pncclRedOpCreatePreMulSum(ncclRedOp_t *op, void *scalar, ncclDataType_t datatype, ncclScalarResidence_t residence, ncclComm_t comm);
+
+/*
+ * ncclRedOpDestroy
+ *
+ * Destroys the reduction operator *op*. The operator must have been created by
+ * ncclRedOpCreatePreMul with the matching communicator *comm*. An operator may be
+ * destroyed as soon as the last NCCL function which is given that operator returns.
+ */
+ncclResult_t ncclRedOpDestroy(ncclRedOp_t op, ncclComm_t comm);
+ncclResult_t pncclRedOpDestroy(ncclRedOp_t op, ncclComm_t comm);
+
+/*
+ * Collective communication operations
+ *
+ * Collective communication operations must be called separately for each
+ * communicator in a communicator clique.
+ *
+ * They return when operations have been enqueued on the CUDA stream.
+ *
+ * Since they may perform inter-CPU synchronization, each call has to be done
+ * from a different thread or process, or need to use Group Semantics (see
+ * below).
+ */
+
+/*
+ * Reduce
+ *
+ * Reduces data arrays of length count in sendbuff into recvbuff using op
+ * operation.
+ * recvbuff may be NULL on all calls except for root device.
+ * root is the rank (not the CUDA device) where data will reside after the
+ * operation is complete.
+ *
+ * In-place operation will happen if sendbuff == recvbuff.
+ */
+ncclResult_t  ncclReduce(const void* sendbuff, void* recvbuff, size_t count, ncclDataType_t datatype,
+    ncclRedOp_t op, int root, ncclComm_t comm, cudaStream_t stream);
+ncclResult_t pncclReduce(const void* sendbuff, void* recvbuff, size_t count, ncclDataType_t datatype,
+    ncclRedOp_t op, int root, ncclComm_t comm, cudaStream_t stream);
+
+/*
+ * (deprecated) Broadcast (in-place)
+ *
+ * Copies count values from root to all other devices.
+ * root is the rank (not the CUDA device) where data resides before the
+ * operation is started.
+ *
+ * This operation is implicitely in place.
+ */
+ncclResult_t  ncclBcast(void* buff, size_t count, ncclDataType_t datatype, int root,
+    ncclComm_t comm, cudaStream_t stream);
+ncclResult_t pncclBcast(void* buff, size_t count, ncclDataType_t datatype, int root,
+    ncclComm_t comm, cudaStream_t stream);
+
+/*
+ * Broadcast
+ *
+ * Copies count values from root to all other devices.
+ * root is the rank (not the CUDA device) where data resides before the
+ * operation is started.
+ *
+ * In-place operation will happen if sendbuff == recvbuff.
+ */
+ncclResult_t  ncclBroadcast(const void* sendbuff, void* recvbuff, size_t count, ncclDataType_t datatype, int root,
+    ncclComm_t comm, cudaStream_t stream);
+ncclResult_t pncclBroadcast(const void* sendbuff, void* recvbuff, size_t count, ncclDataType_t datatype, int root,
+    ncclComm_t comm, cudaStream_t stream);
+
+/*
+ * All-Reduce
+ *
+ * Reduces data arrays of length count in sendbuff using op operation, and
+ * leaves identical copies of result on each recvbuff.
+ *
+ * In-place operation will happen if sendbuff == recvbuff.
+ */
+ncclResult_t  ncclAllReduce(const void* sendbuff, void* recvbuff, size_t count,
+    ncclDataType_t datatype, ncclRedOp_t op, ncclComm_t comm, cudaStream_t stream);
+ncclResult_t pncclAllReduce(const void* sendbuff, void* recvbuff, size_t count,
+    ncclDataType_t datatype, ncclRedOp_t op, ncclComm_t comm, cudaStream_t stream);
+
+/*
+ * Reduce-Scatter
+ *
+ * Reduces data in sendbuff using op operation and leaves reduced result
+ * scattered over the devices so that recvbuff on rank i will contain the i-th
+ * block of the result.
+ * Assumes sendcount is equal to nranks*recvcount, which means that sendbuff
+ * should have a size of at least nranks*recvcount elements.
+ *
+ * In-place operations will happen if recvbuff == sendbuff + rank * recvcount.
+ */
+ncclResult_t  ncclReduceScatter(const void* sendbuff, void* recvbuff,
+    size_t recvcount, ncclDataType_t datatype, ncclRedOp_t op, ncclComm_t comm,
+    cudaStream_t stream);
+ncclResult_t pncclReduceScatter(const void* sendbuff, void* recvbuff,
+    size_t recvcount, ncclDataType_t datatype, ncclRedOp_t op, ncclComm_t comm,
+    cudaStream_t stream);
+
+/*
+ * All-Gather
+ *
+ * Each device gathers sendcount values from other GPUs into recvbuff,
+ * receiving data from rank i at offset i*sendcount.
+ * Assumes recvcount is equal to nranks*sendcount, which means that recvbuff
+ * should have a size of at least nranks*sendcount elements.
+ *
+ * In-place operations will happen if sendbuff == recvbuff + rank * sendcount.
+ */
+ncclResult_t  ncclAllGather(const void* sendbuff, void* recvbuff, size_t sendcount,
+    ncclDataType_t datatype, ncclComm_t comm, cudaStream_t stream);
+ncclResult_t pncclAllGather(const void* sendbuff, void* recvbuff, size_t sendcount,
+    ncclDataType_t datatype, ncclComm_t comm, cudaStream_t stream);
+
+/*
+ * Send
+ *
+ * Send data from sendbuff to rank peer.
+ *
+ * Rank peer needs to call ncclRecv with the same datatype and the same count from this
+ * rank.
+ *
+ * This operation is blocking for the GPU. If multiple ncclSend and ncclRecv operations
+ * need to progress concurrently to complete, they must be fused within a ncclGroupStart/
+ * ncclGroupEnd section.
+ */
+ncclResult_t  ncclSend(const void* sendbuff, size_t count, ncclDataType_t datatype, int peer,
+    ncclComm_t comm, cudaStream_t stream);
+ncclResult_t pncclSend(const void* sendbuff, size_t count, ncclDataType_t datatype, int peer,
+    ncclComm_t comm, cudaStream_t stream);
+
+/*
+ * Receive
+ *
+ * Receive data from rank peer into recvbuff.
+ *
+ * Rank peer needs to call ncclSend with the same datatype and the same count to this
+ * rank.
+ *
+ * This operation is blocking for the GPU. If multiple ncclSend and ncclRecv operations
+ * need to progress concurrently to complete, they must be fused within a ncclGroupStart/
+ * ncclGroupEnd section.
+ */
+ncclResult_t pncclRecv(void* recvbuff, size_t count, ncclDataType_t datatype, int peer,
+    ncclComm_t comm, cudaStream_t stream);
+ncclResult_t  ncclRecv(void* recvbuff, size_t count, ncclDataType_t datatype, int peer,
+    ncclComm_t comm, cudaStream_t stream);
+
+/*
+ * Group semantics
+ *
+ * When managing multiple GPUs from a single thread, and since NCCL collective
+ * calls may perform inter-CPU synchronization, we need to "group" calls for
+ * different ranks/devices into a single call.
+ *
+ * Grouping NCCL calls as being part of the same collective operation is done
+ * using ncclGroupStart and ncclGroupEnd. ncclGroupStart will enqueue all
+ * collective calls until the ncclGroupEnd call, which will wait for all calls
+ * to be complete. Note that for collective communication, ncclGroupEnd only
+ * guarantees that the operations are enqueued on the streams, not that
+ * the operation is effectively done.
+ *
+ * Both collective communication and ncclCommInitRank can be used in conjunction
+ * of ncclGroupStart/ncclGroupEnd, but not together.
+ *
+ * Group semantics also allow to fuse multiple operations on the same device
+ * to improve performance (for aggregated collective calls), or to permit
+ * concurrent progress of multiple send/receive operations.
+ */
+
+/*
+ * Group Start
+ *
+ * Start a group call. All calls to NCCL until ncclGroupEnd will be fused into
+ * a single NCCL operation. Nothing will be started on the CUDA stream until
+ * ncclGroupEnd.
+ */
+ncclResult_t  ncclGroupStart();
+ncclResult_t pncclGroupStart();
+
+/*
+ * Group End
+ *
+ * End a group call. Start a fused NCCL operation consisting of all calls since
+ * ncclGroupStart. Operations on the CUDA stream depending on the NCCL operations
+ * need to be called after ncclGroupEnd.
+ */
+ncclResult_t  ncclGroupEnd();
+ncclResult_t pncclGroupEnd();
+
+#ifdef __cplusplus
+} // end extern "C"
+#endif
+
+#endif // end include guard