Skip to content

Commit

Permalink
Adding endpoint to check system state.
Browse files Browse the repository at this point in the history
  • Loading branch information
fcofdez committed May 5, 2015
1 parent d50ee81 commit dcf0a26
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 6 deletions.
19 changes: 14 additions & 5 deletions src/main/scala/services/Upclose.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import spray.routing._
import spray.routing.{RoutingSettings, RejectionHandler, ExceptionHandler, HttpService}
import upkoder.upclose.models._
import upkoder.models._
import worker._
import worker.Master
import worker.Worker
import scala.util.{Success, Failure}
Expand Down Expand Up @@ -92,7 +93,7 @@ object UpcloseService extends Protocols with MediaJsonProtocols{


class UpcoderServiceActor extends Actor with UpcoderService with ActorLogging {
import worker.Work
import worker.{Work, RequestSystemInfo, SystemState}

// the HttpService trait defines only one abstract member, which
// connects the services environment to the enclosing actor or test
Expand All @@ -110,6 +111,11 @@ class UpcoderServiceActor extends Actor with UpcoderService with ActorLogging {
def receive = runRoute(routes)
import context.dispatcher

def getInfo(): Future[Any] = {
implicit val timeout = Timeout(5.seconds)
val req = RequestSystemInfo()
(masterProxy ? req)
}

def scheduleWork(upcloseBroadcast: UpcloseBroadcast): Unit = {
implicit val timeout = Timeout(5.seconds)
Expand All @@ -123,10 +129,13 @@ class UpcoderServiceActor extends Actor with UpcoderService with ActorLogging {
}


trait UpcoderService extends HttpService with Protocols {

trait UpcoderService extends HttpService with Protocols with SystemStateProtocols {
import UpcloseService._
import worker.SystemState

def scheduleWork(upcloseBroadcast: UpcloseBroadcast): Unit
def getInfo(): Future[Any]

val routes = {
pathPrefix("jobs") {
Expand All @@ -144,8 +153,8 @@ trait UpcoderService extends HttpService with Protocols {
broadcast_id=broadcast.id)
val futurePost = postMediaInfo(encMedia, broadcast.id)
futurePost onComplete {
case Success(x) => println("Success first Post {}", x)
case Failure(e) => println("Error first post {}", e.getMessage)
case Success(_) =>
case Failure(_) =>
}
uc.collection.head
}
Expand All @@ -156,7 +165,7 @@ trait UpcoderService extends HttpService with Protocols {
} ~
get {
complete {
"hola"
getInfo().map[ToResponseMarshallable]{_.asInstanceOf[SystemState]}
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions src/main/scala/worker/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ class Master(workTimeout: FiniteDuration) extends PersistentActor with ActorLogg
}
}

case _: RequestSystemInfo =>
sender() ! workState.status

case CleanupTick =>
for ((workerId, s @ WorkerState(_, Busy(workId, timeout))) workers) {
if (timeout.isOverdue) {
Expand Down
2 changes: 2 additions & 0 deletions src/main/scala/worker/Work.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,5 @@ case class Work(workId: String, broadcast: UpcloseBroadcast) extends Ordered[Wor
}

case class WorkResult(workId: String, result: EncodedVideo)

case class RequestSystemInfo()
14 changes: 13 additions & 1 deletion src/main/scala/worker/WorkState.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package worker

import scala.collection.mutable.PriorityQueue
import spray.json._
import upkoder.models.EncodedVideo

import spray.httpx.SprayJsonSupport

object WorkState {

Expand All @@ -22,6 +23,12 @@ object WorkState {
case class WorkerTimedOut(workId: String) extends WorkDomainEvent
}

case class SystemState(pendingWork: Int, doneWork: Int, inProgressWork: Int)

trait SystemStateProtocols extends DefaultJsonProtocol with SprayJsonSupport {
implicit val SystemStateFormat = jsonFormat3(SystemState.apply)
}

case class WorkState private (
private val pendingWork: PriorityQueue[Work],
private val workInProgress: Map[String, Work],
Expand All @@ -36,6 +43,11 @@ case class WorkState private (
def isAccepted(workId: String): Boolean = acceptedWorkIds.contains(workId)
def isInProgress(workId: String): Boolean = workInProgress.contains(workId)
def isDone(workId: String): Boolean = doneWorkIds.contains(workId)
def status(): SystemState = {
SystemState(pendingWork=pendingWork.size,
doneWork=doneWorkIds.size,
inProgressWork=workInProgress.size)
}

def updated(event: WorkDomainEvent): WorkState = event match {
case WorkAccepted(work)
Expand Down

0 comments on commit dcf0a26

Please sign in to comment.