Skip to content
This repository has been archived by the owner on Jan 20, 2022. It is now read-only.

Feature/add storm dot graph #236

Merged
merged 17 commits into from
Sep 22, 2013
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Updates to reflect comments from github
  • Loading branch information
ianoc committed Sep 21, 2013
commit 4bef8e9c016ba38316f3535a247a58856acd05f5
Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,8 @@

package com.twitter.summingbird.storm

import backtype.storm.LocalCluster
import backtype.storm.Testing
import backtype.storm.testing.CompleteTopologyParam
import backtype.storm.testing.MockedSources
import backtype.storm.tuple.Fields
import backtype.storm.{Config, StormSubmitter}
import backtype.storm.generated.StormTopology
import backtype.storm.topology.{ BoltDeclarer, TopologyBuilder }
import com.twitter.algebird.Monoid
import com.twitter.bijection.Injection
import com.twitter.chill.InjectionPair
import com.twitter.storehaus.algebra.MergeableStore
import com.twitter.storehaus.algebra.MergeableStore.enrich
import com.twitter.summingbird.batch.{ BatchID, Batcher }
import com.twitter.summingbird.storm.option.{ AnchorTuples, IncludeSuccessHandler }
import com.twitter.summingbird.util.CacheSize
import com.twitter.summingbird.kryo.KryoRegistrationHelper
import com.twitter.tormenta.spout.Spout
import com.twitter.summingbird._
import com.twitter.util.Future

import Constants._
import scala.annotation.tailrec

