Skip to content

Commit

Permalink
Merge pull request GoogleCloudPlatform#63 from Khan/improve_sharding_…
Browse files Browse the repository at this point in the history
…of_inequality_filters

Improve sharding of inequality filters
  • Loading branch information
tkaitchuck committed Jun 24, 2015
2 parents 01d93b0 + 3763a97 commit 73d03be
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 4 deletions.
79 changes: 79 additions & 0 deletions python/src/mapreduce/datastore_range_iterators.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

# pylint: disable=g-bad-name

import itertools

from google.appengine.datastore import datastore_query
from google.appengine.datastore import datastore_rpc
from google.appengine.ext import db
Expand Down Expand Up @@ -52,6 +54,19 @@ def create_property_range_iterator(cls,
ns_range,
query_spec)

@classmethod
def create_multi_property_range_iterator(cls,
p_range_iters):
"""Create a RangeIterator.
Args:
p_range_iters: a list of RangeIterator objects to chain together.
Returns:
a RangeIterator.
"""
return _MultiPropertyRangeModelIterator(p_range_iters)

@classmethod
def create_key_ranges_iterator(cls,
k_ranges,
Expand Down Expand Up @@ -209,6 +224,69 @@ def from_json(cls, json):
return obj


class _MultiPropertyRangeModelIterator(RangeIterator):
"""Yields db/ndb model entities within a list of disjoint property ranges."""

def __init__(self, p_range_iters):
"""Init.
Args:
p_range_iters: a list of _PropertyRangeModelIterator objects to chain
together.
"""
self._iters = p_range_iters

def __repr__(self):
return "MultiPropertyRangeIterator combining %s" % str(
[str(it) for it in self._iters])

def __iter__(self):
"""Iterate over entities.
Yields:
db model entities or ndb model entities if the model is defined with ndb.
"""
for model_instance in itertools.chain.from_iterable(self._iters):
yield model_instance

def to_json(self):
"""Inherit doc."""
json = {"name": self.__class__.__name__,
"num_ranges": len(self._iters)}

for i in xrange(len(self._iters)):
json_item = self._iters[i].to_json()
query_spec = json_item["query_spec"]
item_name = json_item["name"]
# Delete and move one level up
del json_item["query_spec"]
del json_item["name"]
json[str(i)] = json_item
# Store once to save space
json["query_spec"] = query_spec
json["item_name"] = item_name

return json

@classmethod
def from_json(cls, json):
"""Inherit doc."""
num_ranges = int(json["num_ranges"])
query_spec = json["query_spec"]
item_name = json["item_name"]

p_range_iters = []
for i in xrange(num_ranges):
json_item = json[str(i)]
# Place query_spec, name back into each iterator
json_item["query_spec"] = query_spec
json_item["name"] = item_name
p_range_iters.append(_PropertyRangeModelIterator.from_json(json_item))

obj = cls(p_range_iters)
return obj


class _KeyRangesIterator(RangeIterator):
"""Create an iterator over a key_ranges.KeyRanges object."""

Expand Down Expand Up @@ -279,6 +357,7 @@ def from_json(cls, json):
# A map from class name to class of all RangeIterators.
_RANGE_ITERATORS = {
_PropertyRangeModelIterator.__name__: _PropertyRangeModelIterator,
_MultiPropertyRangeModelIterator.__name__: _MultiPropertyRangeModelIterator,
_KeyRangesIterator.__name__: _KeyRangesIterator
}

Expand Down
7 changes: 5 additions & 2 deletions python/src/mapreduce/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import sys
import time
import traceback
import zlib

try:
import json
Expand Down Expand Up @@ -1450,7 +1451,8 @@ def _get_input_readers(self, state):
readers = input_reader_class.split_input(split_param)
else:
readers = [input_reader_class.from_json_str(_json) for _json in
json.loads(serialized_input_readers.payload)]
json.loads(zlib.decompress(
serialized_input_readers.payload))]

if not readers:
return None, None
Expand All @@ -1465,7 +1467,8 @@ def _get_input_readers(self, state):
serialized_input_readers = model._HugeTaskPayload(
key_name=serialized_input_readers_key, parent=state)
readers_json_str = [i.to_json_str() for i in readers]
serialized_input_readers.payload = json.dumps(readers_json_str)
serialized_input_readers.payload = zlib.compress(json.dumps(
readers_json_str))
return readers, serialized_input_readers

def _setup_output_writer(self, state):
Expand Down
45 changes: 43 additions & 2 deletions python/src/mapreduce/input_readers.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,9 @@ class AbstractDatastoreInputReader(InputReader):
# Maximum number of shards we'll create.
_MAX_SHARD_COUNT = 256

# Factor for additional ranges to split when using inequality filters.
_OVERSPLIT_FACTOR = 1

# The maximum number of namespaces that will be sharded by datastore key
# before switching to a strategy where sharding is done lexographically by
# namespace.
Expand All @@ -264,6 +267,7 @@ class AbstractDatastoreInputReader(InputReader):
BATCH_SIZE_PARAM = "batch_size"
KEY_RANGE_PARAM = "key_range"
FILTERS_PARAM = "filters"
OVERSPLIT_FACTOR_PARAM = "oversplit_factor"

_KEY_RANGE_ITER_CLS = db_iters.AbstractKeyRangeIterator

