Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Find better way to stop the Python Notifier Thread #10

Open
pentschev opened this issue Apr 3, 2023 · 1 comment
Open

Find better way to stop the Python Notifier Thread #10

pentschev opened this issue Apr 3, 2023 · 1 comment
Labels
bug Something isn't working

Comments

@pentschev
Copy link
Member

In 22d42d6 we made sure all tests will properly stop the notifier thread. We would optimally want to make sure this occurs automatically, but we need to find a way that the ApplicationContext can stop the thread at some point, it is not clear how we would identify that the application no longer needs the thread running, but maybe weakrefs could help in that task.

@pentschev
Copy link
Member Author

Further discussion:

@wence- :

I guess that the problem is that you don't want manual calling of ucp.finalize() or similar? You could do the cleanup in an atexit handler? If the thread lifetime is bound to the ApplicationContext you might be able to clean up the thread by using weakref.finalize

@pentschev :

I guess that the problem is that you don't want manual calling of ucp.finalize() or similar?

Yes, that's right. Ideally the lifetime of objects would take care of that, forcing the user to manually shutdown is not a good approach.

You could do the cleanup in an atexit handler? If the thread lifetime is bound to the ApplicationContext you might be able to clean up the thread by using weakref.finalize

atexit will only execute when the Python process is shutting down, right? If that's the case then it still wouldn't shutdown when context does, for example during pytests because the main process doesn't get destroyed until all tests complete.

@wence- :

Perhaps just

diff --git a/python/ucxx/_lib_async/application_context.py b/python/ucxx/_lib_async/application_context.py
index 6c624a0..fba5158 100644
--- a/python/ucxx/_lib_async/application_context.py
+++ b/python/ucxx/_lib_async/application_context.py
@@ -60,6 +60,7 @@ class ApplicationContext:
         self.start_notifier_thread()
 
         weakref.finalize(self, self.progress_tasks.clear)
+        weakref.finalize(self, self.stop_notifier_thread)
 
         # Ensure progress even before Endpoints get created, for example to
         # receive messages directly on a worker after a remote endpoint

?

@pentschev :

The problem is this will never occur, because self.notifier_thread will be always alive unless stop_notifier_thread() is called and thus will keep ApplicationContext alive as well. I attempted to make all references to the notifier thread weakrefs but failed to do so, it's possible I overlooked something in that case.

Anyway, after a conversation with Mads last week, I came up with a way to get rid of the notifier thread and move its object to the worker progress thread, which will greatly simplify things and make this piece go away if it succeeds, so we could wait a few more days before spending too much time on stop_notifier_thread(). So far I've managed to get it to work but it's really slow, so now I have to figure out what causes it to be slow and if it's possible to fix it.

@wence- :

Makes sense, thanks.

@pentschev pentschev added the bug Something isn't working label Apr 3, 2023
rapids-bot bot pushed a commit that referenced this issue Dec 5, 2023
It is unclear why but for some reason `notify_all()` is causing futexes never to return in some situations. This occurs very frequently in CI and is also less frequently reproducible locally.

The typical stack trace for the blocked thread is shown below:

