From 18c1db8323b98d00769fdf71bd72e60028762850 Mon Sep 17 00:00:00 2001
From: Jérémie Galarneau <jeremie.galarneau@efficios.com>
Date: Wed, 25 Feb 2026 22:02:56 +0000
Subject: [PATCH] sessiond: replace legacy kernel management thread with hotplug handler

In order to get rid of the legacy kernel tracer management data
structures, the manage-kernel thread must be transitioned to use
the modules domain orchestrator API.

As the manage-kernel thread is relatively simple, it is essentially
re-written in this change to use the modern facilities of the project.

The manage-kernel thread used a global pipe (the_kernel_poll_pipe)
written to by various command handlers to signal that its poll set
needed rebuilding. On every notification, it iterated all sessions
and all channels to rebuild the entire epoll set, then matched
hotplug events by scanning the global session list for a matching
raw fd.

Replace this with a hotplug handler thread that uses an explicit
command queue and delegates hotplug handling to the kernel domain
orchestrator:

- Introduce a reusable, templated command_queue<T> utility in
  src/common/ built on lttng::eventfd and lttng::synchro::waiter.
  It provides send() (fire-and-forget) and send_and_wait()
  (synchronous) methods. The waiter/waker plumbing is internal to
  command_base and command_queue (friend relationship).

- Rename manage-kernel.{cpp,hpp} to hotplug-handler.{cpp,hpp}.
  The new thread uses lttng::poller and processes typed commands
  (ADD_CHANNEL, REMOVE_CHANNEL, QUIT) carrying stream_group
  pointers directly, eliminating raw fd lookups.

- Add handle_channel_hotplug() to the domain_orchestrator. The
  orchestrator receives the command queue reference at construction
  and sends ADD_CHANNEL commands from start() and REMOVE_CHANNEL
  commands from its destructor, fully encapsulating the interaction.

- Remove the_kernel_poll_pipe global variable and all pipe writes
  from cmd_enable_channel, cmd_add_context, cmd_enable_event, and
  session_release. Remove the now-unused wpipe/kwpipe parameters
  from these functions' signatures.

Change-Id: I0b24b6cc941d0e9f9557b769af252a0260f401d0
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
---

diff --git a/src/bin/lttng-sessiond/Makefile.am b/src/bin/lttng-sessiond/Makefile.am
index e28cb10..33b4767 100644
--- a/src/bin/lttng-sessiond/Makefile.am
+++ b/src/bin/lttng-sessiond/Makefile.am
@@ -54,7 +54,7 @@
                        dispatch.cpp dispatch.hpp \
                        register.cpp register.hpp \
                        manage-apps.cpp manage-apps.hpp \
-                       manage-kernel.cpp manage-kernel.hpp \
+                       hotplug-handler.cpp hotplug-handler.hpp \
                        manage-consumer.cpp manage-consumer.hpp \
                        clear.cpp clear.hpp \
                        tracker.cpp tracker.hpp \
diff --git a/src/bin/lttng-sessiond/client.cpp b/src/bin/lttng-sessiond/client.cpp
index 78d6cea..8f7f0ac 100644
--- a/src/bin/lttng-sessiond/client.cpp
+++ b/src/bin/lttng-sessiond/client.cpp
@@ -701,6 +701,8 @@
 			lttng::file_descriptor(session->kernel_session->fd),
 			session->kernel_space_domain,
 			*session->kernel_session->consumer,
+			session->id,
+			*the_hotplug_handler_queue,
 			session->kernel_session);
 
 	return LTTNG_OK;
@@ -1653,8 +1655,7 @@
 			goto error;
 		}
 
-		ret = cmd_add_context(
-			cmd_ctx, *target_session, event_context, the_kernel_poll_pipe[1]);
+		ret = cmd_add_context(cmd_ctx, *target_session, event_context);
 		lttng_event_context_destroy(event_context);
 		break;
 	}
@@ -1679,7 +1680,7 @@
 	}
 	case LTTCOMM_SESSIOND_COMMAND_ENABLE_CHANNEL:
 	{
-		ret = cmd_enable_channel(cmd_ctx, *target_session, *sock, the_kernel_poll_pipe[1]);
+		ret = cmd_enable_channel(cmd_ctx, *target_session, *sock);
 		break;
 	}
 	case LTTCOMM_SESSIOND_COMMAND_PROCESS_ATTR_TRACKER_ADD_INCLUDE_VALUE:
@@ -1878,7 +1879,6 @@
 					 filter_expression,
 					 exclusions,
 					 bytecode,
-					 the_kernel_poll_pipe[1],
 					 std::move(event_rule)) :
 			cmd_disable_event(cmd_ctx,
 					  *target_session,
diff --git a/src/bin/lttng-sessiond/cmd.cpp b/src/bin/lttng-sessiond/cmd.cpp
index bce8c3f..9a7619f 100644
--- a/src/bin/lttng-sessiond/cmd.cpp
+++ b/src/bin/lttng-sessiond/cmd.cpp
@@ -643,12 +643,10 @@
 				     char *filter_expression,
 				     struct lttng_bytecode *filter,
 				     struct lttng_event_exclusion *exclusion,
-				     int wpipe,
 				     lttng::ctl::event_rule_uptr event_rule);
 static enum lttng_error_code cmd_enable_channel_internal(ltt_session::locked_ref& session,
 							 const struct lttng_domain *domain,
-							 const struct lttng_channel& channel_attr,
-							 int wpipe);
+							 const struct lttng_channel& channel_attr);
 
 /*
  * Create a session path used by list_lttng_sessions for the case that the
@@ -1398,10 +1396,8 @@
 
 /*
  * Command LTTNG_ENABLE_CHANNEL processed by the client thread.
- *
- * The wpipe arguments is used as a notifier for the kernel thread.
  */
-int cmd_enable_channel(command_ctx *cmd_ctx, ltt_session::locked_ref& session, int sock, int wpipe)
+int cmd_enable_channel(command_ctx *cmd_ctx, ltt_session::locked_ref& session, int sock)
 {
 	const struct lttng_domain command_domain = cmd_ctx->lsm.domain;
 
@@ -1446,7 +1442,7 @@
 		return raw_channel;
 	}());
 
-	const auto cmd_ret = cmd_enable_channel_internal(session, &command_domain, *channel, wpipe);
+	const auto cmd_ret = cmd_enable_channel_internal(session, &command_domain, *channel);
 	if (cmd_ret != LTTNG_OK) {
 		return cmd_ret;
 	}
