Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make sure journald never blocks on sd_notify() to PID 1 #1745

Merged
merged 11 commits into from Nov 2, 2015
11 changes: 11 additions & 0 deletions src/basic/env-util.c
Expand Up @@ -25,6 +25,7 @@
#include "alloc-util.h"
#include "def.h"
#include "env-util.h"
#include "parse-util.h"
#include "string-util.h"
#include "strv.h"
#include "utf8.h"
Expand Down Expand Up @@ -594,3 +595,13 @@ char **replace_env_argv(char **argv, char **env) {
ret[k] = NULL;
return ret;
}

int getenv_bool(const char *p) {
const char *e;

e = getenv(p);
if (!e)
return -ENXIO;

return parse_boolean(e);
}
2 changes: 2 additions & 0 deletions src/basic/env-util.h
Expand Up @@ -47,3 +47,5 @@ char **strv_env_unset_many(char **l, ...) _sentinel_;

char *strv_env_get_n(char **l, const char *name, size_t k) _pure_;
char *strv_env_get(char **x, const char *n) _pure_;

int getenv_bool(const char *p);
2 changes: 1 addition & 1 deletion src/basic/fdset.c
Expand Up @@ -44,7 +44,7 @@ FDSet *fdset_new(void) {
return MAKE_FDSET(set_new(NULL));
}

