TEC
A lightweight C++ library enabling safe, efficient execution in multithreaded and concurrent systems.
Loading...
Searching...
No Matches
tec_socket_server.hpp
Go to the documentation of this file.
1// Time-stamp: <Last changed 2026-02-19 15:52:21 by magnolia>
2/*----------------------------------------------------------------------
3------------------------------------------------------------------------
4Copyright (c) 2020-2026 The Emacs Cat (https://github.com/olddeuteronomy/tec).
5
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
9
10 http://www.apache.org/licenses/LICENSE-2.0
11
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17------------------------------------------------------------------------
18----------------------------------------------------------------------*/
27#pragma once
28
29#include <cstddef>
30#include <memory>
31#include <vector>
32#ifndef _POSIX_C_SOURCE
33// This line fixes the "storage size of 'hints' isn't known" issue.
34#define _POSIX_C_SOURCE 200809L
35#endif
36
37#include <cstdio>
38#include <unistd.h>
39#include <memory.h>
40#include <netdb.h>
41#include <arpa/inet.h>
42#include <sys/types.h>
43#include <sys/socket.h>
44
45#include <atomic>
46#include <cerrno>
47#include <csignal>
48#include <string>
49#include <thread>
50
51#include "tec/tec_def.hpp" // IWYU pragma: keep
52#include "tec/tec_signal.hpp"
53#include "tec/tec_trace.hpp"
54#include "tec/tec_status.hpp"
55#include "tec/tec_memfile.hpp"
56#include "tec/tec_actor.hpp"
59
60
61namespace tec {
62
81template <typename TParams>
82class SocketServer : public Actor {
83
84public:
85 using Params = TParams;
86
87protected:
88
91 std::atomic_bool stop_polling_;
93
94private:
95
96 std::unique_ptr<SocketThreadPool> pool_;
97 std::vector<char> buffer_;
98
100 template <typename Params>
101 struct Task {
102 SocketServer<Params>* server;
103 Socket sock;
104 };
105
107 struct details {
112 static void socket_proc(Task<Params> task) {
113 task.server->dispatch_socket(task.sock);
114 }
115 };
116
117public:
118
125 explicit SocketServer(const Params& params)
126 : Actor()
127 , params_{params}
128 , listenfd_{-1}
129 , stop_polling_{false}
131 {
132 static_assert(
133 std::is_base_of_v<SocketServerParams, Params>,
134 "Not derived from tec::SocketServerParams class");
135 }
136
138 virtual ~SocketServer() = default;
139
147 void start(Signal* sig_started, Status* status) override {
148 TEC_ENTER("SocketServer::start");
149 //
150 // Resolve the server address and bind the host.
151 //
152 *status = resolve_and_bind_host();
153 if (!status->ok()) {
154 sig_started->set();
155 return;
156 }
157 //
158 // Start listening on the host.
159 //
160 *status = start_listening();
161 if (!status->ok()) {
162 sig_started->set();
163 return;
164 }
165 //
166 // Prepare a single-thread buffer or a multi-thread pool.
167 //
168 if (params_.use_thread_pool) {
169 // Multiple-threaded server.
170 pool_ = std::make_unique<SocketThreadPool>(get_buffer_size(), params_.thread_pool_size);
171 } else {
172 // Single-threaded server.
173 buffer_.resize(get_buffer_size());
174 }
175 TEC_TRACE("Buffer size is {} bytes.", get_buffer_size());
176 //
177 // Start polling.
178 //
179 TEC_TRACE("Thread pool is {}.", (params_.use_thread_pool ? "ON" : "OFF"));
180 poll(sig_started);
181 }
182
190 void shutdown(Signal* sig_stopped) override {
191 TEC_ENTER("SocketServer::shutdown");
192 //
193 // Stop polling in a separate thread
194 //
195 TEC_TRACE("Stopping server polling...");
196 std::thread polling_thread([this] {
197 stop_polling_ = true;
198 ::shutdown(listenfd_, SHUT_RDWR);
199 ::close(listenfd_);
200 });
201 polling_thread.join();
202 TEC_TRACE("Closing server socket...");
204 //
205 // Wait until all tasks in the thread pool finished.
206 //
207 if (params_.use_thread_pool) {
208 pool_.reset(nullptr);
209 }
210 TEC_TRACE("Server stopped.");
211 sig_stopped->set();
212 }
213
218 Status process_request(Request request, Reply reply) override {
220 }
221
222protected:
224 constexpr char* get_buffer() {
225 return buffer_.data();
226 }
227
229 constexpr size_t get_buffer_size() const {
230 return params_.buffer_size;
231 }
232
239 virtual Status set_socket_options(int fd) {
240 TEC_ENTER("SocketServer::set_socket_options");
241
242 // Avoid "Address already in use" error.
243 if( ::setsockopt(fd, SOL_SOCKET, SO_REUSEADDR,
244 &params_.opt_reuse_addr, sizeof(int)) < 0 ) {
245#ifdef _TEC_TRACE_ON
246 Status status{errno, "setsockopt SO_REUSEADDR failed", Error::Kind::NetErr};
247 TEC_TRACE("{}.", status);
248#endif
249 }
250 TEC_TRACE("SO_REUSEADDR is {}.", params_.opt_reuse_addr);
251
252 // Reuse the port.
253 if (::setsockopt(fd, SOL_SOCKET, SO_REUSEPORT,
254 &params_.opt_reuse_port, sizeof(int)) < 0) {
255#ifdef _TEC_TRACE_ON
256 Status status{errno, "setsockopt SO_REUSEPORT failed", Error::Kind::NetErr};
257 TEC_TRACE("{}.", status);
258#endif
259 }
260 TEC_TRACE("SO_REUSEPORT is {}.", params_.opt_reuse_port);
261
262 return {};
263 }
264
273 virtual Socket get_socket_info(int client_fd, sockaddr_storage* client_addr) {
274 char client_ip[INET6_ADDRSTRLEN];
275 client_ip[0] = '\0';
276 int client_port{-1};
277
278 if (client_addr->ss_family == AF_INET) {
279 // IPv4
280 struct sockaddr_in *s = (struct sockaddr_in*)&client_addr;
281 ::inet_ntop(AF_INET, &s->sin_addr, client_ip, sizeof(client_ip));
282 client_port = ::ntohs(s->sin_port);
283 }
284 else if (client_addr->ss_family == AF_INET6) {
285 // IPv6
286 struct sockaddr_in6 *s = (struct sockaddr_in6*)&client_addr;
287 ::inet_ntop(AF_INET6, &s->sin6_addr, client_ip, sizeof(client_ip));
288 client_port = ::ntohs(s->sin6_port);
289 }
290
291 // Uses the single thread buffer by default.
292 return {client_fd, client_ip, client_port, get_buffer(), get_buffer_size()};
293 }
294
300 TEC_ENTER("SocketServer::resolve_and_bind_host");
301
302 // Resolve the server address.
303 TEC_TRACE("Resolving address {}:{}...", params_.addr, params_.port);
304 addrinfo hints;
305 ::memset(&hints, 0, sizeof(hints));
306 hints.ai_family = params_.family;
307 hints.ai_socktype = params_.socktype;
308 hints.ai_protocol = params_.protocol;
309
310 // getaddrinfo() returns a list of address structures.
311 // Try each address until we successfully bind().
312 addrinfo* servinfo{NULL};
313 char port_str[16];
314 ::snprintf(port_str, 15, "%d", params_.port);
315 int ecode = ::getaddrinfo(params_.addr.c_str(), port_str,
316 &hints, &servinfo);
317 if( ecode != 0 ) {
318 std::string emsg{::gai_strerror(ecode)};
319 TEC_TRACE("Address resolving error: {}", emsg);
320 return {ecode, emsg, Error::Kind::NetErr};
321 }
322 TEC_TRACE("Address resolved OK.");
323
324 // If socket() or bind() fails, we close the socket
325 // and try the next address.
326 addrinfo* p{NULL};
327 int fd{-1};
328 TEC_TRACE("Binding...");
329 for (p = servinfo ; p != NULL ; p = p->ai_next) {
330 fd = ::socket(p->ai_family, p->ai_socktype, p->ai_protocol);
331 if( fd == -1 ) {
332 // Try next socket.
333 continue;
334 }
335
336 // Set options.
337 auto option_status = set_socket_options(fd);
338 if (!option_status) {
339 // On failure.
340 ::close(fd);
341 ::freeaddrinfo(servinfo);
342 return option_status;
343 }
344
345 if (::bind(fd, p->ai_addr, p->ai_addrlen) != -1) {
346 // Success.
347 break;
348 }
349
350 // Try next socket.
351 ::close(fd);
352 }
353
354 // No longer needed.
355 ::freeaddrinfo(servinfo);
356
357 if (p == NULL) {
358 auto emsg = format("Failed to bind to {}:{}", params_.addr, params_.port);
359 TEC_TRACE(emsg);
360 return {EAFNOSUPPORT, emsg, Error::Kind::NetErr};
361 }
362
363 // Success.
364 listenfd_ = fd;
365 return {};
366 }
367
373 TEC_ENTER("SocketServer::start_listening");
374 if (::listen(listenfd_, params_.queue_size) == -1) {
375 auto emsg = format("Failed to listen on {}:{}.", params_.addr, params_.port);
376 ::close(listenfd_);
377 listenfd_ = EOF;
378 return {errno, emsg, Error::Kind::NetErr};
379 }
380 TEC_TRACE("Server listening on {}:{}.", params_.addr, params_.port);
381 return {};
382 }
383
390 virtual Status accept_connection(int* clientfd, sockaddr_storage* client_addr) {
391 TEC_ENTER("SocketServer::accept_connection");
392 // Wait for incoming connection.
393 socklen_t sin_size = sizeof(sockaddr_storage);
394 auto fd = ::accept(listenfd_,
395 (sockaddr *)client_addr,
396 &sin_size);
397 // Check result.
398 if (fd == EOF) {
399 std::string err_msg;
400 if( errno == EINVAL || errno == EINTR || errno == EBADF ) {
401 err_msg = format("Polling interrupted by signal {}.", errno);
402 }
403 else {
404 err_msg = format("accept() failed with errno={}.", errno);
405 }
406 TEC_TRACE(err_msg);
407 return {errno, err_msg, Error::Kind::NetErr};
408 }
409 *clientfd = fd;
410 return {};
411 }
412
417 virtual void poll(Signal* sig_started) {
418 TEC_ENTER("SocketServer::poll");
419 sig_started->set();
420
421 while (!stop_polling_) {
422 int clientfd{-1};
423 //
424 // Waiting for incoming connection.
425 //
426 sockaddr_storage client_addr;
427 TEC_TRACE("Waiting for incoming connection...");
428 if (!accept_connection(&clientfd, &client_addr)) {
429 continue;
430 }
431 //
432 // Obtain socket info.
433 //
434 Socket sock = get_socket_info(clientfd, &client_addr);
435 if (sock.port == -1) {
436 // Failed -- close input connection and continue polling.
437 ::close(clientfd);
438 continue;
439 } else {
440 TEC_TRACE("Accepted connection from {}:{}.", sock.addr, sock.port);
441 }
442 //
443 // Process incoming connection.
444 //
445 process_socket(sock);
446 }
448 }
449
456 virtual void process_socket(Socket sock) {
457 TEC_ENTER("process_socket");
458 if (pool_) {
459 // The thread pool uses the *round-robin* algorithm to assign a buffer for a socket.
460 size_t idx = pool_->get_next_worker_index();
461 TEC_TRACE("Pool worker IDX={}.", idx);
462 // Trick -- substitute `sock.buffer` with a per-thread buffer.
463 sock.buffer = pool_->get_buffer(idx);
464 sock.buffer_size = pool_->get_buffer_size();
465 // Async processing -- enqueue a client handling task to the thread pool.
466 Task<Params> task{this, sock};
467 pool_->enqueue([task] {
468 details::socket_proc(task);
469 });
470 }
471 else {
472 // Synchronous processing.
473 dispatch_socket(sock);
474 }
475 }
476
484 virtual void dispatch_socket(Socket _sock) {
485 TEC_ENTER("SocketServer::dispatch_socket");
486 Socket sock{_sock};
488 on_string(&sock);
489 }
491 on_net_data(&sock);
492 }
494 }
495
500 virtual void close_client_connection(Socket* sock) {
501 TEC_ENTER("SocketServer::close_client_connection");
502 TEC_TRACE("Closing connection with {}:{}...", sock->addr, sock->port);
503 if (sock->fd != -1) {
504 ::shutdown(sock->fd, SHUT_RDWR);
505 ::close(sock->fd);
506 sock->fd = -1;
507 }
508 }
509
517 virtual void on_string(const Socket* sock) {
518 TEC_ENTER("SocketServer::on_char_stream");
519 // Default implementation just echoes received data.
520 Bytes data;
521 auto status = Socket::recv(data, sock, 0);
522 if (status) {
523 Socket::send(data, sock);
524 }
525 }
526
533 virtual void on_net_data(const Socket* sock) {
534 }
535
536}; // class SocketServer
537
538} // namespace tec
Abstract base class defining the actor lifecycle and request handling interface.
Definition tec_actor.hpp:63
A byte buffer class with stream-like read/write semantics.
Definition tec_memfile.hpp:50
A thread-safe signal mechanism for inter-thread synchronization.
Definition tec_signal.hpp:44
void set()
Sets the signal to the signaled state and notifies all waiting threads.
Definition tec_signal.hpp:72
void wait() const
Waits indefinitely until the signal is set.
Definition tec_signal.hpp:85
Generic BSD socket server template using actor pattern with configurable parameters.
Definition tec_socket_server.hpp:82
virtual void process_socket(Socket sock)
Decides how to handle newly accepted client socket.
Definition tec_socket_server.hpp:456
void shutdown(Signal *sig_stopped) override
Gracefully shuts down the server.
Definition tec_socket_server.hpp:190
virtual Status accept_connection(int *clientfd, sockaddr_storage *client_addr)
Accepts one incoming connection (blocking)
Definition tec_socket_server.hpp:390
virtual void on_net_data(const Socket *sock)
Default handler for binary / structured network protocols.
Definition tec_socket_server.hpp:533
virtual ~SocketServer()=default
Virtual destructor (default implementation)
TParams Params
Parameter type — must inherit from tec::SocketServerParams
Definition tec_socket_server.hpp:85
constexpr char * get_buffer()
Definition tec_socket_server.hpp:224
Status process_request(Request request, Reply reply) override
Default request processor (not used in socket server)
Definition tec_socket_server.hpp:218
virtual void dispatch_socket(Socket _sock)
Executes client connection handling logic.
Definition tec_socket_server.hpp:484
Signal polling_stopped_
Signal object set when polling loop has fully exited.
Definition tec_socket_server.hpp:92
constexpr size_t get_buffer_size() const
Definition tec_socket_server.hpp:229
SocketServer(const Params &params)
Constructs server instance with given parameters.
Definition tec_socket_server.hpp:125
void start(Signal *sig_started, Status *status) override
Starts the server: bind → listen → accept loop.
Definition tec_socket_server.hpp:147
virtual Socket get_socket_info(int client_fd, sockaddr_storage *client_addr)
Creates Socket object from raw file descriptor and peer address.
Definition tec_socket_server.hpp:273
virtual Status set_socket_options(int fd)
Sets common socket options (SO_REUSEADDR, SO_REUSEPORT)
Definition tec_socket_server.hpp:239
virtual Status resolve_and_bind_host()
Resolves address/port and binds listening socket.
Definition tec_socket_server.hpp:299
std::atomic_bool stop_polling_
Atomic flag used to signal the acceptor/polling loop to exit.
Definition tec_socket_server.hpp:91
int listenfd_
Listening socket file descriptor (-1 when not bound/listening)
Definition tec_socket_server.hpp:90
virtual void close_client_connection(Socket *sock)
Closes client connection cleanly.
Definition tec_socket_server.hpp:500
Params params_
Server configuration parameters (address, port, buffer size, threading mode, etc.)
Definition tec_socket_server.hpp:89
virtual void on_string(const Socket *sock)
Default handler for character-stream / line-based protocols.
Definition tec_socket_server.hpp:517
virtual void poll(Signal *sig_started)
Main acceptor loop — runs until stop_polling_ is set.
Definition tec_socket_server.hpp:417
virtual Status start_listening()
Starts listening on the bound socket.
Definition tec_socket_server.hpp:372
#define TEC_ENTER(name)
Logs an entry message for a named context (e.g., function).
Definition tec_trace.hpp:211
#define TEC_TRACE(...)
Logs a formatted trace message.
Definition tec_trace.hpp:222
@ NetErr
Network-related error.
@ NotImplemented
Not implemented.
static constexpr int kModeNetData
Treat incoming data as length-prefixed binary network messages.
Definition tec_socket.hpp:182
static constexpr int kModeCharStream
Treat incoming data as null-terminated character streams.
Definition tec_socket.hpp:179
Lightweight wrapper around a connected socket file descriptor.
Definition tec_socket.hpp:272
static Status recv(Bytes &data, const Socket *sock, size_t length)
Receive data from a socket into a MemFile (Bytes).
Definition tec_socket.hpp:329
static Status send(const Bytes &data, const Socket *sock)
Send the entire contents of a MemFile (Bytes) through a socket.
Definition tec_socket.hpp:395
char * buffer
Buffer used in send/recv operations.
Definition tec_socket.hpp:276
int port
Peer port number.
Definition tec_socket.hpp:275
size_t buffer_size
Size of the buffer.
Definition tec_socket.hpp:277
int fd
Underlying socket file descriptor.
Definition tec_socket.hpp:273
char addr[INET6_ADDRSTRLEN]
Peer address as a null-terminated string (IPv4 or IPv6).
Definition tec_socket.hpp:274
constexpr bool ok() const
Checks if the status indicates success.
Definition tec_status.hpp:120
Core interface for TEC actors with lifecycle management and request processing.
Common definitions and utilities for the tec namespace.
A byte buffer class with stream-like read/write semantics.
std::any Reply
Type alias for a reply object that can hold any object.
Definition tec_message.hpp:55
std::any Request
Type alias for a request object that can hold any object.
Definition tec_message.hpp:49
std::string format(const T &arg)
Formats a single argument into a string.
Definition tec_print.hpp:171
Defines a thread-safe signal implementation using mutex and condition variable.
Generic BSD socket parameters and helpers.
Specialized thread pool that maintains one pre-allocated fixed-size buffer per worker thread.
Defines error handling types and utilities for the tec namespace.
Provides a thread-safe tracing utility for debugging in the tec namespace.