@@ -1456,8 +1452,7 @@
 
 static enum lttng_error_code cmd_enable_channel_internal(ltt_session::locked_ref& session,
 							 const struct lttng_domain *domain,
-							 const struct lttng_channel& channel_attr,
-							 int wpipe __attribute__((unused)))
+							 const struct lttng_channel& channel_attr)
 {
 	enum lttng_error_code ret_code = LTTNG_OK;
 	struct ltt_ust_session *usess = session->ust_session;
@@ -2568,8 +2563,7 @@
  */
 int cmd_add_context(struct command_ctx *cmd_ctx,
 		    ltt_session::locked_ref& locked_session,
-		    const struct lttng_event_context *event_context,
-		    int kwpipe)
+		    const struct lttng_event_context *event_context)
 {
 	int ret, chan_kern_created = 0, chan_ust_created = 0;
 	const enum lttng_domain_type domain_type = cmd_ctx->lsm.domain.type;
@@ -2599,8 +2593,7 @@
 			kern_domain.type = LTTNG_DOMAIN_KERNEL;
 			kern_domain.buf_type = LTTNG_BUFFER_GLOBAL;
 
-			ret = cmd_enable_channel_internal(
-				locked_session, &kern_domain, *attr, kwpipe);
+			ret = cmd_enable_channel_internal(locked_session, &kern_domain, *attr);
 			if (ret != LTTNG_OK) {
 				goto error;
 			}
@@ -2828,7 +2821,6 @@
 					  char *raw_filter_expression,
 					  struct lttng_bytecode *raw_bytecode,
 					  struct lttng_event_exclusion *raw_exclusion,
-					  int wpipe,
 					  bool internal_event,
 					  lttng::ctl::event_rule_uptr event_rule)
 {
@@ -2905,8 +2897,7 @@
 					return LTTNG_ERR_INVALID;
 				}
 
-				ret = cmd_enable_channel_internal(
-					locked_session, domain, *attr, wpipe);
+				ret = cmd_enable_channel_internal(locked_session, domain, *attr);
 				if (ret != LTTNG_OK) {
 					return static_cast<lttng_error_code>(ret);
 				}
@@ -2987,7 +2978,7 @@
 				return LTTNG_ERR_INVALID;
 			}
 
-			ret = cmd_enable_channel_internal(locked_session, domain, *attr, wpipe);
+			ret = cmd_enable_channel_internal(locked_session, domain, *attr);
 			if (ret != LTTNG_OK) {
 				return static_cast<lttng_error_code>(ret);
 			}
@@ -3136,7 +3127,6 @@
 							filter_expression_copy.release(),
 							bytecode_copy.release(),
 							nullptr,
-							wpipe,
 							std::move(internal_event_rule));
 		}
 
@@ -3225,7 +3215,6 @@
 		     char *filter_expression,
 		     struct lttng_event_exclusion *exclusion,
 		     struct lttng_bytecode *bytecode,
-		     int wpipe,
 		     lttng::ctl::event_rule_uptr event_rule)
 {
 	int ret;
@@ -3249,7 +3238,6 @@
 				filter_expression,
 				bytecode,
 				exclusion,
-				wpipe,
 				false,
 				std::move(event_rule));
 	filter_expression = nullptr;
@@ -3270,7 +3258,6 @@
 				     char *filter_expression,
 				     struct lttng_bytecode *filter,
 				     struct lttng_event_exclusion *exclusion,
-				     int wpipe,
 				     lttng::ctl::event_rule_uptr event_rule)
 {
 	return _cmd_enable_event(locked_session,
@@ -3280,7 +3267,6 @@
 				 filter_expression,
 				 filter,
 				 exclusion,
-				 wpipe,
 				 true,
 				 std::move(event_rule));
 }
diff --git a/src/bin/lttng-sessiond/cmd.hpp b/src/bin/lttng-sessiond/cmd.hpp
index 0151107..52ad5ad 100644
--- a/src/bin/lttng-sessiond/cmd.hpp
+++ b/src/bin/lttng-sessiond/cmd.hpp
@@ -54,10 +54,7 @@
 int cmd_disable_channel(const ltt_session::locked_ref& session,
 			enum lttng_domain_type domain,
 			char *channel_name);
-int cmd_enable_channel(struct command_ctx *cmd_ctx,
-		       ltt_session::locked_ref& session,
-		       int sock,
-		       int wpipe);
+int cmd_enable_channel(struct command_ctx *cmd_ctx, ltt_session::locked_ref& session, int sock);
 
 /* Process attribute tracker commands */
 enum lttng_tracking_policy
@@ -94,8 +91,7 @@
 				   lttng::ctl::event_rule_uptr event_rule);
 int cmd_add_context(struct command_ctx *cmd_ctx,
 		    ltt_session::locked_ref& locked_session,
-		    const struct lttng_event_context *event_context,
-		    int kwpipe);
+		    const struct lttng_event_context *event_context);
 int cmd_set_filter(struct ltt_session *session,
 		   enum lttng_domain_type domain,
 		   char *channel_name,
@@ -107,7 +103,6 @@
 		     char *filter_expression,
 		     struct lttng_event_exclusion *exclusion,
 		     struct lttng_bytecode *bytecode,
-		     int wpipe,
 		     lttng::ctl::event_rule_uptr event_rule);
 
 /* Trace session action commands */
diff --git a/src/bin/lttng-sessiond/globals.cpp b/src/bin/lttng-sessiond/globals.cpp
index b89399f..83c639e 100644
--- a/src/bin/lttng-sessiond/globals.cpp
+++ b/src/bin/lttng-sessiond/globals.cpp
@@ -7,10 +7,13 @@
  *
  */
 
+#include "hotplug-handler.hpp"
 #include "lttng-sessiond.hpp"
 
 #include <common/uuid.hpp>
 
+#include <memory>
+
 lttng_uuid the_sessiond_uuid;
 
 int the_ust_consumerd64_fd = -1;
@@ -29,7 +32,8 @@
 struct lttng_kernel_abi_tracer_version the_kernel_tracer_version;
 struct lttng_kernel_abi_tracer_abi_version the_kernel_tracer_abi_version;
 
-int the_kernel_poll_pipe[2] = { -1, -1 };
+std::unique_ptr<lttng::command_queue<lttng::sessiond::hotplug_handler::command>>
+	the_hotplug_handler_queue;
 
 pid_t the_ppid;
 pid_t the_child_ppid;
