Skip to content

Commit

Permalink
Only check dask.DataFrame dtypes of columns actually used (#1236)
Browse files Browse the repository at this point in the history
  • Loading branch information
ianthomas23 authored Jun 19, 2023
1 parent 9f5b411 commit 6dce648
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 22 deletions.
34 changes: 20 additions & 14 deletions datashader/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import numpy as np
import xarray as xr

from .reductions import by, category_codes, summary, where
from .reductions import SpecialColumn, by, category_codes, summary, where
from .utils import isnull, ngjit

try:
Expand Down Expand Up @@ -46,33 +46,37 @@ def compile_components(agg, schema, glyph, *, antialias=False, cuda=False, parti
Returns
-------
A tuple of the following functions:
A tuple of the following:
``create(shape)``
Takes the aggregate shape, and returns a tuple of initialized numpy
arrays.
Function that takes the aggregate shape, and returns a tuple of
initialized numpy arrays.
``info(df, canvas_shape)``
Takes a dataframe, and returns preprocessed 1D numpy arrays of the
needed columns.
Function that takes a dataframe, and returns preprocessed 1D numpy
arrays of the needed columns.
``append(i, x, y, *aggs_and_cols)``
Appends the ``i``th row of the table to the ``(x, y)`` bin, given the
base arrays and columns in ``aggs_and_cols``. This does the bulk of the
work.
Function that appends the ``i``th row of the table to the ``(x, y)``
bin, given the base arrays and columns in ``aggs_and_cols``. This does
the bulk of the work.
``combine(base_tuples)``
Combine a list of base tuples into a single base tuple. This forms the
reducing step in a reduction tree.
Function that combines a list of base tuples into a single base tuple.
This forms the reducing step in a reduction tree.
``finalize(aggs, cuda)``
Given a tuple of base numpy arrays, returns the finalized ``DataArray``
or ``Dataset``.
Function that is given a tuple of base numpy arrays and returns the
finalized ``DataArray`` or ``Dataset``.
``antialias_stage_2``
If using antialiased lines this is a tuple of the ``AntialiasCombination``
values corresponding to the aggs. If not using antialiased lines then
this is False.
``column_names``
Names of DataFrame columns or DataArray variables that are used by the
agg.
"""
reds = list(traverse_aggregation(agg))

Expand Down Expand Up @@ -115,7 +119,9 @@ def compile_components(agg, schema, glyph, *, antialias=False, cuda=False, parti
combine = make_combine(bases, dshapes, temps, combine_temps, antialias, cuda, partitioned)
finalize = make_finalize(bases, agg, schema, cuda, partitioned)

return create, info, append, combine, finalize, antialias_stage_2
column_names = [c.column for c in cols if c.column != SpecialColumn.RowIndex]

return create, info, append, combine, finalize, antialias_stage_2, column_names


def traverse_aggregation(agg):
Expand Down
9 changes: 5 additions & 4 deletions datashader/data_libraries/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def default(glyph, df, schema, canvas, summary, *, antialias=False, cuda=False):

# Compile functions
partitioned = isinstance(df, dd.DataFrame) and df.npartitions > 1
create, info, append, combine, finalize, antialias_stage_2 = compile_components(
create, info, append, combine, finalize, antialias_stage_2, column_names = compile_components(
summary, schema, glyph, antialias=antialias, cuda=cuda, partitioned=partitioned)
x_mapper = canvas.x_axis.mapper
y_mapper = canvas.y_axis.mapper
Expand Down Expand Up @@ -102,9 +102,10 @@ def func(partition: pd.DataFrame, cumulative_lens, partition_info=None):
graph = df.__dask_graph__()

# Guess a reasonable output dtype from combination of dataframe dtypes
# Only consider columns used, not all columns in dataframe (issue #1235)
dtypes = []

for dt in df.dtypes:
for dt in df.dtypes[column_names]:
if isinstance(dt, pd.CategoricalDtype):
continue
elif isinstance(dt, pd.api.extensions.ExtensionDtype):
Expand All @@ -119,7 +120,7 @@ def func(partition: pd.DataFrame, cumulative_lens, partition_info=None):
else:
dtypes.append(dt)

dtype = np.result_type(*dtypes)
dtype = np.result_type(*dtypes) if dtypes else np.float64
# Create a meta object so that dask.array doesn't try to look
# too closely at the type of the chunks it's wrapping
# they're actually dataframes, tell dask they're ndarrays
Expand Down Expand Up @@ -209,7 +210,7 @@ def line(glyph, df, schema, canvas, summary, *, antialias=False, cuda=False):

# Compile functions
partitioned = isinstance(df, dd.DataFrame) and df.npartitions > 1
create, info, append, combine, finalize, antialias_stage_2 = compile_components(
create, info, append, combine, finalize, antialias_stage_2, _ = compile_components(
summary, schema, glyph, antialias=antialias, cuda=cuda, partitioned=partitioned)
x_mapper = canvas.x_axis.mapper
y_mapper = canvas.y_axis.mapper
Expand Down
6 changes: 3 additions & 3 deletions datashader/data_libraries/dask_xarray.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def dask_rectilinear(glyph, xr_ds, schema, canvas, summary, *, antialias=False,
shape, bounds, st, axis = shape_bounds_st_and_axis(xr_ds, canvas, glyph)

# Compile functions
create, info, append, combine, finalize, antialias_stage_2 = compile_components(
create, info, append, combine, finalize, antialias_stage_2, _ = compile_components(
summary, schema, glyph, antialias=antialias, cuda=cuda, partitioned=True)
x_mapper = canvas.x_axis.mapper
y_mapper = canvas.y_axis.mapper
Expand Down Expand Up @@ -141,7 +141,7 @@ def dask_raster(glyph, xr_ds, schema, canvas, summary, *, antialias=False, cuda=
shape, bounds, st, axis = shape_bounds_st_and_axis(xr_ds, canvas, glyph)

# Compile functions
create, info, append, combine, finalize, antialias_stage_2 = compile_components(
create, info, append, combine, finalize, antialias_stage_2, _ = compile_components(
summary, schema, glyph, antialias=antialias, cuda=cuda, partitioned=True)
x_mapper = canvas.x_axis.mapper
y_mapper = canvas.y_axis.mapper
Expand Down Expand Up @@ -235,7 +235,7 @@ def dask_curvilinear(glyph, xr_ds, schema, canvas, summary, *, antialias=False,
shape, bounds, st, axis = shape_bounds_st_and_axis(xr_ds, canvas, glyph)

# Compile functions
create, info, append, combine, finalize, antialias_stage_2 = compile_components(
create, info, append, combine, finalize, antialias_stage_2, _ = compile_components(
summary, schema, glyph, antialias=antialias, cuda=cuda, partitioned=True)
x_mapper = canvas.x_axis.mapper
y_mapper = canvas.y_axis.mapper
Expand Down
2 changes: 1 addition & 1 deletion datashader/data_libraries/pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def pandas_pipeline(df, schema, canvas, glyph, summary, *, antialias=False):
@glyph_dispatch.register(_GeometryLike)
@glyph_dispatch.register(_AreaToLineLike)
def default(glyph, source, schema, canvas, summary, *, antialias=False, cuda=False):
create, info, append, _, finalize, antialias_stage_2 = compile_components(
create, info, append, _, finalize, antialias_stage_2, _ = compile_components(
summary, schema, glyph, antialias=antialias, cuda=cuda, partitioned=False)
x_mapper = canvas.x_axis.mapper
y_mapper = canvas.y_axis.mapper
Expand Down
10 changes: 10 additions & 0 deletions datashader/tests/test_dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -2118,3 +2118,13 @@ def test_canvas_size():
for cvs in cvs_list:
with pytest.raises(ValueError, match=msg):
cvs.points(ddf, "x", "y", ds.mean("z"))


@pytest.mark.parametrize('ddf', ddfs)
@pytest.mark.parametrize('npartitions', [1, 2, 3])
def test_dataframe_dtypes(ddf, npartitions):
# Issue #1235.
ddf['dates'] = pd.Series(['2007-07-13']*20, dtype='datetime64[ns]')
ddf = ddf.repartition(npartitions)
assert ddf.npartitions == npartitions
ds.Canvas(2, 2).points(ddf, 'x', 'y', ds.count())

0 comments on commit 6dce648

Please sign in to comment.