8 #ifndef WPIUTIL_WPI_WORKERTHREAD_H_
9 #define WPIUTIL_WPI_WORKERTHREAD_H_
17 #include "wpi/SafeThread.h"
18 #include "wpi/future.h"
19 #include "wpi/uv/Async.h"
27 using AfterWorkFunction = std::function<void(R)>;
33 async->wakeup.connect(
34 [](AfterWorkFunction func, R result) { func(result); });
39 if (
auto async = m_async.lock()) {
45 std::weak_ptr<uv::Async<AfterWorkFunction, R>> m_async;
50 using AfterWorkFunction = std::function<void()>;
56 async->wakeup.connect([](AfterWorkFunction func) { func(); });
61 if (
auto async = m_async.lock()) {
67 std::weak_ptr<uv::Async<AfterWorkFunction>> m_async;
70 template <
typename R,
typename... T>
72 using WorkFunction = std::function<R(T...)>;
73 using AfterWorkFunction =
typename WorkerThreadAsync<R>::AfterWorkFunction;
77 std::tuple<T...> params_)
78 : promiseId(promiseId_),
79 work(std::move(work_)),
80 params(std::move(params_)) {}
82 std::tuple<T...> params_)
84 work(std::move(work_)),
85 afterWork(std::move(afterWork_)),
86 params(std::move(params_)) {}
90 AfterWorkFunction afterWork;
91 std::tuple<T...> params;
94 template <
typename R,
typename... T>
101 std::vector<Request> m_requests;
106 template <
typename R,
typename... T>
109 R result = std::apply(req.work, std::move(req.params));
111 if (
auto async = thr.m_async.m_async.lock())
112 async->Send(std::move(req.afterWork), std::move(result));
114 thr.m_promises.
SetValue(req.promiseId, std::move(result));
118 template <
typename... T>
119 void RunWorkerThreadRequest(WorkerThreadThread<void, T...>& thr,
120 WorkerThreadRequest<void, T...>& req) {
121 std::apply(req.work, req.params);
123 if (
auto async = thr.m_async.m_async.lock())
124 async->Send(std::move(req.afterWork));
126 thr.m_promises.SetValue(req.promiseId);
130 template <
typename R,
typename... T>
131 void WorkerThreadThread<R, T...>::Main() {
132 std::vector<Request> requests;
134 std::unique_lock lock(m_mutex);
135 m_cond.wait(lock, [&] {
return !m_active || !m_requests.empty(); });
136 if (!m_active)
break;
139 requests.swap(m_requests);
142 for (
auto&& req : requests) {
143 if (!m_active)
break;
144 RunWorkerThreadRequest(*
this, req);
153 template <
typename T>
156 template <
typename R,
typename... T>
161 using WorkFunction = std::function<R(T...)>;
162 using AfterWorkFunction =
163 typename detail::WorkerThreadAsync<R>::AfterWorkFunction;
175 if (
auto thr = m_owner.GetThread()) thr->m_async.SetLoop(loop);
192 if (
auto thr = m_owner.GetThread()) thr->m_async.UnsetLoop();
203 if (
auto thr = m_owner.GetThread())
204 return thr->m_async.m_async.lock();
221 template <
typename... U>
223 if (
auto thr = m_owner.GetThread()) {
225 uint64_t req = thr->m_promises.CreateRequest();
228 thr->m_requests.emplace_back(
229 req, std::move(work), std::forward_as_tuple(std::forward<U>(u)...));
232 thr->m_cond.notify_one();
235 return thr->m_promises.CreateFuture(req);
256 template <
typename... U>
257 void QueueWorkThen(WorkFunction work, AfterWorkFunction afterWork, U&&... u) {
258 if (
auto thr = m_owner.GetThread()) {
260 thr->m_requests.emplace_back(
261 std::move(work), std::move(afterWork),
262 std::forward_as_tuple(std::forward<U>(u)...));
265 thr->m_cond.notify_one();
275 #endif // WPIUTIL_WPI_WORKERTHREAD_H_