```cpp
Thread 6 (Thread 0x7f13ec84f700 (LWP 2823667) "pytest"):
#0  futex_wait (private=<optimized out>, expected=32765, futex_word=0x7ffd5186a874) at ../sysdeps/nptl/futex-internal.h:141
#1  futex_wait_simple (private=<optimized out>, expected=32765, futex_word=0x7ffd5186a874) at ../sysdeps/nptl/futex-internal.h:172
#2  __condvar_quiesce_and_switch_g1 (private=<optimized out>, g1index=<synthetic pointer>, wseq=<optimized out>, cond=0x7ffd5186a860) at pthread_cond_common.c:416
#3  __pthread_cond_broadcast (cond=0x7ffd5186a860) at pthread_cond_broadcast.c:73
#4  0x00007f140fe5f23c in ucxx::BaseDelayedSubmissionCollection<std::function<void ()> >::process() (this=0x560d0effafd0) at /repo/cpp/include/ucxx/delayed_submission.h:154
#5  0x00007f140fe5f399 in ucxx::DelayedSubmissionCollection::processPost (this=<optimized out>) at /repo/cpp/src/delayed_submission.cpp:84
#6  0x00007f140fe7ed71 in ucxx::WorkerProgressThread::progressUntilSync(std::function<bool ()>, bool const&, std::function<void (void*)>, void*, std::shared_ptr<ucxx::DelayedSubmissionCollection>) (progressFunction=..., stop=@0x560d0f6527f8: false, startCallback=..., startCallbackArg=<optimized out>, delayedSubmissionCollection=...) at /opt/conda/envs/test/x86_64-conda-linux-gnu/include/c++/11.4.0/bits/shared_ptr_base.h:1295
#7  0x00007f140fe7f3ee in std::__invoke_impl<void, void (*)(std::function<bool ()>, bool const&, std::function<void (void*)>, void*, std::shared_ptr<ucxx::DelayedSubmissionCollection>), std::function<bool ()>, std::reference_wrapper<bool>, std::function<void (void*)>, void*, std::shared_ptr<ucxx::DelayedSubmissionCollection> >(std::__invoke_other, void (*&&)(std::function<bool ()>, bool const&, std::function<void (void*)>, void*, std::shared_ptr<ucxx::DelayedSubmissionCollection>), std::function<bool ()>&&, std::reference_wrapper<bool>&&, std::function<void (void*)>&&, void*&&, std::shared_ptr<ucxx::DelayedSubmissionCollection>&&) (__f=<optimized out>, __f=<optimized out>) at /opt/conda/envs/test/x86_64-conda-linux-gnu/include/c++/11.4.0/bits/invoke.h:61
#8  std::__invoke<void (*)(std::function<bool ()>, bool const&, std::function<void (void*)>, void*, std::shared_ptr<ucxx::DelayedSubmissionCollection>), std::function<bool ()>, std::reference_wrapper<bool>, std::function<void (void*)>, void*, std::shared_ptr<ucxx::DelayedSubmissionCollection> >(void (*&&)(std::function<bool ()>, bool const&, std::function<void (void*)>, void*, std::shared_ptr<ucxx::DelayedSubmissionCollection>), std::function<bool ()>&&, std::reference_wrapper<bool>&&, std::function<void (void*)>&&, void*&&, std::shared_ptr<ucxx::DelayedSubmissionCollection>&&) (__fn=<optimized out>) at /opt/conda/envs/test/x86_64-conda-linux-gnu/include/c++/11.4.0/bits/invoke.h:96
#9  std::thread::_Invoker<std::tuple<void (*)(std::function<bool ()>, bool const&, std::function<void (void*)>, void*, std::shared_ptr<ucxx::DelayedSubmissionCollection>), std::function<bool ()>, std::reference_wrapper<bool>, std::function<void (void*)>, void*, std::shared_ptr<ucxx::DelayedSubmissionCollection> > >::_M_invoke<0ul, 1ul, 2ul, 3ul, 4ul, 5ul>(std::_Index_tuple<0ul, 1ul, 2ul, 3ul, 4ul, 5ul>) (this=<optimized out>) at /opt/conda/envs/test/x86_64-conda-linux-gnu/include/c++/11.4.0/bits/std_thread.h:259
#10 std::thread::_Invoker<std::tuple<void (*)(std::function<bool ()>, bool const&, std::function<void (void*)>, void*, std::shared_ptr<ucxx::DelayedSubmissionCollection>), std::function<bool ()>, std::reference_wrapper<bool>, std::function<void (void*)>, void*, std::shared_ptr<ucxx::DelayedSubmissionCollection> > >::operator()() (this=<optimized out>) at /opt/conda/envs/test/x86_64-conda-linux-gnu/include/c++/11.4.0/bits/std_thread.h:266
#11 std::thread::_State_impl<std::thread::_Invoker<std::tuple<void (*)(std::function<bool ()>, bool const&, std::function<void (void*)>, void*, std::shared_ptr<ucxx::DelayedSubmissionCollection>), std::function<bool ()>, std::reference_wrapper<bool>, std::function<void (void*)>, void*, std::shared_ptr<ucxx::DelayedSubmissionCollection> > > >::_M_run() (this=<optimized out>) at /opt/conda/envs/test/x86_64-conda-linux-gnu/include/c++/11.4.0/bits/std_thread.h:211
#12 0x00007f140f92fe95 in std::execute_native_thread_routine (__p=<optimized out>) at ../../../../../libstdc++-v3/src/c++11/thread.cc:104
#13 0x00007f1412647609 in start_thread (arg=<optimized out>) at pthread_create.c:477
#14 0x00007f1412412133 in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95
```

Authors:
  - Peter Andreas Entschev (https://github.com/pentschev)

Approvers:
  - Lawrence Mitchell (https://github.com/wence-)

URL: #140
@pentschev pentschev changed the title Find better wait to stop the Python Notifier Thread Find better way to stop the Python Notifier Thread Jan 9, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

1 participant