diff --git a/src/bin/lttng-sessiond/hotplug-handler.cpp b/src/bin/lttng-sessiond/hotplug-handler.cpp
new file mode 100644
index 0000000..bbd3803
--- /dev/null
+++ b/src/bin/lttng-sessiond/hotplug-handler.cpp
@@ -0,0 +1,195 @@
+/*
+ * SPDX-FileCopyrightText: 2011 EfficiOS Inc.
+ * SPDX-FileCopyrightText: 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ * SPDX-FileCopyrightText: 2013 Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ *
+ * SPDX-License-Identifier: GPL-2.0-only
+ *
+ */
+
+#include "health-sessiond.hpp"
+#include "hotplug-handler.hpp"
+#include "modules-domain-orchestrator.hpp"
+#include "session.hpp"
+#include "thread.hpp"
+
+#include <common/error.hpp>
+#include <common/poller.hpp>
+
+#include <unordered_map>
+
+namespace ls = lttng::sessiond;
+namespace lhh = ls::hotplug_handler;
+
+namespace {
+
+struct tracked_channel {
+	ls::modules::stream_group *channel;
+	ltt_session::id_t session_id;
+};
+
+struct thread_context {
+	lttng::command_queue<lhh::command>& queue;
+};
+
+void process_commands(lttng::command_queue<lhh::command>& queue,
+		      lttng::poller& poller,
+		      std::unordered_map<int, tracked_channel>& tracked_channels,
+		      bool& quit_requested)
+{
+	while (auto cmd = queue.pop()) {
+		switch (cmd->type) {
+		case lhh::command_type::ADD_CHANNEL:
+		{
+			LTTNG_ASSERT(cmd->channel);
+			const auto channel_fd = cmd->channel->tracer_handle().fd();
+
+			DBG_FMT("Hotplug handler: tracking channel for hotplug: fd={}, session_id={}",
+				channel_fd,
+				cmd->session_id);
+
+			tracked_channels[channel_fd] = { cmd->channel, cmd->session_id };
+
+			poller.add(
+				cmd->channel->tracer_handle(),
+				lttng::poller::event_type::READABLE,
+				[channel_fd, &tracked_channels](lttng::poller::event_type events) {
+					if ((events & lttng::poller::event_type::READABLE) !=
+					    lttng::poller::event_type::READABLE) {
+						/*
+						 * Error or hangup on the channel fd.
+						 * The orchestrator's destructor will send a
+						 * REMOVE_CHANNEL command to clean this up.
+						 */
+						WARN_FMT(
+							"Hotplug handler: unexpected event on channel fd: fd={}, events='{}'",
+							channel_fd,
+							events);
+						return;
+					}
+
+					const auto it = tracked_channels.find(channel_fd);
+					LTTNG_ASSERT(it != tracked_channels.end());
+
+					const auto& channel = it->second;
+
+					DBG_FMT("Hotplug handler: hotplug event on channel: fd={}, session_id={}",
+						channel_fd,
+						channel.session_id);
+
+					try {
+						auto list_lock = ls::lock_session_list();
+						auto session = ltt_session::find_locked_session(
+							channel.session_id);
+
+						session->get_kernel_orchestrator()
+							.handle_channel_hotplug(*channel.channel);
+					} catch (const lttng::sessiond::exceptions::
+							 session_not_found_error&) {
+						ERR_FMT("Hotplug handler: session not found during hotplug: session_id={}",
+							channel.session_id);
+						std::abort();
+					}
+				});
+
+			cmd->_complete();
+			break;
+		}
+		case lhh::command_type::REMOVE_CHANNEL:
+		{
+			LTTNG_ASSERT(cmd->channel);
+			const auto channel_fd = cmd->channel->tracer_handle().fd();
+
+			DBG_FMT("Hotplug handler: untracking channel: fd={}", channel_fd);
+
+			const auto it = tracked_channels.find(channel_fd);
+			LTTNG_ASSERT(it != tracked_channels.end());
+
+			poller.remove(cmd->channel->tracer_handle());
+			tracked_channels.erase(it);
+
+			cmd->_complete();
+			break;
+		}
+		case lhh::command_type::QUIT:
+			DBG("Hotplug handler: quit command received");
+			quit_requested = true;
+			cmd->_complete();
+			return;
+		}
+	}
+}
+
+void *thread_hotplug_handler(void *data)
+{
+	auto *ctx = static_cast<thread_context *>(data);
+	auto& queue = ctx->queue;
+
+	DBG("Hotplug handler thread started");
+
+	health_register(the_health_sessiond, HEALTH_SESSIOND_TYPE_KERNEL);
+	health_code_update();
+
+	lttng::poller poller;
+	std::unordered_map<int, tracked_channel> tracked_channels;
+	bool quit_requested = false;
+
+	poller.add(
+		queue.wake_fd(),
+		lttng::poller::event_type::READABLE,
+		[&queue, &poller, &tracked_channels, &quit_requested](lttng::poller::event_type) {
+			process_commands(queue, poller, tracked_channels, quit_requested);
+		});
+
+	while (!quit_requested) {
+		health_code_update();
+
+		DBG("Hotplug handler thread polling");
+		health_poll_entry();
+		poller.poll(lttng::poller::timeout_type::WAIT_FOREVER);
+		health_poll_exit();
+
+		health_code_update();
+	}
+
+	health_unregister(the_health_sessiond);
+	DBG("Hotplug handler thread exiting");
+	return nullptr;
+}
+
+bool shutdown_hotplug_handler_thread(void *data)
+{
+	auto *ctx = static_cast<thread_context *>(data);
+
+	lhh::command quit_cmd(lhh::command_type::QUIT);
+	ctx->queue.send(std::move(quit_cmd));
+	return true;
+}
+
+void cleanup_hotplug_handler_thread(void *data)
+{
+	delete static_cast<thread_context *>(data);
+}
+
+} /* namespace */
+
+bool lhh::launch_hotplug_handler_thread(lttng::command_queue<lhh::command>& queue)
+{
+	auto *ctx = new (std::nothrow) thread_context{ queue };
+	if (!ctx) {
+		return false;
+	}
+
+	auto *thread = lttng_thread_create("Hotplug handler",
+					   thread_hotplug_handler,
+					   shutdown_hotplug_handler_thread,
+					   cleanup_hotplug_handler_thread,
+					   ctx);
+	if (!thread) {
+		delete ctx;
+		return false;
+	}
+
+	lttng_thread_put(thread);
+	return true;
+}
diff --git a/src/bin/lttng-sessiond/hotplug-handler.hpp b/src/bin/lttng-sessiond/hotplug-handler.hpp
new file mode 100644
index 0000000..df74858
--- /dev/null
+++ b/src/bin/lttng-sessiond/hotplug-handler.hpp
@@ -0,0 +1,62 @@
+/*
+ * SPDX-FileCopyrightText: 2026 Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ *
+ * SPDX-License-Identifier: GPL-2.0-only
+ *
+ */
+
+#ifndef LTTNG_SESSIOND_HOTPLUG_HANDLER_HPP
+#define LTTNG_SESSIOND_HOTPLUG_HANDLER_HPP
+
+#include <common/command-queue.hpp>
+
+#include <cstdint>
+
+namespace lttng {
+namespace sessiond {
+namespace modules {
+class stream_group;
+} /* namespace modules */
+
+namespace hotplug_handler {
+
+enum class command_type : std::uint8_t {
+	ADD_CHANNEL,
+	REMOVE_CHANNEL,
+	QUIT,
+};
+
+/* Session id type matching ltt_session::id_t. */
+using session_id_t = std::uint64_t;
+
+struct command : public lttng::command_base {
+	explicit command(command_type type_) noexcept : type(type_)
+	{
+	}
+
+	command(command_type type_,
+		modules::stream_group& channel_,
+		session_id_t session_id_) noexcept :
+		type(type_), channel(&channel_), session_id(session_id_)
+	{
+	}
+
+	~command() override = default;
+
+	command(command&&) noexcept = default;
+	command& operator=(command&&) noexcept = default;
+	command(const command&) = delete;
+	command& operator=(const command&) = delete;
+
+	command_type type;
+	modules::stream_group *channel = nullptr;
+	session_id_t session_id = 0;
+};
+
+bool launch_hotplug_handler_thread(lttng::command_queue<command>& queue);
+
+} /* namespace hotplug_handler */
+} /* namespace sessiond */
+} /* namespace lttng */
+
+#endif /* LTTNG_SESSIOND_HOTPLUG_HANDLER_HPP */
diff --git a/src/bin/lttng-sessiond/lttng-sessiond.hpp b/src/bin/lttng-sessiond/lttng-sessiond.hpp
index 8a63278..e7ebed0 100644
--- a/src/bin/lttng-sessiond/lttng-sessiond.hpp
+++ b/src/bin/lttng-sessiond/lttng-sessiond.hpp
@@ -21,6 +21,7 @@
 #include <common/sessiond-comm/sessiond-comm.hpp>
 #include <common/uuid.hpp>
 
