Skip to content

Commit

Permalink
Dask support for first, last, first_n and last_n reductions (#1214)
Browse files Browse the repository at this point in the history
* New _max_row_index and _min_row_index reductions

* first and last reductions using dask on cpu

* New _max_n_row_index and _min_n_row_index reductions

* first_n and last_n reductions using dask on cpu

* Correct where(first) and similar

* Handle valid/invalid values in append call using where

* Improved tests

* Handle checking NaNs in a separate column

* Better cuda error messages
  • Loading branch information
ianthomas23 authored May 16, 2023
1 parent 8092f4d commit d385061
Show file tree
Hide file tree
Showing 9 changed files with 834 additions and 201 deletions.
78 changes: 55 additions & 23 deletions datashader/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@


@memoize
def compile_components(agg, schema, glyph, *, antialias=False, cuda=False):
def compile_components(agg, schema, glyph, *, antialias=False, cuda=False, partitioned=False):
"""Given an ``Aggregation`` object and a schema, return 5 sub-functions
and information on how to perform the second stage aggregation if
antialiasing is requested,
Expand All @@ -24,6 +24,21 @@ def compile_components(agg, schema, glyph, *, antialias=False, cuda=False):
agg : Aggregation
The expression describing the aggregation(s) to be computed.
schema : DataShape
Columns and dtypes in the source dataset.
glyph : Glyph
The glyph to render.
antialias : bool
Whether to render using antialiasing.
cuda : bool
Whether to render using CUDA (on the GPU) or CPU.
partitioned : bool
Whether the source dataset is partitioned using dask.
Returns
-------
A tuple of the following functions:
Expand Down Expand Up @@ -57,8 +72,8 @@ def compile_components(agg, schema, glyph, *, antialias=False, cuda=False):
reds = list(traverse_aggregation(agg))

# List of base reductions (actually computed)
bases = list(unique(concat(r._build_bases(cuda) for r in reds)))
dshapes = [b.out_dshape(schema, antialias) for b in bases]
bases = list(unique(concat(r._build_bases(cuda, partitioned) for r in reds)))
dshapes = [b.out_dshape(schema, antialias, cuda, partitioned) for b in bases]

# Information on how to perform second stage aggregation of antialiased lines,
# including whether antialiased lines self-intersect or not as we need a single
Expand All @@ -77,20 +92,23 @@ def compile_components(agg, schema, glyph, *, antialias=False, cuda=False):
antialias_stage_2 = False

# List of tuples of (append, base, input columns, temps, combine temps, uses cuda mutex)
calls = [_get_call_tuples(b, d, schema, cuda, antialias, self_intersect)
calls = [_get_call_tuples(b, d, schema, cuda, antialias, self_intersect, partitioned)
for (b, d) in zip(bases, dshapes)]

# List of unique column names needed
cols = list(unique(concat(pluck(2, calls))))
# List of unique column names needed, including nan_check_columns
cols = list(concat(pluck(2, calls)))
nan_check_cols = list(c[3] for c in calls if c[3] is not None)
cols = list(unique(cols + nan_check_cols))

# List of temps needed
temps = list(pluck(3, calls))
combine_temps = list(pluck(4, calls))
temps = list(pluck(4, calls))
combine_temps = list(pluck(5, calls))

create = make_create(bases, dshapes, cuda)
append, uses_cuda_mutex = make_append(bases, cols, calls, glyph, isinstance(agg, by), antialias)
info = make_info(cols, uses_cuda_mutex)
combine = make_combine(bases, dshapes, temps, combine_temps, antialias, cuda)
finalize = make_finalize(bases, agg, schema, cuda)
combine = make_combine(bases, dshapes, temps, combine_temps, antialias, cuda, partitioned)
finalize = make_finalize(bases, agg, schema, cuda, partitioned)

return create, info, append, combine, finalize, antialias_stage_2

Expand All @@ -105,14 +123,15 @@ def traverse_aggregation(agg):
yield agg


def _get_call_tuples(base, dshape, schema, cuda, antialias, self_intersect):
def _get_call_tuples(base, dshape, schema, cuda, antialias, self_intersect, partitioned):
# Comments refer to usage in make_append()
return (
base._build_append(dshape, schema, cuda, antialias, self_intersect), # func
(base,), # bases
base.inputs, # cols
base.inputs, # cols, arrays of these are passed to reduction append functions
base.nan_check_column, # column used to check for NaNs in some where reductions
base._build_temps(cuda), # temps
base._build_combine_temps(cuda), # combine temps
base._build_combine_temps(cuda, partitioned), # combine temps
cuda and base.uses_cuda_mutex(), # uses cuda mutex
)

Expand Down Expand Up @@ -147,7 +166,7 @@ def info(df, canvas_shape):
def make_append(bases, cols, calls, glyph, categorical, antialias):
names = ('_{0}'.format(i) for i in count())
inputs = list(bases) + list(cols)
any_uses_cuda_mutex = any(call[5] for call in calls)
any_uses_cuda_mutex = any(call[6] for call in calls)
if any_uses_cuda_mutex:
# This adds an argument to the append() function that is the cuda mutex
# generated in make_info.
Expand All @@ -163,7 +182,7 @@ def make_append(bases, cols, calls, glyph, categorical, antialias):
else:
subscript = None

for func, bases, cols, temps, _, uses_cuda_mutex in calls:
for func, bases, cols, nan_check_column, temps, _, uses_cuda_mutex in calls:
local_lk.update(zip(temps, (next(names) for i in temps)))
func_name = next(names)
namespace[func_name] = func
Expand Down Expand Up @@ -194,10 +213,23 @@ def make_append(bases, cols, calls, glyph, categorical, antialias):
# where reduction needs access to the return of the contained
# reduction, which is the preceding one here.
body[-1] = f'{update_index_arg_name} = {body[-1]}'
body.append(f'if {update_index_arg_name} >= 0:')
call = ' {0}(x, y, {1})'.format(func_name, ', '.join(args))

# If nan_check_column is defined then need to check if value of
# correct row in that column is NaN and if so do nothing. This
# check needs to occur before the where.selector is called.
if nan_check_column is None:
whitespace = ''
else:
var = f"{arg_lk[nan_check_column]}[{subscript}]"
prev_body = body[-1]
body[-1] = f'if {var}<=0 or {var}>0:' # Inline CUDA-friendly 'is not nan' test
body.append(f' {prev_body}')
whitespace = ' '

body.append(f'{whitespace}if {update_index_arg_name} >= 0:')
call = f' {whitespace}{func_name}(x, y, {", ".join(args)})'
else:
call = '{0}(x, y, {1})'.format(func_name, ', '.join(args))
call = f'{func_name}(x, y, {", ".join(args)})'

body.append(call)

Expand Down Expand Up @@ -225,14 +257,14 @@ def make_append(bases, cols, calls, glyph, categorical, antialias):
return ngjit(namespace['append']), any_uses_cuda_mutex


def make_combine(bases, dshapes, temps, combine_temps, antialias, cuda):
def make_combine(bases, dshapes, temps, combine_temps, antialias, cuda, partitioned):
arg_lk = dict((k, v) for (v, k) in enumerate(bases))

# where._combine() deals with combine of preceding reduction so exclude
# it from explicit combine calls.
base_is_where = [isinstance(b, where) for b in bases]
next_base_is_where = base_is_where[1:] + [False]
calls = [(None if n else b._build_combine(d, antialias, cuda), [arg_lk[i] for i in (b,) + t + ct])
calls = [(None if n else b._build_combine(d, antialias, cuda, partitioned), [arg_lk[i] for i in (b,) + t + ct])
for (b, d, t, ct, n) in zip(bases, dshapes, temps, combine_temps, next_base_is_where)]

def combine(base_tuples):
Expand All @@ -253,15 +285,15 @@ def combine(base_tuples):
return combine


def make_finalize(bases, agg, schema, cuda):
def make_finalize(bases, agg, schema, cuda, partitioned):
arg_lk = dict((k, v) for (v, k) in enumerate(bases))
if isinstance(agg, summary):
calls = []
for key, val in zip(agg.keys, agg.values):
f = make_finalize(bases, val, schema, cuda)
f = make_finalize(bases, val, schema, cuda, partitioned)
try:
# Override bases if possible
bases = val._build_bases(cuda)
bases = val._build_bases(cuda, partitioned)
except AttributeError:
pass
inds = [arg_lk[b] for b in bases]
Expand Down
12 changes: 7 additions & 5 deletions datashader/data_libraries/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,16 @@ def default(glyph, df, schema, canvas, summary, *, antialias=False, cuda=False):
shape, bounds, st, axis = shape_bounds_st_and_axis(df, canvas, glyph)

# Compile functions
create, info, append, combine, finalize, antialias_stage_2 = \
compile_components(summary, schema, glyph, antialias=antialias, cuda=cuda)
partitioned = isinstance(df, dd.DataFrame) and df.npartitions > 1
create, info, append, combine, finalize, antialias_stage_2 = compile_components(
summary, schema, glyph, antialias=antialias, cuda=cuda, partitioned=partitioned)
x_mapper = canvas.x_axis.mapper
y_mapper = canvas.y_axis.mapper
extend = glyph._build_extend(x_mapper, y_mapper, info, append, antialias_stage_2)
x_range = bounds[:2]
y_range = bounds[2:]

if summary.uses_row_index() and isinstance(df, dd.DataFrame) and df.npartitions > 1:
if summary.uses_row_index(cuda, partitioned):
def func(partition: pd.DataFrame, cumulative_lens, partition_info=None):
# This function is called once for each dask dataframe partition.
# It sets the _datashader_row_offset attribute so that row indexes
Expand Down Expand Up @@ -207,8 +208,9 @@ def line(glyph, df, schema, canvas, summary, *, antialias=False, cuda=False):
shape, bounds, st, axis = shape_bounds_st_and_axis(df, canvas, glyph)

# Compile functions
create, info, append, combine, finalize, antialias_stage_2 = \
compile_components(summary, schema, glyph, antialias=antialias, cuda=cuda)
partitioned = isinstance(df, dd.DataFrame) and df.npartitions > 1
create, info, append, combine, finalize, antialias_stage_2 = compile_components(
summary, schema, glyph, antialias=antialias, cuda=cuda, partitioned=partitioned)
x_mapper = canvas.x_axis.mapper
y_mapper = canvas.y_axis.mapper
extend = glyph._build_extend(x_mapper, y_mapper, info, append, antialias_stage_2)
Expand Down
12 changes: 6 additions & 6 deletions datashader/data_libraries/dask_xarray.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ def dask_rectilinear(glyph, xr_ds, schema, canvas, summary, *, antialias=False,
shape, bounds, st, axis = shape_bounds_st_and_axis(xr_ds, canvas, glyph)

# Compile functions
create, info, append, combine, finalize, antialias_stage_2 = \
compile_components(summary, schema, glyph, antialias=antialias, cuda=cuda)
create, info, append, combine, finalize, antialias_stage_2 = compile_components(
summary, schema, glyph, antialias=antialias, cuda=cuda, partitioned=True)
x_mapper = canvas.x_axis.mapper
y_mapper = canvas.y_axis.mapper
extend = glyph._build_extend(x_mapper, y_mapper, info, append, antialias_stage_2)
Expand Down Expand Up @@ -141,8 +141,8 @@ def dask_raster(glyph, xr_ds, schema, canvas, summary, *, antialias=False, cuda=
shape, bounds, st, axis = shape_bounds_st_and_axis(xr_ds, canvas, glyph)

# Compile functions
create, info, append, combine, finalize, antialias_stage_2 = \
compile_components(summary, schema, glyph, antialias=antialias, cuda=cuda)
create, info, append, combine, finalize, antialias_stage_2 = compile_components(
summary, schema, glyph, antialias=antialias, cuda=cuda, partitioned=True)
x_mapper = canvas.x_axis.mapper
y_mapper = canvas.y_axis.mapper
extend = glyph._build_extend(x_mapper, y_mapper, info, append, antialias_stage_2)
Expand Down Expand Up @@ -235,8 +235,8 @@ def dask_curvilinear(glyph, xr_ds, schema, canvas, summary, *, antialias=False,
shape, bounds, st, axis = shape_bounds_st_and_axis(xr_ds, canvas, glyph)

# Compile functions
create, info, append, combine, finalize, antialias_stage_2 = \
compile_components(summary, schema, glyph, antialias=antialias, cuda=cuda)
create, info, append, combine, finalize, antialias_stage_2 = compile_components(
summary, schema, glyph, antialias=antialias, cuda=cuda, partitioned=True)
x_mapper = canvas.x_axis.mapper
y_mapper = canvas.y_axis.mapper
extend = glyph._build_extend(x_mapper, y_mapper, info, append, antialias_stage_2)
Expand Down
2 changes: 1 addition & 1 deletion datashader/data_libraries/pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def pandas_pipeline(df, schema, canvas, glyph, summary, *, antialias=False):
@glyph_dispatch.register(_AreaToLineLike)
def default(glyph, source, schema, canvas, summary, *, antialias=False, cuda=False):
create, info, append, _, finalize, antialias_stage_2 = compile_components(
summary, schema, glyph, antialias=antialias, cuda=cuda)
summary, schema, glyph, antialias=antialias, cuda=cuda, partitioned=False)
x_mapper = canvas.x_axis.mapper
y_mapper = canvas.y_axis.mapper
extend = glyph._build_extend(x_mapper, y_mapper, info, append, antialias_stage_2)
Expand Down
Loading

0 comments on commit d385061

Please sign in to comment.