-
Notifications
You must be signed in to change notification settings - Fork 267
Conversation
…orm. Attempt to but some better types down on the stores/services we use in storm that aren't storm specific.
@@ -18,12 +18,12 @@ package com.twitter.summingbird.store | |||
|
|||
import com.twitter.summingbird.online.Externalizer | |||
import com.twitter.storehaus.ReadableStore | |||
import com.twitter.storehaus.algebra.MergeableStore | |||
import com.twitter.storehaus.algebra.Mergeable | |||
import com.twitter.summingbird.batch.BatchID |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why did you make it just a Mergeable? Aren't we dealing with Stores still?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had already done it in storm , so this brings it in line. We don't actually use more of the contract in our code than it being a Mergeable. I.e. we only actually call multiMerge, never get or put. So you can do stuff without providing those functions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
got it
Looks good to me, added a few suggestions for clarifications. |
@@ -62,4 +62,17 @@ object Timestamp { | |||
def prev(old: Timestamp) = if (old.milliSinceEpoch != Long.MinValue) Some(old.prev) else None | |||
def ordering: Ordering[Timestamp] = Timestamp.orderingOnTimestamp | |||
} | |||
|
|||
// This is a right semigroup, that given any two Timestamps just take the one on the right. | |||
val rightSemigroup = new Semigroup[Timestamp] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting, I'm surprised we don't have this in algebird.
Is there a reason we prefer this to the Max semigroup?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its just a kind of known quantity. I don't believe the TS really makes that much sense in any case after this other than being within the bounds of some range usually. This is just a move here from elsewhere. The whole usage probably could do with more thought. (maybe Max, but then with aggregations you could go down also in a stream, so Right at least indicates its pretty arbitrary)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is strange to me. Where would we use this over Max, which is the default monoid for Timestamp.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was your choice Oscar ;)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right. I remember the situation now. We should probably comment it better. The reason we did this is because we don't want to give a stronger contract to the semigroup than the store actually respects.
Then looks back up the timestamp handed from the stream and outputs with that. | ||
*/ | ||
def wrapOnlineFactory[K, V](supplier: MergeableStoreFactory[K, V]): MergeableStoreFactory[K, (Timestamp, V)] = | ||
{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
curly on previous line?
@@ -32,6 +32,14 @@ trait FlatMapOperation[-T, +U] extends Serializable with Closeable { | |||
*/ | |||
def maybeFlush: Future[TraversableOnce[U]] = Future.value(Seq.empty[U]) | |||
|
|||
// Helper to add a simple U => S operation at the end of a FlatMapOperation | |||
def map[S](fn: U => S): FlatMapOperation[T, S] = | |||
andThen(FlatMapOperation(fn.andThen(r => Seq(r)))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we do r => Iterator(r)
here because I am afraid of creating an in-memory representation needlessly by forcing this to Seq.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, but then as a map its output should be just 1 element I believe?
Summingbird Storm/Online refactor
* add Semigroup.maybePlus * Add new test * remove -adapted-args
based ontop of #539, so when we merge that I can rebase/merge to make the diff a little cleaner. This is much bigger though so didn't want to get stuck with merge conflicts.
This however does not effect behavior of jobs(though it is source code incompatible at this time) at all. Its mostly moving things out of storm into online where possible.
Attempts to put some nicer types around our Store Provider and Service Provider needs in online is probably the most useful cleanup to it. Adds a bunch of comments too where needed.