+#include <memory>
 #include <urcu.h>
 #include <urcu/wfcqueue.h>
 
@@ -118,7 +119,19 @@
 	struct cds_list_head head;
 };
 
-extern int the_kernel_poll_pipe[2];
+/* Hotplug handler command queue for kernel hotplug monitoring. */
+namespace lttng {
+namespace sessiond {
+namespace hotplug_handler {
+struct command;
+} /* namespace hotplug_handler */
+} /* namespace sessiond */
+template <typename>
+class command_queue;
+} /* namespace lttng */
+
+extern std::unique_ptr<lttng::command_queue<lttng::sessiond::hotplug_handler::command>>
+	the_hotplug_handler_queue;
 
 /*
  * Populated when the daemon starts with the current page size of the system.
diff --git a/src/bin/lttng-sessiond/main.cpp b/src/bin/lttng-sessiond/main.cpp
index aac6c46..086b68d 100644
--- a/src/bin/lttng-sessiond/main.cpp
+++ b/src/bin/lttng-sessiond/main.cpp
@@ -21,12 +21,12 @@
 #include "event.hpp"
 #include "fd-limit.hpp"
 #include "health-sessiond.hpp"
+#include "hotplug-handler.hpp"
 #include "kernel-consumer.hpp"
 #include "kernel.hpp"
 #include "lttng-sessiond.hpp"
 #include "lttng-ust-ctl.hpp"
 #include "manage-apps.hpp"
-#include "manage-kernel.hpp"
 #include "modprobe.hpp"
 #include "notification-thread-commands.hpp"
 #include "notification-thread.hpp"
@@ -57,6 +57,7 @@
 #include <common/lockfile.hpp>
 #include <common/logging-utils.hpp>
 #include <common/make-unique-wrapper.hpp>
+#include <common/make-unique.hpp>
 #include <common/path.hpp>
 #include <common/relayd/relayd.hpp>
 #include <common/systemd-utils.hpp>
@@ -308,7 +309,6 @@
 	/* Close all other pipes. */
 	utils_close_pipe(apps_cmd_pipe);
 	utils_close_pipe(apps_cmd_notify_pipe);
-	utils_close_pipe(the_kernel_poll_pipe);
 
 	ret = remove(the_config.pid_file_path.value);
 	if (ret < 0) {
@@ -1836,14 +1836,6 @@
 		the_ppid = getppid();
 	}
 
-	/* Setup the kernel pipe for waking up the kernel thread */
-	if (is_root && !the_config.no_kernel) {
-		if (utils_create_pipe_cloexec(the_kernel_poll_pipe)) {
-			retval = -1;
-			goto stop_threads;
-		}
-	}
-
 	/* Setup the thread apps communication pipe. */
 	if (utils_create_pipe_cloexec(apps_cmd_pipe)) {
 		retval = -1;
@@ -1977,8 +1969,17 @@
 
 	/* Don't start this thread if kernel tracing is not requested nor root */
 	if (is_root && !the_config.no_kernel) {
-		/* Create kernel thread to manage kernel event */
-		if (!launch_kernel_management_thread(the_kernel_poll_pipe[0])) {
+		/*
+		 * Allocate the hotplug handler command queue here rather than in
+		 * globals to avoid it being present in the runas worker (which
+		 * is forked earlier in main()).
+		 */
+		the_hotplug_handler_queue = lttng::make_unique<
+			lttng::command_queue<lttng::sessiond::hotplug_handler::command>>();
+
+		/* Create hotplug handler thread for kernel CPU hotplug monitoring. */
+		if (!lttng::sessiond::hotplug_handler::launch_hotplug_handler_thread(
+			    *the_hotplug_handler_queue)) {
 			retval = -1;
 			goto stop_threads;
 		}
diff --git a/src/bin/lttng-sessiond/manage-kernel.cpp b/src/bin/lttng-sessiond/manage-kernel.cpp
deleted file mode 100644
index d20bc3d..0000000
--- a/src/bin/lttng-sessiond/manage-kernel.cpp
+++ /dev/null
@@ -1,353 +0,0 @@
-/*
- * SPDX-FileCopyrightText: 2011 EfficiOS Inc.
- * SPDX-FileCopyrightText: 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
- * SPDX-FileCopyrightText: 2013 Jérémie Galarneau <jeremie.galarneau@efficios.com>
- *
- * SPDX-License-Identifier: GPL-2.0-only
- *
- */
-
-#include "health-sessiond.hpp"
-#include "kernel-consumer.hpp"
-#include "kernel.hpp"
-#include "manage-kernel.hpp"
-#include "testpoint.hpp"
-#include "thread.hpp"
-#include "utils.hpp"
-
-#include <common/make-unique-wrapper.hpp>
-#include <common/pipe.hpp>
-#include <common/pthread-lock.hpp>
-#include <common/urcu.hpp>
-#include <common/utils.hpp>
-
-#include <fcntl.h>
-
-namespace {
-struct thread_notifiers {
-	struct lttng_pipe *quit_pipe;
-	int kernel_poll_pipe_read_fd;
-};
-} /* namespace */
-
-/*
- * Update the kernel poll set of all channel fd available over all tracing
- * session. Add the wakeup pipe at the end of the set.
- */
-static int update_kernel_poll(struct lttng_poll_event *events)
-{
-	int ret;
-
-	DBG("Updating kernel poll set");
-
-	const auto list_lock = lttng::sessiond::lock_session_list();
-	const struct ltt_session_list *session_list = session_get_list();
-
-	for (auto *session : lttng::urcu::list_iteration_adapter<ltt_session, &ltt_session::list>(
-		     session_list->head)) {
-		if (!session_get(session)) {
-			continue;
-		}
-
-		session_lock(session);
-		if (session->kernel_session == nullptr) {
-			session_unlock(session);
-			session_put(session);
-			continue;
-		}
-
-		for (auto *channel : lttng::urcu::list_iteration_adapter<ltt_kernel_channel,
-									 &ltt_kernel_channel::list>(
-			     session->kernel_session->channel_list.head)) {
-			/* Add channel fd to the kernel poll set */
-			ret = lttng_poll_add(events, channel->fd, LPOLLIN | LPOLLRDNORM);
-			if (ret < 0) {
-				session_unlock(session);
-				session_put(session);
-				return -1;
-			}
-
-			DBG("Channel fd %d added to kernel set", channel->fd);
-		}
-
-		session_unlock(session);
-		session_put(session);
-	}
-
-	return 0;
-}
-
-/*
- * Find the channel fd from 'fd' over all tracing session. When found, check
- * for new channel stream and send those stream fds to the kernel consumer.
- *
- * Useful for CPU hotplug feature.
- */
-static int update_kernel_stream(int fd)
-{
-	int ret = 0;
-
-	DBG("Updating kernel streams for channel fd %d", fd);
-
-	const auto list_lock = lttng::sessiond::lock_session_list();
-	const struct ltt_session_list *session_list = session_get_list();
-
-	for (auto *raw_session_ptr :
-	     lttng::urcu::list_iteration_adapter<ltt_session, &ltt_session::list>(
-		     session_list->head)) {
-		ltt_kernel_session *ksess;
-
-		const auto session = [raw_session_ptr]() {
-			session_get(raw_session_ptr);
-			raw_session_ptr->lock();
-			return ltt_session::make_locked_ref(*raw_session_ptr);
-		}();
-
-		if (session->kernel_session == nullptr) {
-			continue;
-		}
-
-		ksess = session->kernel_session;
-
-		for (auto *channel : lttng::urcu::list_iteration_adapter<ltt_kernel_channel,
-									 &ltt_kernel_channel::list>(
-			     ksess->channel_list.head)) {
-			if (channel->fd != fd) {
-				continue;
-			}
-			DBG("Channel found, updating kernel streams");
-			ret = kernel_open_channel_stream(channel);
-			if (ret < 0) {
-				return ret;
-			}
-			/* Update the stream global counter */
-			ksess->stream_count_global += ret;
-
-			/*
-			 * Have we already sent fds to the consumer? If yes, it
-			 * means that tracing is started so it is safe to send
-			 * our updated stream fds.
-			 */
-			if (ksess->consumer_fds_sent != 1 || ksess->consumer == nullptr) {
-				return -1;
-			}
-
-			for (auto *socket :
-			     lttng::urcu::lfht_iteration_adapter<consumer_socket,
-								 decltype(consumer_socket::node),
-								 &consumer_socket::node>(
-				     *ksess->consumer->socks->ht)) {
-				const lttng::pthread::lock_guard socket_lock(*socket->lock);
-
-				ret = kernel_consumer_send_channel_streams(
-					socket, channel, ksess, session->output_traces ? 1 : 0);
-				if (ret < 0) {
-					return ret;
-				}
-			}
-		}
-	}
-
-	return ret;
-}
-
-/*
- * This thread manage event coming from the kernel.
- *
- * Features supported in this thread:
- *    -) CPU Hotplug
- */
-static void *thread_kernel_management(void *data)
-{
-	int ret, i, update_poll_flag = 1, err = -1;
-	uint32_t nb_fd;
-	char tmp;
-	struct lttng_poll_event events;
-	struct thread_notifiers *notifiers = (thread_notifiers *) data;
-	const auto thread_quit_pipe_fd = lttng_pipe_get_readfd(notifiers->quit_pipe);
-
-	DBG("[thread] Thread manage kernel started");
-
-	health_register(the_health_sessiond, HEALTH_SESSIOND_TYPE_KERNEL);
-
-	/*
-	 * This first step of the while is to clean this structure which could free
-	 * non NULL pointers so initialize it before the loop.
-	 */
-	lttng_poll_init(&events);
-
-	if (testpoint(sessiond_thread_manage_kernel)) {
-		goto error_testpoint;
-	}
-
-	health_code_update();
-
-	if (testpoint(sessiond_thread_manage_kernel_before_loop)) {
-		goto error_testpoint;
-	}
-
-	while (true) {
-		health_code_update();
-
-		if (update_poll_flag == 1) {
-			/* Clean events object. We are about to populate it again. */
-			lttng_poll_clean(&events);
-
-			ret = lttng_poll_create(&events, 2, LTTNG_CLOEXEC);
-			if (ret < 0) {
-				goto error_poll_create;
-			}
-
-			ret = lttng_poll_add(&events, notifiers->kernel_poll_pipe_read_fd, LPOLLIN);
-			if (ret < 0) {
-				goto error;
-			}
-
-			ret = lttng_poll_add(&events, thread_quit_pipe_fd, LPOLLIN);
-			if (ret < 0) {
-				goto error;
-			}
-
-			/* This will add the available kernel channel if any. */
-			ret = update_kernel_poll(&events);
-			if (ret < 0) {
-				goto error;
-			}
-			update_poll_flag = 0;
-		}
-
-		DBG("Thread kernel polling");
-
-		/* Poll infinite value of time */
-	restart:
-		health_poll_entry();
-		ret = lttng_poll_wait(&events, -1);
-		DBG("Thread kernel return from poll on %d fds", LTTNG_POLL_GETNB(&events));
-		health_poll_exit();
-		if (ret < 0) {
-			/*
-			 * Restart interrupted system call.
-			 */
-			if (errno == EINTR) {
-				goto restart;
-			}
-			goto error;
-		} else if (ret == 0) {
-			/* Should not happen since timeout is infinite */
-			ERR("Return value of poll is 0 with an infinite timeout.\n"
-			    "This should not have happened! Continuing...");
-			continue;
-		}
-
-		nb_fd = ret;
-
-		for (i = 0; i < nb_fd; i++) {
-			/* Fetch once the poll data */
-			const auto revents = LTTNG_POLL_GETEV(&events, i);
-			const auto pollfd = LTTNG_POLL_GETFD(&events, i);
-
-			health_code_update();
-
-			/* Activity on thread quit pipe, exiting. */
-			if (pollfd == thread_quit_pipe_fd) {
-				DBG("Activity on thread quit pipe");
-				err = 0;
-				goto exit;
-			}
-
-			/* Check for data on kernel pipe */
-			if (revents & LPOLLIN) {
-				if (pollfd == notifiers->kernel_poll_pipe_read_fd) {
-					(void) lttng_read(
-						notifiers->kernel_poll_pipe_read_fd, &tmp, 1);
-					/*
-					 * Ret value is useless here, if this pipe gets any actions
-					 * an update is required anyway.
-					 */
-					update_poll_flag = 1;
-					continue;
-				} else {
-					/*
-					 * New CPU detected by the kernel. Adding kernel stream to
-					 * kernel session and updating the kernel consumer
-					 */
-					ret = update_kernel_stream(pollfd);
-					if (ret < 0) {
-						continue;
-					}
-					break;
-				}
-			} else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
-				update_poll_flag = 1;
-				continue;
-			} else {
-				ERR("Unexpected poll events %u for sock %d", revents, pollfd);
-				goto error;
-			}
-		}
-	}
-
-exit:
-error:
-	lttng_poll_clean(&events);
-error_poll_create:
-error_testpoint:
-	if (err) {
-		health_error();
-		ERR("Health error occurred in %s", __func__);
-		WARN("Kernel thread died unexpectedly. "
-		     "Kernel tracing can continue but CPU hotplug is disabled.");
-	}
-	health_unregister(the_health_sessiond);
-	DBG("Kernel thread dying");
-	return nullptr;
-}
-
-static bool shutdown_kernel_management_thread(void *data)
-{
-	struct thread_notifiers *notifiers = (thread_notifiers *) data;
-	const int write_fd = lttng_pipe_get_writefd(notifiers->quit_pipe);
-
-	return notify_thread_pipe(write_fd) == 1;
-}
-
-static void cleanup_kernel_management_thread(void *data)
-{
-	struct thread_notifiers *notifiers = (thread_notifiers *) data;
-
-	lttng_pipe_destroy(notifiers->quit_pipe);
-	free(notifiers);
-}
-
-bool launch_kernel_management_thread(int kernel_poll_pipe_read_fd)
-{
-	struct lttng_pipe *quit_pipe;
-	struct thread_notifiers *notifiers = nullptr;
-	struct lttng_thread *thread;
-
-	notifiers = zmalloc<thread_notifiers>();
-	if (!notifiers) {
-		goto error_alloc;
-	}
-	quit_pipe = lttng_pipe_open(FD_CLOEXEC);
-	if (!quit_pipe) {
-		goto error;
-	}
-	notifiers->quit_pipe = quit_pipe;
-	notifiers->kernel_poll_pipe_read_fd = kernel_poll_pipe_read_fd;
-
-	thread = lttng_thread_create("Kernel management",
-				     thread_kernel_management,
-				     shutdown_kernel_management_thread,
-				     cleanup_kernel_management_thread,
-				     notifiers);
-	if (!thread) {
-		goto error;
-	}
-	lttng_thread_put(thread);
-	return true;
-error:
-	cleanup_kernel_management_thread(notifiers);
-error_alloc:
-	return false;
-}
diff --git a/src/bin/lttng-sessiond/manage-kernel.hpp b/src/bin/lttng-sessiond/manage-kernel.hpp
deleted file mode 100644
index 466ad78..0000000
--- a/src/bin/lttng-sessiond/manage-kernel.hpp
+++ /dev/null
@@ -1,19 +0,0 @@
-/*
- * SPDX-FileCopyrightText: 2011 EfficiOS Inc.
- * SPDX-FileCopyrightText: 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
- * SPDX-FileCopyrightText: 2013 Jérémie Galarneau <jeremie.galarneau@efficios.com>
- *
- * SPDX-License-Identifier: GPL-2.0-only
- *
- */
-
-#ifndef SESSIOND_KERNEL_MANAGEMENT_THREAD_H
-#define SESSIOND_KERNEL_MANAGEMENT_THREAD_H
-
-#include "lttng-sessiond.hpp"
-
-#include <stdbool.h>
-
-bool launch_kernel_management_thread(int kernel_poll_pipe_read_fd);
-
-#endif /* SESSIOND_KERNEL_MANAGEMENT_THREAD_H */
diff --git a/src/bin/lttng-sessiond/modules-domain-orchestrator.cpp b/src/bin/lttng-sessiond/modules-domain-orchestrator.cpp
index 052bd57..1f38c8c 100644
--- a/src/bin/lttng-sessiond/modules-domain-orchestrator.cpp
+++ b/src/bin/lttng-sessiond/modules-domain-orchestrator.cpp
@@ -259,6 +259,23 @@
 ls::modules::domain_orchestrator::~domain_orchestrator()
 {
 	/*
+	 * Unregister all channels from the hotplug handler thread so their
+	 * fds are removed from the poller before being closed.
+	 */
+	for (auto& channel_entry : _channels) {
+		auto& channel = *channel_entry.second;
+
+		if (!channel.is_sent_to_consumer()) {
+			/* Channel was never registered for hotplug monitoring. */
+			continue;
+		}
+
+		hotplug_handler::command cmd(
+			hotplug_handler::command_type::REMOVE_CHANNEL, channel, _session_id);
+		_hotplug_queue.send_and_wait(std::move(cmd));
+	}
+
+	/*
 	 * Unregister all channels that were published to the notification
 	 * thread. This mirrors the add performed by _send_channel_to_consumer()
 	 * and ensures the notification thread doesn't hold stale references to
@@ -850,7 +867,7 @@
 
 unsigned int ls::modules::domain_orchestrator::_open_channel_streams(stream_group& channel)
 {
-	unsigned int stream_count = 0;
+	unsigned int streams_opened = 0;
 
 	while (true) {
 		const auto raw_stream_fd = kernctl_create_stream(channel.tracer_handle().fd());
@@ -873,7 +890,7 @@
 				"Failed to set FD_CLOEXEC on kernel stream file descriptor", errno);
 		}
 
-		const auto cpu = stream_count;
+		const auto cpu = channel.stream_count();
 		DBG_FMT("Kernel stream created: channel=`{}`, fd={}, cpu={}",
 			channel.configuration().name,
 			stream_fd.fd(),
@@ -881,7 +898,7 @@
 
 		/*
 		 * Also add the stream to the legacy struct for downstream code that
-		 * still iterates ltt_kernel_channel::stream_list (manage-kernel
+		 * still iterates ltt_kernel_channel::stream_list (hotplug handler
 		 * thread, consumer send code).
 		 */
 		auto& legacy_channel =
@@ -905,10 +922,10 @@
 		legacy_channel.stream_count++;
 
 		channel.add_stream(cpu, std::move(stream_fd));
-		stream_count++;
+		streams_opened++;
 	}
 
-	return stream_count;
+	return streams_opened;
 }
 
 void ls::modules::domain_orchestrator::_flush_channel_streams(const stream_group& channel) const
@@ -1216,6 +1233,24 @@
 		_send_channels_to_consumer(kconsumer_socket);
 	}
 
