Skip to content

Commit

Permalink
Adding rejected state to invalid streams.
Browse files Browse the repository at this point in the history
  • Loading branch information
fcofdez committed Apr 28, 2015
1 parent 2820353 commit 9006e5d
Show file tree
Hide file tree
Showing 8 changed files with 46 additions and 7 deletions.
2 changes: 2 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ scalacOptions += "-feature"
libraryDependencies ++= {
val akkaStreamV = "1.0-M3"
val sprayV = "1.3.3"
val fingerV = "1.5.2+"
Seq(
"de.sciss" %% "fingertree" % fingerV,
"org.fusesource" % "sigar" % "1.6.4",
"io.spray" %% "spray-can" % sprayV,
"io.spray" %% "spray-client" % sprayV,
Expand Down
4 changes: 2 additions & 2 deletions src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ akka {

log-dead-letters = 1
log-dead-letters-during-shutdown = on
loggers = ["akka.event.slf4j.Slf4jLogger"]
loglevel = "DEBUG"
# loggers = ["akka.event.slf4j.Slf4jLogger"]
# loglevel = "DEBUG"

cluster {
seed-nodes = [
Expand Down
1 change: 0 additions & 1 deletion src/main/scala/worker/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ object UpcloseService extends Protocols {
implicit val system = ActorSystem()
implicit val executor = system.dispatcher

// def config: Config
val logger = Logging(system, getClass)

lazy val config = ConfigFactory.load()
Expand Down
13 changes: 13 additions & 0 deletions src/main/scala/worker/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,19 @@ class Master(workTimeout: FiniteDuration) extends PersistentActor with ActorLogg
}
}

case MasterWorkerProtocol.WorkRejected(workerId, workId) =>
if (workState.isDone(workId)) {
sender() ! MasterWorkerProtocol.Ack(workId)
} else if (!workState.isInProgress(workId)) {
log.info("Work {} not in progress, reported as done by worker {}", workId, workerId)
} else {
log.info("Work {} is done by worker {}", workId, workerId)
changeWorkerToIdle(workerId, workId)
persist(WorkRejected(workId)) { event
workState = workState.updated(event)
sender ! MasterWorkerProtocol.Ack(workId)
}
}
case MasterWorkerProtocol.WorkFailed(workerId, workId) =>
if (workState.isInProgress(workId)) {
log.info("Work {} failed by worker {}", workId, workerId)
Expand Down
1 change: 1 addition & 0 deletions src/main/scala/worker/MasterWorkerProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ object MasterWorkerProtocol {
case class WorkerRequestsWork(workerId: String)
case class WorkIsDone(workerId: String, workId: String, result: EncodedVideo)
case class WorkFailed(workerId: String, workId: String)
case class WorkRejected(workerId: String, workId: String)

// Master -> Workers
case object WorkIsReady
Expand Down
9 changes: 6 additions & 3 deletions src/main/scala/worker/WorkExecutor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import com.typesafe.config.ConfigFactory
import upkoder.models.FFProbeProtocols._
import upkoder.models._
import upkoder.upclose.models.UpcloseBroadcast

import scala.language.postfixOps


class WorkExecutor extends Actor with ActorLogging{
Expand All @@ -26,8 +26,11 @@ class WorkExecutor extends Actor with ActorLogging{
val bucket = "upclose-dev-thumbnails"
log.info("Downloading {}", url)
val srcMedia = downloadMedia(url)
val duration = getDuration(srcMedia.getPath)
log.info("Media duration {}", duration)
if (duration <= 1) { sender ! Worker.WorkerRejected(upcloseBroadcast.id) }
log.info("Generating thumbnails {}", srcMedia.getPath)
val thumbnails = generateThumbnails(srcMedia, upcloseBroadcast.duration)
val thumbnails = generateThumbnails(srcMedia, duration)
log.info("Generated thumbnails {}", srcMedia.getPath)
val thumbsInfo = thumbnails map { x => getMediaInfo(x).transformToEncodeMedia(x, uploadToS3(x, bucket)) }
log.info("thumbnails {}", thumbsInfo)
Expand Down Expand Up @@ -74,7 +77,7 @@ class WorkExecutor extends Actor with ActorLogging{
}

def generateThumbnails(srcMedia: File, duration: Int): Seq[File] = {
Seq.fill(3)(nextInt(getDuration(srcMedia.getPath))).map{ generateThumbnail(srcMedia.getPath, _) }
Seq.fill(4)(nextInt(duration)) map { generateThumbnail(srcMedia.getPath, _) }
}

def downloadMedia(url: String): File = {
Expand Down
7 changes: 7 additions & 0 deletions src/main/scala/worker/WorkState.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ object WorkState {
pendingWork = PriorityQueue(),
workInProgress = Map.empty,
acceptedWorkIds = Set.empty,
rejectedWorkIds = Set.empty,
doneWorkIds = Set.empty)

trait WorkDomainEvent
case class WorkAccepted(work: Work) extends WorkDomainEvent
case class WorkStarted(workId: String) extends WorkDomainEvent
case class WorkRejected(workId: String) extends WorkDomainEvent
case class WorkCompleted(workId: String, result: EncodedVideo) extends WorkDomainEvent
case class WorkerFailed(workId: String) extends WorkDomainEvent
case class WorkerTimedOut(workId: String) extends WorkDomainEvent
Expand All @@ -24,6 +26,7 @@ case class WorkState private (
private val pendingWork: PriorityQueue[Work],
private val workInProgress: Map[String, Work],
private val acceptedWorkIds: Set[String],
private val rejectedWorkIds: Set[String],
private val doneWorkIds: Set[String]) {

import WorkState._
Expand All @@ -48,6 +51,10 @@ case class WorkState private (
copy(
workInProgress = workInProgress - workId,
doneWorkIds = doneWorkIds + workId)
case WorkRejected(workId) =>
copy(
workInProgress = workInProgress - workId,
rejectedWorkIds = rejectedWorkIds + workId)
case WorkerFailed(workId) =>
pendingWork enqueue workInProgress(workId)
copy(
Expand Down
16 changes: 15 additions & 1 deletion src/main/scala/worker/Worker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ object Worker {
Props(classOf[Worker], clusterClient, workExecutorProps, registerInterval)

case class WorkComplete(result: EncodedVideo)
case class WorkerRejected(workId: Int)
}

class Worker(clusterClient: ActorRef, workExecutorProps: Props, registerInterval: FiniteDuration)
Expand Down Expand Up @@ -71,6 +72,16 @@ class Worker(clusterClient: ActorRef, workExecutorProps: Props, registerInterval
context.become(working)
}

def waitForWorkRejectedAck() : Receive = {
case Ack(id) if id == workId =>
sendToMaster(WorkerRequestsWork(workerId))
context.setReceiveTimeout(Duration.Undefined)
context.become(idle)
case ReceiveTimeout =>
log.info("No ack from master, retrying")
sendToMaster(WorkRejected(workerId, workId))
}


def waitForWorkIsDoneAck(result: EncodedVideo): Receive = {
case Ack(id) if id == workId =>
Expand All @@ -87,13 +98,16 @@ class Worker(clusterClient: ActorRef, workExecutorProps: Props, registerInterval
clusterClient ! SendToAll("/user/master/active", msg)
}


def working: Receive = {
case WorkComplete(result) =>
log.info("Work is complete. Result {}.", result)
sendToMaster(WorkIsDone(workerId, workId, result))
context.setReceiveTimeout(5.seconds)
context.become(waitForWorkIsDoneAck(result))
case WorkerRejected(workId) =>
sendToMaster(WorkRejected(workerId, workId.toString))
context.setReceiveTimeout(5.seconds)
context.become(waitForWorkRejectedAck())
case _: Work =>
log.info("Worker {} is already working", workerId)
}
Expand Down

0 comments on commit 9006e5d

Please sign in to comment.