From ffed86444e9b586d9e01d368b2e4e0daa9951be6 Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Mon, 23 Sep 2013 13:01:14 -0700 Subject: [PATCH 01/13] Fix building warnings --- .../scala/com/twitter/summingbird/store/ClientStoreLaws.scala | 2 +- .../com/twitter/summingbird/storm/spout/FixedTupleSpout.java | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/summingbird-client/src/test/scala/com/twitter/summingbird/store/ClientStoreLaws.scala b/summingbird-client/src/test/scala/com/twitter/summingbird/store/ClientStoreLaws.scala index fe60b1933..0fa077151 100644 --- a/summingbird-client/src/test/scala/com/twitter/summingbird/store/ClientStoreLaws.scala +++ b/summingbird-client/src/test/scala/com/twitter/summingbird/store/ClientStoreLaws.scala @@ -50,7 +50,7 @@ class ClientStoreLaws extends Specification { val retMap = clientStore.multiGet(keys) def assertPresent[T](f: Future[T], comparison: T) { - assert(f.isReturn && f.get == comparison) + assert(f.isReturn && Await.result(f) == comparison) } "ClientStore should return a map from multiGet of the same size as the input request" in { diff --git a/summingbird-storm/src/main/java/com/twitter/summingbird/storm/spout/FixedTupleSpout.java b/summingbird-storm/src/main/java/com/twitter/summingbird/storm/spout/FixedTupleSpout.java index a04c1db00..710f36d0e 100644 --- a/summingbird-storm/src/main/java/com/twitter/summingbird/storm/spout/FixedTupleSpout.java +++ b/summingbird-storm/src/main/java/com/twitter/summingbird/storm/spout/FixedTupleSpout.java @@ -55,6 +55,7 @@ public static void clear(String stormId) { private List _tuples; private SpoutOutputCollector _collector; + @SuppressWarnings("unused") private TopologyContext _context; private List _serveTuples; private Map _pending; From 81dccc9ddb97d3270effe200b2404561d68316dc Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Mon, 23 Sep 2013 13:01:34 -0700 Subject: [PATCH 02/13] Use new storm planner for building storm topologies --- .../summingbird/storm/StormPlatform.scala | 360 +++++------------- 1 file changed, 98 insertions(+), 262 deletions(-) diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala index f95f06247..2536273c4 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala @@ -77,7 +77,7 @@ object Storm { * operations that can potentially be optimized. */ sealed trait FMItem -case class OptionMap[T, U](op: T => Option[U]) extends FMItem +case class OptionMap[T, U]() extends FMItem case class FactoryCell(factory: StoreFactory[_, _]) extends FMItem case class FlatMap(op: FlatMapOperation[_, _]) extends FMItem @@ -86,6 +86,7 @@ object FMItem { FlatMap(FlatMapOperation.write(sinkSupplier)) } + abstract class Storm(options: Map[String, Options], updateConf: Config => Config) extends Platform[Storm] { type Source[+T] = Spout[(Long, T)] type Store[-K, V] = StormStore[K, V] @@ -94,280 +95,120 @@ abstract class Storm(options: Map[String, Options], updateConf: Config => Config type Plan[T] = StormTopology private type Prod[T] = Producer[Storm, T] - private type JamfMap = Map[Prod[_], List[String]] - - val END_SUFFIX = "end" - val FM_CONSTANT = "flatMap-" - - /** - * Returns true if this producer's suffix indicates that it's the - * final flatMapper before the sumByKey call, false otherwise. - */ - def isFinalFlatMap(suffix: String) = !suffix.contains(FM_CONSTANT) private def getOrElse[T: Manifest](idOpt: Option[String], default: T): T = (for { id <- idOpt stormOpts <- options.get(id) option <- stormOpts.get[T] - } yield option).getOrElse(default) - - def buildTopology[T]( - topoBuilder: TopologyBuilder, - outerProducer: Prod[T], - forkedNodes: Set[Prod[_]], - jamfs: JamfMap, - toSchedule: List[FMItem], - path: List[Prod[_]], - suffix: String, - id: Option[String]) - (implicit config: Config): (List[String], JamfMap) = { - - /** - * Helper method for recursively calling this same function with - * all of its arguments set, by default, to the current call's - * supplied parameters. (Note that this recursion will loop - * infinitely if called directly with no changed parameters.) - */ - def recurse[U]( - producer: Prod[U], - topoBuilder: TopologyBuilder = topoBuilder, - outerProducer: Prod[T] = outerProducer, - jamfs: JamfMap = jamfs, - toSchedule: List[FMItem] = toSchedule, - path: List[Prod[_]] = path, - suffix: String = suffix, - id: Option[String] = id) = - buildTopology( - topoBuilder, producer, forkedNodes, jamfs, - toSchedule, outerProducer :: path, suffix, id - ) - - /** - * TODO: This internal check is duplicating the internal isEmpty - * check in scheduleFlatMapper. The idea is that we should only - * tag a flatMap- prefix on if there are operations to - * schedule. The path is calculated in a separate spot from the - * actual scheduling. - * - * Remove the duplication and do everything in one spot by - * changing the return type of scheduleFlatMapper. - */ - def suffixOf(xs: List[_], suffix: String): String = - if (xs.isEmpty) suffix else FM_CONSTANT + suffix - - def flatMap(parents: List[String], ops: List[FMItem]) = - scheduleFlatMapper(topoBuilder, parents, path, suffix, id, ops) + } yield option).getOrElse(default) + private def scheduleFlatMapper(stormDag: StormDag, node: StormNode, topologyName: Option[String])(implicit topologyBuilder: TopologyBuilder) = { /** - * This method is called by any nodes that contain Storm - * FlatMapOperations. If the calling node is a "forked node" (ie, - * has multiple consumers), we can't do any optimization across - * the boundary, so we schedule a flatMap operation and push the - * contained "op" down into the recursion. If it's not a forked - * node, we can continue to optimize the graph by pushing the - * current operation onto the toSchedule stack. - */ - def perhapsSchedule[A](parent: Prod[A], op: FMItem) = { - val newOps = op :: toSchedule - if (forkedNodes.contains(parent)) { - val (s, m) = recurse( - parent, - toSchedule = List.empty, - suffix = "fork-" + suffixOf(newOps, suffix) - ) - (flatMap(s, newOps), m) - } else recurse(parent, toSchedule = newOps) - } - - jamfs.get(outerProducer) match { - case Some(s) => (s, jamfs) - case None => - val (strings, m): (List[String], JamfMap) = outerProducer match { - case Summer(producer, _, _) => - assert(path.isEmpty, "Only a single Summer is supported at this time.") - recurse(producer) - - case IdentityKeyedProducer(producer) => recurse(producer) - - case NamedProducer(producer, newId) => recurse(producer, id = Some(newId)) - - case Source(spout) => - // The current planner requires a layer of flatMapBolts, even - // if calling sumByKey directly on a source. - val (optionMaps, remaining) = toSchedule.span { - case OptionMap(_) => true - case _ => false - } - - val operations = - if (remaining.isEmpty) - List(FlatMap(FlatMapOperation.identity)) - else remaining - - val spoutName = "spout-" + suffixOf(operations, suffix) - - val stormSpout = optionMaps.foldLeft(spout.asInstanceOf[Spout[(Long, Any)]]) { - case (spout, OptionMap(op)) => - spout.flatMap { case (time, t) => - op.asInstanceOf[Any => Option[_]].apply(t) - .map { x => (time, x) } } - case _ => sys.error("not possible, given the above call to span.") - }.getSpout - val parallelism = getOrElse(id, DEFAULT_SPOUT_PARALLELISM).parHint - topoBuilder.setSpout(spoutName, stormSpout, parallelism) - val parents = List(spoutName) - - // Attach a FlatMapBolt after this source. - (flatMap(parents, operations), jamfs) - - case OptionMappedProducer(producer, op) => - perhapsSchedule(producer, OptionMap(op)) - - case FlatMappedProducer(producer, op) => - perhapsSchedule(producer, FlatMap(FlatMapOperation(op))) - - case WrittenProducer(producer, sinkSupplier) => - perhapsSchedule(producer, FMItem.sink(sinkSupplier)) - - case LeftJoinedProducer(producer, StoreWrapper(newService)) => - perhapsSchedule(producer, FactoryCell(newService)) - - case MergedProducer(l, r) => - val leftSuffix = "L-" + suffixOf(toSchedule, suffix) - val rightSuffix = "R-" + suffixOf(toSchedule, suffix) - val (leftNodes, leftM) = - recurse(l, toSchedule = List.empty, suffix = leftSuffix) - val (rightNodes, rightM) = - recurse(r, toSchedule = List.empty, suffix = rightSuffix, jamfs = leftM) - val parents = leftNodes ++ rightNodes - (flatMap(parents, toSchedule), rightM) - - /** - * Very similar to Merge, just ignore the left. - * TODO: - * https://github.com/twitter/summingbird/issues/241 - * if no one consumes the output of a node on storm, - * we should not emit. - */ - case AlsoProducer(l, r) => - val leftSuffix = "L-" + suffixOf(toSchedule, suffix) - val rightSuffix = "R-" + suffixOf(toSchedule, suffix) - val (_, leftM) = - recurse(l, toSchedule = List.empty, suffix = leftSuffix) - val (rightNodes, rightM) = - recurse(r, toSchedule = List.empty, suffix = rightSuffix, jamfs = leftM) - (flatMap(rightNodes, toSchedule), rightM) - } - (strings, m + (outerProducer -> strings)) - } - } - - /** * Only exists because of the crazy casts we needed. */ - private def serviceOperation[K, V, W](store: StoreFactory[_, _]) = + def serviceOperation[K, V, W](store: StoreFactory[_, _]) = FlatMapOperation.combine( FlatMapOperation.identity[(K, V)], store.asInstanceOf[StoreFactory[K, W]] ) - - private def foldOperations(head: FMItem, tail: List[FMItem]) = { - val operation = head match { - case OptionMap(op) => FlatMapOperation(op.andThen(_.iterator).asInstanceOf[Any => TraversableOnce[Any]]) - case FactoryCell(store) => serviceOperation(store) - case FlatMap(op) => op - } - tail.foldLeft(operation.asInstanceOf[FlatMapOperation[Any, Any]]) { - case (acc, FactoryCell(store)) => FlatMapOperation.combine( - acc.asInstanceOf[FlatMapOperation[Any, (Any, Any)]], - store.asInstanceOf[StoreFactory[Any, Any]] - ).asInstanceOf[FlatMapOperation[Any, Any]] - case (acc, OptionMap(op)) => acc.andThen(FlatMapOperation[Any, Any](op.andThen(_.iterator).asInstanceOf[Any => TraversableOnce[Any]])) - case (acc, FlatMap(op)) => acc.andThen(op.asInstanceOf[FlatMapOperation[Any, Any]]) - } - } - - // TODO https://github.com/twitter/summingbird/issues/84: This - // function is returning the Node ID; replace string programming - // with a world where the "id" is actually the path to that node - // from the root. - private def scheduleFlatMapper( - topoBuilder: TopologyBuilder, - parents: List[String], - path: List[Prod[_]], - suffix: String, - id: Option[String], - toSchedule: List[FMItem]) - : List[String] = { - toSchedule match { - case Nil => parents - case head :: tail => - val operation = foldOperations(head, tail) - val metrics = getOrElse(id, DEFAULT_FM_STORM_METRICS) - val anchorTuples = getOrElse(id, AnchorTuples.default) - - val bolt = if (isFinalFlatMap(suffix)) { - val summer = Producer.retrieveSummer(path) - .getOrElse(sys.error("A Summer is required.")) - new FinalFlatMapBolt( - operation.asInstanceOf[FlatMapOperation[Any, (Any, Any)]], - getOrElse(id, DEFAULT_FM_CACHE), - getOrElse(id, DEFAULT_FM_STORM_METRICS), - anchorTuples - )(summer.monoid.asInstanceOf[Monoid[Any]], summer.store.batcher) + def producerToHeadOperation(producer: Producer[Storm, _]): FlatMapOperation[Any, Any] = { + producer match { + case OptionMappedProducer(_, op) => FlatMapOperation(op.andThen(_.iterator)) + case FlatMappedProducer(_, op) => FlatMapOperation(op) + case WrittenProducer(_, sinkSupplier) => FlatMapOperation.write(sinkSupplier.asInstanceOf[() => (Any => Future[Unit])]) + case LeftJoinedProducer(_, StoreWrapper(newService)) => serviceOperation(newService.asInstanceOf[StoreFactory[Any, Any]]).asInstanceOf[FlatMapOperation[Any, Any]] + case _ => throw new Exception("Invalid producer: " + producer) + } + } + + def foldOperations(producers: List[Producer[Storm, _]]):FlatMapOperation[Any, Any] = { + val initialOperation = producerToHeadOperation(producers.head) + producers.tail.foldLeft(initialOperation.asInstanceOf[FlatMapOperation[Any, Any]]) { + + case (acc, LeftJoinedProducer(_, StoreWrapper(newService))) => + FlatMapOperation.combine( + acc.asInstanceOf[FlatMapOperation[Any, (Any, Any)]], + newService.asInstanceOf[StoreFactory[Any, Any]] + ).asInstanceOf[FlatMapOperation[Any, Any]] + case (acc, OptionMappedProducer(_, op)) => acc.andThen(FlatMapOperation[Any, Any](op.andThen(_.iterator).asInstanceOf[Any => TraversableOnce[Any]])) + case (acc, FlatMappedProducer(_, op)) => acc.andThen(FlatMapOperation(op).asInstanceOf[FlatMapOperation[Any, Any]]) + case (acc, WrittenProducer(_, sinkSupplier)) => acc.andThen(FlatMapOperation.write(sinkSupplier.asInstanceOf[() => (Any => Future[Unit])])) + } + } + val nodeName = stormDag.getNodeName(node) + val operation = foldOperations(node.members) + val metrics = getOrElse(topologyName, DEFAULT_FM_STORM_METRICS) + val anchorTuples = getOrElse(topologyName, AnchorTuples.default) + + val bolt = node match { + case _: FinalFlatMapStormBolt => + val summer = stormDag.dependantsOf(node).head.members.collect{case s: Summer[Storm, _, _] => s}.head.asInstanceOf[Summer[Storm, _ , _]] + new FinalFlatMapBolt( + operation.asInstanceOf[FlatMapOperation[Any, (Any, Any)]], + getOrElse(topologyName, DEFAULT_FM_CACHE), + getOrElse(topologyName, DEFAULT_FM_STORM_METRICS), + anchorTuples + )(summer.monoid.asInstanceOf[Monoid[Any]], summer.store.batcher) + case _: IntermediateFlatMapStormBolt => + new IntermediateFlatMapBolt(operation, metrics, anchorTuples) + case _ => throw new Exception("Non-flatmap node passed to the flatmap function") } - else - new IntermediateFlatMapBolt(operation, metrics, anchorTuples) - val parallelism = getOrElse(id, DEFAULT_FM_PARALLELISM) - val boltName = FM_CONSTANT + suffix - val declarer = topoBuilder.setBolt(boltName, bolt, parallelism.parHint) + - parents.foreach { declarer.shuffleGrouping(_) } - List(boltName) - } + val parallelism = getOrElse(topologyName, DEFAULT_FM_PARALLELISM) + val declarer = topologyBuilder.setBolt(nodeName, bolt, parallelism.parHint) + + val dependsOnNames = stormDag.dependsOn(node).collect{case x:StormNode => stormDag.getNodeName(x)} + dependsOnNames.foreach { declarer.shuffleGrouping(_) } + } + + private def scheduleSpout[K](stormDag: StormDag, node: StormSpout, topologyName: Option[String])(implicit topologyBuilder: TopologyBuilder) = { + val spout = node.members.collect{case Source(s) => s}.head + val nodeName = stormDag.getNodeName(node) + def opFM(op: Any => Option[Any], spout: Spout[(Long, Any)]): Spout[(Long, Any)] = spout.flatMap { case (time, t) => op.apply(t).map {x => (time, x)}} + val stormSpout = node.members.foldLeft(spout.asInstanceOf[Spout[(Long, Any)]]) { + case (spout, Source(_)) => spout // The source is still in the members list so drop it + case (spout, OptionMappedProducer(_, op)) => opFM(op, spout) + case _ => sys.error("not possible, given the above call to span.") + }.getSpout + + val parallelism = getOrElse(topologyName, DEFAULT_SPOUT_PARALLELISM).parHint + topologyBuilder.setSpout(nodeName, stormSpout, parallelism) } - private def populate[K, V]( - topologyBuilder: TopologyBuilder, - summer: Summer[Storm, K, V], - name: Option[String])(implicit config: Config) = { + + private def scheduleSinkBolt[K,V] (stormDag: StormDag, node: StormNode, topologyName: Option[String])(implicit topologyBuilder: TopologyBuilder) = { + val summer: Summer[Storm, K, V] = node.members.collect{case c: Summer[Storm, K, V] => c}.head implicit val monoid = summer.monoid - - val dep = Dependants(summer) - val fanOutSet = - Producer.transitiveDependenciesOf(summer) - .filter(dep.fanOut(_).exists(_ > 1)).toSet - - val (parents, _) = buildTopology( - topologyBuilder, summer, fanOutSet, Map.empty, - List.empty, List.empty, - END_SUFFIX, name) + val nodeName = stormDag.getNodeName(node) + val supplier = summer.store match { case MergeableStoreSupplier(contained, _) => contained } val sinkBolt = new SinkBolt[K, V]( supplier, - getOrElse(name, DEFAULT_ONLINE_SUCCESS_HANDLER), - getOrElse(name, DEFAULT_ONLINE_EXCEPTION_HANDLER), - getOrElse(name, DEFAULT_SINK_CACHE), - getOrElse(name, DEFAULT_SINK_STORM_METRICS), - getOrElse(name, DEFAULT_MAX_WAITING_FUTURES), - getOrElse(name, IncludeSuccessHandler.default) + getOrElse(topologyName: Option[String], DEFAULT_ONLINE_SUCCESS_HANDLER), + getOrElse(topologyName: Option[String], DEFAULT_ONLINE_EXCEPTION_HANDLER), + getOrElse(topologyName: Option[String], DEFAULT_SINK_CACHE), + getOrElse(topologyName: Option[String], DEFAULT_SINK_STORM_METRICS), + getOrElse(topologyName: Option[String], DEFAULT_MAX_WAITING_FUTURES), + getOrElse(topologyName: Option[String], IncludeSuccessHandler.default) ) - - val declarer = + + val declarer = topologyBuilder.setBolt( - GROUP_BY_SUM, + nodeName, sinkBolt, - getOrElse(name, DEFAULT_SINK_PARALLELISM).parHint) + getOrElse(topologyName, DEFAULT_SINK_PARALLELISM).parHint) + + val dependsOnNames = stormDag.dependsOn(node).collect{case x:StormNode => stormDag.getNodeName(x)} + dependsOnNames.foreach { parentName => + declarer.fieldsGrouping(parentName, new Fields(AGG_KEY)) + } - parents.foreach { parentName => - declarer.fieldsGrouping(parentName, new Fields(AGG_KEY)) - } - List(GROUP_BY_SUM) } /** @@ -390,26 +231,21 @@ abstract class Storm(options: Map[String, Options], updateConf: Config => Config def transformConfig(base: Config): Config = updateConf(base) def withConfigUpdater(fn: Config => Config): Storm - def plan[T](node: Producer[Storm, T]): StormTopology = { - val topologyBuilder = new TopologyBuilder + def plan[T](tail: Producer[Storm, T]): StormTopology = { + implicit val topologyBuilder = new TopologyBuilder implicit val config = baseConfig - /** - * This crippled version of the StormPlatform only supports a - * Summer or any number of NamedProducers stacked onto the end of - * the DAG. - */ - @tailrec def retrieve(p: Producer[Storm, _], id: Option[String] = None): (Summer[Storm, Any, Any], Option[String]) = - p match { - case s: Summer[Storm, Any, Any] => (s, id) - case NamedProducer(inner, name) => retrieve(inner, Some(name)) - case _ => sys.error("A Summer is required.") + val stormDag = DagBuilder(tail) + val name: Option[String] = tail match {case NamedProducer(_, n) => Some(n) case _ => None} + + stormDag.nodes.map{node => + node match { + case _: SummerStormBolt => scheduleSinkBolt(stormDag, node, name) + case _: FinalFlatMapStormBolt => scheduleFlatMapper(stormDag, node, name) + case _: IntermediateFlatMapStormBolt => scheduleFlatMapper(stormDag, node, name) + case s: StormSpout => scheduleSpout(stormDag, s, name) } - val (summer, name) = retrieve(node) - - // TODO (https://github.com/twitter/summingbird/issues/86): - // support topologies that don't end with a sum - populate(topologyBuilder, summer, name) + } topologyBuilder.createTopology } def run(summer: Producer[Storm, _], jobName: String): Unit = run(plan(summer), jobName) From eb3c911176df80c8b79775e5b52a686f34257bd9 Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Mon, 23 Sep 2013 13:01:46 -0700 Subject: [PATCH 03/13] Extend .gitignore for eclipse files --- .gitignore | 4 + .../summingbird/storm/StormPlatform.scala | 192 +++++++++--------- 2 files changed, 98 insertions(+), 98 deletions(-) diff --git a/.gitignore b/.gitignore index 4040a7705..a8990a326 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,7 @@ project/plugins/src_managed/ /.idea/ /.idea_modules/ *.iml +*/.project +*/.settings +*/.cache +*/.classpath diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala index 2536273c4..bcd0ed9b2 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala @@ -21,7 +21,7 @@ import backtype.storm.Testing import backtype.storm.testing.CompleteTopologyParam import backtype.storm.testing.MockedSources import backtype.storm.tuple.Fields -import backtype.storm.{Config, StormSubmitter} +import backtype.storm.{Config, StormSubmitter } import backtype.storm.generated.StormTopology import backtype.storm.topology.{ BoltDeclarer, TopologyBuilder } import com.twitter.algebird.Monoid @@ -61,8 +61,7 @@ object Storm { def remote(options: Map[String, Options] = Map.empty): RemoteStorm = new RemoteStorm(options, identity) - def timedSpout[T](spout: Spout[T]) - (implicit timeOf: TimeExtractor[T]): Spout[(Long, T)] = + def timedSpout[T](spout: Spout[T])(implicit timeOf: TimeExtractor[T]): Spout[(Long, T)] = spout.map(t => (timeOf(t), t)) def store[K, V](store: => MergeableStore[(K, BatchID), V])(implicit batcher: Batcher): MergeableStoreSupplier[K, V] = @@ -73,9 +72,9 @@ object Storm { } /** - * Object containing helper functions to build up the list of storm - * operations that can potentially be optimized. - */ + * Object containing helper functions to build up the list of storm + * operations that can potentially be optimized. + */ sealed trait FMItem case class OptionMap[T, U]() extends FMItem case class FactoryCell(factory: StoreFactory[_, _]) extends FMItem @@ -86,7 +85,6 @@ object FMItem { FlatMap(FlatMapOperation.write(sinkSupplier)) } - abstract class Storm(options: Map[String, Options], updateConf: Config => Config) extends Platform[Storm] { type Source[+T] = Spout[(Long, T)] type Store[-K, V] = StormStore[K, V] @@ -101,89 +99,88 @@ abstract class Storm(options: Map[String, Options], updateConf: Config => Config id <- idOpt stormOpts <- options.get(id) option <- stormOpts.get[T] - } yield option).getOrElse(default) + } yield option).getOrElse(default) private def scheduleFlatMapper(stormDag: StormDag, node: StormNode, topologyName: Option[String])(implicit topologyBuilder: TopologyBuilder) = { /** - * Only exists because of the crazy casts we needed. - */ - def serviceOperation[K, V, W](store: StoreFactory[_, _]) = - FlatMapOperation.combine( - FlatMapOperation.identity[(K, V)], - store.asInstanceOf[StoreFactory[K, W]] - ) - def producerToHeadOperation(producer: Producer[Storm, _]): FlatMapOperation[Any, Any] = { - producer match { - case OptionMappedProducer(_, op) => FlatMapOperation(op.andThen(_.iterator)) - case FlatMappedProducer(_, op) => FlatMapOperation(op) - case WrittenProducer(_, sinkSupplier) => FlatMapOperation.write(sinkSupplier.asInstanceOf[() => (Any => Future[Unit])]) - case LeftJoinedProducer(_, StoreWrapper(newService)) => serviceOperation(newService.asInstanceOf[StoreFactory[Any, Any]]).asInstanceOf[FlatMapOperation[Any, Any]] - case _ => throw new Exception("Invalid producer: " + producer) - } - } - - def foldOperations(producers: List[Producer[Storm, _]]):FlatMapOperation[Any, Any] = { - val initialOperation = producerToHeadOperation(producers.head) - producers.tail.foldLeft(initialOperation.asInstanceOf[FlatMapOperation[Any, Any]]) { - - case (acc, LeftJoinedProducer(_, StoreWrapper(newService))) => - FlatMapOperation.combine( - acc.asInstanceOf[FlatMapOperation[Any, (Any, Any)]], - newService.asInstanceOf[StoreFactory[Any, Any]] - ).asInstanceOf[FlatMapOperation[Any, Any]] - case (acc, OptionMappedProducer(_, op)) => acc.andThen(FlatMapOperation[Any, Any](op.andThen(_.iterator).asInstanceOf[Any => TraversableOnce[Any]])) - case (acc, FlatMappedProducer(_, op)) => acc.andThen(FlatMapOperation(op).asInstanceOf[FlatMapOperation[Any, Any]]) - case (acc, WrittenProducer(_, sinkSupplier)) => acc.andThen(FlatMapOperation.write(sinkSupplier.asInstanceOf[() => (Any => Future[Unit])])) - } - } - val nodeName = stormDag.getNodeName(node) - val operation = foldOperations(node.members) - val metrics = getOrElse(topologyName, DEFAULT_FM_STORM_METRICS) - val anchorTuples = getOrElse(topologyName, AnchorTuples.default) - - val bolt = node match { - case _: FinalFlatMapStormBolt => - val summer = stormDag.dependantsOf(node).head.members.collect{case s: Summer[Storm, _, _] => s}.head.asInstanceOf[Summer[Storm, _ , _]] - new FinalFlatMapBolt( - operation.asInstanceOf[FlatMapOperation[Any, (Any, Any)]], - getOrElse(topologyName, DEFAULT_FM_CACHE), - getOrElse(topologyName, DEFAULT_FM_STORM_METRICS), - anchorTuples - )(summer.monoid.asInstanceOf[Monoid[Any]], summer.store.batcher) - case _: IntermediateFlatMapStormBolt => - new IntermediateFlatMapBolt(operation, metrics, anchorTuples) - case _ => throw new Exception("Non-flatmap node passed to the flatmap function") - } - - - - val parallelism = getOrElse(topologyName, DEFAULT_FM_PARALLELISM) - val declarer = topologyBuilder.setBolt(nodeName, bolt, parallelism.parHint) - - val dependsOnNames = stormDag.dependsOn(node).collect{case x:StormNode => stormDag.getNodeName(x)} - dependsOnNames.foreach { declarer.shuffleGrouping(_) } + * Only exists because of the crazy casts we needed. + */ + def serviceOperation[K, V, W](store: StoreFactory[_, _]) = + FlatMapOperation.combine( + FlatMapOperation.identity[(K, V)], + store.asInstanceOf[StoreFactory[K, W]]) + def producerToHeadOperation(producer: Producer[Storm, _]): FlatMapOperation[Any, Any] = { + producer match { + case OptionMappedProducer(_, op) => FlatMapOperation(op.andThen(_.iterator)) + case FlatMappedProducer(_, op) => FlatMapOperation(op) + case WrittenProducer(_, sinkSupplier) => FlatMapOperation.write(sinkSupplier.asInstanceOf[() => (Any => Future[Unit])]) + case LeftJoinedProducer(_, StoreWrapper(newService)) => serviceOperation(newService.asInstanceOf[StoreFactory[Any, Any]]).asInstanceOf[FlatMapOperation[Any, Any]] + case IdentityKeyedProducer(_) => FlatMapOperation((x:Any) => List((x,x))) + case _ => throw new Exception("Invalid producer: " + producer) + } + } + + def foldOperations(producers: List[Producer[Storm, _]]): FlatMapOperation[Any, Any] = { + val initialOperation = producerToHeadOperation(producers.head) + producers.foldLeft(FlatMapOperation.identity[Any]) { + case (acc, p) => + p match { + case LeftJoinedProducer(_, StoreWrapper(newService)) => + FlatMapOperation.combine( + acc.asInstanceOf[FlatMapOperation[Any, (Any, Any)]], + newService.asInstanceOf[StoreFactory[Any, Any]]).asInstanceOf[FlatMapOperation[Any, Any]] + case OptionMappedProducer(_, op) => acc.andThen(FlatMapOperation[Any, Any](op.andThen(_.iterator).asInstanceOf[Any => TraversableOnce[Any]])) + case FlatMappedProducer(_, op) => acc.andThen(FlatMapOperation(op).asInstanceOf[FlatMapOperation[Any, Any]]) + case WrittenProducer(_, sinkSupplier) => acc.andThen(FlatMapOperation.write(sinkSupplier.asInstanceOf[() => (Any => Future[Unit])])) + case IdentityKeyedProducer(_) => acc // acc.andThen(FlatMapOperation((x:Any) => List((x,x)))) + case _ => throw new Exception("Not found! : " + p) + } + } + } + val nodeName = stormDag.getNodeName(node) + val operation = foldOperations(node.members.reverse) + val metrics = getOrElse(topologyName, DEFAULT_FM_STORM_METRICS) + val anchorTuples = getOrElse(topologyName, AnchorTuples.default) + + val bolt = node match { + case _: FinalFlatMapStormBolt => + val summer = stormDag.dependantsOf(node).head.members.collect { case s: Summer[Storm, _, _] => s }.head.asInstanceOf[Summer[Storm, _, _]] + new FinalFlatMapBolt( + operation.asInstanceOf[FlatMapOperation[Any, (Any, Any)]], + getOrElse(topologyName, DEFAULT_FM_CACHE), + getOrElse(topologyName, DEFAULT_FM_STORM_METRICS), + anchorTuples)(summer.monoid.asInstanceOf[Monoid[Any]], summer.store.batcher) + case _: IntermediateFlatMapStormBolt => + new IntermediateFlatMapBolt(operation, metrics, anchorTuples) + case _ => throw new Exception("Non-flatmap node passed to the flatmap function") + } + + val parallelism = getOrElse(topologyName, DEFAULT_FM_PARALLELISM) + val declarer = topologyBuilder.setBolt(nodeName, bolt, parallelism.parHint) + + val dependsOnNames = stormDag.dependsOn(node).collect { case x: StormNode => stormDag.getNodeName(x) } + dependsOnNames.foreach { declarer.shuffleGrouping(_) } } - + private def scheduleSpout[K](stormDag: StormDag, node: StormSpout, topologyName: Option[String])(implicit topologyBuilder: TopologyBuilder) = { - val spout = node.members.collect{case Source(s) => s}.head - val nodeName = stormDag.getNodeName(node) - def opFM(op: Any => Option[Any], spout: Spout[(Long, Any)]): Spout[(Long, Any)] = spout.flatMap { case (time, t) => op.apply(t).map {x => (time, x)}} - val stormSpout = node.members.foldLeft(spout.asInstanceOf[Spout[(Long, Any)]]) { - case (spout, Source(_)) => spout // The source is still in the members list so drop it - case (spout, OptionMappedProducer(_, op)) => opFM(op, spout) - case _ => sys.error("not possible, given the above call to span.") - }.getSpout - - val parallelism = getOrElse(topologyName, DEFAULT_SPOUT_PARALLELISM).parHint - topologyBuilder.setSpout(nodeName, stormSpout, parallelism) + val spout = node.members.collect { case Source(s) => s }.head + val nodeName = stormDag.getNodeName(node) + + val stormSpout = node.members.reverse.foldLeft(spout.asInstanceOf[Spout[(Long, Any)]]) { + case (spout, Source(_)) => spout // The source is still in the members list so drop it + case (spout, OptionMappedProducer(_, op)) => spout.flatMap {case (time, t) => op.apply(t).map { x => (time, x) }} + case _ => sys.error("not possible, given the above call to span.") + }.getSpout + + val parallelism = getOrElse(topologyName, DEFAULT_SPOUT_PARALLELISM).parHint + topologyBuilder.setSpout(nodeName, stormSpout, parallelism) } - - private def scheduleSinkBolt[K,V] (stormDag: StormDag, node: StormNode, topologyName: Option[String])(implicit topologyBuilder: TopologyBuilder) = { - val summer: Summer[Storm, K, V] = node.members.collect{case c: Summer[Storm, K, V] => c}.head + private def scheduleSinkBolt[K, V](stormDag: StormDag, node: StormNode, topologyName: Option[String])(implicit topologyBuilder: TopologyBuilder) = { + val summer: Summer[Storm, K, V] = node.members.collect { case c: Summer[Storm, K, V] => c }.head implicit val monoid = summer.monoid val nodeName = stormDag.getNodeName(node) - + val supplier = summer.store match { case MergeableStoreSupplier(contained, _) => contained } @@ -195,29 +192,28 @@ abstract class Storm(options: Map[String, Options], updateConf: Config => Config getOrElse(topologyName: Option[String], DEFAULT_SINK_CACHE), getOrElse(topologyName: Option[String], DEFAULT_SINK_STORM_METRICS), getOrElse(topologyName: Option[String], DEFAULT_MAX_WAITING_FUTURES), - getOrElse(topologyName: Option[String], IncludeSuccessHandler.default) - ) - - val declarer = + getOrElse(topologyName: Option[String], IncludeSuccessHandler.default)) + + val declarer = topologyBuilder.setBolt( nodeName, sinkBolt, getOrElse(topologyName, DEFAULT_SINK_PARALLELISM).parHint) - - val dependsOnNames = stormDag.dependsOn(node).collect{case x:StormNode => stormDag.getNodeName(x)} - dependsOnNames.foreach { parentName => - declarer.fieldsGrouping(parentName, new Fields(AGG_KEY)) - } + + val dependsOnNames = stormDag.dependsOn(node).collect { case x: StormNode => stormDag.getNodeName(x) } + dependsOnNames.foreach { parentName => + declarer.fieldsGrouping(parentName, new Fields(AGG_KEY)) + } } /** - * The following operations are public. - */ + * The following operations are public. + */ /** - * Base storm config instances used by the Storm platform. - */ + * Base storm config instances used by the Storm platform. + */ def baseConfig = { val config = new Config config.setFallBackOnJavaSerialization(false) @@ -236,9 +232,9 @@ abstract class Storm(options: Map[String, Options], updateConf: Config => Config implicit val config = baseConfig val stormDag = DagBuilder(tail) - val name: Option[String] = tail match {case NamedProducer(_, n) => Some(n) case _ => None} - - stormDag.nodes.map{node => + val name: Option[String] = tail match { case NamedProducer(_, n) => Some(n) case _ => None } + + stormDag.nodes.map { node => node match { case _: SummerStormBolt => scheduleSinkBolt(stormDag, node, name) case _: FinalFlatMapStormBolt => scheduleFlatMapper(stormDag, node, name) @@ -264,7 +260,7 @@ class RemoteStorm(options: Map[String, Options], updateConf: Config => Config) e } class LocalStorm(options: Map[String, Options], updateConf: Config => Config) - extends Storm(options, updateConf) { + extends Storm(options, updateConf) { lazy val localCluster = new LocalCluster override def withConfigUpdater(fn: Config => Config) = From 59dcf8b365f0b32cc1b5bcc9cac80373bc599dda Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Mon, 23 Sep 2013 16:56:52 -0700 Subject: [PATCH 04/13] Rename the StormNodes to just Nodes. Clean up a bit more, remove the Intermediate/Final flat map distinction --- .../summingbird/storm/PlannedTopology.scala | 122 +++++++++--------- .../summingbird/storm/StormPlatform.scala | 44 +++---- .../twitter/summingbird/storm/viz/viz.scala | 4 +- .../storm/TopologyPlannerLaws.scala | 19 +-- 4 files changed, 81 insertions(+), 108 deletions(-) diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/PlannedTopology.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/PlannedTopology.scala index 1eaa7077b..1d8e83db3 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/PlannedTopology.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/PlannedTopology.scala @@ -18,8 +18,9 @@ package com.twitter.summingbird.storm import com.twitter.summingbird._ +case class NodeIdentifier(identifier: String) -sealed trait StormNode { +sealed trait Node { val members: List[Producer[Storm, _]] = List() private val dependantStateOpt = members.headOption.map(h => Dependants(h)) @@ -33,9 +34,9 @@ sealed trait StormNode { def localDependantsOf(p: Producer[Storm, _]): List[Producer[Storm, _]] = dependantsOf(p).filter(members.contains(_)) - def toSpout: StormSpout = StormSpout(this.members) + def toSpout: SourceNode = SourceNode(this.members) - def toSummer: SummerStormBolt = SummerStormBolt(this.members) + def toSummer: SummerNode = SummerNode(this.members) def contains(p: Producer[Storm, _]): Boolean = members.contains(p) @@ -43,11 +44,11 @@ sealed trait StormNode { def getName(dag: StormDag): String = dag.getNodeName(this) - def shortName(): String + def shortName(): NodeIdentifier - def add(node: Producer[Storm, _]): StormNode + def add(node: Producer[Storm, _]): Node - def reverse: StormNode + def reverse: Node def toStringWithPrefix(prefix: String): String = { @@ -62,67 +63,62 @@ sealed trait StormNode { } -// This is the default state for StormNodes if there is nothing special about them. +// This is the default state for Nodes if there is nothing special about them. // There can be an unbounded number of these and there is no hard restrictions on ordering/where. Other than // locations which must be one of the others -case class IntermediateFlatMapStormBolt(override val members: List[Producer[Storm, _]] = List()) extends StormNode { - def add(node: Producer[Storm, _]): StormNode = if(members.contains(node)) this else this.copy(members=node :: members) +case class FlatMapNode(override val members: List[Producer[Storm, _]] = List()) extends Node { + def add(node: Producer[Storm, _]): Node = if(members.contains(node)) this else this.copy(members=node :: members) def reverse = this.copy(members.reverse) - override def shortName(): String = "IF" + override def shortName(): NodeIdentifier = NodeIdentifier("FlatMap") } -case class FinalFlatMapStormBolt(override val members: List[Producer[Storm, _]] = List()) extends StormNode { - def add(node: Producer[Storm, _]): StormNode = if(members.contains(node)) this else this.copy(members=node :: members) - def reverse = this.copy(members.reverse) - override def shortName(): String = "FF" -} -case class SummerStormBolt(override val members: List[Producer[Storm, _]] = List()) extends StormNode { - def add(node: Producer[Storm, _]): StormNode = if(members.contains(node)) this else this.copy(members=node :: members) +case class SummerNode(override val members: List[Producer[Storm, _]] = List()) extends Node { + def add(node: Producer[Storm, _]): Node = if(members.contains(node)) this else this.copy(members=node :: members) def reverse = this.copy(members.reverse) - override def shortName(): String = "SU" + override def shortName(): NodeIdentifier = NodeIdentifier("Summer") } -case class StormSpout(override val members: List[Producer[Storm, _]] = List()) extends StormNode { - def add(node: Producer[Storm, _]): StormNode = if(members.contains(node)) this else this.copy(members=node :: members) +case class SourceNode(override val members: List[Producer[Storm, _]] = List()) extends Node { + def add(node: Producer[Storm, _]): Node = if(members.contains(node)) this else this.copy(members=node :: members) def reverse = this.copy(members.reverse) - override def shortName(): String = "SP" + override def shortName(): NodeIdentifier = NodeIdentifier("Source") } -case class StormDag(tail: Producer[Storm, _], producerToNode: Map[Producer[Storm, _], StormNode], - nodes: List[StormNode], - nodeToName: Map[StormNode, String] = Map(), - nameToNode: Map[String, StormNode] = Map(), - dependsOnM: Map[StormNode, List[StormNode]] = Map[StormNode, List[StormNode]](), - dependantOfM: Map[StormNode, List[StormNode]] = Map[StormNode, List[StormNode]]()) { - def connect(src: StormNode, dest: StormNode): StormDag = { +case class StormDag(tail: Producer[Storm, _], producerToNode: Map[Producer[Storm, _], Node], + nodes: List[Node], + nodeToName: Map[Node, String] = Map(), + nameToNode: Map[String, Node] = Map(), + dependsOnM: Map[Node, List[Node]] = Map[Node, List[Node]](), + dependantOfM: Map[Node, List[Node]] = Map[Node, List[Node]]()) { + def connect(src: Node, dest: Node): StormDag = { if (src == dest) { this } else { - assert(!dest.isInstanceOf[StormSpout]) + assert(!dest.isInstanceOf[SourceNode]) // We build/maintain two maps, // Nodes to which each node depends on // and nodes on which each node depends - val oldDependsOnTargets = dependsOnM.getOrElse(src, List[StormNode]()) + val oldDependsOnTargets = dependsOnM.getOrElse(src, List[Node]()) val dependsOnTargets = if(oldDependsOnTargets.contains(dest)) oldDependsOnTargets else (dest :: oldDependsOnTargets) - val oldDependantOfTargets = dependantOfM.getOrElse(dest, List[StormNode]()) + val oldDependantOfTargets = dependantOfM.getOrElse(dest, List[Node]()) val dependantOfTargets = if(oldDependantOfTargets.contains(src)) oldDependantOfTargets else (src :: oldDependantOfTargets) copy(dependsOnM = dependsOnM + (src -> dependsOnTargets) , dependantOfM = dependantOfM + (dest -> dependantOfTargets)) } } - def locateOpt(p: Producer[Storm, _]): Option[StormNode] = producerToNode.get(p) - def locate(p: Producer[Storm, _]): StormNode = locateOpt(p).get + def locateOpt(p: Producer[Storm, _]): Option[Node] = producerToNode.get(p) + def locate(p: Producer[Storm, _]): Node = locateOpt(p).get def connect(src: Producer[Storm, _], dest: Producer[Storm, _]): StormDag = connect(locate(src), locate(dest)) - def getNodeName(n: StormNode): String = nodeToName(n) - def tailN: StormNode = producerToNode(tail) + def getNodeName(n: Node): String = nodeToName(n) + def tailN: Node = producerToNode(tail) - def dependantsOf(n: StormNode): List[StormNode] = dependsOnM.get(n).getOrElse(List()) - def dependsOn(n: StormNode): List[StormNode] = dependantOfM.get(n).getOrElse(List()) + def dependantsOf(n: Node): List[Node] = dependsOnM.get(n).getOrElse(List()) + def dependsOn(n: Node): List[Node] = dependantOfM.get(n).getOrElse(List()) def toStringWithPrefix(prefix: String): String = { prefix + "StormDag\n" + nodes.foldLeft(""){ case (str, node) => @@ -134,20 +130,20 @@ case class StormDag(tail: Producer[Storm, _], producerToNode: Map[Producer[Storm } object StormDag { - def buildProducerToNodeLookUp(stormNodeSet: List[StormNode]) : Map[Producer[Storm, _], StormNode] = { - stormNodeSet.foldLeft(Map[Producer[Storm, _], StormNode]()){ (curRegistry, stormNode) => + def buildProducerToNodeLookUp(stormNodeSet: List[Node]) : Map[Producer[Storm, _], Node] = { + stormNodeSet.foldLeft(Map[Producer[Storm, _], Node]()){ (curRegistry, stormNode) => stormNode.members.foldLeft(curRegistry) { (innerRegistry, producer) => (innerRegistry + (producer -> stormNode)) } } } - def build(tail: Producer[Storm, _], registry: List[StormNode]) : StormDag = { + def build(tail: Producer[Storm, _], registry: List[Node]) : StormDag = { - val seenNames = Set[StormNode]() + val seenNames = Set[Node]() val producerToNode = buildProducerToNodeLookUp(registry) val dag = registry.foldLeft(StormDag(tail, producerToNode, registry)){ (curDag, stormNode) => // Here we are building the StormDag's connection topology. - // We visit every producer and connect the StormNode's represented by its dependant and dependancies. + // We visit every producer and connect the Node's represented by its dependant and dependancies. // Producers which live in the same node will result in a NOP in connect. stormNode.members.foldLeft(curDag) { (innerDag, dependantProducer) => @@ -163,7 +159,7 @@ object StormDag { } } - def genNames(dep: StormNode, dag: StormDag, outerNodeToName: Map[StormNode, String], usedNames: Set[String]): (Map[StormNode, String], Set[String]) = { + def genNames(dep: Node, dag: StormDag, outerNodeToName: Map[Node, String], usedNames: Set[String]): (Map[Node, String], Set[String]) = { dag.dependsOn(dep).foldLeft((outerNodeToName, usedNames)) {case ((nodeToName, taken), n) => val name = tryGetName(nodeToName(dep) + "-" + n.shortName, taken) val useName = nodeToName.get(n) match { @@ -185,25 +181,25 @@ object DagBuilder { private type VisitedStore = Set[Prod[_]] def apply[P](tail: Producer[Storm, P]): StormDag = { - val stormNodeSet = buildStormNodesSet(tail) + val stormNodeSet = buildNodesSet(tail) // The nodes are added in a source -> summer way with how we do list prepends // but its easier to look at laws in a summer -> source manner - // We also drop all StormNodes with no members(may occur when we visit a node already seen and its the first in that Node) - val revsersedNodeSet = stormNodeSet.filter(_.members.size > 0).foldLeft(List[StormNode]()){(nodes, n) => n.reverse :: nodes} + // We also drop all Nodes with no members(may occur when we visit a node already seen and its the first in that Node) + val revsersedNodeSet = stormNodeSet.filter(_.members.size > 0).foldLeft(List[Node]()){(nodes, n) => n.reverse :: nodes} StormDag.build(tail, revsersedNodeSet) } - // This takes an initial pass through all of the Producers, assigning them to StormNodes - private def buildStormNodesSet[P](tail: Producer[Storm, P]): List[StormNode] = { + // This takes an initial pass through all of the Producers, assigning them to Nodes + private def buildNodesSet[P](tail: Producer[Storm, P]): List[Node] = { val dep = Dependants(tail) val forkedNodes = Producer.transitiveDependenciesOf(tail) .filter(dep.fanOut(_).exists(_ > 1)).toSet def distinctAddToList[T](l : List[T], n : T): List[T] = if(l.contains(n)) l else (n :: l) - // Add the dependentProducer to a StormNode along with each of its dependencies in turn. - def addWithDependencies[T](dependantProducer: Prod[T], previousBolt: StormNode, - stormRegistry: List[StormNode], visited: VisitedStore) : (List[StormNode], VisitedStore) = { + // Add the dependentProducer to a Node along with each of its dependencies in turn. + def addWithDependencies[T](dependantProducer: Prod[T], previousBolt: Node, + stormRegistry: List[Node], visited: VisitedStore) : (List[Node], VisitedStore) = { if (visited.contains(dependantProducer)) { (distinctAddToList(stormRegistry, previousBolt), visited) } else { @@ -212,10 +208,10 @@ object DagBuilder { def recurse[U]( producer: Prod[U], - updatedBolt: StormNode = currentBolt, - updatedRegistry: List[StormNode] = stormRegistry, + updatedBolt: Node = currentBolt, + updatedRegistry: List[Node] = stormRegistry, visited: VisitedStore = visitedWithN) - : (List[StormNode], VisitedStore) = { + : (List[Node], VisitedStore) = { addWithDependencies(producer, updatedBolt, updatedRegistry, visited) } @@ -236,28 +232,28 @@ object DagBuilder { * or if we can continue by adding this producer to the current physical node. * * This function acts as a look ahead, rather than depending on the state of the current node it depends - * on the nodes further along in the dag. That is conditions for spliting into multiple StormNodes are based on as yet + * on the nodes further along in the dag. That is conditions for spliting into multiple Nodes are based on as yet * unvisisted Producers. */ - def maybeSplitThenRecurse[U, A](currentProducer: Prod[U], dep: Prod[A]): (List[StormNode], VisitedStore) = { + def maybeSplitThenRecurse[U, A](currentProducer: Prod[U], dep: Prod[A]): (List[Node], VisitedStore) = { val doSplit = dep match { case _ if (forkedNodes.contains(dep)) => true - case _ if (currentBolt.isInstanceOf[FinalFlatMapStormBolt] && allDepsMergeableWithSource(dep)) => true + case _ if (currentBolt.isInstanceOf[FlatMapNode] && allDepsMergeableWithSource(dep)) => true case _ if ((!mergableWithSource(currentProducer)) && allDepsMergeableWithSource(dep)) => true case _ => false } if (doSplit) { - recurse(dep, updatedBolt = IntermediateFlatMapStormBolt(), updatedRegistry = distinctAddToList(stormRegistry, currentBolt)) + recurse(dep, updatedBolt = FlatMapNode(), updatedRegistry = distinctAddToList(stormRegistry, currentBolt)) } else { recurse(dep) } } /* - * This is a peek ahead when we meet a MergedProducer. We pull the directly depended on MergedProducer's into the same StormNode, + * This is a peek ahead when we meet a MergedProducer. We pull the directly depended on MergedProducer's into the same Node, * only if that MergedProducer is not a fan out node. * This has the effect of pulling all of the merged streams in as siblings rather than just the two. - * From this we return a list of the MergedProducers which should be combined into the current StormNode, and the list of nodes + * From this we return a list of the MergedProducers which should be combined into the current Node, and the list of nodes * on which these nodes depends (the producers passing data into these MergedProducer). */ @@ -274,7 +270,7 @@ object DagBuilder { } dependantProducer match { - case Summer(producer, _, _) => recurse(producer, updatedBolt = FinalFlatMapStormBolt(), updatedRegistry = distinctAddToList(stormRegistry, currentBolt.toSummer)) + case Summer(producer, _, _) => recurse(producer, updatedBolt = FlatMapNode(), updatedRegistry = distinctAddToList(stormRegistry, currentBolt.toSummer)) case IdentityKeyedProducer(producer) => maybeSplitThenRecurse(dependantProducer, producer) case NamedProducer(producer, newId) => maybeSplitThenRecurse(dependantProducer, producer) case Source(spout) => (distinctAddToList(stormRegistry, currentBolt.toSpout), visitedWithN) @@ -291,12 +287,12 @@ object DagBuilder { // Recurse down all the newly generated dependencies dependencies.foldLeft((distinctAddToList(stormRegistry, newCurrentBolt), visitedWithOther)) { case ((newStormReg, newVisited), n) => - recurse(n, IntermediateFlatMapStormBolt(), newStormReg, newVisited) + recurse(n, FlatMapNode(), newStormReg, newVisited) } } } } - val (stormRegistry, _) = addWithDependencies(tail, IntermediateFlatMapStormBolt(), List[StormNode](), Set()) + val (stormRegistry, _) = addWithDependencies(tail, FlatMapNode(), List[Node](), Set()) stormRegistry } } diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala index bcd0ed9b2..890d784dd 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala @@ -101,7 +101,7 @@ abstract class Storm(options: Map[String, Options], updateConf: Config => Config option <- stormOpts.get[T] } yield option).getOrElse(default) - private def scheduleFlatMapper(stormDag: StormDag, node: StormNode, topologyName: Option[String])(implicit topologyBuilder: TopologyBuilder) = { + private def scheduleFlatMapper(stormDag: StormDag, node: Node, topologyName: Option[String])(implicit topologyBuilder: TopologyBuilder) = { /** * Only exists because of the crazy casts we needed. */ @@ -109,19 +109,8 @@ abstract class Storm(options: Map[String, Options], updateConf: Config => Config FlatMapOperation.combine( FlatMapOperation.identity[(K, V)], store.asInstanceOf[StoreFactory[K, W]]) - def producerToHeadOperation(producer: Producer[Storm, _]): FlatMapOperation[Any, Any] = { - producer match { - case OptionMappedProducer(_, op) => FlatMapOperation(op.andThen(_.iterator)) - case FlatMappedProducer(_, op) => FlatMapOperation(op) - case WrittenProducer(_, sinkSupplier) => FlatMapOperation.write(sinkSupplier.asInstanceOf[() => (Any => Future[Unit])]) - case LeftJoinedProducer(_, StoreWrapper(newService)) => serviceOperation(newService.asInstanceOf[StoreFactory[Any, Any]]).asInstanceOf[FlatMapOperation[Any, Any]] - case IdentityKeyedProducer(_) => FlatMapOperation((x:Any) => List((x,x))) - case _ => throw new Exception("Invalid producer: " + producer) - } - } def foldOperations(producers: List[Producer[Storm, _]]): FlatMapOperation[Any, Any] = { - val initialOperation = producerToHeadOperation(producers.head) producers.foldLeft(FlatMapOperation.identity[Any]) { case (acc, p) => p match { @@ -132,7 +121,7 @@ abstract class Storm(options: Map[String, Options], updateConf: Config => Config case OptionMappedProducer(_, op) => acc.andThen(FlatMapOperation[Any, Any](op.andThen(_.iterator).asInstanceOf[Any => TraversableOnce[Any]])) case FlatMappedProducer(_, op) => acc.andThen(FlatMapOperation(op).asInstanceOf[FlatMapOperation[Any, Any]]) case WrittenProducer(_, sinkSupplier) => acc.andThen(FlatMapOperation.write(sinkSupplier.asInstanceOf[() => (Any => Future[Unit])])) - case IdentityKeyedProducer(_) => acc // acc.andThen(FlatMapOperation((x:Any) => List((x,x)))) + case IdentityKeyedProducer(_) => acc case _ => throw new Exception("Not found! : " + p) } } @@ -142,27 +131,27 @@ abstract class Storm(options: Map[String, Options], updateConf: Config => Config val metrics = getOrElse(topologyName, DEFAULT_FM_STORM_METRICS) val anchorTuples = getOrElse(topologyName, AnchorTuples.default) - val bolt = node match { - case _: FinalFlatMapStormBolt => - val summer = stormDag.dependantsOf(node).head.members.collect { case s: Summer[Storm, _, _] => s }.head.asInstanceOf[Summer[Storm, _, _]] + val summerOpt:Option[SummerNode] = stormDag.dependantsOf(node).collect{case s: SummerNode => s}.headOption + + val bolt = summerOpt match { + case Some(s) => + val summerProducer = s.members.collect { case s: Summer[Storm, _, _] => s }.head.asInstanceOf[Summer[Storm, _, _]] new FinalFlatMapBolt( operation.asInstanceOf[FlatMapOperation[Any, (Any, Any)]], getOrElse(topologyName, DEFAULT_FM_CACHE), getOrElse(topologyName, DEFAULT_FM_STORM_METRICS), - anchorTuples)(summer.monoid.asInstanceOf[Monoid[Any]], summer.store.batcher) - case _: IntermediateFlatMapStormBolt => + anchorTuples)(summerProducer.monoid.asInstanceOf[Monoid[Any]], summerProducer.store.batcher) + case None => new IntermediateFlatMapBolt(operation, metrics, anchorTuples) - case _ => throw new Exception("Non-flatmap node passed to the flatmap function") } - val parallelism = getOrElse(topologyName, DEFAULT_FM_PARALLELISM) val declarer = topologyBuilder.setBolt(nodeName, bolt, parallelism.parHint) - val dependsOnNames = stormDag.dependsOn(node).collect { case x: StormNode => stormDag.getNodeName(x) } + val dependsOnNames = stormDag.dependsOn(node).collect { case x: Node => stormDag.getNodeName(x) } dependsOnNames.foreach { declarer.shuffleGrouping(_) } } - private def scheduleSpout[K](stormDag: StormDag, node: StormSpout, topologyName: Option[String])(implicit topologyBuilder: TopologyBuilder) = { + private def scheduleSpout[K](stormDag: StormDag, node: Node, topologyName: Option[String])(implicit topologyBuilder: TopologyBuilder) = { val spout = node.members.collect { case Source(s) => s }.head val nodeName = stormDag.getNodeName(node) @@ -176,7 +165,7 @@ abstract class Storm(options: Map[String, Options], updateConf: Config => Config topologyBuilder.setSpout(nodeName, stormSpout, parallelism) } - private def scheduleSinkBolt[K, V](stormDag: StormDag, node: StormNode, topologyName: Option[String])(implicit topologyBuilder: TopologyBuilder) = { + private def scheduleSinkBolt[K, V](stormDag: StormDag, node: Node, topologyName: Option[String])(implicit topologyBuilder: TopologyBuilder) = { val summer: Summer[Storm, K, V] = node.members.collect { case c: Summer[Storm, K, V] => c }.head implicit val monoid = summer.monoid val nodeName = stormDag.getNodeName(node) @@ -200,7 +189,7 @@ abstract class Storm(options: Map[String, Options], updateConf: Config => Config sinkBolt, getOrElse(topologyName, DEFAULT_SINK_PARALLELISM).parHint) - val dependsOnNames = stormDag.dependsOn(node).collect { case x: StormNode => stormDag.getNodeName(x) } + val dependsOnNames = stormDag.dependsOn(node).collect { case x: Node => stormDag.getNodeName(x) } dependsOnNames.foreach { parentName => declarer.fieldsGrouping(parentName, new Fields(AGG_KEY)) } @@ -236,10 +225,9 @@ abstract class Storm(options: Map[String, Options], updateConf: Config => Config stormDag.nodes.map { node => node match { - case _: SummerStormBolt => scheduleSinkBolt(stormDag, node, name) - case _: FinalFlatMapStormBolt => scheduleFlatMapper(stormDag, node, name) - case _: IntermediateFlatMapStormBolt => scheduleFlatMapper(stormDag, node, name) - case s: StormSpout => scheduleSpout(stormDag, s, name) + case _: SummerNode => scheduleSinkBolt(stormDag, node, name) + case _: FlatMapNode => scheduleFlatMapper(stormDag, node, name) + case _: SourceNode => scheduleSpout(stormDag, node, name) } } topologyBuilder.createTopology diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/viz/viz.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/viz/viz.scala index 678199ffe..fd4f04159 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/viz/viz.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/viz/viz.scala @@ -50,7 +50,7 @@ case class VizGraph(dag: StormDag) { } } - def getSubGraphStr(nameLookupTable: NameLookupTable, node: StormNode): (List[String], List[String], NameLookupTable) = { + def getSubGraphStr(nameLookupTable: NameLookupTable, node: Node): (List[String], List[String], NameLookupTable) = { node.members.foldLeft((List[String](), List[String](), nameLookupTable)) { case ((definitions, mappings, nameLookupTable), nextNode) => val dependants = dependantState.dependantsOf(nextNode).getOrElse(Set()) @@ -69,7 +69,7 @@ case class VizGraph(dag: StormDag) { } } def genClusters(): String = { - val (clusters, producerMappings, producerNames, nodeToShortLookupTable) = dag.nodes.foldLeft((List[String](), List[String](), emptyNameLookupTable(), Map[StormNode, String]())) { + val (clusters, producerMappings, producerNames, nodeToShortLookupTable) = dag.nodes.foldLeft((List[String](), List[String](), emptyNameLookupTable(), Map[Node, String]())) { case ((clusters, producerMappings, nameLookupTable, nodeShortName), node) => val (nodeDefinitions, mappings, newNameLookupTable) = getSubGraphStr(nameLookupTable, node) diff --git a/summingbird-storm/src/test/scala/com/twitter/summingbird/storm/TopologyPlannerLaws.scala b/summingbird-storm/src/test/scala/com/twitter/summingbird/storm/TopologyPlannerLaws.scala index 3adc15aec..6e7d03d48 100644 --- a/summingbird-storm/src/test/scala/com/twitter/summingbird/storm/TopologyPlannerLaws.scala +++ b/summingbird-storm/src/test/scala/com/twitter/summingbird/storm/TopologyPlannerLaws.scala @@ -159,7 +159,7 @@ object TopologyPlannerLaws extends Properties("StormDag") { property("Only spouts can have no incoming dependencies") = forAll { (dag: StormDag) => dag.nodes.forall{n => n match { - case _: StormSpout => true + case _: SourceNode => true case _ => dag.dependsOn(n).size > 0 } } @@ -169,7 +169,7 @@ object TopologyPlannerLaws extends Properties("StormDag") { property("Spouts must have no incoming dependencies, and they must have dependants") = forAll { (dag: StormDag) => dag.nodes.forall{n => n match { - case _: StormSpout => + case _: SourceNode => dag.dependsOn(n).size == 0 && dag.dependantsOf(n).size > 0 case _ => true } @@ -183,7 +183,7 @@ object TopologyPlannerLaws extends Properties("StormDag") { val success = firstP match { case Summer(_, _, _) => dag.dependsOn(n).size > 0 && dag.dependsOn(n).forall {otherN => - otherN.isInstanceOf[FinalFlatMapStormBolt] + otherN.isInstanceOf[FlatMapNode] } case _ => true } @@ -192,21 +192,10 @@ object TopologyPlannerLaws extends Properties("StormDag") { } } - property("There should only be one final flatmap bolt") = forAll { (dag: StormDag) => - val numFinalFlatmapBolts = dag.nodes.foldLeft(0){(sum, n) => - n match { - case _: FinalFlatMapStormBolt => sum + 1 - case _ => sum - } - } - numFinalFlatmapBolts == 1 - } - - property("There should be no flatmap producers in the source node") = forAll { (dag: StormDag) => dag.nodes.forall{n => val success = n match { - case n: StormSpout => n.members.forall{p => !p.isInstanceOf[FlatMappedProducer[_, _, _]]} + case n: SourceNode => n.members.forall{p => !p.isInstanceOf[FlatMappedProducer[_, _, _]]} case _ => true } if(!success) dumpGraph(dag) From 71b28d221e6b321957a93f39d954049401b419ff Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Mon, 23 Sep 2013 17:36:45 -0700 Subject: [PATCH 05/13] Name nodes better, include child producer info --- .../summingbird/TestGraphGenerators.scala | 10 +++++++++- .../summingbird/storm/PlannedTopology.scala | 17 ++++++++++++----- 2 files changed, 21 insertions(+), 6 deletions(-) diff --git a/summingbird-core/src/test/scala/com/twitter/summingbird/TestGraphGenerators.scala b/summingbird-core/src/test/scala/com/twitter/summingbird/TestGraphGenerators.scala index a845dbe20..fcb2cf98c 100644 --- a/summingbird-core/src/test/scala/com/twitter/summingbird/TestGraphGenerators.scala +++ b/summingbird-core/src/test/scala/com/twitter/summingbird/TestGraphGenerators.scala @@ -67,6 +67,14 @@ object TestGraphGenerators { fn <- arbitrary[(Int) => List[Int]] in <- genProd1 } yield FlatMappedProducer(in, fn) + + + def genNamedProducer11[P <: Platform[P]](implicit genSource1 : Gen[Producer[P, Int]], genSource2 : Gen[KeyedProducer[P, Int, Int]], testStore: P#Store[Int, Int], sink1: P#Sink[Int], sink2: P#Sink[(Int, Int)]) = for { + _ <- Gen.choose(0, 1) + fn <- arbitrary[(Int) => List[Int]] + in <- genProd1 + name <- Gen.alphaStr + } yield NamedProducer(FlatMappedProducer(in, fn), name) def genMerged1[P <: Platform[P]](implicit genSource1 : Gen[Producer[P, Int]], genSource2 : Gen[KeyedProducer[P, Int, Int]], testStore: P#Store[Int, Int], sink1: P#Sink[Int], sink2: P#Sink[(Int, Int)]) = for { _ <- Gen.choose(0,1) @@ -116,7 +124,7 @@ object TestGraphGenerators { def genProd1[P <: Platform[P]](implicit genSource1 : Gen[Producer[P, Int]], genSource2 : Gen[KeyedProducer[P, Int, Int]], testStore: P#Store[Int, Int], sink1: P#Sink[Int], sink2: P#Sink[(Int, Int)]): Gen[Producer[P, Int]] = - frequency((25, genSource1), (3, genOptMap11), (3, genOptMap21), (1, genMerged1), (3, genFlatMap11), + frequency((25, genSource1), (3, genOptMap11), (8, genNamedProducer11), (3, genOptMap21), (1, genMerged1), (3, genFlatMap11), (0, also1), (3, genFlatMap21)) } diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/PlannedTopology.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/PlannedTopology.scala index 1d8e83db3..e609b0ab6 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/PlannedTopology.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/PlannedTopology.scala @@ -18,7 +18,9 @@ package com.twitter.summingbird.storm import com.twitter.summingbird._ -case class NodeIdentifier(identifier: String) +case class NodeIdentifier(identifier: String) { + override def toString:String = identifier +} sealed trait Node { val members: List[Producer[Storm, _]] = List() @@ -43,6 +45,11 @@ sealed trait Node { def getNameFallback: String = getClass.getName.replaceFirst("com.twitter.summingbird.storm.","") def getName(dag: StormDag): String = dag.getNodeName(this) + + def collapseNamedNodes:String = { + val membersCombined = members.reverse.collect{case NamedProducer(_, n) => n.replace("-","=")}.mkString(",") + if(membersCombined.size > 0 ) "(" + membersCombined + ")" else "" + } def shortName(): NodeIdentifier @@ -69,20 +76,20 @@ sealed trait Node { case class FlatMapNode(override val members: List[Producer[Storm, _]] = List()) extends Node { def add(node: Producer[Storm, _]): Node = if(members.contains(node)) this else this.copy(members=node :: members) def reverse = this.copy(members.reverse) - override def shortName(): NodeIdentifier = NodeIdentifier("FlatMap") + override def shortName(): NodeIdentifier = NodeIdentifier("FlatMap" + collapseNamedNodes ) } case class SummerNode(override val members: List[Producer[Storm, _]] = List()) extends Node { def add(node: Producer[Storm, _]): Node = if(members.contains(node)) this else this.copy(members=node :: members) def reverse = this.copy(members.reverse) - override def shortName(): NodeIdentifier = NodeIdentifier("Summer") + override def shortName(): NodeIdentifier = NodeIdentifier("Summer" + collapseNamedNodes) } case class SourceNode(override val members: List[Producer[Storm, _]] = List()) extends Node { def add(node: Producer[Storm, _]): Node = if(members.contains(node)) this else this.copy(members=node :: members) def reverse = this.copy(members.reverse) - override def shortName(): NodeIdentifier = NodeIdentifier("Source") + override def shortName(): NodeIdentifier = NodeIdentifier("Source" + collapseNamedNodes) } @@ -170,7 +177,7 @@ object StormDag { } } - val (nodeToName, _) = genNames(dag.tailN, dag, Map(dag.tailN -> "T"), Set("T")) + val (nodeToName, _) = genNames(dag.tailN, dag, Map(dag.tailN -> "Tail"), Set("Tail")) val nameToNode = nodeToName.map((t) => (t._2,t._1)) dag.copy(nodeToName = nodeToName, nameToNode = nameToNode) } From 68a8b2df38e602d6185e5b5120df98a35ba3f1e4 Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Mon, 23 Sep 2013 17:44:25 -0700 Subject: [PATCH 06/13] Fixed bug introduced by rename --- .../scala/com/twitter/summingbird/storm/PlannedTopology.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/PlannedTopology.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/PlannedTopology.scala index e609b0ab6..d2de6eda0 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/PlannedTopology.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/PlannedTopology.scala @@ -245,7 +245,9 @@ object DagBuilder { def maybeSplitThenRecurse[U, A](currentProducer: Prod[U], dep: Prod[A]): (List[Node], VisitedStore) = { val doSplit = dep match { case _ if (forkedNodes.contains(dep)) => true - case _ if (currentBolt.isInstanceOf[FlatMapNode] && allDepsMergeableWithSource(dep)) => true + // If we are a flatmap, but there haven't been any other flatmaps yet(i.e. the registry is of size 1, the summer). + // Then we must split to avoid a 2 level higherarchy + case _ if (currentBolt.isInstanceOf[FlatMapNode] && stormRegistry.size == 1 && allDepsMergeableWithSource(dep)) => true case _ if ((!mergableWithSource(currentProducer)) && allDepsMergeableWithSource(dep)) => true case _ => false } From 9253c2c035c09c0540d843371d3404908afc9ec7 Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Mon, 23 Sep 2013 18:22:44 -0700 Subject: [PATCH 07/13] Look up the config in the first found transitive dependency NamedProducer --- .../summingbird/storm/StormPlatform.scala | 48 ++++++++++--------- 1 file changed, 25 insertions(+), 23 deletions(-) diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala index 890d784dd..e48ec22b8 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala @@ -94,14 +94,17 @@ abstract class Storm(options: Map[String, Options], updateConf: Config => Config private type Prod[T] = Producer[Storm, T] - private def getOrElse[T: Manifest](idOpt: Option[String], default: T): T = + private def getOrElse[T: Manifest](node: Node, default: T): T = { + val producer = node.members.last + val namedNodes = Producer.transitiveDependenciesOf(producer).collect{case NamedProducer(_, n) => n} (for { - id <- idOpt + id <- namedNodes stormOpts <- options.get(id) option <- stormOpts.get[T] - } yield option).getOrElse(default) + } yield option).headOption.getOrElse(default) + } - private def scheduleFlatMapper(stormDag: StormDag, node: Node, topologyName: Option[String])(implicit topologyBuilder: TopologyBuilder) = { + private def scheduleFlatMapper(stormDag: StormDag, node: Node)(implicit topologyBuilder: TopologyBuilder) = { /** * Only exists because of the crazy casts we needed. */ @@ -128,8 +131,8 @@ abstract class Storm(options: Map[String, Options], updateConf: Config => Config } val nodeName = stormDag.getNodeName(node) val operation = foldOperations(node.members.reverse) - val metrics = getOrElse(topologyName, DEFAULT_FM_STORM_METRICS) - val anchorTuples = getOrElse(topologyName, AnchorTuples.default) + val metrics = getOrElse(node, DEFAULT_FM_STORM_METRICS) + val anchorTuples = getOrElse(node, AnchorTuples.default) val summerOpt:Option[SummerNode] = stormDag.dependantsOf(node).collect{case s: SummerNode => s}.headOption @@ -138,20 +141,20 @@ abstract class Storm(options: Map[String, Options], updateConf: Config => Config val summerProducer = s.members.collect { case s: Summer[Storm, _, _] => s }.head.asInstanceOf[Summer[Storm, _, _]] new FinalFlatMapBolt( operation.asInstanceOf[FlatMapOperation[Any, (Any, Any)]], - getOrElse(topologyName, DEFAULT_FM_CACHE), - getOrElse(topologyName, DEFAULT_FM_STORM_METRICS), + getOrElse(node, DEFAULT_FM_CACHE), + getOrElse(node, DEFAULT_FM_STORM_METRICS), anchorTuples)(summerProducer.monoid.asInstanceOf[Monoid[Any]], summerProducer.store.batcher) case None => new IntermediateFlatMapBolt(operation, metrics, anchorTuples) } - val parallelism = getOrElse(topologyName, DEFAULT_FM_PARALLELISM) + val parallelism = getOrElse(node, DEFAULT_FM_PARALLELISM) val declarer = topologyBuilder.setBolt(nodeName, bolt, parallelism.parHint) val dependsOnNames = stormDag.dependsOn(node).collect { case x: Node => stormDag.getNodeName(x) } dependsOnNames.foreach { declarer.shuffleGrouping(_) } } - private def scheduleSpout[K](stormDag: StormDag, node: Node, topologyName: Option[String])(implicit topologyBuilder: TopologyBuilder) = { + private def scheduleSpout[K](stormDag: StormDag, node: Node)(implicit topologyBuilder: TopologyBuilder) = { val spout = node.members.collect { case Source(s) => s }.head val nodeName = stormDag.getNodeName(node) @@ -161,11 +164,11 @@ abstract class Storm(options: Map[String, Options], updateConf: Config => Config case _ => sys.error("not possible, given the above call to span.") }.getSpout - val parallelism = getOrElse(topologyName, DEFAULT_SPOUT_PARALLELISM).parHint + val parallelism = getOrElse(node, DEFAULT_SPOUT_PARALLELISM).parHint topologyBuilder.setSpout(nodeName, stormSpout, parallelism) } - private def scheduleSinkBolt[K, V](stormDag: StormDag, node: Node, topologyName: Option[String])(implicit topologyBuilder: TopologyBuilder) = { + private def scheduleSinkBolt[K, V](stormDag: StormDag, node: Node)(implicit topologyBuilder: TopologyBuilder) = { val summer: Summer[Storm, K, V] = node.members.collect { case c: Summer[Storm, K, V] => c }.head implicit val monoid = summer.monoid val nodeName = stormDag.getNodeName(node) @@ -176,18 +179,18 @@ abstract class Storm(options: Map[String, Options], updateConf: Config => Config val sinkBolt = new SinkBolt[K, V]( supplier, - getOrElse(topologyName: Option[String], DEFAULT_ONLINE_SUCCESS_HANDLER), - getOrElse(topologyName: Option[String], DEFAULT_ONLINE_EXCEPTION_HANDLER), - getOrElse(topologyName: Option[String], DEFAULT_SINK_CACHE), - getOrElse(topologyName: Option[String], DEFAULT_SINK_STORM_METRICS), - getOrElse(topologyName: Option[String], DEFAULT_MAX_WAITING_FUTURES), - getOrElse(topologyName: Option[String], IncludeSuccessHandler.default)) + getOrElse(node, DEFAULT_ONLINE_SUCCESS_HANDLER), + getOrElse(node, DEFAULT_ONLINE_EXCEPTION_HANDLER), + getOrElse(node, DEFAULT_SINK_CACHE), + getOrElse(node, DEFAULT_SINK_STORM_METRICS), + getOrElse(node, DEFAULT_MAX_WAITING_FUTURES), + getOrElse(node, IncludeSuccessHandler.default)) val declarer = topologyBuilder.setBolt( nodeName, sinkBolt, - getOrElse(topologyName, DEFAULT_SINK_PARALLELISM).parHint) + getOrElse(node, DEFAULT_SINK_PARALLELISM).parHint) val dependsOnNames = stormDag.dependsOn(node).collect { case x: Node => stormDag.getNodeName(x) } dependsOnNames.foreach { parentName => @@ -221,13 +224,12 @@ abstract class Storm(options: Map[String, Options], updateConf: Config => Config implicit val config = baseConfig val stormDag = DagBuilder(tail) - val name: Option[String] = tail match { case NamedProducer(_, n) => Some(n) case _ => None } stormDag.nodes.map { node => node match { - case _: SummerNode => scheduleSinkBolt(stormDag, node, name) - case _: FlatMapNode => scheduleFlatMapper(stormDag, node, name) - case _: SourceNode => scheduleSpout(stormDag, node, name) + case _: SummerNode => scheduleSinkBolt(stormDag, node) + case _: FlatMapNode => scheduleFlatMapper(stormDag, node) + case _: SourceNode => scheduleSpout(stormDag, node) } } topologyBuilder.createTopology From 0b48de9f5a282a7868c238098c373982e7b50e5b Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Mon, 23 Sep 2013 18:30:03 -0700 Subject: [PATCH 08/13] Add named producer to test cases and code to handle --- .../src/test/scala/com/twitter/summingbird/TestGraphs.scala | 2 ++ .../scala/com/twitter/summingbird/storm/StormPlatform.scala | 2 ++ 2 files changed, 4 insertions(+) diff --git a/summingbird-core/src/test/scala/com/twitter/summingbird/TestGraphs.scala b/summingbird-core/src/test/scala/com/twitter/summingbird/TestGraphs.scala index 3593ad76b..a5958b595 100644 --- a/summingbird-core/src/test/scala/com/twitter/summingbird/TestGraphs.scala +++ b/summingbird-core/src/test/scala/com/twitter/summingbird/TestGraphs.scala @@ -80,8 +80,10 @@ object TestGraphs { (preJoinFn: T => TraversableOnce[(K, U)]) (postJoinFn: ((K, (U, Option[JoinedU]))) => TraversableOnce[(K, V)]): Summer[P, K, V] = source + .name("My named source") .flatMap(preJoinFn) .leftJoin(service) + .name("My named flatmap") .flatMap(postJoinFn) .sumByKey(store) } diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala index e48ec22b8..3a94e3084 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala @@ -125,6 +125,7 @@ abstract class Storm(options: Map[String, Options], updateConf: Config => Config case FlatMappedProducer(_, op) => acc.andThen(FlatMapOperation(op).asInstanceOf[FlatMapOperation[Any, Any]]) case WrittenProducer(_, sinkSupplier) => acc.andThen(FlatMapOperation.write(sinkSupplier.asInstanceOf[() => (Any => Future[Unit])])) case IdentityKeyedProducer(_) => acc + case NamedProducer(_, _) => acc case _ => throw new Exception("Not found! : " + p) } } @@ -161,6 +162,7 @@ abstract class Storm(options: Map[String, Options], updateConf: Config => Config val stormSpout = node.members.reverse.foldLeft(spout.asInstanceOf[Spout[(Long, Any)]]) { case (spout, Source(_)) => spout // The source is still in the members list so drop it case (spout, OptionMappedProducer(_, op)) => spout.flatMap {case (time, t) => op.apply(t).map { x => (time, x) }} + case (spout, NamedProducer(_, _)) => spout case _ => sys.error("not possible, given the above call to span.") }.getSpout From 1d5284f316335254573160046e901b0e5e3b3b87 Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Tue, 24 Sep 2013 11:33:39 -0700 Subject: [PATCH 09/13] Address comments, make the IdentityOperation a sub class --- .../summingbird/storm/FlatMapOperation.scala | 10 ++++++++- .../summingbird/storm/StormPlatform.scala | 22 ++----------------- 2 files changed, 11 insertions(+), 21 deletions(-) diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/FlatMapOperation.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/FlatMapOperation.scala index 05f68dcf2..1629af370 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/FlatMapOperation.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/FlatMapOperation.scala @@ -53,8 +53,16 @@ class FunctionFlatMapOperation[T, U](@transient fm: T => TraversableOnce[U]) def apply(t: T) = Future.value(boxed.get(t)) } +class IdentityFlatMapOperation[T] extends FlatMapOperation[T, T] { + // By default we do the identity function + def apply(t: T): Future[TraversableOnce[T]] = Future.value(Some(t)) + + // But if we are composed with something else, just become it + override def andThen[V](fmo: FlatMapOperation[T, V]): FlatMapOperation[T, V] = fmo +} + object FlatMapOperation { - def identity[T] = FlatMapOperation { t: T => Some(t) } + def identity[T]: FlatMapOperation[T, T] = new IdentityFlatMapOperation() def apply[T, U](fm: T => TraversableOnce[U]): FlatMapOperation[T, U] = new FunctionFlatMapOperation(fm) diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala index 3a94e3084..ac9553176 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala @@ -71,20 +71,6 @@ object Storm { Producer.source[Storm, T](timedSpout(spout)) } -/** - * Object containing helper functions to build up the list of storm - * operations that can potentially be optimized. - */ -sealed trait FMItem -case class OptionMap[T, U]() extends FMItem -case class FactoryCell(factory: StoreFactory[_, _]) extends FMItem -case class FlatMap(op: FlatMapOperation[_, _]) extends FMItem - -object FMItem { - def sink[T](sinkSupplier: () => (T => Future[Unit])): FMItem = - FlatMap(FlatMapOperation.write(sinkSupplier)) -} - abstract class Storm(options: Map[String, Options], updateConf: Config => Config) extends Platform[Storm] { type Source[+T] = Spout[(Long, T)] type Store[-K, V] = StormStore[K, V] @@ -96,7 +82,8 @@ abstract class Storm(options: Map[String, Options], updateConf: Config => Config private def getOrElse[T: Manifest](node: Node, default: T): T = { val producer = node.members.last - val namedNodes = Producer.transitiveDependenciesOf(producer).collect{case NamedProducer(_, n) => n} + + val namedNodes = Dependants(producer).transitiveDependantsOf(producer).collect{case NamedProducer(_, n) => n} (for { id <- namedNodes stormOpts <- options.get(id) @@ -108,11 +95,6 @@ abstract class Storm(options: Map[String, Options], updateConf: Config => Config /** * Only exists because of the crazy casts we needed. */ - def serviceOperation[K, V, W](store: StoreFactory[_, _]) = - FlatMapOperation.combine( - FlatMapOperation.identity[(K, V)], - store.asInstanceOf[StoreFactory[K, W]]) - def foldOperations(producers: List[Producer[Storm, _]]): FlatMapOperation[Any, Any] = { producers.foldLeft(FlatMapOperation.identity[Any]) { case (acc, p) => From ec3128bb472c74b556e81c45ba3117fbf0ac78a0 Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Thu, 26 Sep 2013 10:57:04 -0700 Subject: [PATCH 10/13] Address naming issues to avoid confusion --- .../summingbird/storm/PlannedTopology.scala | 25 +++++++++++-------- .../summingbird/storm/StormPlatform.scala | 6 ++--- 2 files changed, 17 insertions(+), 14 deletions(-) diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/PlannedTopology.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/PlannedTopology.scala index d2de6eda0..6aa1e13b1 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/PlannedTopology.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/PlannedTopology.scala @@ -98,8 +98,8 @@ case class StormDag(tail: Producer[Storm, _], producerToNode: Map[Producer[Storm nodes: List[Node], nodeToName: Map[Node, String] = Map(), nameToNode: Map[String, Node] = Map(), - dependsOnM: Map[Node, List[Node]] = Map[Node, List[Node]](), - dependantOfM: Map[Node, List[Node]] = Map[Node, List[Node]]()) { + dependenciesOfM: Map[Node, List[Node]] = Map[Node, List[Node]](), + dependantsOfM: Map[Node, List[Node]] = Map[Node, List[Node]]()) { def connect(src: Node, dest: Node): StormDag = { if (src == dest) { this @@ -108,12 +108,15 @@ case class StormDag(tail: Producer[Storm, _], producerToNode: Map[Producer[Storm // We build/maintain two maps, // Nodes to which each node depends on // and nodes on which each node depends - val oldDependsOnTargets = dependsOnM.getOrElse(src, List[Node]()) - val dependsOnTargets = if(oldDependsOnTargets.contains(dest)) oldDependsOnTargets else (dest :: oldDependsOnTargets) - val oldDependantOfTargets = dependantOfM.getOrElse(dest, List[Node]()) - val dependantOfTargets = if(oldDependantOfTargets.contains(src)) oldDependantOfTargets else (src :: oldDependantOfTargets) - - copy(dependsOnM = dependsOnM + (src -> dependsOnTargets) , dependantOfM = dependantOfM + (dest -> dependantOfTargets)) + val oldSrcDependants = dependantsOfM.getOrElse(src, List[Node]()) + val newSrcDependants = if(oldSrcDependants.contains(dest)) oldSrcDependants else (dest :: oldSrcDependants) + val newDependantsOfM = dependantsOfM + (src -> newSrcDependants) + + val oldDestDependencies = dependenciesOfM.getOrElse(dest, List[Node]()) + val newDestDependencies = if(oldDestDependencies.contains(src)) oldDestDependencies else (src :: oldDestDependencies) + val newDependenciesOfM = dependenciesOfM + (dest -> newDestDependencies) + + copy(dependenciesOfM = newDependenciesOfM, dependantsOfM = newDependantsOfM) } } @@ -124,8 +127,8 @@ case class StormDag(tail: Producer[Storm, _], producerToNode: Map[Producer[Storm def getNodeName(n: Node): String = nodeToName(n) def tailN: Node = producerToNode(tail) - def dependantsOf(n: Node): List[Node] = dependsOnM.get(n).getOrElse(List()) - def dependsOn(n: Node): List[Node] = dependantOfM.get(n).getOrElse(List()) + def dependantsOf(n: Node): List[Node] = dependantsOfM.get(n).getOrElse(List()) + def dependenciesOf(n: Node): List[Node] = dependenciesOfM.get(n).getOrElse(List()) def toStringWithPrefix(prefix: String): String = { prefix + "StormDag\n" + nodes.foldLeft(""){ case (str, node) => @@ -167,7 +170,7 @@ object StormDag { } def genNames(dep: Node, dag: StormDag, outerNodeToName: Map[Node, String], usedNames: Set[String]): (Map[Node, String], Set[String]) = { - dag.dependsOn(dep).foldLeft((outerNodeToName, usedNames)) {case ((nodeToName, taken), n) => + dag.dependenciesOf(dep).foldLeft((outerNodeToName, usedNames)) {case ((nodeToName, taken), n) => val name = tryGetName(nodeToName(dep) + "-" + n.shortName, taken) val useName = nodeToName.get(n) match { case None => name diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala index ac9553176..eb7e7a44a 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala @@ -133,7 +133,7 @@ abstract class Storm(options: Map[String, Options], updateConf: Config => Config val parallelism = getOrElse(node, DEFAULT_FM_PARALLELISM) val declarer = topologyBuilder.setBolt(nodeName, bolt, parallelism.parHint) - val dependsOnNames = stormDag.dependsOn(node).collect { case x: Node => stormDag.getNodeName(x) } + val dependsOnNames = stormDag.dependantsOf(node).collect { case x: Node => stormDag.getNodeName(x) } dependsOnNames.foreach { declarer.shuffleGrouping(_) } } @@ -176,8 +176,8 @@ abstract class Storm(options: Map[String, Options], updateConf: Config => Config sinkBolt, getOrElse(node, DEFAULT_SINK_PARALLELISM).parHint) - val dependsOnNames = stormDag.dependsOn(node).collect { case x: Node => stormDag.getNodeName(x) } - dependsOnNames.foreach { parentName => + val dependantsOfNames = stormDag.dependantsOf(node).collect { case x: Node => stormDag.getNodeName(x) } + dependantsOfNames.foreach { parentName => declarer.fieldsGrouping(parentName, new Fields(AGG_KEY)) } From 0281d523bd53d0ba61e729e1071a419408833ea5 Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Thu, 26 Sep 2013 11:02:15 -0700 Subject: [PATCH 11/13] Fix tests to match --- .../twitter/summingbird/storm/TopologyPlannerLaws.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/summingbird-storm/src/test/scala/com/twitter/summingbird/storm/TopologyPlannerLaws.scala b/summingbird-storm/src/test/scala/com/twitter/summingbird/storm/TopologyPlannerLaws.scala index 4f6e23166..3078f5e17 100644 --- a/summingbird-storm/src/test/scala/com/twitter/summingbird/storm/TopologyPlannerLaws.scala +++ b/summingbird-storm/src/test/scala/com/twitter/summingbird/storm/TopologyPlannerLaws.scala @@ -114,7 +114,7 @@ object TopologyPlannerLaws extends Properties("StormDag") { val firstP = n.members.last firstP match { case Summer(_, _, _) => - dag.dependsOn(n).forall {otherN => + dag.dependantsOf(n).forall {otherN => otherN.members.head.isInstanceOf[KeyedProducer[_, _, _]] } case _ => true @@ -139,7 +139,7 @@ object TopologyPlannerLaws extends Properties("StormDag") { dag.nodes.forall{n => n match { case _: SourceNode => true - case _ => dag.dependsOn(n).size > 0 + case _ => dag.dependenciesOf(n).size > 0 } } } @@ -149,7 +149,7 @@ object TopologyPlannerLaws extends Properties("StormDag") { dag.nodes.forall{n => n match { case _: SourceNode => - dag.dependsOn(n).size == 0 && dag.dependantsOf(n).size > 0 + dag.dependenciesOf(n).size == 0 && dag.dependantsOf(n).size > 0 case _ => true } } @@ -161,7 +161,7 @@ object TopologyPlannerLaws extends Properties("StormDag") { val firstP = n.members.last val success = firstP match { case Summer(_, _, _) => - dag.dependsOn(n).size > 0 && dag.dependsOn(n).forall {otherN => + dag.dependenciesOf(n).size > 0 && dag.dependenciesOf(n).forall {otherN => otherN.isInstanceOf[FlatMapNode] } case _ => true From 56b0b2567d251dc9ea97350b16b0b40e71987f10 Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Thu, 26 Sep 2013 13:25:24 -0700 Subject: [PATCH 12/13] Fix platform bugs --- .../com/twitter/summingbird/storm/StormPlatform.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala index eb7e7a44a..afedebaff 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala @@ -133,8 +133,8 @@ abstract class Storm(options: Map[String, Options], updateConf: Config => Config val parallelism = getOrElse(node, DEFAULT_FM_PARALLELISM) val declarer = topologyBuilder.setBolt(nodeName, bolt, parallelism.parHint) - val dependsOnNames = stormDag.dependantsOf(node).collect { case x: Node => stormDag.getNodeName(x) } - dependsOnNames.foreach { declarer.shuffleGrouping(_) } + val dependenciesNames = stormDag.dependenciesOf(node).collect { case x: Node => stormDag.getNodeName(x) } + dependenciesNames.foreach { declarer.shuffleGrouping(_) } } private def scheduleSpout[K](stormDag: StormDag, node: Node)(implicit topologyBuilder: TopologyBuilder) = { @@ -176,8 +176,8 @@ abstract class Storm(options: Map[String, Options], updateConf: Config => Config sinkBolt, getOrElse(node, DEFAULT_SINK_PARALLELISM).parHint) - val dependantsOfNames = stormDag.dependantsOf(node).collect { case x: Node => stormDag.getNodeName(x) } - dependantsOfNames.foreach { parentName => + val dependenciesNames = stormDag.dependenciesOf(node).collect { case x: Node => stormDag.getNodeName(x) } + dependenciesNames.foreach { parentName => declarer.fieldsGrouping(parentName, new Fields(AGG_KEY)) } From a0569157060f494d0e32fcffbfbd96e528b552ee Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Thu, 26 Sep 2013 13:25:46 -0700 Subject: [PATCH 13/13] Add information about how we map between clusters to the output graph --- .../main/scala/com/twitter/summingbird/storm/viz/viz.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/viz/viz.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/viz/viz.scala index fd4f04159..54f55f56c 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/viz/viz.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/viz/viz.scala @@ -78,8 +78,12 @@ case class VizGraph(dag: StormDag) { val nextCluster = "subgraph %s {\n\tlabel=\"%s\"\n%s\n}\n".format(shortName, dag.getNodeName(node), nodeDefinitions.mkString("\n")) (nextCluster :: clusters, mappings ++ producerMappings, newNameLookupTable, newNodeShortName) } + + val clusterMappings = dag.nodes.flatMap{case node => + dag.dependantsOf(node).collect{case n => "cluster_%s -> cluster_%s [style=dashed]".format(node.hashCode.toHexString, n.hashCode.toHexString)} + } - "digraph summingbirdGraph {\n" + (clusters ++ producerMappings).mkString("\n") + "\n}" + "digraph summingbirdGraph {\n" + (clusters ++ producerMappings ++ clusterMappings).mkString("\n") + "\n}" } override def toString() : String = genClusters