Skip to content

Commit

Permalink
Progress with ragged implementation...
Browse files Browse the repository at this point in the history
  • Loading branch information
danpovey committed Sep 2, 2020
2 parents cb0796d + 25d73ef commit 8c85850
Show file tree
Hide file tree
Showing 6 changed files with 206 additions and 23 deletions.
1 change: 1 addition & 0 deletions k2/csrc/cuda/array.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ struct Array2Accessor {
__host__ __device__ T &operator () (int32_t i, int32_t j) {
return data[i * elem_stride0 + j];
}
T *Row(int32_t i) { return data + elem_stride0 * i; }
Array2Accessor(T *data, int32_t elem_stride0):
data(data), elem_stride0(elem_stride0) { }
__host__ __device__ Array2Accessor(const Array2Accessor &other) = default;
Expand Down
28 changes: 28 additions & 0 deletions k2/csrc/cuda/context.h
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,33 @@ inline void Eval2(ContextPtrType c, int32_t m, int32_t n, LambdaT &lambda) {
}


// This is for use by ParallelRunner and Context. Users probably should not interact
// with this directly. The idea is that the Context object will call this to
// possibly override its default thread.
// The
class CudaThreadOverride {
inline cudaStream_t OverrideThread(cudaStream_t thread) {
if (thread_override_ != 0 && thread != k2_cudaStreamInvalid)
return thread_override_;
else
return thread;
}
void Push(cudaStream_t thread) {
stack_.push_back(thread);
thread_override_ = thread;
}
void Pop() {
assert(!stack_.empty());
stack_.pop_back();
}

CudaThreadOverride(): thread_override_(0x0) { }

cudaThread_t thread_override_;
std::vector<cudaThread_t> stack_;
};
thread_local CudaThreadOverride thread_override;

/*
Class ParallelRunner allows you to invoke Eval(), but in parallel.
It works for CUDA and CPU, but for CPU it currently just executes things
Expand Down Expand Up @@ -413,6 +440,7 @@ class ParallelRunner {




// OK, want to do:
// ContextPtr c = ...; ///
// auto d = Dependency({out_region1, out_region2},
Expand Down
118 changes: 110 additions & 8 deletions k2/csrc/cuda/ragged.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,27 @@ RaggedShape RaggedShape3(Array1<int32_t> *row_splits1,
RaggedShape2(row_splits2, row_ids2, cached_tot_size2));
}

// See declaration in ragged.h for documentation of its purpose and interface.
RaggedShape Unsqueeze(const RaggedShape &src, int32_t axis) {
// If axis == 0, initial row_splits and row_ids will look like the following,
// if for example src.Dim0() was 5: [ 0 5 ], [ 0 0 0 0 0 1 ]. The other axes
// would be pushed forward.
//
// If 0 < axis <= src.NumAxes(), the inserted row_splits and row_ids would
// look like the following, if for instance the src.TotSize(axis-1) = 8:
// [ 0 1 2 3 4 5 6 7 8 ], [ 0 1 2 3 4 5 6 7 8 ].
//
// The reason why the code is different for axis == 0, is that in that case we
// are really making visible an "implicit" axis of the input `src`; we could
// call it axis 0 of the original RaggedShape. Imagine that "implicit" axis's
// row_splits and row_ids map respectively from an idx_minus1 -> idx0 and from
// an idx_0 to idx_minus1, where idx_minus1 is always 0 and 0 <= idx0 <
// Dim0().



}


/*
TODO: fix this documentation...
Expand Down Expand Up @@ -155,7 +176,8 @@ struct RowInfoWithOffsets {



RaggedShape Stack(int32_t axis, int32_t num_srcs, RaggedShape **src) {

RaggedShape Stack(int32_t num_srcs, RaggedShape **src, int32_t axis) {
CHECK_GT(num_srcs, 0);
ContextPtr c = src[0]->GetContext();
int32_t num_axes_in = src[0]->NumAxes(),
Expand All @@ -176,10 +198,14 @@ RaggedShape Stack(int32_t axis, int32_t num_srcs, RaggedShape **src) {
GetRowInfoMulti(num_srcs, src, &src_row_splits, &src_row_ids);


// offsets has shape ((src[0]->NumAxes()+1), (num_srcs + 1)).
// Its first row (axis=0) is 0,1,2,3,..., and for axis>0, that row is
// the exclusive-sum of the TotSize(axis-1) for the respective
// sources. Its last column
// offsets has shape ((src[0]->NumAxes()+1), (num_srcs + 1)). Its first row
// (axis=0) is 0,1,2,3,..., and for axis > 0, that row is the exclusive-sum of
// the TotSize(axis-1) for the respective sources, defining TotSize(-1) as 1.
// Its last column gives the total size on each axis, of the output.
//
// Note: dimensionally, a value in the N'th row of `offsets` is an `idx...N`
// w.r.t. the output ragged array, meaning if N is 0, an idx0; if N is 1 an idx01, and so
// on.
Array2<int32_t> offsets = GetOffsets(num_srcs, src);
auto offsets_acc = offsets.Accessor();

Expand Down Expand Up @@ -208,8 +234,10 @@ RaggedShape Stack(int32_t axis, int32_t num_srcs, RaggedShape **src) {

std::vector<cudaStream_t> streams(num_axes_in + 1);
int32_t num_jobs = num_srcs * 2;
// task_redirects is a device array (if using GPU).
Array2<TaskRedirect> task_redirects(c, num_axes_in + 1, num_jobs);
auto task_redirects_acc = task_redirects.Accessor();

// populate task_redirects (these allocate blocks of threads roughly
// proportionally to the amount of data to process from this source.
for (int32_t axis = 0; axis <= num_axes_in; axis++) {
Expand All @@ -227,7 +255,6 @@ RaggedShape Stack(int32_t axis, int32_t num_srcs, RaggedShape **src) {
{
// first do the row-splits.
TaskRedirect *tr = &(task_redirects_acc(axis, 0));
cudaStream_t stream = streams[axis];

const int32_t **this_src_row_splits = &(src_row_splits_acc(axis, 0)),
**this_src_row_ids = &(src_row_ids_acc(axis, 0));
Expand All @@ -236,17 +263,92 @@ RaggedShape Stack(int32_t axis, int32_t num_srcs, RaggedShape **src) {
const int32_t *offsets_this_axis = &(offsets_acc(axis, 0)),
*offsets_next_axis = &(offsets_acc(axis + 1, 0)),

{
auto lambda_set_row_splits = [=] __host__ __device__ (
int32_t src_idx, int32_t num_threads, int32_t thread_idx) -> void {
// Reminder of how row_splits work dimensionally: they are a map
// from, e.g. an idx0 to an idx01. An offsets_acc(0,n) is
// dimensionally an idx0; an offsets_acc(1,n) an idx01, and so on.
int32_t this_offset = offsets_this_axis[src_idx],
next_offset = offsets_this_axis[src_idx + 1],
this_value_offset = offsets_next_axis[src_idx],
num_rows = next_offset - this_offset;
int32_t *src_row_splits_ptr = this_src_row_splits[src_idx];
// Using <= instead of < below causes threads for different src_idx to
// write a single overlapping value, but also ensures that the
// terminating value is written. This only works because row_splits
// vectors always start with 0, which is not necessarily the case
// for row-ids.
for (; thread_idx <= num_rows; thread_idx += num_threads) {
this_dest_row_splits[this_offset + thread_idx] =
this_value_offset + src_row_splits_ptr[thread_idx];
}
};

int32_t min_threads_per_job = 2,
tot_work = tot_sizes_out[axis],
target_num_loops = (tot_work > 1000000 ? 4 : 2);
bool include_final_task = false;
EvalWithRedirect(stream[axis], num_jobs,
task_redirects_acc.Row(axis), min_threads_per_job,
tot_work, target_num_loops, include_final_task,
lambda_set_row_splits);
}

{
auto lambda_set_row_ids = [=] __host__ __device__ (
int32_t src_idx, int32_t num_threads, int32_t thread_idx) -> void {
// Reminder of how row_ids work dimensionally: they are a map
// from, e.g. an idx01 to an idx0. An offsets_acc(0,n) is
// dimensionally an idx0; an offsets_acc(1,n) an idx01, and so on.
int32_t this_offset = offsets_next_axis[src_idx],
next_offset = offsets_next_axis[src_idx + 1],
this_value_offset = offsets_this_axis[src_idx],
num_elems = next_offset - this_offset;
int32_t *src_row_ids_ptr = this_src_row_ids[src_idx];
// We need to write the very last value at the end of all the
// arrays; the last job (for src_idx == num_srcs - 1) does this
// by adding 1 to num_srcs. We can't let them all write an
// extra value, because unlike row_splits, row_ids vectors may not
// start with 0 in general; so having 2 threads write that
// value (the 1st of each; one past the last of each) would cause
// indeterminacy.
if (src_idx == num_srcs - 1)
num_elems++;
for (; thread_idx <= num_elems; thread_idx += num_threads) {
this_dest_row_ids[this_offset + thread_idx] =
this_value_offset + src_row_ids_ptr[thread_idx];
}
};
int32_t min_threads_per_job = 2,
tot_work = tot_sizes_out[axis+1],
target_num_loops = (tot_work > 1000000 ? 4 : 2);
bool include_final_task = false;
EvalWithRedirect(stream[axis+1], num_jobs,
task_redirects_acc.Row(axis+1), min_threads_per_job,
tot_work, target_num_loops, include_final_task,
lambda_set_row_ids);
}





auto lambda_set_row_splits = [=] __host__ __device__ (
int32_t src_idx, int32_t num_threads, int32_t thread_idx) -> void {
int32_t this_offset = offsets_this_axis[src_idx],
next_offset = offsets_this_axis[src_idx + 1],
num_rows = next_offset - this_offset,
this_value_offset = offsets_next_axis[src_idx],
num_rows = next_offset - this_offset;
int32_t *src_row_splits_ptr = this_src_row_splits[src_idx];
// Using <= instead of < below causes threads for different src_idx to
// write a single overlapping value, but also ensures that the
// terminating value is written.
for (; thread_idx <= num_rows; thread_idx += num_threads) {
this_dest_row_splits[this_offset + thread_idx] =

this_value_offset + src_row_splits_ptr[thread_idx];
}
};

RowInfo src_row = *src_info, dest_row = *dst_info;
int32_t num_rows = src_info->num_rows;
Expand Down
67 changes: 57 additions & 10 deletions k2/csrc/cuda/ragged.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#define K2_CSRC_CUDA_RAGGED_H_

#include "k2/csrc/cuda/algorithms.h"
#include "k2/csrc/cuda/array.h"

namespace k2 {

Expand Down Expand Up @@ -110,7 +111,6 @@ class RaggedShape {
*/
RaggedShape Index(int32_t axis, int32_t value);


RaggedShape ComposeRaggedShapes(RaggedShape &a, RaggedShape &b);

RaggedShape(std::vector<RaggedShapeDim> &axes): axes_(axes) { }
Expand All @@ -128,14 +128,13 @@ class RaggedShape {
Similar to TF/PyTorch's Stack. The result will have Dim0 == src_size. All
the source RaggedShapes must have the same NumAxes().
@param [in] src_size The number of `RaggedShape`s in `src`
@param [in] src The shapes to be stacked
@param [in] axis The new axis whose dimension will equal src_size.
CAUTION: only axis == 0 and axis == 1 are supported
right now, and for the axis==1 case we have a
requirement that all the src->Dim0() return the
requirement that all the src[i]->Dim0() have the
same value.
@param [in] src_size The number of `RaggedShape`s in `src`
@param [in] src The shapes to be stacked
@return The appended result.
Expand All @@ -148,7 +147,52 @@ class RaggedShape {
and these are just the shapes of arrays..).
See also the version of Stack for class Ragged.
*/
RaggedShape Stack(int32_t axis, int32_t src_size, const RaggedShape **src);
RaggedShape Stack(int32_t src_size, const RaggedShape **src, int32_t axis);

/*
Insert a new axis at position `axis`, with 0 <= axis <= src.NumAxes(), for
which the only allowed index will be 0 (which is another way of saying: all
list sizes on that axis will be 1).
@param [in] src Source shape. Row-ids and Row-splits of answer will
share memory with those in `src` (although it goes
without saying).
@param [in] axis Axis to insert.
Note: you probably shouldn't be using this very often; if you are using this,
you may be thinking in a PyTorch-y way but you should be relying on things
like Eval() with custom lambdas more. Read algorithms like in compose.cc to
understand why. Also: axis==0 is probably the only really useful case.
*/
RaggedShape Unsqueeze(const RaggedShape &src, int32_t axis);

/*
Append a list of RaggedShape to form a single RaggedShape
@param [in] num_srcs Number of source shapes to append
@param [in] src Array of sources to append
@param [in] axis Axis to append them on. Previous axes must
have the same shape! CAUTION: currently
we only support axis == 0.
@return Returns the appended RaggedShape.
*/
RaggedShape Append(int32_t num_srcs, const RaggedShape **src, int32_t axis);


/*
Renumber axis 0 of a ragged shape
@param [in] src Shape to renumber
@param [in] new2old Mapping from new to old numbering of array, of
length src.Dim0(); must contain the numbers
0 through src.Dim0() - 1 in some order.
@return Returns the renumbered shape. Will satisfy:
ret[i,j,k] = src[new2old[i],j,k]. (Note, this is
not actual C++ code, it represents a conceptual
indexing operator).
*/
RaggedShape Renumber(const RaggedShape &src, Array1<int32_t> &new2old);



template <typename T>
struct Ragged {
Expand Down Expand Up @@ -191,15 +235,15 @@ RaggedShape SubsampleRaggedShape(RaggedShape &src,

/*
Stack a list of Ragged arrays to create a Ragged array with one more axis.
Similar to TF/PyTorch's Stack. The result will have Dim0 == src_size. All
Similar to TF/PyTorch's Stack. The result will have Dim0 == num_srcs. All
the source Ragged arrays' shapes must have the same NumAxes().
@param [in] axis The new axis whose dimension will equal src_size.
@param [in] axis The new axis whose dimension will equal num_srcs.
CAUTION: only axis == 0 and axis == 1 are supported right now,
and for the axis==1 case we have a requirement that all the
src->Dim0() return the same value.
@param [in] src_size The number of `RaggedShape`s in `src`
@param [in] num_srcs The number of `RaggedShape`s in `src`
@param [in] src The shapes to be stacked
@return The appended result.
Expand All @@ -210,7 +254,7 @@ RaggedShape SubsampleRaggedShape(RaggedShape &src,
result[i,j,k,l] = (*src[j])[i,k,l]
*/
template <typename T>
Ragged<T> Stack(int32_t axis, int32_t src_size, Ragged<T> **src);
Ragged<T> Stack(int32_t axis, int32_t num_srcs, Ragged<T> **src);

/*
Create a RaggedShape from an array of row-ids. (which maps each element to
Expand Down Expand Up @@ -310,6 +354,9 @@ RaggedShape RaggedShapeFromTotSizes(int32_t num_axes, int32_t *tot_sizes);



// TODO(dan), include guard maybe.
#include "k2/csrc/cuda/ragged_inl.h"

} // namespace k2

#endif // K2_CSRC_CUDA_RAGGED_H_
4 changes: 4 additions & 0 deletions k2/csrc/cuda/ragged_inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@

namespace k2 {





template <typename T>
Ragged<T> Stack(int32_t axis, int32_t src_size, const Ragged<T> *src) {
CHECK_GT(src_size, 0); // can later relax this, maybe
Expand Down
11 changes: 6 additions & 5 deletions k2/csrc/cuda/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -369,9 +369,10 @@ void GetTaskRedirect(ContextPtr &c, int32_t num_tasks,
this after calling GetTaskRedirect().
@param [in] stream Stream to execute this in (or k2_cudaStreamInvalid for CPU).
@param [in] num_tasks The num_tasks provided to GetTaskRedirect().
@param [in] redirect The array written to by GetTaskRedirect(). Must be
of length num_tasks * 2.
@param [in] num_jobs size of the array of tasks; this will be equal to
num_tasks * 2 where `num_tasks` is hte number of
tasks given to GetTaskRedirect().
@param [in] redirect The array written to by GetTaskRedirect().
@param [in] min_threads_per_task This would typically be something like 8, 16 or 32.
It is the smallest allowed num_threads that we allocate
for each task; the number of threads per job is a multiple of
Expand All @@ -394,7 +395,7 @@ void GetTaskRedirect(ContextPtr &c, int32_t num_tasks,
number of `work items` per thread this code aims
for when deiding the threads_per_job.
@param [in] include_final_task If true, the lambda will be called once
with task_idx=num_tasks, num_threads=1, thread_idx=0;
with task_idx=num_tasks=num_jobs/2, num_threads=1, thread_idx=0;
This happens to be useful quite a bit.
@param [in] lambda The lambda expression to run; this is to be run
as, lambda(task_idx, num_threads_this_task, thread_idx), which
Expand All @@ -408,7 +409,7 @@ void GetTaskRedirect(ContextPtr &c, int32_t num_tasks,
to do a 'one-off task' (invoked once in the resulting kernel).
*/
emplate<typename LambdaT, typename lambdaU>
void EvalWithRedirect(cudaStream_t stream, int32_t num_tasks,
void EvalWithRedirect(cudaStream_t stream, int32_t num_jobs,
TaskRedirect *redirect, int32_t min_threads_per_job,
int32_t tot_work, int32_t target_num_loops,
bool include_final_task, LambdaT &lambda) {
Expand Down

0 comments on commit 8c85850

Please sign in to comment.