}
#endif
-return d_log_interface(s, sp, pp);
+s = d_log_interface(s, sp, pp);
+
+if (testflag(addr, af_tcp_fastopen))
+ s = string_catn(s, sp, pp, US" TFO", 4);
+
+return s;
}
addr->host_used
|| Ustrcmp(addr->transport->driver_name, "smtp") == 0
|| Ustrcmp(addr->transport->driver_name, "lmtp") == 0
- ? addr->message : NULL);
+ ? addr->message : NULL);
deliver_host_port = save_port;
deliver_host_address = save_address;
}
-static uschar *
+uschar *
string_timesince(struct timeval * then)
{
struct timeval diff;
}
#ifndef DISABLE_PRDR
- if (addr->flags & af_prdr_used)
+ if (testflag(addr, af_prdr_used))
s = string_catn(s, &size, &ptr, US" PRDR", 5);
#endif
- if (addr->flags & af_chunking_used)
+ if (testflag(addr, af_chunking_used))
s = string_catn(s, &size, &ptr, US" K", 2);
}
if (LOGGING(deliver_time))
{
- struct timeval diff = {addr->more_errno, addr->delivery_usec};
+ struct timeval diff = {.tv_sec = addr->more_errno, .tv_usec = addr->delivery_usec};
s = string_append(s, &size, &ptr, 2, US" DT=", string_timediff(&diff));
}
later (with a log entry). */
if (!*sender_address && message_age >= ignore_bounce_errors_after)
- setflag(addr, af_ignore_error);
+ addr->prop.ignore_error = TRUE;
/* Freeze the message if requested, or if this is a bounce message (or other
message with null sender) and this address does not have its own errors
to ignore occurs later, instead of sending a message. Logging of freezing
occurs later, just before writing the -H file. */
- if ( !testflag(addr, af_ignore_error)
+ if ( !addr->prop.ignore_error
&& ( addr->special_action == SPECIAL_FREEZE
|| (sender_address[0] == 0 && !addr->prop.errors_address)
) )
addr->return_filename =
spool_fname(US"msglog", message_subdir, message_id,
string_sprintf("-%d-%d", getpid(), return_count++));
-
+
if ((addr->return_file = open_msglog_file(addr->return_filename, 0400, &error)) < 0)
{
common_error(TRUE, addr, errno, US"Unable to %s file for %s transport "
BOOL ok =
tp == next->transport
&& !previously_transported(next, TRUE)
- && (addr->flags & (af_pfr|af_file)) == (next->flags & (af_pfr|af_file))
+ && testflag(addr, af_pfr) == testflag(next, af_pfr)
+ && testflag(addr, af_file) == testflag(next, af_file)
&& (!uses_lp || Ustrcmp(next->local_part, addr->local_part) == 0)
&& (!uses_dom || Ustrcmp(next->domain, addr->domain) == 0)
&& same_strings(next->prop.errors_address, addr->prop.errors_address)
often bigger) so even if we are reading while the subprocess is still going, we
should never have only a partial item in the buffer.
+hs12: This assumption is not true anymore, since we got quit large items (certificate
+information and such)
+
Argument:
poffset the offset of the parlist item
eop TRUE if the process has completed
address_item *addr = p->addr;
pid_t pid = p->pid;
int fd = p->fd;
-uschar *endptr = big_buffer;
-uschar *ptr = endptr;
+
uschar *msg = p->msg;
BOOL done = p->done;
-BOOL finished = FALSE;
-/* minimum size to read is header size including id, subid and length */
-int required = PIPE_HEADER_SIZE;
/* Loop through all items, reading from the pipe when necessary. The pipe
-is set up to be non-blocking, but there are two different Unix mechanisms in
-use. Exim uses O_NONBLOCK if it is defined. This returns 0 for end of file,
-and EAGAIN for no more data. If O_NONBLOCK is not defined, Exim uses O_NDELAY,
-which returns 0 for both end of file and no more data. We distinguish the
-two cases by taking 0 as end of file only when we know the process has
-completed.
-
-Each separate item is written to the pipe in a single write(), and as they are
-all short items, the writes will all be atomic and we should never find
-ourselves in the position of having read an incomplete item. "Short" in this
-case can mean up to about 1K in the case when there is a long error message
-associated with an address. */
+used to be non-blocking. But I do not see a reason for using non-blocking I/O
+here, as the preceding select() tells us, if data is available for reading.
-DEBUG(D_deliver) debug_printf("reading pipe for subprocess %d (%s)\n",
- (int)p->pid, eop? "ended" : "not ended");
+A read() on a "selected" handle should never block, but(!) it may return
+less data then we expected. (The buffer size we pass to read() shouldn't be
+understood as a "request", but as a "limit".)
-while (!done)
- {
- retry_item *r, **rp;
- int remaining = endptr - ptr;
- uschar header[PIPE_HEADER_SIZE + 1];
- uschar id, subid;
- uschar *endc;
-
- /* Read (first time) or top up the chars in the buffer if necessary.
- There will be only one read if we get all the available data (i.e. don't
- fill the buffer completely). */
-
- if (remaining < required && !finished)
- {
- int len;
- int available = big_buffer_size - remaining;
+Each separate item is written to the pipe in a timely manner. But, especially for
+larger items, the read(2) may already return partial data from the write(2).
- if (remaining > 0) memmove(big_buffer, ptr, remaining);
+The write is atomic mostly (depending on the amount written), but atomic does
+not imply "all or noting", it just is "not intermixed" with other writes on the
+same channel (pipe).
- ptr = big_buffer;
- endptr = big_buffer + remaining;
- len = read(fd, endptr, available);
-
- DEBUG(D_deliver) debug_printf("read() yielded %d\n", len);
-
- /* If the result is EAGAIN and the process is not complete, just
- stop reading any more and process what we have already. */
-
- if (len < 0)
- {
- if (!eop && errno == EAGAIN) len = 0; else
- {
- msg = string_sprintf("failed to read pipe from transport process "
- "%d for transport %s: %s", pid, addr->transport->driver_name,
- strerror(errno));
- break;
- }
- }
+*/
- /* If the length is zero (eof or no-more-data), just process what we
- already have. Note that if the process is still running and we have
- read all the data in the pipe (but less that "available") then we
- won't read any more, as "finished" will get set. */
+DEBUG(D_deliver) debug_printf("reading pipe for subprocess %d (%s)\n",
+ (int)p->pid, eop? "ended" : "not ended yet");
- endptr += len;
- remaining += len;
- finished = len != available;
+while (!done)
+ {
+ retry_item *r, **rp;
+ uschar pipeheader[PIPE_HEADER_SIZE+1];
+ uschar *id = &pipeheader[0];
+ uschar *subid = &pipeheader[1];
+ uschar *ptr = big_buffer;
+ size_t required = PIPE_HEADER_SIZE; /* first the pipehaeder, later the data */
+ ssize_t got;
+
+ DEBUG(D_deliver) debug_printf(
+ "expect %lu bytes (pipeheader) from tpt process %d\n", (u_long)required, pid);
+
+ /* We require(!) all the PIPE_HEADER_SIZE bytes here, as we know,
+ they're written in a timely manner, so waiting for the write shouldn't hurt a lot.
+ If we get less, we can assume the subprocess do be done and do not expect any further
+ information from it. */
+
+ got = readn(fd, pipeheader, required);
+ if (got != required)
+ {
+ msg = string_sprintf("got %d of %d bytes (pipeheader) "
+ "from transport process %d for transport %s",
+ got, PIPE_HEADER_SIZE, pid, addr->transport->driver_name);
+ done = TRUE;
+ break;
}
- /* If we are at the end of the available data, exit the loop. */
- if (ptr >= endptr) break;
+ pipeheader[PIPE_HEADER_SIZE] = '\0';
+ DEBUG(D_deliver)
+ debug_printf("got %ld bytes (pipeheader) from transport process %d\n",
+ (long) got, pid);
- /* copy and read header */
- memcpy(header, ptr, PIPE_HEADER_SIZE);
- header[PIPE_HEADER_SIZE] = '\0';
- id = header[0];
- subid = header[1];
- required = Ustrtol(header + 2, &endc, 10) + PIPE_HEADER_SIZE; /* header + data */
+ {
+ /* If we can't decode the pipeheader, the subprocess seems to have a
+ problem, we do not expect any furher information from it. */
+ char *endc;
+ required = Ustrtol(pipeheader+2, &endc, 10);
if (*endc)
{
- msg = string_sprintf("failed to read pipe from transport process "
- "%d for transport %s: error reading size from header", pid, addr->transport->driver_name);
+ msg = string_sprintf("failed to read pipe "
+ "from transport process %d for transport %s: error decoding size from header",
+ pid, addr->transport->driver_name);
done = TRUE;
break;
}
+ }
DEBUG(D_deliver)
- debug_printf("header read id:%c,subid:%c,size:%s,required:%d,remaining:%d,finished:%d\n",
- id, subid, header+2, required, remaining, finished);
-
- /* is there room for the dataset we want to read ? */
- if (required > big_buffer_size - PIPE_HEADER_SIZE)
- {
- msg = string_sprintf("failed to read pipe from transport process "
- "%d for transport %s: big_buffer too small! required size=%d buffer size=%d", pid, addr->transport->driver_name,
- required, big_buffer_size - PIPE_HEADER_SIZE);
- done = TRUE;
- break;
- }
-
- /* We wrote all datasets with atomic write() calls. Remaining < required only
- happens if big_buffer was too small to get all available data from pipe;
- finished has to be false as well. */
-
- if (remaining < required)
- {
- if (!finished)
- continue;
- msg = string_sprintf("failed to read pipe from transport process "
- "%d for transport %s: required size=%d > remaining size=%d and finished=true",
- pid, addr->transport->driver_name, required, remaining);
- done = TRUE;
- break;
+ debug_printf("expect %lu bytes (pipedata) from transport process %d\n",
+ (u_long)required, pid);
+
+ /* Same as above, the transport process will write the bytes announced
+ in a timely manner, so we can just wait for the bytes, getting less than expected
+ is considered a problem of the subprocess, we do not expect anything else from it. */
+ got = readn(fd, big_buffer, required);
+ if (got != required)
+ {
+ msg = string_sprintf("got only %d of %d bytes (pipedata) "
+ "from transport process %d for transport %s",
+ got, required, pid, addr->transport->driver_name);
+ done = TRUE;
+ break;
}
- /* Step past the header */
- ptr += PIPE_HEADER_SIZE;
-
/* Handle each possible type of item, assuming the complete item is
available in store. */
- switch (id)
+ switch (*id)
{
/* Host items exist only if any hosts were marked unusable. Match
up by checking the IP address. */
#ifdef SUPPORT_TLS
case 'X':
if (!addr) goto ADDR_MISMATCH; /* Below, in 'A' handler */
- switch (subid)
+ switch (*subid)
{
case '1':
addr->cipher = NULL;
#endif /*SUPPORT_TLS*/
case 'C': /* client authenticator information */
- switch (subid)
+ switch (*subid)
{
case '1': addr->authenticator = *ptr ? string_copy(ptr) : NULL; break;
case '2': addr->auth_id = *ptr ? string_copy(ptr) : NULL; break;
#ifndef DISABLE_PRDR
case 'P':
- addr->flags |= af_prdr_used;
+ setflag(addr, af_prdr_used);
break;
#endif
case 'K':
- addr->flags |= af_chunking_used;
+ setflag(addr, af_chunking_used);
+ break;
+
+ case 'T':
+ setflag(addr, af_tcp_fastopen);
break;
case 'D':
break;
}
- switch (subid)
+ switch (*subid)
{
#ifdef SUPPORT_SOCKS
case '2': /* proxy information; must arrive before A0 and applies to that addr XXX oops*/
p->done = done;
/* If the process hadn't finished, and we haven't seen the end of the data
-or suffered a disaster, update the rest of the state, and return FALSE to
+or if we suffered a disaster, update the rest of the state, and return FALSE to
indicate "not finished". */
if (!eop && !done)
addr->transport_return = DEFER;
addr->special_action = SPECIAL_FREEZE;
addr->message = msg;
+ log_write(0, LOG_MAIN|LOG_PANIC, "Delivery status for %s: %s\n", addr->address, addr->message);
}
/* Return TRUE to indicate we have got all we need from this process, even
{
readycount--;
if (par_read_pipe(poffset, FALSE)) /* Finished with this pipe */
- {
for (;;) /* Loop for signals */
{
pid_t endedpid = waitpid(pid, &status, 0);
"%d (errno = %d) from waitpid() for process %d",
(int)endedpid, errno, (int)pid);
}
- }
}
}
}
}
-
-
-
static void
-rmt_dlv_checked_write(int fd, char id, char subid, void * buf, int size)
+rmt_dlv_checked_write(int fd, char id, char subid, void * buf, ssize_t size)
{
-uschar writebuffer[PIPE_HEADER_SIZE + BIG_BUFFER_SIZE];
-int header_length;
-int ret;
+uschar pipe_header[PIPE_HEADER_SIZE+1];
+size_t total_len = PIPE_HEADER_SIZE + size;
+
+struct iovec iov[2] = {
+ { pipe_header, PIPE_HEADER_SIZE }, /* indication about the data to expect */
+ { buf, size } /* *the* data */
+};
+
+ssize_t ret;
/* we assume that size can't get larger then BIG_BUFFER_SIZE which currently is set to 16k */
/* complain to log if someone tries with buffer sizes we can't handle*/
-if (size > 99999)
+if (size > BIG_BUFFER_SIZE-1)
{
log_write(0, LOG_MAIN|LOG_PANIC_DIE,
- "Failed writing transport result to pipe: can't handle buffers > 99999 bytes. truncating!\n");
- size = 99999;
+ "Failed writing transport result to pipe: can't handle buffers > %d bytes. truncating!\n",
+ BIG_BUFFER_SIZE-1);
+ size = BIG_BUFFER_SIZE;
}
-/* to keep the write() atomic we build header in writebuffer and copy buf behind */
-/* two write() calls would increase the complexity of reading from pipe */
+/* Should we check that we do not write more than PIPE_BUF? What would
+that help? */
/* convert size to human readable string prepended by id and subid */
-header_length = snprintf(CS writebuffer, PIPE_HEADER_SIZE+1, "%c%c%05d", id, subid, size);
-if (header_length != PIPE_HEADER_SIZE)
- {
+if (PIPE_HEADER_SIZE != snprintf(CS pipe_header, PIPE_HEADER_SIZE+1, "%c%c%05ld",
+ id, subid, (long)size))
log_write(0, LOG_MAIN|LOG_PANIC_DIE, "header snprintf failed\n");
- writebuffer[0] = '\0';
- }
-
-DEBUG(D_deliver) debug_printf("header write id:%c,subid:%c,size:%d,final:%s\n",
- id, subid, size, writebuffer);
-if (buf && size > 0)
- memcpy(writebuffer + PIPE_HEADER_SIZE, buf, size);
+DEBUG(D_deliver) debug_printf("header write id:%c,subid:%c,size:%ld,final:%s\n",
+ id, subid, (long)size, pipe_header);
-size += PIPE_HEADER_SIZE;
-if ((ret = write(fd, writebuffer, size)) != size)
- log_write(0, LOG_MAIN|LOG_PANIC_DIE, "Failed writing transport result to pipe: %s\n",
- ret == -1 ? strerror(errno) : "short write");
+if ((ret = writev(fd, iov, 2)) != total_len)
+ log_write(0, LOG_MAIN|LOG_PANIC_DIE,
+ "Failed writing transport result to pipe (%ld of %ld bytes): %s",
+ (long)ret, (long)total_len, ret == -1 ? strerror(errno) : "short write");
}
/*************************************************
that it can use either of them, though it prefers O_NONBLOCK, which
distinguishes between EOF and no-more-data. */
+/* The data appears in a timely manner and we already did a select on
+all pipes, so I do not see a reason to use non-blocking IO here
+
#ifdef O_NONBLOCK
(void)fcntl(pfd[pipe_read], F_SETFL, O_NONBLOCK);
#else
(void)fcntl(pfd[pipe_read], F_SETFL, O_NDELAY);
#endif
+*/
/* If the maximum number of subprocesses already exist, wait for a process
to finish. If we ran out of file descriptors, parmax will have been reduced
search_tidyup();
+
if ((pid = fork()) == 0)
{
int fd = pfd[pipe_write];
host_item *h;
+ DEBUG(D_deliver) debug_selector |= D_pid; // hs12
/* Setting this global in the subprocess means we need never clear it */
transport_name = tp->name;
}
#ifndef DISABLE_PRDR
- if (addr->flags & af_prdr_used)
+ if (testflag(addr, af_prdr_used))
rmt_dlv_checked_write(fd, 'P', '0', NULL, 0);
#endif
- if (addr->flags & af_chunking_used)
+ if (testflag(addr, af_chunking_used))
rmt_dlv_checked_write(fd, 'K', '0', NULL, 0);
+ if (testflag(addr, af_tcp_fastopen))
+ rmt_dlv_checked_write(fd, 'T', '0', NULL, 0);
+
memcpy(big_buffer, &addr->dsn_aware, sizeof(addr->dsn_aware));
rmt_dlv_checked_write(fd, 'D', '0', big_buffer, sizeof(addr->dsn_aware));
/* Fork failed; defer with error message */
- if (pid < 0)
+ if (pid == -1)
{
(void)close(pfd[pipe_read]);
panicmsg = string_sprintf("fork failed for remote delivery to %s: %s",
uschar *type;
p->uid = uid;
p->gid = gid;
- setflag(p, af_uid_set |
- af_gid_set |
- af_allow_file |
- af_allow_pipe |
- af_allow_reply);
+ setflag(p, af_uid_set);
+ setflag(p, af_gid_set);
+ setflag(p, af_allow_file);
+ setflag(p, af_allow_pipe);
+ setflag(p, af_allow_reply);
/* Find the name of the system filter's appropriate pfr transport */
addr->local_part = addr->address;
addr->message =
US"filter autoreply generated syntactically invalid recipient";
- setflag(addr, af_ignore_error);
- (void)post_process_one(addr, FAIL, LOG_MAIN, DTYPE_ROUTER, 0);
+ addr->prop.ignore_error = TRUE;
+ (void) post_process_one(addr, FAIL, LOG_MAIN, DTYPE_ROUTER, 0);
continue; /* with the next new address */
}
addr2->host_list = addr->host_list;
addr2->fallback_hosts = addr->fallback_hosts;
addr2->prop.errors_address = addr->prop.errors_address;
- copyflag(addr2, addr, af_hide_child | af_local_host_removed);
+ copyflag(addr2, addr, af_hide_child);
+ copyflag(addr2, addr, af_local_host_removed);
DEBUG(D_deliver|D_route)
- {
debug_printf(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n"
"routing %s\n"
"Routing for %s copied from %s\n",
addr2->address, addr2->address, addr->address);
- }
}
}
} /* Continue with routing the next address. */
if (journal_fd < 0)
{
uschar * fname = spool_fname(US"input", message_subdir, id, US"-J");
-
+
if ((journal_fd = Uopen(fname,
#ifdef O_CLOEXEC
O_CLOEXEC |
FILE *f = fdopen(fd, "wb");
/* header only as required by RFC. only failure DSN needs to honor RET=FULL */
uschar * bound;
- transport_ctx tctx = {0};
+ transport_ctx tctx = {{0}};
DEBUG(D_deliver)
debug_printf("sending error message to: %s\n", sender_address);
if (sender_address[0] == 0 && !addr_failed->prop.errors_address)
{
if ( !testflag(addr_failed, af_retry_timedout)
- && !testflag(addr_failed, af_ignore_error))
- {
+ && !addr_failed->prop.ignore_error)
log_write(0, LOG_MAIN|LOG_PANIC, "internal error: bounce message "
"failure is neither frozen nor ignored (it's been ignored)");
- }
- setflag(addr_failed, af_ignore_error);
+
+ addr_failed->prop.ignore_error = TRUE;
}
/* If the first address on the list has af_ignore_error set, just remove
it from the list, throw away any saved message file, log it, and
mark the recipient done. */
- if ( testflag(addr_failed, af_ignore_error)
+ if ( addr_failed->prop.ignore_error
|| ( addr_failed->dsn_flags & rf_dsnflags
&& (addr_failed->dsn_flags & rf_notify_failure) != rf_notify_failure
) )
transport_filter_argv = NULL; /* Just in case */
return_path = sender_address; /* In case not previously set */
{ /* Dummy transport for headers add */
- transport_ctx tctx = {0};
+ transport_ctx tctx = {{0}};
transport_instance tb = {0};
tctx.u.fd = fileno(f);
FILE *wmf = NULL;
FILE *f = fdopen(fd, "wb");
uschar * bound;
- transport_ctx tctx = {0};
+ transport_ctx tctx = {{0}};
if (warn_message_file)
if (!(wmf = Ufopen(warn_message_file, "rb")))
void
delivery_re_exec(int exec_type)
{
-uschar * s;
+uschar * where;
if (cutthrough.fd >= 0 && cutthrough.callout_hold_only)
{
sending_ip_address = cutthrough.snd_ip;
sending_port = cutthrough.snd_port;
- s = US"socketpair";
+ where = US"socketpair";
if (socketpair(AF_UNIX, SOCK_STREAM, 0, pfd) != 0)
goto fail;
- s = US"fork";
+ where = US"fork";
if ((pid = fork()) < 0)
goto fail;
- else if (pid == 0) /* child: fork again to totally dosconnect */
+ else if (pid == 0) /* child: fork again to totally disconnect */
{
close(pfd[1]);
if ((pid = fork()))
cancel_cutthrough_connection(TRUE, US"non-continued delivery");
(void) child_exec_exim(exec_type, FALSE, NULL, FALSE, 2, US"-Mc", message_id);
}
-/* Control does not return here. */
+return; /* compiler quietening; control does not reach here. */
fail:
log_write(0,
LOG_MAIN | (exec_type == CEE_EXEC_EXIT ? LOG_PANIC : LOG_PANIC_DIE),
- "delivery re-exec failed: %s", strerror(errno));
+ "delivery re-exec %s failed: %s", where, strerror(errno));
/* Get here if exec_type == CEE_EXEC_EXIT.
Note: this must be _exit(), not exit(). */