+	_legacy_kernel_session->active = true;
+	_active = true;
+
+	/* Register the channels for hotplug monitoring. */
+	for (auto& channel_entry : _channels) {
+		auto& channel = *channel_entry.second;
+
+		if (channel.is_monitored_for_hotplug()) {
+			continue;
+		}
+
+		hotplug_handler::command cmd(
+			hotplug_handler::command_type::ADD_CHANNEL, channel, _session_id);
+		_hotplug_queue.send_and_wait(std::move(cmd));
+
+		channel.mark_monitored_for_hotplug();
+	}
+
 	/* Start kernel tracing. */
 	const auto start_ret = kernel_start_session(_legacy_kernel_session);
 	if (start_ret < 0) {
@@ -1223,9 +1258,6 @@
 	}
 
 	kernel_wait_quiescent();
-
-	_legacy_kernel_session->active = true;
-	_active = true;
 }
 
 void ls::modules::domain_orchestrator::stop()
@@ -1530,3 +1562,60 @@
 
 	return it->second->stream_count();
 }
+
+void ls::modules::domain_orchestrator::handle_channel_hotplug(stream_group& channel)
+{
+	LTTNG_ASSERT(_legacy_kernel_session);
+
+	const auto new_stream_count = _open_channel_streams(channel);
+	if (new_stream_count == 0) {
+		WARN_FMT(
+			"Kernel channel hotplug event, but no new streams opened for channel: channel_name=`{}`",
+			channel.configuration().name);
+		return;
+	}
+
+	DBG_FMT("Kernel channel hotplug opened new streams: channel_name=`{}`, additional_streams_count={}, total_stream_count={}",
+		channel.configuration().name,
+		new_stream_count,
+		channel.stream_count());
+
+	_legacy_kernel_session->stream_count_global += new_stream_count;
+
+	LTTNG_ASSERT(channel.is_sent_to_consumer());
+
+	/* Send only the newly-opened (unsent) streams to the consumer daemon. */
+	auto& kconsumer_socket = _get_consumer_socket();
+	const lttng::pthread::lock_guard socket_lock(*kconsumer_socket.lock);
+	const auto& channel_config = channel.configuration();
+
+	for (const auto& stream : channel.streams()) {
+		auto& kstream = static_cast<stream_group::kernel_stream&>(*stream);
+
+		if (kstream.sent_to_consumer) {
+			continue;
+		}
+
+		DBG_FMT("Sending hotplug kernel stream to consumer: channel=`{}`, fd={}, cpu={}",
+			channel_config.name,
+			kstream.handle.fd(),
+			kstream.cpu);
+
+		lttcomm_consumer_msg lkm = {};
+		consumer_init_add_stream_comm_msg(
+			&lkm, channel.consumer_key(), kstream.handle.fd(), kstream.cpu);
+
+		auto stream_fd = kstream.handle.fd();
+		const auto monitor = _legacy_kernel_session->output_traces ? 1 : 0;
+		const auto send_ret = consumer_send_stream(
+			&kconsumer_socket, &_consumer, &lkm, &stream_fd, monitor);
+		if (send_ret < 0) {
+			LTTNG_THROW_KERNEL_CONSUMER_SEND_FAILURE(lttng::format(
+				"Failed to send hotplug kernel stream to consumer: channel=`{}`, cpu={}",
+				channel_config.name,
+				kstream.cpu));
+		}
+
+		kstream.sent_to_consumer = true;
+	}
+}
diff --git a/src/bin/lttng-sessiond/modules-domain-orchestrator.hpp b/src/bin/lttng-sessiond/modules-domain-orchestrator.hpp
index 8bb2760..b33d2e2 100644
--- a/src/bin/lttng-sessiond/modules-domain-orchestrator.hpp
+++ b/src/bin/lttng-sessiond/modules-domain-orchestrator.hpp
@@ -12,6 +12,7 @@
 #include "domain-orchestrator.hpp"
 #include "domain.hpp"
 #include "event-rule-configuration.hpp"