int fdset_new_array(FDSet **ret, int *fds, unsigned n_fds) {
int fdset_new_array(FDSet **ret, const int *fds, unsigned n_fds) {
unsigned i;
FDSet *s;
int r;
Expand Down
2 changes: 1 addition & 1 deletion src/basic/fdset.h
Expand Up @@ -35,7 +35,7 @@ int fdset_consume(FDSet *s, int fd);
bool fdset_contains(FDSet *s, int fd);
int fdset_remove(FDSet *s, int fd);

int fdset_new_array(FDSet **ret, int *fds, unsigned n_fds);
int fdset_new_array(FDSet **ret, const int *fds, unsigned n_fds);
int fdset_new_fill(FDSet **ret);
int fdset_new_listen_fds(FDSet **ret, bool unset);

Expand Down
20 changes: 11 additions & 9 deletions src/core/manager.c
Expand Up @@ -86,6 +86,8 @@
#include "virt.h"
#include "watchdog.h"

#define NOTIFY_RCVBUF_SIZE (8*1024*1024)

/* Initial delay and the interval for printing status messages about running jobs */
#define JOBS_IN_PROGRESS_WAIT_USEC (5*USEC_PER_SEC)
#define JOBS_IN_PROGRESS_PERIOD_USEC (USEC_PER_SEC / 3)
Expand Down Expand Up @@ -689,6 +691,8 @@ static int manager_setup_notify(Manager *m) {
if (fd < 0)
return log_error_errno(errno, "Failed to allocate notification socket: %m");

fd_inc_rcvbuf(fd, NOTIFY_RCVBUF_SIZE);

if (m->running_as == MANAGER_SYSTEM)
m->notify_socket = strdup("/run/systemd/notify");
else {
Expand Down Expand Up @@ -1488,7 +1492,7 @@ static unsigned manager_dispatch_dbus_queue(Manager *m) {
return n;
}

static void manager_invoke_notify_message(Manager *m, Unit *u, pid_t pid, char *buf, size_t n, FDSet *fds) {
static void manager_invoke_notify_message(Manager *m, Unit *u, pid_t pid, const char *buf, size_t n, FDSet *fds) {
_cleanup_strv_free_ char **tags = NULL;

assert(m);
Expand Down Expand Up @@ -1618,7 +1622,7 @@ static int manager_dispatch_notify_fd(sd_event_source *source, int fd, uint32_t
return 0;
}

static void invoke_sigchld_event(Manager *m, Unit *u, siginfo_t *si) {
static void invoke_sigchld_event(Manager *m, Unit *u, const siginfo_t *si) {
assert(m);
assert(u);
assert(si);
Expand Down Expand Up @@ -2000,8 +2004,7 @@ int manager_loop(Manager *m) {
m->exit_code = MANAGER_OK;

/* Release the path cache */
set_free_free(m->unit_path_cache);
m->unit_path_cache = NULL;
m->unit_path_cache = set_free_free(m->unit_path_cache);

manager_check_finished(m);

Expand Down Expand Up @@ -2111,6 +2114,9 @@ void manager_send_unit_audit(Manager *m, Unit *u, int type, bool success) {
const char *msg;
int audit_fd, r;

if (m->running_as != MANAGER_SYSTEM)
return;

audit_fd = get_audit_fd();
if (audit_fd < 0)
return;
Expand All @@ -2120,9 +2126,6 @@ void manager_send_unit_audit(Manager *m, Unit *u, int type, bool success) {
if (m->n_reloading > 0)
return;

if (m->running_as != MANAGER_SYSTEM)
return;

if (u->type != UNIT_SERVICE)
return;

Expand Down Expand Up @@ -2771,8 +2774,7 @@ static int create_generator_dir(Manager *m, char **generator, const char *name)
return log_oom();

if (!mkdtemp(p)) {
log_error_errno(errno, "Failed to create generator directory %s: %m",
p);
log_error_errno(errno, "Failed to create generator directory %s: %m", p);
free(p);
return -errno;
}
Expand Down
3 changes: 1 addition & 2 deletions src/journal-remote/journal-remote.c
Expand Up @@ -1256,7 +1256,6 @@ static int parse_argv(int argc, char *argv[]) {
};

int c, r;
const char *p;
bool type_a, type_b;

assert(argc >= 0);
Expand Down Expand Up @@ -1417,7 +1416,7 @@ static int parse_argv(int argc, char *argv[]) {

case ARG_GNUTLS_LOG: {
#ifdef HAVE_GNUTLS
p = optarg;
const char* p = optarg;
for (;;) {
_cleanup_free_ char *word = NULL;

Expand Down
130 changes: 127 additions & 3 deletions src/journal/journald-server.c
Expand Up @@ -78,6 +78,8 @@

#define RECHECK_SPACE_USEC (30*USEC_PER_SEC)

#define NOTIFY_SNDBUF_SIZE (8*1024*1024)

static int determine_space_for(
Server *s,
JournalMetrics *metrics,
Expand Down Expand Up @@ -1457,6 +1459,126 @@ static int server_open_hostname(Server *s) {
return 0;
}

static int dispatch_notify_event(sd_event_source *es, int fd, uint32_t revents, void *userdata) {
Server *s = userdata;
int r;

assert(s);
assert(s->notify_event_source == es);
assert(s->notify_fd == fd);

if (revents != EPOLLOUT) {
log_error("Invalid events on notify file descriptor.");
return -EINVAL;
}

/* The $NOTIFY_SOCKET is writable again, now send exactly one
* message on it. Either it's the initial READY=1 event or an
* stdout stream event. If there's nothing to write anymore,
* turn our event source off. The next time there's something
* to send it will be turned on again. */

if (!s->sent_notify_ready) {
static const char p[] =
"READY=1\n"
"STATUS=Processing requests...";
ssize_t l;

l = send(s->notify_fd, p, strlen(p), MSG_DONTWAIT);
if (l < 0) {
if (errno == EAGAIN)
return 0;

return log_error_errno(errno, "Failed to send READY=1 notification message: %m");
}

s->sent_notify_ready = true;
log_debug("Sent READY=1 notification.");

} else if (s->stdout_streams_notify_queue)
/* Dispatch one stream notification event */
stdout_stream_send_notify(s->stdout_streams_notify_queue);

/* Leave us enabled if there's still more to to do. */
if (s->stdout_streams_notify_queue)
return 0;

/* There was nothing to do anymore, let's turn ourselves off. */
r = sd_event_source_set_enabled(es, SD_EVENT_OFF);
if (r < 0)
return log_error_errno(r, "Failed to turn off notify event source: %m");

return 0;
}

static int server_connect_notify(Server *s) {
union sockaddr_union sa = {
.un.sun_family = AF_UNIX,
};
const char *e;
int r;

assert(s);
assert(s->notify_fd < 0);
assert(!s->notify_event_source);

/*
So here's the problem: we'd like to send notification
messages to PID 1, but we cannot do that via sd_notify(),
since that's synchronous, and we might end up blocking on
it. Specifically: given that PID 1 might block on
dbus-daemon during IPC, and dbus-daemon is logging to us,
and might hence block on us, we might end up in a deadlock
if we block on sending PID 1 notification messages -- by
generating a full blocking circle. To avoid this, let's
create a non-blocking socket, and connect it to the
notification socket, and then wait for POLLOUT before we
send anything. This should efficiently avoid any deadlocks,
as we'll never block on PID 1, hence PID 1 can safely block
on dbus-daemon which can safely block on us again.

Don't think that this issue is real? It is, see:
https://github.com/systemd/systemd/issues/1505
*/

e = getenv("NOTIFY_SOCKET");
if (!e)
return 0;

if ((e[0] != '@' && e[0] != '/') || e[1] == 0) {
log_error("NOTIFY_SOCKET set to an invalid value: %s", e);
return -EINVAL;
}

if (strlen(e) > sizeof(sa.un.sun_path)) {
log_error("NOTIFY_SOCKET path too long: %s", e);
return -EINVAL;
}

s->notify_fd = socket(AF_UNIX, SOCK_DGRAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0);
if (s->notify_fd < 0)
return log_error_errno(errno, "Failed to create notify socket: %m");

(void) fd_inc_sndbuf(s->notify_fd, NOTIFY_SNDBUF_SIZE);

strncpy(sa.un.sun_path, e, sizeof(sa.un.sun_path));
if (sa.un.sun_path[0] == '@')
sa.un.sun_path[0] = 0;

r = connect(s->notify_fd, &sa.sa, offsetof(struct sockaddr_un, sun_path) + strlen(e));
if (r < 0)
return log_error_errno(errno, "Failed to connect to notify socket: %m");

r = sd_event_add_io(s->event, &s->notify_event_source, s->notify_fd, EPOLLOUT, dispatch_notify_event, s);
if (r < 0)
return log_error_errno(r, "Failed to watch notification socket: %m");

/* This should fire pretty soon, which we'll use to send the
* READY=1 event. */

return 0;
}

int server_init(Server *s) {
_cleanup_fdset_free_ FDSet *fds = NULL;
int n, r, fd;
Expand All @@ -1465,7 +1587,7 @@ int server_init(Server *s) {
assert(s);

zero(*s);
s->syslog_fd = s->native_fd = s->stdout_fd = s->dev_kmsg_fd = s->audit_fd = s->hostname_fd = -1;
s->syslog_fd = s->native_fd = s->stdout_fd = s->dev_kmsg_fd = s->audit_fd = s->hostname_fd = s->notify_fd = -1;
s->compress = true;
s->seal = true;

Expand Down Expand Up @@ -1511,8 +1633,6 @@ int server_init(Server *s) {
if (r < 0)
return log_error_errno(r, "Failed to create event loop: %m");

sd_event_set_watchdog(s->event, true);

n = sd_listen_fds(true);
if (n < 0)
return log_error_errno(n, "Failed to read listening file descriptors from environment: %m");
Expand Down Expand Up @@ -1637,6 +1757,8 @@ int server_init(Server *s) {
server_cache_boot_id(s);
server_cache_machine_id(s);

(void) server_connect_notify(s);

return system_journal_open(s, false);
}

Expand Down Expand Up @@ -1685,6 +1807,7 @@ void server_done(Server *s) {
sd_event_source_unref(s->sigterm_event_source);
sd_event_source_unref(s->sigint_event_source);
sd_event_source_unref(s->hostname_event_source);
sd_event_source_unref(s->notify_event_source);
sd_event_unref(s->event);

safe_close(s->syslog_fd);
Expand All @@ -1693,6 +1816,7 @@ void server_done(Server *s) {
safe_close(s->dev_kmsg_fd);
safe_close(s->audit_fd);
safe_close(s->hostname_fd);
safe_close(s->notify_fd);

if (s->rate_limit)
journal_rate_limit_free(s->rate_limit);
Expand Down
13 changes: 9 additions & 4 deletions src/journal/journald-server.h
Expand Up @@ -26,9 +26,12 @@

#include "sd-event.h"

typedef struct Server Server;

#include "hashmap.h"
#include "journal-file.h"
#include "journald-rate-limit.h"
#include "journald-stream.h"
#include "list.h"

typedef enum Storage {
Expand All @@ -48,15 +51,14 @@ typedef enum SplitMode {
_SPLIT_INVALID = -1
} SplitMode;

typedef struct StdoutStream StdoutStream;

typedef struct Server {
struct Server {
int syslog_fd;
int native_fd;
int stdout_fd;
int dev_kmsg_fd;
int audit_fd;
int hostname_fd;
int notify_fd;

sd_event *event;

Expand All @@ -71,6 +73,7 @@ typedef struct Server {
sd_event_source *sigterm_event_source;
sd_event_source *sigint_event_source;
sd_event_source *hostname_event_source;
sd_event_source *notify_event_source;

JournalFile *runtime_journal;
JournalFile *system_journal;
Expand Down Expand Up @@ -111,6 +114,7 @@ typedef struct Server {
usec_t oldest_file_usec;

LIST_HEAD(StdoutStream, stdout_streams);
LIST_HEAD(StdoutStream, stdout_streams_notify_queue);
unsigned n_stdout_streams;

char *tty_path;
Expand All @@ -132,6 +136,7 @@ typedef struct Server {

struct udev *udev;

bool sent_notify_ready;
bool sync_scheduled;

char machine_id_field[sizeof("_MACHINE_ID=") + 32];
Expand All @@ -140,7 +145,7 @@ typedef struct Server {

/* Cached cgroup root, so that we don't have to query that all the time */
char *cgroup_root;
} Server;
};

#define SERVER_MACHINE_ID(s) ((s)->machine_id_field + strlen("_MACHINE_ID="))

Expand Down