Skip to content

Commit

Permalink
More work.
Browse files Browse the repository at this point in the history
  • Loading branch information
fcofdez committed May 5, 2015
1 parent 3cf489f commit 0bb2cb6
Show file tree
Hide file tree
Showing 7 changed files with 61 additions and 25 deletions.
7 changes: 3 additions & 4 deletions src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,12 @@ akka {

log-dead-letters = 1
log-dead-letters-during-shutdown = on
loggers = ["akka.event.slf4j.Slf4jLogger"] #
loglevel = "INFO"
# loggers = ["akka.event.slf4j.Slf4jLogger"] #
# loglevel = "INFO"

cluster {
seed-nodes = [
"akka.tcp://[email protected]:2551",
"akka.tcp://[email protected]:2552"]
"akka.tcp://[email protected]:2551"]

auto-down-unreachable-after = 10s
}
Expand Down
3 changes: 1 addition & 2 deletions src/main/resources/worker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,4 @@ akka {
}

contact-points = [
"akka.tcp://[email protected]:2551",
"akka.tcp://[email protected]:2552"]
"akka.tcp://[email protected]:2551"]
35 changes: 32 additions & 3 deletions src/main/scala/services/Upclose.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,14 @@ import spray.httpx.marshalling.ToResponseMarshallable
import spray.routing._
import spray.routing.{RoutingSettings, RejectionHandler, ExceptionHandler, HttpService}
import upkoder.upclose.models._
import upkoder.models._
import worker.Master
import worker.Worker
import scala.util.{Success, Failure}