+#include "hotplug-handler.hpp"
 #include "recording-channel-configuration.hpp"
 #include "stream-group.hpp"
 
@@ -27,6 +28,7 @@
 struct consumer_output;
 struct consumer_socket;
 struct ltt_kernel_session;
+struct ltt_session;
 
 namespace lttng {
 namespace sessiond {
@@ -335,6 +337,16 @@
 		_published_to_notification_thread = true;
 	}
 
+	bool is_monitored_for_hotplug() const noexcept
+	{
+		return _monitored_for_hotplug;
+	}
+
+	void mark_monitored_for_hotplug() noexcept
+	{
+		_monitored_for_hotplug = true;
+	}
+
 	void add_event_rule(const config::event_rule_configuration& event_rule_config,
 			    lttng::file_descriptor tracer_event_fd)
 	{
@@ -374,6 +386,7 @@
 	const config::recording_channel_configuration& _configuration;
 	bool _sent_to_consumer = false;
 	bool _published_to_notification_thread = false;
+	bool _monitored_for_hotplug = false;
 	std::vector<event_rule> _event_rules;
 };
 
@@ -454,13 +467,19 @@
  */
 class domain_orchestrator final : public sessiond::domain_orchestrator {
 public:
+	using hotplug_command = lttng::sessiond::hotplug_handler::command;
+
 	explicit domain_orchestrator(lttng::file_descriptor tracer_session_fd,
 				     config::domain& domain_configuration,
 				     struct consumer_output& consumer,
+				     hotplug_handler::session_id_t session_id,
+				     lttng::command_queue<hotplug_command>& hotplug_queue,
 				     struct ltt_kernel_session *legacy_kernel_session) :
 		_tracer_session_fd(std::move(tracer_session_fd)),
 		_domain_configuration(domain_configuration),
 		_consumer(consumer),
+		_session_id(session_id),
+		_hotplug_queue(hotplug_queue),
 		_legacy_kernel_session(legacy_kernel_session)
 	{
 	}
@@ -522,6 +541,15 @@
 	unsigned int get_stream_count_for_channel(
 		const config::recording_channel_configuration& channel_config) const;
 
+	/*
+	 * Handle a CPU hotplug event on the given channel.
+	 *
+	 * Opens newly-available streams via kernctl_create_stream() and,
+	 * if tracing is active and consumer fds have already been sent,
+	 * sends the new streams to the consumer daemon.
+	 */
+	void handle_channel_hotplug(stream_group& channel);
+
 private:
 	/*
 	 * Look up a runtime channel by its configuration object.
@@ -582,6 +610,8 @@
 	lttng::file_descriptor _tracer_session_fd;
 	config::domain& _domain_configuration;
 	struct consumer_output& _consumer;
+	hotplug_handler::session_id_t _session_id;
+	lttng::command_queue<hotplug_command>& _hotplug_queue;
 	std::unordered_map<const config::recording_channel_configuration *,
 			   std::unique_ptr<stream_group>>
 		_channels;
diff --git a/src/bin/lttng-sessiond/session.cpp b/src/bin/lttng-sessiond/session.cpp
index f8c6557..267d9b6 100644
--- a/src/bin/lttng-sessiond/session.cpp
+++ b/src/bin/lttng-sessiond/session.cpp
@@ -1008,15 +1008,6 @@
 		trace_ust_destroy_session(usess);
 	}
 
-	/*
-	 * Must notify the kernel thread here to update it's poll set in order to
-	 * remove the channel(s)' fd just destroyed.
-	 */
-	ret = notify_thread_pipe(the_kernel_poll_pipe[1]);
-	if (ret < 0) {
-		PERROR("write kernel poll pipe");
-	}
-
 	DBG("Destroying session %s (id %" PRIu64 ")", session->name, session->id);
 
 	snapshot_destroy(&session->snapshot);
diff --git a/src/common/Makefile.am b/src/common/Makefile.am
index 4a58a35..b5e85e2 100644
--- a/src/common/Makefile.am
+++ b/src/common/Makefile.am
@@ -164,6 +164,7 @@
 # meant to be used by GPL executables.
 noinst_LTLIBRARIES += libcommon-gpl.la
 libcommon_gpl_la_SOURCES = \
+	command-queue.hpp \
 	common.hpp \
 	context.cpp context.hpp \
 	daemonize.cpp daemonize.hpp \
diff --git a/src/common/command-queue.hpp b/src/common/command-queue.hpp
new file mode 100644
index 0000000..8fc76bb
--- /dev/null
+++ b/src/common/command-queue.hpp
@@ -0,0 +1,148 @@
+/*
+ * SPDX-FileCopyrightText: 2026 Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ *
+ * SPDX-License-Identifier: GPL-2.0-only
+ *
+ */
+
+#ifndef LTTNG_COMMAND_QUEUE_HPP
+#define LTTNG_COMMAND_QUEUE_HPP
+
+#include <common/eventfd.hpp>
+#include <common/waiter.hpp>
+
+#include <vendor/optional.hpp>
+
+#include <deque>
+#include <mutex>
+
+namespace lttng {
+
+/*
+ * Base class that users should derive from when defining their command type.
+ *
+ * Provides optional synchronous completion support: command_queue's
+ * send_and_wait() method uses the private waker to block the sender
+ * until the consumer thread signals completion via _complete().
+ *
+ * Users who need a "result" from command processing should define
+ * their own set_result()/get_result() methods on their derived type.
+ */
+class command_base {
+public:
+	command_base() = default;
+	virtual ~command_base() = default;
+
+	command_base(command_base&& other) noexcept = default;
+	command_base& operator=(command_base&& other) noexcept = default;
+	command_base(const command_base&) = delete;
+	command_base& operator=(const command_base&) = delete;
+
+	/*
+	 * Signal completion of this command. If a sender is blocked in
+	 * send_and_wait(), this wakes it up. Safe to call even if no
+	 * waker was set (fire-and-forget commands).
+	 *
+	 * Must be called by the consumer thread after processing the command.
+	 */
+	void _complete() noexcept
+	{
+		if (_completed_waker) {
+			_completed_waker->wake();
+		}
+	}
+
+private:
+	template <typename>
+	friend class command_queue;
+
+	void _set_waker(lttng::synchro::waker waker) noexcept
+	{
+		_completed_waker = waker;
+	}
+
+	nonstd::optional<lttng::synchro::waker> _completed_waker;
+};
+
+/*
+ * Thread-safe command queue with eventfd-based wakeup.
+ *
+ * CommandType should derive from command_base and define move semantics
+ * if copies are costly. The queue accepts commands by value (callers
+ * can std::move into it).
+ *
+ * send()          — fire-and-forget: enqueue + wake eventfd.
+ * send_and_wait() — enqueue + wake + block until the consumer thread
+ *                   calls _complete() on the command.
+ *
+ * The waiter/waker plumbing is entirely internal; callers never touch it.
+ */
+template <typename CommandType>
+class command_queue {
+public:
+	/*
+	 * Non-semaphore semantics: decrement() returns the accumulated
+	 * counter and resets it to zero, which is what drain() needs.
+	 */
+	command_queue() : _wake_fd(false)
+	{
+	}
+	~command_queue() = default;
+
+	command_queue(const command_queue&) = delete;
+	command_queue(command_queue&&) = delete;
+	command_queue& operator=(const command_queue&) = delete;
+	command_queue& operator=(command_queue&&) = delete;
+
+	void send(CommandType cmd)
+	{
+		const std::lock_guard<std::mutex> guard(_lock);
+
+		_queue.emplace_back(std::move(cmd));
+		_wake_fd.increment();
+	}
+
+	void send_and_wait(CommandType cmd)
+	{
+		lttng::synchro::waiter completion_waiter;
+
+		cmd._set_waker(completion_waiter.get_waker());
+
+		{
+			const std::lock_guard<std::mutex> guard(_lock);
+
+			_queue.emplace_back(std::move(cmd));
+			_wake_fd.increment();
+		}
+
+		completion_waiter.wait();
+	}
+
+	nonstd::optional<CommandType> pop()
+	{
+		const std::lock_guard<std::mutex> guard(_lock);
+
+		if (_queue.empty()) {
+			_wake_fd.decrement();
+			return nonstd::nullopt;
+		}
+
+		auto cmd = nonstd::optional<CommandType>(std::move(_queue.front()));
+		_queue.pop_front();
+		return cmd;
+	}
+
+	const lttng::eventfd& wake_fd() const noexcept
+	{
+		return _wake_fd;
+	}
+
+private:
+	lttng::eventfd _wake_fd;
+	std::mutex _lock;
+	std::deque<CommandType> _queue;
+};
+
+} /* namespace lttng */
+
+#endif /* LTTNG_COMMAND_QUEUE_HPP */
diff --git a/src/common/file-descriptor.cpp b/src/common/file-descriptor.cpp
index 92d27da..a2d0ea4 100644
--- a/src/common/file-descriptor.cpp
+++ b/src/common/file-descriptor.cpp
@@ -71,8 +71,8 @@
 
 	const auto ret = ::close(_raw_fd);
 
-	_raw_fd = -1;
 	if (ret) {
 		PERROR("Failed to close file descriptor: fd=%i", _raw_fd);
 	}
+	_raw_fd = -1;
 }
diff --git a/tests/regression/tools/lttng-ctl/liblttng-ust-ctl-fuzz.cpp b/tests/regression/tools/lttng-ctl/liblttng-ust-ctl-fuzz.cpp
index 1e91670..cb05a48 100644
--- a/tests/regression/tools/lttng-ctl/liblttng-ust-ctl-fuzz.cpp
+++ b/tests/regression/tools/lttng-ctl/liblttng-ust-ctl-fuzz.cpp
@@ -7,8 +7,8 @@
 #define _GNU_SOURCE
 #endif
 
-#include <lttng/ust-ctl.h>
 #include <lttng/lttng-export.h>
+#include <lttng/ust-ctl.h>
 
 #include <assert.h>
 #include <dlfcn.h>