sealed trait StormNode {
val members: List[Producer[Storm, _]] = List()
Expand Down Expand Up @@ -79,6 +58,9 @@ sealed trait StormNode {

}

// This is the default state for StormNodes if there is nothing special about them.
// There can be an unbounded number of these and there is no hard restrictions on ordering/where. Other than
// locations which must be one of the others
case class IntermediateFlatMapStormBolt(override val members: List[Producer[Storm, _]] = List()) extends StormNode {
def add(node: Producer[Storm, _]): StormNode = if(members.contains(node)) this else this.copy(members=node :: members)
override def getName = "Intermediate Flatmap Bolt"
Expand All @@ -105,9 +87,10 @@ case class StormSpout(override val members: List[Producer[Storm, _]] = List()) e



case class StormDag(tail: Producer[Storm, _], nodeLut: Map[Producer[Storm, _], StormNode],
dependsOnM: Map[StormNode, Set[StormNode]], dependantOfM: Map[StormNode, Set[StormNode]],
allNodes: Set[StormNode]) {
case class StormDag(tail: Producer[Storm, _], producerToNode: Map[Producer[Storm, _], StormNode],
nodes: Set[StormNode],
dependsOnM: Map[StormNode, Set[StormNode]] = Map[StormNode, Set[StormNode]](),
dependantOfM: Map[StormNode, Set[StormNode]] = Map[StormNode, Set[StormNode]]()) {
def connect(src: StormNode, dest: StormNode): StormDag = {
if (src == dest) {
this
Expand All @@ -118,154 +101,126 @@ case class StormDag(tail: Producer[Storm, _], nodeLut: Map[Producer[Storm, _], S
// and nodes on which each node depends
val dependsOnTargets = dependsOnM.getOrElse(src, Set[StormNode]())
val dependantOfTargets = dependantOfM.getOrElse(dest, Set[StormNode]())

StormDag(tail, nodeLut, dependsOnM + (src -> (dependsOnTargets + dest) ), dependantOfM + (dest -> (dependantOfTargets + src)), allNodes)
copy(dependsOnM = dependsOnM + (src -> (dependsOnTargets + dest) ), dependantOfM = dependantOfM + (dest -> (dependantOfTargets + src)))
}
}

def nodes = allNodes

def locate(p: Producer[Storm, _]) = nodeLut.get(p)

def connect(src: Producer[Storm, _], dest: Producer[Storm, _]): StormDag = {
val newDag = for {
lNode <- locate(src)
rNode <- locate(dest)
} yield connect(lNode, rNode)
newDag.getOrElse(this)
}
def locateOpt(p: Producer[Storm, _]): Option[StormNode] = producerToNode.get(p)
def locate(p: Producer[Storm, _]): StormNode = locateOpt(p).get
def connect(src: Producer[Storm, _], dest: Producer[Storm, _]): StormDag = connect(locate(src), locate(dest))

def dependantsOf(n: StormNode): Set[StormNode] = dependsOnM.get(n).getOrElse(Set())
def dependsOn(n: StormNode): Set[StormNode] = dependantOfM.get(n).getOrElse(Set())

def toStringWithPrefix(prefix: String): String = {
prefix + "StormDag\n" + allNodes.foldLeft(""){ case (str, node) =>
prefix + "StormDag\n" + nodes.foldLeft(""){ case (str, node) =>
str + node.toStringWithPrefix(prefix + "\t") + "\n"
}
}
override def toString(): String = {
val res = toStringWithPrefix("\t")
var dumpNumber = 1
def dumpFailingGraph(dag: StormDag) = {
import java.io._
import com.twitter.summingbird.viz.BaseViz
val writer2 = new PrintWriter(new File("/tmp/failingBaseGraph" + dumpNumber + ".dot"))
BaseViz(dag.tail, writer2)
writer2.close()
dumpNumber = dumpNumber + 1
}
def dumpSFailingGraph(dag: StormDag) = {
import java.io._
import com.twitter.summingbird.storm.viz.StormViz
val writer2 = new PrintWriter(new File("/tmp/failingSGraph" + dumpNumber + ".dot"))
StormViz(dag.tail, writer2)
writer2.close()
dumpNumber = dumpNumber + 1
}
dumpSFailingGraph(this)
dumpFailingGraph(this)
res
}

override def toString(): String = toStringWithPrefix("\t")
}

object StormDag {
def build(registry: StormRegistry) : StormDag = {
val nodeLut = registry.buildLut
registry.registry.foldLeft(StormDag(registry.tail, nodeLut, Map[StormNode, Set[StormNode]](), Map[StormNode, Set[StormNode]](), registry.registry)){ (curDag, stormNode) =>
stormNode.members.foldLeft(curDag) { (innerDag, outerProducer) =>
outerProducer match {
case Summer(producer, _, _) => innerDag.connect(producer, outerProducer)
case IdentityKeyedProducer(producer) => innerDag.connect(producer, outerProducer)
case NamedProducer(producer, newId) => innerDag.connect(producer, outerProducer)
case OptionMappedProducer(producer, op, manifest) => innerDag.connect(producer, outerProducer)
case FlatMappedProducer(producer, op) => innerDag.connect(producer, outerProducer)
case WrittenProducer(producer, sinkSupplier) => innerDag.connect(producer, outerProducer)
case LeftJoinedProducer(producer, StoreWrapper(newService)) => innerDag.connect(producer, outerProducer)
case MergedProducer(l, r) => innerDag.connect(l, outerProducer).connect(r, outerProducer)
case Source(_, _) => innerDag
}
def buildProducerToNodeLookUp(stormNodeSet: Set[StormNode]) : Map[Producer[Storm, _], StormNode] = {
stormNodeSet.foldLeft(Map[Producer[Storm, _], StormNode]()){ (curRegistry, stormNode) =>
stormNode.members.foldLeft(curRegistry) { (innerRegistry, producer) =>
(innerRegistry + (producer -> stormNode))
}
}
}
}

case class StormRegistry(tail: Producer[Storm, _], registry: Set[StormNode] = Set[StormNode]()) {

def register(n: StormNode): StormRegistry = {
if(n.members.size > 0) {
StormRegistry(tail, registry + n.reverse )
} else this
}

def buildLut() : Map[Producer[Storm, _], StormNode] = {
registry.foldLeft(Map[Producer[Storm, _], StormNode]()){ (curRegistry, stormNode) =>
stormNode.members.foldLeft(curRegistry) { (innerRegistry, producer) =>
(innerRegistry + (producer -> stormNode))
def build(tail: Producer[Storm, _], registry: Set[StormNode]) : StormDag = {
val producerToNode = buildProducerToNodeLookUp(registry)

registry.foldLeft(StormDag(tail, producerToNode, registry)){ (curDag, stormNode) =>
// Here we are building the StormDag's connection topology.
// We visit every producer and connect the StormNode's represented by its dependant and dependancies.
// Producers which live in the same node will result in a NOP in connect.
stormNode.members.foldLeft(curDag) { (innerDag, dependantProducer) =>
dependantProducer match {
case Summer(producer, _, _) => innerDag.connect(producer, dependantProducer)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not:

Producer
  .dependenciesOf(dependantProducer)
  .foldLeft(innerDag) { {dag, dep) => dag.connect(dep, dependantProducer) }

Would that work and save some code duplication with dependenciesOf?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's address this, if you agree in a next PR.

case IdentityKeyedProducer(producer) => innerDag.connect(producer, dependantProducer)
case NamedProducer(producer, newId) => innerDag.connect(producer, dependantProducer)
case OptionMappedProducer(producer, op, manifest) => innerDag.connect(producer, dependantProducer)
case FlatMappedProducer(producer, op) => innerDag.connect(producer, dependantProducer)
case WrittenProducer(producer, sinkSupplier) => innerDag.connect(producer, dependantProducer)
case LeftJoinedProducer(producer, StoreWrapper(newService)) => innerDag.connect(producer, dependantProducer)
case MergedProducer(l, r) => innerDag.connect(l, dependantProducer).connect(r, dependantProducer)
case Source(_, _) => innerDag
}
}
}
}
}

object StormTopologyBuilder {
object DagBuilder {
private type Prod[T] = Producer[Storm, T]
private type VisitedStore = Set[Prod[_]]

def apply[P](tail: Producer[Storm, P]): StormDag = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is weird that you have two apply methods that return two different types. They should either be renamed, or return one type.

For apply, I think it works for Function-like objects (one return type), or constructor-like methods (return an instance of the trait or class that the object is a companion of).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait, got cusfused by the inner class with a lowercase name.

val stormRegistry = new buildReg(tail)()
StormDag.build(stormRegistry)
val stormNodeSet = buildStormNodesSet(tail)

// The nodes are added in a summer -> source manner
// but its easier to look at laws in a source -> summer manner
// We also drop all StormNodes with no members(may occur when we visit a node already seen and its the first in that Node)
val revsersedNodeSet = stormNodeSet.filter(_.members.size > 0).foldLeft(Set[StormNode]()){(nodes, n) => nodes + n.reverse}
StormDag.build(tail, revsersedNodeSet)
}

private class buildReg[P](tail: Producer[Storm, P]) {
private val dep = Dependants(tail)
private val forkedNodes =
Producer.transitiveDependenciesOf(tail)
.filter(dep.fanOut(_).exists(_ > 1)).toSet
// This takes an initial pass through all of the Producers, assigning them to StormNodes
private def buildStormNodesSet[P](tail: Producer[Storm, P]): Set[StormNode] = {
val dep = Dependants(tail)
val forkedNodes = Producer.transitiveDependenciesOf(tail)
.filter(dep.fanOut(_).exists(_ > 1)).toSet

def apply(): StormRegistry = {
val (stormRegistry, _) = recursiveCollect(tail, IntermediateFlatMapStormBolt(), StormRegistry(tail), Set())
stormRegistry
}

private def recursiveCollect[T](outerProducer: Prod[T], previousBolt: StormNode, stormRegistry: StormRegistry, visited: VisitedStore) : (StormRegistry, VisitedStore) = {
if (visited.contains(outerProducer)) {
(stormRegistry.register(previousBolt), visited)
// Add the dependentProducer to a StormNode along with each of its dependencies in turn.
def addWithDependencies[T](dependantProducer: Prod[T], previousBolt: StormNode,
stormRegistry: Set[StormNode], visited: VisitedStore) : (Set[StormNode], VisitedStore) = {
if (visited.contains(dependantProducer)) {
(stormRegistry + previousBolt, visited)
} else {
val currentBolt = previousBolt.add(outerProducer)
val visitedWithN = visited + outerProducer
val currentBolt = previousBolt.add(dependantProducer)
val visitedWithN = visited + dependantProducer

def recurse[U](
producer: Prod[U],
updatedBolt: StormNode = currentBolt,
updatedDag: StormRegistry = stormRegistry,
updatedDag: Set[StormNode] = stormRegistry,
visited: VisitedStore = visitedWithN)
: (StormRegistry, VisitedStore) = {
recursiveCollect(producer, updatedBolt, updatedDag, visited)
: (Set[StormNode], VisitedStore) = {
addWithDependencies(producer, updatedBolt, updatedDag, visited)
}

def mergableWithSource(dependency: Prod[_]): Boolean = {
dependency match {
case NamedProducer(producer, _) => mergableWithSource(producer)
case IdentityKeyedProducer(producer) => mergableWithSource(producer)
case OptionMappedProducer(producer, _, _) => mergableWithSource(producer)
def mergableWithSource(dep: Prod[_]): Boolean = {
dep match {
case NamedProducer(producer, _) => true
case IdentityKeyedProducer(producer) => true
case OptionMappedProducer(producer, _, _) => true
case Source(_, _) => true
case _ => false
}
}

def allDepsMergeableWithSource(p: Prod[_]): Boolean = mergableWithSource(p) && Producer.dependenciesOf(p).forall(allDepsMergeableWithSource)

/*
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add something like:

The purpose of this method is to see if we need to add a new physical node to the graph, or if we can continue by adding this producer to the current physical node.

Is that right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, added

* The purpose of this method is to see if we need to add a new physical node to the graph,
* or if we can continue by adding this producer to the current physical node.
*
* This function acts as a look ahead, rather than depending on the state of the current node it depends
* on the nodes further along in the dag. That is conditions for spliting into multiple StormNodes are based on as yet
* unvisisted Producers.
*/
def maybeSplit[A](dep: Prod[A], outerProducerMergableWithSource: Boolean = true): (StormRegistry, VisitedStore) = {
def maybeSplitThenRecurse[U, A](currentProducer: Prod[U], dep: Prod[A]): (Set[StormNode], VisitedStore) = {
val doSplit = dep match {
case _ if (forkedNodes.contains(dep)) => true
case _ if (currentBolt.isInstanceOf[FinalFlatMapStormBolt] && mergableWithSource(dep)) => true
case _ if (!outerProducerMergableWithSource && mergableWithSource(dep)) => true
case _ if (currentBolt.isInstanceOf[FinalFlatMapStormBolt] && allDepsMergeableWithSource(dep)) => true
case _ if ((!mergableWithSource(currentProducer)) && allDepsMergeableWithSource(dep)) => true
case _ => false
}
if (doSplit) {
recurse(dep, updatedBolt = IntermediateFlatMapStormBolt(), updatedDag = stormRegistry.register(currentBolt))
recurse(dep, updatedBolt = IntermediateFlatMapStormBolt(), updatedDag = stormRegistry + currentBolt)
} else {
recurse(dep)
}
Expand All @@ -282,6 +237,7 @@ object StormTopologyBuilder {
def mergeCollapse[A](p: Prod[A]): (Set[Prod[A]], Set[Prod[A]]) = {
p match {
case MergedProducer(subL, subR) if !forkedNodes.contains(p) =>
// TODO support de-duping self merges https://github.com/twitter/summingbird/issues/237
if(subL == subR) throw new Exception("Storm doesn't support both the left and right sides of a join being the same node.")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same TODO. I think this is solvabale pretty easily: replace (x ++ x) with x.flatMap { t => Set(t, t) }. When we make the big list of merges at the head of a FlatMap node, all of them need to be distinct actually. If anything is repeated, we need to do the doubling trick I guess. Leave this as a TODO: de-duping self merges.

val (lMergeNodes, lSiblings) = mergeCollapse(subL)
val (rMergeNodes, rSiblings) = mergeCollapse(subR)
Expand All @@ -290,28 +246,30 @@ object StormTopologyBuilder {
}
}



outerProducer match {
case Summer(producer, _, _) => recurse(producer, updatedBolt = FinalFlatMapStormBolt(), updatedDag = stormRegistry.register(currentBolt.toSummer))
case IdentityKeyedProducer(producer) => maybeSplit(producer)
case NamedProducer(producer, newId) => maybeSplit(producer)
case Source(spout, manifest) => (stormRegistry.register(currentBolt.toSpout), visitedWithN)
case OptionMappedProducer(producer, op, manifest) => maybeSplit(producer)
case FlatMappedProducer(producer, op) => maybeSplit(producer, outerProducerMergableWithSource = false)
case WrittenProducer(producer, sinkSupplier) => maybeSplit(producer, outerProducerMergableWithSource = false)
case LeftJoinedProducer(producer, StoreWrapper(newService)) => maybeSplit(producer, outerProducerMergableWithSource = false)
dependantProducer match {
case Summer(producer, _, _) => recurse(producer, updatedBolt = FinalFlatMapStormBolt(), updatedDag = stormRegistry + currentBolt.toSummer)
case IdentityKeyedProducer(producer) => maybeSplitThenRecurse(dependantProducer, producer)
case NamedProducer(producer, newId) => maybeSplitThenRecurse(dependantProducer, producer)
case Source(spout, manifest) => (stormRegistry + currentBolt.toSpout, visitedWithN)
case OptionMappedProducer(producer, op, manifest) => maybeSplitThenRecurse(dependantProducer, producer)
case FlatMappedProducer(producer, op) => maybeSplitThenRecurse(dependantProducer, producer)
case WrittenProducer(producer, sinkSupplier) => maybeSplitThenRecurse(dependantProducer, producer)
case LeftJoinedProducer(producer, StoreWrapper(newService)) => maybeSplitThenRecurse(dependantProducer, producer)
case MergedProducer(l, r) =>
// TODO support de-duping self merges https://github.com/twitter/summingbird/issues/237
if(l == r) throw new Exception("Storm doesn't support both the left and right sides of a join being the same node.")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a TODO with a link to a github issue (support self merges in the Dot-planner).

val (otherMergeNodes, siblings) = mergeCollapse(outerProducer)
val (otherMergeNodes, dependencies) = mergeCollapse(dependantProducer)
val newCurrentBolt = otherMergeNodes.foldLeft(currentBolt)(_.add(_))
val visitedWithOther = otherMergeNodes.foldLeft(visitedWithN){ (visited, n) => visited + n }
val startingReg = stormRegistry.register(newCurrentBolt)
siblings.foldLeft((startingReg, visitedWithOther)) { case ((newStormReg, newVisited), n) =>

// Recurse down all the newly generated dependencies
dependencies.foldLeft((stormRegistry + newCurrentBolt, visitedWithOther)) { case ((newStormReg, newVisited), n) =>
recurse(n, IntermediateFlatMapStormBolt(), newStormReg, newVisited)
}
}
}
}
val (stormRegistry, _) = addWithDependencies(tail, IntermediateFlatMapStormBolt(), Set[StormNode](), Set())
stormRegistry
}
}
Loading