Skip to content
This repository has been archived by the owner on Oct 4, 2019. It is now read-only.

Commit

Permalink
Merge branch 'ignore-unknown-lost-tasks'
Browse files Browse the repository at this point in the history
* ignore-unknown-lost-tasks:
  ignore TASK_LOST on non-running tasks
  • Loading branch information
Rob-Johnson committed Feb 2, 2017
2 parents fb6d43c + 6b05628 commit 2341c7d
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,17 @@ class MesosJobFramework @Inject()(
log.info("Task with id '%s' FAILED".format(taskId))
scheduler.handleFailedTask(taskStatus)
case TaskState.TASK_LOST =>
log.info("Task with id '%s' LOST".format(taskId))
scheduler.handleFailedTask(taskStatus)
if (taskManager.persistenceStore.getTasks.contains(taskId)) {
//if we know about this, then it's the real deal and the task is lost.
//we treat lost tasks as failed.
log.info("TASK_LOST for task %s. Treating as a failed run.".format(taskId))
scheduler.handleFailedTask(taskStatus)
} else {
//if we don't have record about the lost task running, then ignore the message.
//explicitly, we're *not* marking tasks as failed if we don't think they're running
//anymore, since it may have finished between reconcile time and response time.
log.info("Ignoring TASK_LOST for task %s, since we have no record that it's running.".format(taskId))
}
case TaskState.TASK_RUNNING =>
log.info("Task with id '%s' RUNNING. Removing persistence task.".format(taskId))
taskManager.removeTask(taskStatus.getTaskId.getValue)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package org.apache.mesos.chronos.scheduler.mesos

import java.nio.charset.StandardCharsets
import mesosphere.mesos.protos._
import mesosphere.mesos.util.FrameworkIdUtil
import com.google.common.cache.Cache
import org.apache.mesos.Protos.Offer
import org.apache.mesos.chronos.ChronosTestHelper._
import org.apache.mesos.chronos.scheduler.jobs.{ BaseJob, JobScheduler, TaskManager }
import org.apache.mesos.chronos.scheduler.state.PersistenceStore
import org.apache.mesos.{ Protos, SchedulerDriver }
import org.mockito.Mockito._
import org.specs2.mock.Mockito
Expand Down Expand Up @@ -180,4 +182,81 @@ class MesosJobFrameworkSpec extends SpecificationWithJUnit with Mockito {
val status = Protos.TaskStatus.newBuilder().setTaskId(Protos.TaskID.newBuilder().setValue("BLAHBLAHBLAH")).setState(Protos.TaskState.TASK_RUNNING).build()
mesosJobFramework.statusUpdate(schedulerDriver, status) must not(throwA[MatchError])
}

"Ignore TASK_LOST for jobs that we don't believe to be running" in {
import mesosphere.mesos.protos.Implicits._
import scala.collection.JavaConverters._

val taskManager = mock[TaskManager]
val persistenceStore = mock[PersistenceStore]
val fakeTasks = collection.immutable.HashMap[String, Array[Byte]]()
persistenceStore.getTasks returns fakeTasks
taskManager.persistenceStore returns persistenceStore
val jobScheduler = mock[JobScheduler]

val taskCache = mock[Cache[String, Protos.TaskState]]
doNothing.when(taskCache).put(any, any)

taskManager.taskCache returns taskCache

val mesosDriverFactory = mock[MesosDriverFactory]
val schedulerDriver = mock[SchedulerDriver]
mesosDriverFactory.get().returns(schedulerDriver)

val mesosJobFramework = spy(
new MesosJobFramework(
mesosDriverFactory,
jobScheduler,
taskManager,
makeConfig("--decline_offer_duration", "3000"),
mock[FrameworkIdUtil],
mock[MesosTaskBuilder],
mock[MesosOfferReviver]))
val status = Protos.TaskStatus.newBuilder()
.setTaskId(Protos.TaskID.newBuilder()
.setValue("mytask"))
.setState(Protos.TaskState.TASK_LOST)
.build()

mesosJobFramework.statusUpdate(schedulerDriver, status)
there was no(jobScheduler).handleFailedTask(status)
}

"Treat TASK_LOST as failed when the task is in task manager" in {
import mesosphere.mesos.protos.Implicits._
import scala.collection.JavaConverters._

val taskManager = mock[TaskManager]
val persistenceStore = mock[PersistenceStore]
val fakeTasks = collection.immutable.HashMap("ct:0000:1:foo" -> "foo".getBytes(StandardCharsets.UTF_8) )
persistenceStore.getTasks returns fakeTasks
taskManager.persistenceStore returns persistenceStore
val jobScheduler = mock[JobScheduler]

val taskCache = mock[Cache[String, Protos.TaskState]]
doNothing.when(taskCache).put(any, any)

taskManager.taskCache returns taskCache

val mesosDriverFactory = mock[MesosDriverFactory]
val schedulerDriver = mock[SchedulerDriver]
mesosDriverFactory.get().returns(schedulerDriver)

val mesosJobFramework = spy(
new MesosJobFramework(
mesosDriverFactory,
jobScheduler,
taskManager,
makeConfig("--decline_offer_duration", "3000"),
mock[FrameworkIdUtil],
mock[MesosTaskBuilder],
mock[MesosOfferReviver]))
val status = Protos.TaskStatus.newBuilder()
.setTaskId(Protos.TaskID.newBuilder()
.setValue("ct:0000:1:foo"))
.setState(Protos.TaskState.TASK_LOST)
.build()
mesosJobFramework.statusUpdate(schedulerDriver, status)
there was one(jobScheduler).handleFailedTask(status)
}
}

0 comments on commit 2341c7d

Please sign in to comment.