TEC
A lightweight C++ library enabling safe, efficient execution in multithreaded and concurrent systems.
Loading...
Searching...
No Matches
tec_worker.hpp
Go to the documentation of this file.
1// Time-stamp: <Last changed 2026-02-25 16:21:42 by magnolia>
2/*----------------------------------------------------------------------
3------------------------------------------------------------------------
4Copyright (c) 2020-2026 The Emacs Cat (https://github.com/olddeuteronomy/tec).
5
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
9
10 http://www.apache.org/licenses/LICENSE-2.0
11
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17------------------------------------------------------------------------
18----------------------------------------------------------------------*/
26#pragma once
27
28#include <atomic>
29#include <cmath>
30#include <memory>
31#include <mutex>
32#include <typeindex>
33#include <unordered_map>
34#include <functional>
35#include <thread>
36
37#include "tec/tec_def.hpp" // IWYU pragma: keep
38#include "tec/tec_trace.hpp"
39#include "tec/tec_status.hpp"
40#include "tec/tec_queue.hpp"
41#include "tec/tec_message.hpp"
42#include "tec/tec_daemon.hpp"
43
44
45namespace tec {
46
69template <typename TParams>
70class Worker : public Daemon {
71public:
72 using Params = TParams;
73 using id_t = std::thread::id;
74 using Lock = std::lock_guard<std::mutex>;
75
80 using CallbackFunc = std::function<void(Worker<Params>*, const Message&)>;
81
82protected:
84
85private:
91 struct Slot {
92 Worker<Params>* worker;
93 CallbackFunc callback;
94
100 explicit Slot(Worker<Params>* w, CallbackFunc cb)
101 : worker{w}
102 , callback{cb}
103 {}
104
109 ~Slot() = default;
110 };
111
112 std::unordered_map<std::type_index, std::unique_ptr<Slot>> slots_;
113 std::mutex mtx_slots_;
114 Signal sig_running_;
115 Signal sig_inited_;
116 Signal sig_terminated_;
118 Status status_;
119 std::atomic_bool flag_running_;
120 std::atomic_bool flag_exited_;
121 std::mutex mtx_thread_proc_;
122 std::thread thread_;
123 id_t thread_id_;
124
125public:
133 explicit Worker(const Params& params)
134 : Daemon()
135 , params_{params}
136 , flag_running_{false}
137 , flag_exited_{false}
138 , thread_id_{0}
139 {}
140
145 virtual ~Worker() {
146 if (thread_.joinable()) {
147 terminate();
148 }
149 }
150
155 id_t id() const { return thread_id_; }
156
161 constexpr const Params& params() const { return params_; }
162
168 const Signal& sig_terminated() const override { return sig_terminated_; }
169
177 void send(Message&& msg) override {
178 TEC_ENTER("Worker::send");
179 TEC_TRACE("Message [{}] sent.", name(msg));
180 mq_.enqueue(std::move(msg));
181 }
182
195 Status make_request(Request&& req, Reply&& rep) override {
196 Signal ready;
197 Status status;
198 Payload payload{
199 &ready,
200 &status,
201 std::move(req),
202 std::move(rep)};
203 send({&payload});
204 ready.wait();
205 return status;
206 }
207
219 template <typename Derived, typename T>
220 void register_callback(Derived* worker, void (Derived::*callback)(const Message& msg)) {
221 Lock lk{mtx_slots_};
222 TEC_ENTER("Worker::register_callback");
223 //
224 // Ensure Derived is actually derived from Worker<Params>.
225 //
226 static_assert(std::is_base_of_v<Worker<Params>, Derived>,
227 "Derived must inherit from tec::Worker");
228 std::type_index ndx = std::type_index(typeid(T));
229 //
230 // Remove existing handler.
231 //
232 if (auto slot = slots_.find(ndx); slot != slots_.end()) {
233 slots_.erase(ndx);
234 }
235 //
236 // Set the slot.
237 //
238 slots_[ndx] = std::make_unique<Slot>(
239 worker,
240 [callback](Worker<Params>* worker, const Message& msg) {
241 // Safely downcast to Derived.
242 auto derived = dynamic_cast<Derived*>(worker);
243 (derived->*callback)(msg);
244 });
245 TEC_TRACE("Callback {} registered.", typeid(T).name());
246 }
247
248protected:
256 virtual void dispatch(const Message& msg) {
257 auto ndx = std::type_index(msg.type());
258 if (auto slot = slots_.find(ndx); slot != slots_.end()) {
259 slot->second->callback(slot->second->worker, std::cref(msg));
260 }
261 }
262
263private:
264
272 template <typename Params>
273 struct details {
281 static void thread_proc(Worker<Params>& wt) {
282 TEC_ENTER("Worker::thread_proc");
283 // Signal termination on exit.
284 Signal::OnExit on_terminating{&wt.sig_terminated_};
285 //
286 // Obtains thread ID and waits for the running signal.
287 //
288 wt.thread_id_ = std::this_thread::get_id();
289 TEC_TRACE("thread {} created.", wt.id());
290 wt.sig_running_.wait();
291 TEC_TRACE("`sig_running' received.");
292 //
293 // Exit immediately if flagged.
294 //
295 if (wt.flag_exited_) {
296 return;
297 }
298 //
299 // Initialize the worker.
300 //
301 TEC_TRACE("on_init() called ...");
302 wt.status_ = wt.on_init();
303 TEC_TRACE("on_init() returned {}.", wt.status_);
304 //
305 // Signal initialization complete.
306 //
307 wt.sig_inited_.set();
308 TEC_TRACE("`sig_inited' signalled.");
309
310 if (wt.status_) {
311 //
312 // Process messages if initialized successfully.
313 //
314 TEC_TRACE("entering message loop.");
315 bool stop = false;
316 do {
317 auto msg = wt.mq_.dequeue();
318 TEC_TRACE("received Message [{}].", name(msg));
319 if (is_null(msg)) {
320 stop = true;
321 wt.flag_exited_ = true;
322 } else {
323 wt.dispatch(std::cref(msg));
324 }
325 } while (!stop);
326 TEC_TRACE("leaving message loop, {} message(s) left in queue...", wt.mq_.size());
327 }
328 //
329 // Finalize if it was initialized successfully.
330 //
331 if (wt.status_) {
332 TEC_TRACE("on_exit() called ...");
333 wt.status_ = wt.on_exit();
334 TEC_TRACE("on_exit() returned {}.", wt.status_);
335 }
336 }
337 };
338
339protected:
346 virtual Status on_init() { return {}; }
347
353 virtual Status on_exit() { return {}; }
354
355protected:
360 thread_ = std::thread(details<Params>::thread_proc, std::ref(*this));
361 return {};
362 }
363
364public:
372 Status run() override {
373 Lock lk{mtx_thread_proc_};
374 TEC_ENTER("Worker::run");
375 // Create the Daemon's thread in suspended state.
376 auto status = create_thread();
377 if (!status) {
378 return status;
379 }
380 //
381 // Resume the thread.
382 //
383 flag_running_ = true;
384 sig_running_.set();
385 TEC_TRACE("`sig_running' signalled.");
386 //
387 // Wait for thread initialization completed.
388 //
389 TEC_TRACE("waiting for `sig_inited' signalled ...");
390 sig_inited_.wait();
391 // Return a result of thread initialization.
392 return status_;
393 }
394
403 Status terminate() override {
404 Lock lk{mtx_thread_proc_};
405 TEC_ENTER("Worker::terminate");
406 if (!thread_.joinable()) {
407 TEC_TRACE("WARNING: no thread exists!");
408 return status_;
409 }
410 if (!flag_running_) {
411 TEC_TRACE("Exiting the suspended thread...");
412 flag_exited_ = true;
413 sig_running_.set();
414 }
415 //
416 // Send the null message on normal exiting.
417 //
418 if (status_ && !flag_exited_) {
419 send(nullmsg());
420 TEC_TRACE("QUIT sent.");
421 }
422 //
423 // Wait for the thread to finish its execution.
424 //
425 TEC_TRACE("waiting for thread {} to finish ...", id());
426 thread_.join();
427 TEC_TRACE("thread {} finished OK.", id());
428
429 return status_;
430 }
431
432}; // class Worker
433
434} // namespace tec
Abstract interface for a daemon that runs in a separate thread.
Definition tec_daemon.hpp:47
A thread-safe queue implementation for storing and retrieving elements of type T.
Definition tec_queue.hpp:52
void enqueue(T &&t)
Adds an element to the back of the queue.
Definition tec_queue.hpp:83
T dequeue(void)
Retrieves and removes the front element from the queue.
Definition tec_queue.hpp:95
std::size_t size() const
Returns the current number of elements in the queue.
Definition tec_queue.hpp:111
A thread-safe signal mechanism for inter-thread synchronization.
Definition tec_signal.hpp:44
void set()
Sets the signal to the signaled state and notifies all waiting threads.
Definition tec_signal.hpp:72
void wait() const
Waits indefinitely until the signal is set.
Definition tec_signal.hpp:85
A class implementing message processing as a daemon.
Definition tec_worker.hpp:70
constexpr const Params & params() const
Retrieves the worker's configuration parameters.
Definition tec_worker.hpp:161
id_t id() const
Retrieves the worker thread's ID.
Definition tec_worker.hpp:155
TParams Params
Type alias for worker parameters.
Definition tec_worker.hpp:72
virtual void dispatch(const Message &msg)
Dispatches a message to its registered callback.
Definition tec_worker.hpp:256
std::lock_guard< std::mutex > Lock
Type alias for mutex lock guard.
Definition tec_worker.hpp:74
std::thread::id id_t
Type alias for thread ID.
Definition tec_worker.hpp:73
virtual Status on_init()
Default callback invoked during worker thread initialization.
Definition tec_worker.hpp:346
const Signal & sig_terminated() const override
Retrieves the signal indicating the worker has terminated.
Definition tec_worker.hpp:168
Status terminate() override
Terminates the worker thread.
Definition tec_worker.hpp:403
Status run() override
Starts the worker thread's message polling.
Definition tec_worker.hpp:372
std::function< void(Worker< Params > *, const Message &)> CallbackFunc
Type alias for a callback function to process messages.
Definition tec_worker.hpp:80
virtual ~Worker()
Destructor that ensures proper thread termination.
Definition tec_worker.hpp:145
Params params_
Configuration parameters for the worker.
Definition tec_worker.hpp:83
void send(Message &&msg) override
Sends a message to the worker's queue.
Definition tec_worker.hpp:177
Worker(const Params &params)
Constructs a worker.
Definition tec_worker.hpp:133
virtual Status create_thread()
Create the Daemon's thread in suspended state.
Definition tec_worker.hpp:359
void register_callback(Derived *worker, void(Derived::*callback)(const Message &msg))
Registers a callback for a specific message type.
Definition tec_worker.hpp:220
virtual Status on_exit()
Default callback invoked when exiting the worker thread.
Definition tec_worker.hpp:353
Status make_request(Request &&req, Reply &&rep) override
Sends a request and waits for a reply in a daemon thread.
Definition tec_worker.hpp:195
#define TEC_ENTER(name)
Logs an entry message for a named context (e.g., function).
Definition tec_trace.hpp:211
#define TEC_TRACE(...)
Logs a formatted trace message.
Definition tec_trace.hpp:222
A message used for RPC-style calls.
Definition tec_message.hpp:89
Helper struct to signal termination on exit.
Definition tec_signal.hpp:110
Defines the minimal contract for any long-lived service or processing component.
Common definitions and utilities for the tec namespace.
Defines a flexible message type and helper functions for the tec namespace.
Message nullmsg() noexcept
Creates a null message.
Definition tec_message.hpp:67
bool is_null(const Message &msg) noexcept
Checks if a message is null.
Definition tec_message.hpp:75
std::any Reply
Type alias for a reply object that can hold any object.
Definition tec_message.hpp:55
std::any Message
Type alias for a message that can hold any object.
Definition tec_message.hpp:43
std::any Request
Type alias for a request object that can hold any object.
Definition tec_message.hpp:49
auto name(const Message &msg) noexcept
Retrieves the type name of a message's content for registering the corresponding message handler.
Definition tec_message.hpp:84
Thread-safe queue implementation.
Defines error handling types and utilities for the tec namespace.
Provides a thread-safe tracing utility for debugging in the tec namespace.