object UpcloseService extends Protocols {
implicit val system = ActorSystem("ServiceSystem")
object UpcloseService extends Protocols with MediaJsonProtocols{
implicit val system = ActorSystem("ClusterSystem")
implicit val executor = system.dispatcher

val logger = Logging(system, getClass)
Expand All @@ -54,9 +55,25 @@ object UpcloseService extends Protocols {
val env = sys.env.get("ENV").getOrElse("dev")
val apiUrl = config.getString(s"upclose.$env.api.url")
val apiEndpoint = config.getString(s"upclose.$env.api.endpoint")
val apiPostEndpoint = config.getString(s"upclose.$env.api.post_endpoint")

lazy val pipeline = addHeader("Authorization", "") ~> sendReceive ~> unmarshal[UpcloseCollection]

lazy val postPipeline = (
addHeader("Authorization", "Client 25638abf-fa27-44c8-9a41-2a65ec39ddf") ~> sendReceive
)

def upclosePostRequest(request: HttpRequest): Future[HttpResponse] = postPipeline{request}

def mediaEndpoint(broadcast_id: String): Uri = {
val auth = Authority(host = Host(apiUrl))
Uri(scheme = "https", authority = auth, path = Path(apiPostEndpoint.replace("$id", broadcast_id)))
}

def postMediaInfo(encMedia: EncodedMedia, br_id: Int): Future[HttpResponse] = {
upclosePostRequest(Post(mediaEndpoint(br_id.toString), encMedia))
}

def upcloseRequest(request: HttpRequest): Future[UpcloseCollection] = pipeline{request}

def uplcloseUri(archive_id: String): Uri = {
Expand Down Expand Up @@ -114,7 +131,19 @@ trait UpcoderService extends HttpService with Protocols {
complete {
if(tokboxInfoRequest.status == "uploaded"){
fetchBroadcastInfo(tokboxInfoRequest.id).map[ToResponseMarshallable]{ uc =>
scheduleWork(uc.collection.head)
val broadcast = uc.collection.head
scheduleWork(broadcast)
val encMedia = EncodedMedia(url=broadcast.video_url,
width=640,
height=480,
size=broadcast.size,
mime_type="video/mp4",
broadcast_id=broadcast.id)
val futurePost = postMediaInfo(encMedia, broadcast.id)
futurePost onComplete {
case Success(x) => println("Success first Post {}", x)
case Failure(e) => println("Error first post {}", e.getMessage)
}
uc.collection.head
}
}else{
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/worker/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ trait Backend {
def startMaster(): Unit = {
startBackend(2551, "backend")
Thread.sleep(5000)
startBackend(2552, "backend")
Thread.sleep(5000)
// startBackend(2552, "backend")
// Thread.sleep(5000)
startHttpService()
}
}
Expand Down
4 changes: 4 additions & 0 deletions src/main/scala/worker/Models.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ case class UpcoderJob(id: String)
case class UpcloseBroadcast(id: Int, account_id: Int, duration: Int, cumulative_participant_count: Int, created_at: DateTime, tokbox_api_key: String, tokbox_archive_id: String) extends Ordered[UpcloseBroadcast] {
import scala.math.Ordered.orderingToOrdered

def size: Int = {
this.duration * 141356
}

def video_url: String = {
val tokbox_api_key = this.tokbox_api_key
val tokbox_archive_id = this.tokbox_archive_id
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/worker/WorkConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ class WorkResultConsumer extends Actor with ActorLogging {
result.mediaInfo.foreach { y =>
val a = postMediaInfo(y, result.broadcast_id)
a onComplete {
case Success(x) => log.info("oleeeeeeeeeeeeee {}", x)
case Failure(e) => log.info("noooooooooooooo {}", e.getMessage)
case Success(x) => log.info("Success Post {}", x)
case Failure(e) => log.info("Error post {}", e.getMessage)
}
}
log.info("Consumed result: {}", result.broadcast_id)
Expand Down
29 changes: 17 additions & 12 deletions src/main/scala/worker/WorkExecutor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ class WorkExecutor extends Actor with ActorLogging{
lazy val config = ConfigFactory.load()
val env = sys.env.get("ENV").getOrElse("dev")
val region_conf = config.getString(s"upclose.$env.s3.region")
val thumbBucket = config.getString(s"upclose.$env.s3.region")
val videoBucket = config.getString(s"upclose.$env.s3.region")
implicit val s3 = S3()
s3.setRegion(Region(region_conf))

Expand All @@ -26,18 +28,21 @@ class WorkExecutor extends Actor with ActorLogging{
val bucket = "upclose-dev-thumbnails"
val srcMedia = downloadMedia(url)
val duration = getDuration(srcMedia.getPath)
if (duration <= 1) { sender ! Worker.WorkerRejected(upcloseBroadcast.id) }
val thumbnails = generateThumbnails(srcMedia, duration)
val thumbsInfo = thumbnails map { x => getMediaInfo(x).transformToEncodeMedia(x, uploadToS3(x, bucket)) }
val finalThumsInfo = thumbsInfo map { _.copy(broadcast_id = upcloseBroadcast.id) }
val encodedMedia = encode(srcMedia.getPath)
val buckett = "upclose-dev-videos"
val encodedVideoInfo = getMediaInfo(encodedMedia).transformToEncodeMedia(encodedMedia, uploadToS3(encodedMedia, buckett))
val finalEncodedMediaInfo = encodedVideoInfo.copy(broadcast_id = upcloseBroadcast.id)
val x = finalThumsInfo :+ finalEncodedMediaInfo
thumbnails.foreach { _.delete }
encodedMedia.delete
sender() ! Worker.WorkComplete(EncodedVideo(upcloseBroadcast.id, x))
if (duration <= 1)
sender ! Worker.WorkerRejected(upcloseBroadcast.id)
else {
val thumbnails = generateThumbnails(srcMedia, duration)
val thumbsInfo = thumbnails map { x => getMediaInfo(x).transformToEncodeMedia(x, uploadToS3(x, bucket)) }
val finalThumsInfo = thumbsInfo map { _.copy(broadcast_id = upcloseBroadcast.id) }
val encodedMedia = encode(srcMedia.getPath)
val buckett = "upclose-dev-videos"
val encodedVideoInfo = getMediaInfo(encodedMedia).transformToEncodeMedia(encodedMedia, uploadToS3(encodedMedia, buckett))
val finalEncodedMediaInfo = encodedVideoInfo.copy(broadcast_id = upcloseBroadcast.id)
val x = finalThumsInfo :+ finalEncodedMediaInfo
thumbnails.foreach { _.delete }
encodedMedia.delete
sender() ! Worker.WorkComplete(EncodedVideo(upcloseBroadcast.id, x))
}
}

def uploadToS3(mediaFile: File, bucket: String): Option[String] = {
Expand Down

0 comments on commit 0bb2cb6

Please sign in to comment.