Expand Down Expand Up @@ -320,6 +324,8 @@ def _get_query_spec(cls, mapper_spec):
keys_only=bool(params.get(cls.KEYS_ONLY_PARAM, False)),
filters=filters,
batch_size=int(params.get(cls.BATCH_SIZE_PARAM, cls._BATCH_SIZE)),
oversplit_factor=int(params.get(cls.OVERSPLIT_FACTOR_PARAM,
cls._OVERSPLIT_FACTOR)),
model_class_path=entity_kind,
app=app,
ns=ns)
Expand Down Expand Up @@ -508,6 +514,14 @@ def validate(cls, mapper_spec):
raise BadReaderParamsError("Bad batch size: %s" % batch_size)
except ValueError, e:
raise BadReaderParamsError("Bad batch size: %s" % e)
if cls.OVERSPLIT_FACTOR_PARAM in params:
try:
oversplit_factor = int(params[cls.OVERSPLIT_FACTOR_PARAM])
if oversplit_factor < 1:
raise BadReaderParamsError("Bad oversplit factor:"
" %s" % oversplit_factor)
except ValueError, e:
raise BadReaderParamsError("Bad oversplit factor: %s" % e)
try:
bool(params.get(cls.KEYS_ONLY_PARAM, False))
except:
Expand Down Expand Up @@ -690,9 +704,19 @@ def split_input(cls, mapper_spec):
if not property_range.should_shard_by_property_range(query_spec.filters):
return super(DatastoreInputReader, cls).split_input(mapper_spec)

# Artificially increase the number of shards to get a more even split.
# For example, if we are creating 7 shards for one week of data based on a
# Day property and the data points tend to be clumped on certain days (say,
# Monday and Wednesday), instead of assigning each shard a single day of
# the week, we will split each day into "oversplit_factor" pieces, and
# assign each shard "oversplit_factor" pieces with "1 / oversplit_factor"
# the work, so that the data from Monday and Wednesday is more evenly
# spread across all shards.
oversplit_factor = query_spec.oversplit_factor
oversplit_shard_count = oversplit_factor * shard_count
p_range = property_range.PropertyRange(query_spec.filters,
query_spec.model_class_path)
p_ranges = p_range.split(shard_count)
p_ranges = p_range.split(oversplit_shard_count)

# User specified a namespace.
if query_spec.ns is not None:
Expand All @@ -713,7 +737,7 @@ def split_input(cls, mapper_spec):
for _ in p_ranges]
# Lots of namespaces. Split by ns.
else:
ns_ranges = namespace_range.NamespaceRange.split(n=shard_count,
ns_ranges = namespace_range.NamespaceRange.split(n=oversplit_shard_count,
contiguous=False,
can_query=lambda: True,
_app=query_spec.app)
Expand All @@ -724,6 +748,23 @@ def split_input(cls, mapper_spec):
iters = [
db_iters.RangeIteratorFactory.create_property_range_iterator(
p, ns, query_spec) for p, ns in zip(p_ranges, ns_ranges)]

# Reduce the number of ranges back down to the shard count.
# It's possible that we didn't split into enough shards even
# after oversplitting, in which case we don't need to do anything.
if len(iters) > shard_count:
# We cycle through the iterators and chain them together, e.g.
# if we look at the indices chained together, we get:
# Shard #0 gets 0, num_shards, 2 * num_shards, ...
# Shard #1 gets 1, num_shards + 1, 2 * num_shards + 1, ...
# Shard #2 gets 2, num_shards + 2, 2 * num_shards + 2, ...
# and so on. This should split fairly evenly.
iters = [
db_iters.RangeIteratorFactory.create_multi_property_range_iterator(
[iters[i] for i in xrange(start_index, len(iters), shard_count)]
) for start_index in xrange(shard_count)
]

return [cls(i) for i in iters]


Expand Down
6 changes: 6 additions & 0 deletions python/src/mapreduce/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -1182,19 +1182,23 @@ class QuerySpec(object):
"""Encapsulates everything about a query needed by DatastoreInputReader."""

DEFAULT_BATCH_SIZE = 50
DEFAULT_OVERSPLIT_FACTOR = 1

def __init__(self,
entity_kind,
keys_only=None,
filters=None,
batch_size=None,
oversplit_factor=None,
model_class_path=None,
app=None,
ns=None):
self.entity_kind = entity_kind
self.keys_only = keys_only or False
self.filters = filters or None
self.batch_size = batch_size or self.DEFAULT_BATCH_SIZE
self.oversplit_factor = (oversplit_factor or
self.DEFAULT_OVERSPLIT_FACTOR)
self.model_class_path = model_class_path
self.app = app
self.ns = ns
Expand All @@ -1204,6 +1208,7 @@ def to_json(self):
"keys_only": self.keys_only,
"filters": self.filters,
"batch_size": self.batch_size,
"oversplit_factor": self.oversplit_factor,
"model_class_path": self.model_class_path,
"app": self.app,
"ns": self.ns}
Expand All @@ -1214,6 +1219,7 @@ def from_json(cls, json):
json["keys_only"],
json["filters"],
json["batch_size"],
json["oversplit_factor"],
json["model_class_path"],
json["app"],
json["ns"])

0 comments on commit 73d03be

Please sign in to comment.