Skip to content
This repository has been archived by the owner on Jan 20, 2022. It is now read-only.

Fix Storm Planner Bug #217

Merged
merged 3 commits into from
Sep 17, 2013
Merged

Fix Storm Planner Bug #217

merged 3 commits into from
Sep 17, 2013

Conversation

sritchie
Copy link
Collaborator

Verified the fix with some println debugging plus this:

object Test {
  import com.twitter.storehaus._
  import com.twitter.storehaus.algebra.MergeableStore
  import com.twitter.summingbird.storm._
  import com.twitter.summingbird._
  import com.twitter.summingbird.batch.{BatchID, Batcher}
  import com.twitter.tormenta.spout.Spout

  implicit val te = TimeExtractor[Int](i => i.toLong)
  implicit val batcher = Batcher.ofHours(1)
  val node = Storm.source(Spout.fromTraversable(Seq(1,2,3))).flatMap { i => List(i, i, i) }

  val newNode = node.flatMap(i => Some(i)).write(() => _ => com.twitter.util.Future.Unit)

  try {
    Storm.local("face").plan {
      node.either(newNode).flatMap {
        case Left(t) => Some(t)
        case Right(r) => None
      }.map(_ -> 1L).sumByKey {
        MergeableStoreSupplier.from {
          MergeableStore.fromStore[(Int, BatchID), Long](new JMapStore())
        }
      }
    }
  } catch {
    case _ => null
  }
}


class WriteOperation[T](sinkSupplier: () => (T => Future[Unit])) extends FlatMapOperation[T, T] {
lazy val sink = sinkSupplier()
override def apply(t: T) = sink(t).map { _ => Some(t) }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could .recover here to make sure a write doesn't drop the downstream.

Is that the right thing or let it fail, and see the restarts in nimbus?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I say we rescue. We need to do the same thing after service joins, I think. To do this right, I think we need default handlers all over.

johnynek added a commit that referenced this pull request Sep 17, 2013
@johnynek johnynek merged commit ed7d4d7 into develop Sep 17, 2013
@johnynek johnynek deleted the feature/storm_planner_change branch September 17, 2013 17:27
snoble pushed a commit to snoble/summingbird that referenced this pull request Sep 8, 2017
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants