Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flowable.firstOrError interrupts the thread downstream. #7688

Closed
2 tasks done
joshallenit opened this issue Apr 11, 2024 · 4 comments
Closed
2 tasks done

Flowable.firstOrError interrupts the thread downstream. #7688

joshallenit opened this issue Apr 11, 2024 · 4 comments

Comments

@joshallenit
Copy link

When using a new thread scheduler, Flowable.firstOrError interrupts the thread downstream. We use a scheduler created from a ThreadPoolExecutor that also has this issue with firstOrError.

  • library version number

3.1.8

  • If you think you found a bug, please include a code sample that reproduces the problem
import com.google.common.truth.Truth.assertThat
import io.reactivex.rxjava3.core.Flowable
import io.reactivex.rxjava3.core.Scheduler
import io.reactivex.rxjava3.functions.Supplier
import io.reactivex.rxjava3.internal.schedulers.NewThreadWorker
import io.reactivex.rxjava3.plugins.RxJavaPlugins
import java.util.concurrent.ThreadFactory
import kotlin.test.Test
import kotlinx.coroutines.rx3.await
import kotlinx.coroutines.test.runTest

class RxInterruptedTest {

  @Test
  fun `Rx firstOrError - when using new thread scheduler - thread in map is not interrupted `() =
    runTest {
     // Use a new thread scheduler with a custom thread factory so that we can log what is calling interrupt on the thread.
     // same result can be had by using Schedulers.newThread.
      val scheduler =
        RxJavaPlugins.onNewThreadScheduler(RxJavaPlugins.initNewThreadScheduler(NewThreadTask()))

      Flowable.just(Unit)
        .observeOn(scheduler)
        .map { assertThat(Thread.currentThread().isInterrupted).isFalse() }
        .firstOrError()
        .doOnSuccess {
          // this fails
          assertThat(Thread.currentThread().isInterrupted).isFalse()
        }
        .await()
    }
}

class NewThreadTask : Supplier<Scheduler> {
  override fun get(): Scheduler {
    return NewThreadScheduler()
  }
}

class NewThreadScheduler
@JvmOverloads
constructor(val threadFactory: ThreadFactory? = THREAD_FACTORY) : Scheduler() {
  override fun createWorker(): Worker {
    return NewThreadWorker(threadFactory)
  }

  companion object {
    private val THREAD_FACTORY: ThreadFactory =
      object : ThreadFactory {
        override fun newThread(r: Runnable?): Thread {
          return object : Thread(r) {
            override fun interrupt() {
              Exception("Interrupting here").printStackTrace()
              super.interrupt()
            }
          }
        }
      }
  }
}

The logs show that NewThreadWorker is disposed which interrupts the thread.

java.lang.Exception: Interrupting here
	at xxx.NewThreadScheduler$Companion$THREAD_FACTORY$1$newThread$1.interrupt(RxInterruptedTest.kt:53)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.interruptIfStarted(ThreadPoolExecutor.java:677)
	at java.base/java.util.concurrent.ThreadPoolExecutor.interruptWorkers(ThreadPoolExecutor.java:777)
	at java.base/java.util.concurrent.ThreadPoolExecutor.shutdownNow(ThreadPoolExecutor.java:1428)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor.shutdownNow(ScheduledThreadPoolExecutor.java:870)
	at io.reactivex.rxjava3.internal.schedulers.NewThreadWorker.dispose(NewThreadWorker.java:161)
	at io.reactivex.rxjava3.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.cancel(FlowableObserveOn.java:157)
	at io.reactivex.rxjava3.internal.subscribers.BasicFuseableSubscriber.cancel(BasicFuseableSubscriber.java:158)
	at io.reactivex.rxjava3.internal.operators.flowable.FlowableElementAtSingle$ElementAtSubscriber.onNext(FlowableElementAtSingle.java:85)
	at io.reactivex.rxjava3.internal.operators.flowable.FlowableMap$MapSubscriber.onNext(FlowableMap.java:69)
	at io.reactivex.rxjava3.internal.operators.flowable.FlowableObserveOn$ObserveOnSubscriber.runSync(FlowableObserveOn.java:338)
	at io.reactivex.rxjava3.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.run(FlowableObserveOn.java:176)
	at io.reactivex.rxjava3.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:65)
	at io.reactivex.rxjava3.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:56)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1583)
@akarnokd
Copy link
Member

akarnokd commented Apr 11, 2024

Yes, that can happen because the new thread worker has to be shut down in some way to not leak it. Operators cancel eagerly and thus this interrupt via shutdownNow can happen while still being on the same thread.

Could you use a different scheduler instead, such as IO or from with a custom executor? Those would not shutdownNow like the vanilla newThread.

@joshallenit
Copy link
Author

joshallenit commented Apr 11, 2024

Thanks for the tip! Using Schedulers.from(executor) directly avoids this issue. We are using executor.asCoroutineDispatcher().asScheduler() so that we can share threads between coroutines and Rx using limitedParallelism. That leads to this interrupted issue, from a different mechanism then shutdownNow.

 	at xxx.RxIoThread.interrupt(MonotonicThreadFactory.kt:25)
 	at kotlinx.coroutines.ThreadState.invoke(Interruptible.kt:145)
 	at kotlinx.coroutines.ThreadState.invoke(Interruptible.kt:62)
 	at kotlinx.coroutines.InvokeOnCancelling.invoke(JobSupport.kt:1428)
 	at kotlinx.coroutines.JobSupport.notifyCancelling(JobSupport.kt:1473)
 	at kotlinx.coroutines.JobSupport.tryMakeCancelling(JobSupport.kt:796)
 	at kotlinx.coroutines.JobSupport.makeCancelling(JobSupport.kt:756)
 	at kotlinx.coroutines.JobSupport.cancelImpl$kotlinx_coroutines_core(JobSupport.kt:672)
 	at kotlinx.coroutines.JobSupport.parentCancelled(JobSupport.kt:638)
 	at kotlinx.coroutines.ChildHandleNode.invoke(JobSupport.kt:1436)
 	at kotlinx.coroutines.JobSupport.notifyCancelling(JobSupport.kt:1473)
 	at kotlinx.coroutines.JobSupport.tryMakeCancelling(JobSupport.kt:796)
 	at kotlinx.coroutines.JobSupport.makeCancelling(JobSupport.kt:756)
 	at kotlinx.coroutines.JobSupport.cancelImpl$kotlinx_coroutines_core(JobSupport.kt:672)
 	at kotlinx.coroutines.JobSupport.parentCancelled(JobSupport.kt:638)
 	at kotlinx.coroutines.ChildHandleNode.invoke(JobSupport.kt:1436)
 	at kotlinx.coroutines.JobSupport.notifyCancelling(JobSupport.kt:1473)
 	at kotlinx.coroutines.JobSupport.tryMakeCompletingSlowPath(JobSupport.kt:901)
 	at kotlinx.coroutines.JobSupport.tryMakeCompleting(JobSupport.kt:864)
 	at kotlinx.coroutines.JobSupport.cancelMakeCompleting(JobSupport.kt:697)
 	at kotlinx.coroutines.JobSupport.cancelImpl$kotlinx_coroutines_core(JobSupport.kt:668)
 	at kotlinx.coroutines.JobSupport.cancelInternal(JobSupport.kt:633)
 	at kotlinx.coroutines.JobSupport.cancel(JobSupport.kt:618)
 	at kotlinx.coroutines.Job$DefaultImpls.cancel$default(Job.kt:195)
 	at kotlinx.coroutines.rx3.DispatcherScheduler$DispatcherWorker.dispose(RxScheduler.kt:92)
 	at io.reactivex.rxjava3.internal.operators.flowable.FlowableSubscribeOn$SubscribeOnSubscriber.cancel(FlowableSubscribeOn.java:142)

@akarnokd
Copy link
Member

Oh, that's a hefty kotlinx stacktrace.

I don't know what and why they do it that way. It's out of control of Rx as we only issue a dispose to the worker.

@joshallenit
Copy link
Author

Okay, closing this then, thanks for the help!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants