From: Jeremy Harris Date: Thu, 27 Jun 2024 10:52:37 +0000 (+0100) Subject: Hintsdb transactions X-Git-Url: https://git.exim.org/exim.git/commitdiff_plain/600dc06981df5a906125f8442c36056a117412d4 Hintsdb transactions --- diff --git a/doc/doc-txt/ChangeLog b/doc/doc-txt/ChangeLog index 4c2412b9c..833ac7d69 100644 --- a/doc/doc-txt/ChangeLog +++ b/doc/doc-txt/ChangeLog @@ -10,6 +10,12 @@ JH/01 Use fewer forks & execs for sending many messages to a single host. process, we can loop there. A two-phase queue run will benefit, particularly for mailinglist and smarthost cases. +JH/02 Add transaction support for hintsdbs. The sole initial provider is + sqlite, and is used for the wait-trasnprot DB. Transactions imply + locking internal to the DB. We no longer need a separate lockfile, can + keep the DB handle open for extended periods, and still potentially + benefit from concurrency on non-conlicting record uses. + Exim version 4.98 ----------------- diff --git a/src/src/dbfn.c b/src/src/dbfn.c index 2b5ec908b..e12871cdd 100644 --- a/src/src/dbfn.c +++ b/src/src/dbfn.c @@ -237,6 +237,78 @@ return dbblock; +/* Only for transaction-capable DB types. Open without locking or +starting a transaction. "lof" and "panic" always true; read/write mode. +*/ + +open_db * +dbfn_open_multi(const uschar * name, open_db * dbblock) +{ +int rc, save_errno; +flock_t lock_data; +uschar dirname[PATHLEN], filename[PATHLEN]; + +DEBUG(D_hints_lookup) acl_level++; + +dbblock->lockfd = -1; +db_dir_make(TRUE); + +snprintf(CS dirname, sizeof(dirname), "%s/db", spool_directory); +snprintf(CS filename, sizeof(filename), "%s/%s", dirname, name); + +priv_drop_temp(exim_uid, exim_gid); +dbblock->dbptr = exim_dbopen_multi(filename, dirname, O_RDWR, EXIMDB_MODE); +if (!dbblock->dbptr && errno == ENOENT) + { + DEBUG(D_hints_lookup) + debug_printf_indent("%s appears not to exist: trying to create\n", filename); + dbblock->dbptr = exim_dbopen_multi(filename, dirname, O_RDWR|O_CREAT, EXIMDB_MODE); + } +save_errno = errno; +priv_restore(); + +/* If the open has failed, return NULL, leaving errno set. If lof is TRUE, +log the event - also for debugging - but debug only if the file just doesn't +exist. */ + +if (!dbblock->dbptr) + { + errno = save_errno; + if (save_errno != ENOENT) + log_write(0, LOG_MAIN, "%s", string_open_failed("DB file %s", + filename)); + else + DEBUG(D_hints_lookup) + debug_printf_indent("%s\n", CS string_open_failed("DB file %s", + filename)); + errno = save_errno; + DEBUG(D_hints_lookup) acl_level--; + return NULL; + } + +DEBUG(D_hints_lookup) debug_printf_indent( + "opened hints database %s for transactions: NOLOCK flags=O_RDWR\n", filename); + +/* Pass back the block containing the opened database handle */ + +return dbblock; +} + + +BOOL +dbfn_transaction_start(open_db * dbp) +{ +DEBUG(D_hints_lookup) debug_printf_indent("dbfn_transaction_start\n"); +return exim_dbtransaction_start(dbp->dbptr); +} +void +dbfn_transaction_commit(open_db * dbp) +{ +DEBUG(D_hints_lookup) debug_printf_indent("dbfn_transaction_commit\n"); +return exim_dbtransaction_commit(dbp->dbptr); +} + + /************************************************* * Unlock and close a database file * @@ -250,11 +322,11 @@ Returns: nothing */ void -dbfn_close(open_db *dbblock) +dbfn_close(open_db * dbp) { -int * fdp = &dbblock->lockfd; +int * fdp = &dbp->lockfd; -exim_dbclose(dbblock->dbptr); +exim_dbclose(dbp->dbptr); if (*fdp >= 0) (void)close(*fdp); DEBUG(D_hints_lookup) { @@ -266,6 +338,18 @@ DEBUG(D_hints_lookup) } +void +dbfn_close_multi(open_db * dbp) +{ +exim_dbclose_multi(dbp->dbptr); +DEBUG(D_hints_lookup) + { + debug_printf_indent("closed hints database\n"); + acl_level--; + } +} + + /************************************************* diff --git a/src/src/dbfunctions.h b/src/src/dbfunctions.h index 0b0bcab22..cc4c4f655 100644 --- a/src/src/dbfunctions.h +++ b/src/src/dbfunctions.h @@ -13,12 +13,16 @@ /* Functions for reading/writing exim database files */ void dbfn_close(open_db *); +void dbfn_close_multi(open_db *); int dbfn_delete(open_db *, const uschar *); open_db *dbfn_open(const uschar *, int, open_db *, BOOL, BOOL); +open_db *dbfn_open_multi(const uschar *, open_db *); void *dbfn_read_with_length(open_db *, const uschar *, int *); void *dbfn_read_enforce_length(open_db *, const uschar *, size_t); uschar *dbfn_scan(open_db *, BOOL, EXIM_CURSOR **); int dbfn_write(open_db *, const uschar *, void *, int); +BOOL dbfn_transaction_start(open_db *); +void dbfn_transaction_commit(open_db *); /* Macro for the common call to read without wanting to know the length. */ diff --git a/src/src/deliver.c b/src/src/deliver.c index d2b2c3ab2..24be14982 100644 --- a/src/src/deliver.c +++ b/src/src/deliver.c @@ -4700,6 +4700,32 @@ all pipes, so I do not see a reason to use non-blocking IO here search_tidyup(); +/* +A continued-tpt will, in the tpt parent here, call par_reduce for +the one child. But we are hoping to never do continued-transport... +SO.... we may have called par_reduce for a single child, above when we'd +hit the limit on child-count. Possibly multiple times with different +transports and target hosts. Does it matter if several return a suggested +next-id, and we lose all but the last? Hmm. Less parallel working would +happen. Perhaps still do continued-tpt once one has been set? No, that won't +work for all cases. +BAH. +Could take the initial continued-tpt hit, and then do the next-id thing? + +do_remote_deliveries par_reduce par_wait par_read_pipe +*/ + +if ( continue_transport + && !continue_wait_db + && !exim_lockfile_needed() + ) + { + open_db * dbp = store_get(sizeof(open_db), GET_UNTAINTED); + continue_wait_db = dbfn_open_multi( + string_sprintf("wait-%.200s", continue_transport), dbp); + continue_next_id[0] = '\0'; + } + if ((pid = exim_fork(US"transport")) == 0) { int fd = pfd[pipe_write]; @@ -5038,6 +5064,7 @@ all pipes, so I do not see a reason to use non-blocking IO here rmt_dlv_checked_write(fd, 'I', '0', big_buffer, ptr - big_buffer); } +/*XXX new code*/ /* Continuation message-id */ if (*continue_next_id) rmt_dlv_checked_write(fd, 'Z', '1', continue_next_id, MESSAGE_ID_LENGTH); @@ -5103,14 +5130,20 @@ all pipes, so I do not see a reason to use non-blocking IO here (continue_transport gets set to NULL) before we consider any other addresses in this message. */ - if (continue_transport) par_reduce(0, fallback); + if (continue_transport) + { + par_reduce(0, fallback); + if (continue_wait_db && !continue_next_id) + { dbfn_close_multi(continue_wait_db); continue_wait_db = NULL; } + } /* Otherwise, if we are running in the test harness, wait a bit, to let the newly created process get going before we create another process. This should ensure repeatability in the tests. Wait long enough for most cases to complete the transport. */ - else testharness_pause_ms(600); + else + testharness_pause_ms(600); continue; @@ -6462,8 +6495,8 @@ address_item * addr_last; uschar * filter_message, * info; open_db dbblock, * dbm_file; extern int acl_where; -CONTINUED_ID: +CONTINUED_ID: final_yield = DELIVER_ATTEMPTED_NORMAL; now = time(NULL); addr_last = NULL; diff --git a/src/src/globals.c b/src/src/globals.c index 0f9d5b54f..7c7395022 100644 --- a/src/src/globals.c +++ b/src/src/globals.c @@ -749,6 +749,9 @@ uschar *continue_host_address = NULL; uschar continue_next_id[MESSAGE_ID_LENGTH +1] = {[0]='\0'}; int continue_sequence = 1; uschar *continue_transport = NULL; +#ifndef COMPILE_UTILITY +open_db *continue_wait_db = NULL; +#endif #ifndef DISABLE_ESMTP_LIMITS unsigned continue_limit_mail = 0; unsigned continue_limit_rcpt = 0; diff --git a/src/src/globals.h b/src/src/globals.h index a82d529c0..57b930fd4 100644 --- a/src/src/globals.h +++ b/src/src/globals.h @@ -453,6 +453,9 @@ extern uschar *continue_host_address; /* IP address for ditto */ extern uschar continue_next_id[]; /* Next message_id from hintsdb */ extern int continue_sequence; /* Sequence num for continued delivery */ extern uschar *continue_transport; /* Transport for continued delivery */ +#ifndef COMPILE_UTILITY +extern open_db *continue_wait_db; /* Hintsdb for wait-transport */ +#endif #ifndef DISABLE_ESMTP_LIMITS extern unsigned continue_limit_mail; /* Peer advertised limit */ extern unsigned continue_limit_rcpt; diff --git a/src/src/hintsdb.h b/src/src/hintsdb.h index c5a856abc..c19bb039a 100644 --- a/src/src/hintsdb.h +++ b/src/src/hintsdb.h @@ -97,7 +97,7 @@ return FALSE; /* We do transaction; no extra locking needed */ /* EXIM_DBOPEN - return pointer to an EXIM_DB, NULL if failed */ static inline EXIM_DB * -exim_dbopen__(const uschar * name, const uschar * dirname, int flags, +exim_dbopen_multi(const uschar * name, const uschar * dirname, int flags, unsigned mode) { EXIM_DB * dbp; @@ -106,8 +106,7 @@ if (flags & O_CREAT) sflags |= SQLITE_OPEN_CREATE; if ((ret = sqlite3_open_v2(CCS name, &dbp, sflags, NULL)) == SQLITE_OK) { sqlite3_busy_timeout(dbp, 5000); - ret = sqlite3_exec(dbp, "BEGIN TRANSACTION;", NULL, NULL, NULL); - if (ret == SQLITE_OK && flags & O_CREAT) + if (flags & O_CREAT) ret = sqlite3_exec(dbp, "CREATE TABLE IF NOT EXISTS tbl (ky TEXT PRIMARY KEY, dat BLOB);", NULL, NULL, NULL); @@ -119,6 +118,23 @@ if ((ret = sqlite3_open_v2(CCS name, &dbp, sflags, NULL)) == SQLITE_OK) return ret == SQLITE_OK ? dbp : NULL; } +static inline BOOL +exim_dbtransaction_start(EXIM_DB * dbp) +{ +return sqlite3_exec(dbp, "BEGIN TRANSACTION;", NULL, NULL, NULL) == SQLITE_OK; +} + +static inline EXIM_DB * +exim_dbopen__(const uschar * name, const uschar * dirname, int flags, + unsigned mode) +{ +EXIM_DB * dbp = exim_dbopen_multi(name, dirname, flags, mode); +if (!dbp || exim_dbtransaction_start(dbp)) + return dbp; +sqlite3_close(dbp); +return NULL; +} + /* EXIM_DBGET - returns TRUE if successful, FALSE otherwise */ /* note we alloc'n'copy - the caller need not do so */ /* result has a NUL appended, but the length is as per the DB */ @@ -322,12 +338,22 @@ store_free(cursor); /* EXIM_DBCLOSE */ -static void -exim_dbclose__(EXIM_DB * dbp) +static inline void +exim_dbclose_multi(EXIM_DB * dbp) { -(void) sqlite3_exec(dbp, "COMMIT TRANSACTION;", NULL, NULL, NULL); sqlite3_close(dbp); } +static inline void +exim_dbtransaction_commit(EXIM_DB * dbp) +{ +(void) sqlite3_exec(dbp, "COMMIT TRANSACTION;", NULL, NULL, NULL); +} +static inline void +exim_dbclose__(EXIM_DB * dbp) +{ +exim_dbtransaction_commit(dbp); +exim_dbclose_multi(dbp); +} /* Datum access */ @@ -400,6 +426,13 @@ exim_lockfile_needed(void) return TRUE; } +static inline EXIM_DB * +exim_dbopen_multi(const uschar * name, const uschar * dirname, int flags, + unsigned mode) { return NULL; } +static inline void exim_dbclose_multi(EXIM_DB * dbp) {} +static inline BOOL exim_dbtransaction_start(EXIM_DB * dbp) { return FALSE; } +static inline void exim_dbtransaction_commit(EXIM_DB * dbp) {} + /* EXIM_DBOPEN - return pointer to an EXIM_DB, NULL if failed */ static inline EXIM_DB * exim_dbopen__(const uschar * name, const uschar * dirname, int flags, @@ -583,6 +616,13 @@ exim_lockfile_needed(void) return TRUE; } +static inline EXIM_DB * +exim_dbopen_multi(const uschar * name, const uschar * dirname, int flags, + unsigned mode) { return NULL; } +static inline void exim_dbclose_multi(EXIM_DB * dbp) {} +static inline BOOL exim_dbtransaction_start(EXIM_DB * dbp) { return FALSE; } +static inline void exim_dbtransaction_commit(EXIM_DB * dbp) {} + /* EXIM_DBOPEN - return pointer to an EXIM_DB, NULL if failed */ /* The API changed for DB 4.1. - and we also starting using the "env" with a specified working dir, to avoid the DBCONFIG file trap. */ @@ -736,6 +776,13 @@ exim_lockfile_needed(void) return TRUE; } +static inline EXIM_DB * +exim_dbopen_multi(const uschar * name, const uschar * dirname, int flags, + unsigned mode) { return NULL; } +static inline void exim_dbclose_multi(EXIM_DB * dbp) {} +static inline BOOL exim_dbtransaction_start(EXIM_DB * dbp) { return FALSE; } +static inline void exim_dbtransaction_commit(EXIM_DB * dbp) {} + /* EXIM_DBOPEN - return pointer to an EXIM_DB, NULL if failed */ static inline EXIM_DB * exim_dbopen__(const uschar * name, const uschar * dirname, int flags, @@ -889,6 +936,13 @@ exim_lockfile_needed(void) return TRUE; } +static inline EXIM_DB * +exim_dbopen_multi(const uschar * name, const uschar * dirname, int flags, + unsigned mode) { return NULL; } +static inline void exim_dbclose_multi(EXIM_DB * dbp) {} +static inline BOOL exim_dbtransaction_start(EXIM_DB * dbp) { return FALSE; } +static inline void exim_dbtransaction_commit(EXIM_DB * dbp) {} + /* EXIM_DBOPEN - return pointer to an EXIM_DB, NULL if failed */ static inline EXIM_DB * exim_dbopen__(const uschar * name, const uschar * dirname, int flags, @@ -1035,6 +1089,13 @@ exim_lockfile_needed(void) return TRUE; } +static inline EXIM_DB * +exim_dbopen_multi(const uschar * name, const uschar * dirname, int flags, + unsigned mode) { return NULL; } +static inline void exim_dbclose_multi(EXIM_DB * dbp) {} +static inline BOOL exim_dbtransaction_start(EXIM_DB * dbp) { return FALSE; } +static inline void exim_dbtransaction_commit(EXIM_DB * dbp) {} + /* EXIM_DBOPEN - returns a EXIM_DB *, NULL if failed */ /* Check that the name given is not present. This catches a directory name; otherwise we would create the name.pag and diff --git a/src/src/queue.c b/src/src/queue.c index b811a53bd..3073ee780 100644 --- a/src/src/queue.c +++ b/src/src/queue.c @@ -341,8 +341,10 @@ so force the first one. The selecting string can optionally be a regex, or refer to the sender instead of recipients. If queue_2stage is set, the queue is scanned twice. The first time, queue_smtp -is set so that routing is done for all messages. Thus in the second run those -that are routed to the same host should go down the same SMTP connection. +is set so that routing is done for all messages. A call of the transport adds +each message_id in turn to a list for the resulting host. +Then in the second run those that are routed to the same host should all go down +a single SMTP connection. Arguments: q queue-runner descriptor @@ -794,7 +796,7 @@ if (q->queue_2stage) else break; #ifdef MEASURE_TIMING - report_time_since(×tamp_startup, US"queue_run 1st phase done"); + report_time_since(×tamp_startup, US"queue_run phase 1 done"); #endif q->queue_2stage = f.queue_2stage = FALSE; DEBUG(D_queue_run) debug_printf("queue_run phase 2 start\n"); diff --git a/src/src/transport.c b/src/src/transport.c index 3245b2cae..6468f9f23 100644 --- a/src/src/transport.c +++ b/src/src/transport.c @@ -1511,10 +1511,13 @@ if (!is_new_message_id(message_id)) DEBUG(D_transport) debug_printf("updating wait-%s database\n", tpname); -/* Open the database for this transport */ +/* Open the database (or transaction) for this transport */ -if (!(dbp = dbfn_open(string_sprintf("wait-%.200s", tpname), - O_RDWR, &dbblock, TRUE, TRUE))) +if ( continue_wait_db + ? !dbfn_transaction_start(dbp = continue_wait_db) + : !(dbp = dbfn_open(string_sprintf("wait-%.200s", tpname), + O_RDWR, &dbblock, TRUE, TRUE)) + ) return; /* Scan the list of hosts for which this message is waiting, and ensure @@ -1640,7 +1643,10 @@ for (host_item * host = hostlist; host; host = host->next) /* All now done */ -dbfn_close(dbp); +if (continue_wait_db) + dbfn_transaction_commit(dbp); +else + dbfn_close(dbp); } @@ -1713,8 +1719,11 @@ if (local_message_max > 0 && continue_sequence >= local_message_max) /* Open the waiting information database. */ -if (!(dbp = dbfn_open(string_sprintf("wait-%.200s", transport_name), - O_RDWR, &dbblock, TRUE, TRUE))) +if ( continue_wait_db + ? !dbfn_transaction_start(dbp = continue_wait_db) + : !(dbp = dbfn_open(string_sprintf("wait-%.200s", transport_name), + O_RDWR, &dbblock, TRUE, TRUE)) + ) goto retfalse; /* See if there is a record for this host; if not, there's nothing to do. */ @@ -1907,7 +1916,10 @@ if (host_length > 0) dbfn_write(dbp, hostname, host_record, (int)sizeof(dbdata_wait) + host_length); } -dbfn_close(dbp); +if (continue_wait_db) + dbfn_transaction_commit(dbp); +else + dbfn_close(dbp); DEBUG(D_transport) { @@ -1917,7 +1929,10 @@ DEBUG(D_transport) return TRUE; dbclose_false: - dbfn_close(dbp); + if (continue_wait_db) + dbfn_transaction_commit(dbp); + else + dbfn_close(dbp); retfalse: DEBUG(D_transport) @@ -2064,6 +2079,11 @@ int status; DEBUG(D_transport) debug_printf("transport_pass_socket entered\n"); +/*XXX we'd prefer this never happens, by not calling here for this +case (instead, passing back the next-id. But in case it does... */ +if (continue_wait_db) + { dbfn_close_multi(continue_wait_db); continue_wait_db = NULL; } + #ifndef DISABLE_ESMTP_LIMITS continue_limit_mail = peer_limit_mail; continue_limit_rcpt = peer_limit_rcpt; @@ -2075,6 +2095,46 @@ if ((pid = exim_fork(US"continued-transport")) == 0) /* If we are running in the test harness, wait for a bit to allow the previous process time to finish, write the log, etc., so that the output is always in the same order for automatic comparison. */ + /* The double-fork goes back at least as far as 0.53 (June 1996). As of + 2024 I'm unclear why it is needed, especially given the following exec. + I suppose it means that the parent [caller of transport_pass_socket()] + [ that would be the "transport" proc ] + has no direct extant child procs, from this operation. Does it wait + for children? Not obviously so far, and a quick test has is working + apparently ok with a single fork. Further: The "transport" proc goes + on to only send results back up the pipe to its parent, the "delivery" + proc. It'd be kinda nice to swap the sequence around: send the results back, + omit the forking entrely, and exec the new transport. But the code + it all in the wrong place (the pipe-write in deliver.c and here we're + down in the transport). Perhaps we could pass the suggested next + msgid back up the pipe? + + How would this interact with the existing TLS proxy process? + Probably the way continue_more does at present. That feature is + for the case where a single message has more (recip) addrs than + can be handled in a single call to the transport. The continue-more + flag is set; the transport avoids doing a continue-transport fork/exec, + closes TLS and passes back to the deliver proc and exits. The deliver proc + makes a further call to the transport. An RSET is sent on the conn; + and the still-open conn is left for the deliver proc to make another + call to the transport with it open. That only works because it was + originally a continued-conn, also, so the deliver proc has the conn. + - So if already a continued-conn, could pass back the next-message-id + rather than doing a further continued-conn - but we'd have to re-establish + TLS. + [ Which is a pity, and should also be worked on. ] + We do not need to pass the wait-tpt hintsdb handle across an exec-for- + continued-conn because the deliver proc can know what tpt will be used, + so the name is predictable and it cam open it. May as well do that for + any remote tpt, and skip the open/close code in the tpt. Perhaps local + tpts also for consistency. But... only for transaction-capable DB providers + (and we will be assuming they are sequential-fork-safe). + + Architecture. The transport is a separate proc so that it can + - go badly wrong and die, being monitored from a safer parent + - manipulate privs, to deliver to local files. But here, we're smtp + and don't to that! + */ testharness_pause_ms(500); transport_do_pass_socket(transport_name, hostname, hostaddress, diff --git a/src/src/transports/smtp.c b/src/src/transports/smtp.c index 30983984a..172ee3445 100644 --- a/src/src/transports/smtp.c +++ b/src/src/transports/smtp.c @@ -4799,17 +4799,17 @@ if (sx->completed_addr && sx->ok && sx->send_quit) been used, which we do under TLSv1.3 for the gsasl SCRAM*PLUS methods. But we were always doing it anyway. */ - tls_close(sx->cctx.tls_ctx, - sx->send_tlsclose ? TLS_SHUTDOWN_WAIT : TLS_SHUTDOWN_WONLY); - sx->send_tlsclose = FALSE; - sx->cctx.tls_ctx = NULL; - tls_out.active.sock = -1; - smtp_peer_options = smtp_peer_options_wrap; - sx->ok = !sx->smtps - && smtp_write_command(sx, SCMD_FLUSH, "EHLO %s\r\n", sx->helo_data) - >= 0 - && smtp_read_response(sx, sx->buffer, sizeof(sx->buffer), - '2', ob->command_timeout); + tls_close(sx->cctx.tls_ctx, + sx->send_tlsclose ? TLS_SHUTDOWN_WAIT : TLS_SHUTDOWN_WONLY); + sx->send_tlsclose = FALSE; + sx->cctx.tls_ctx = NULL; + tls_out.active.sock = -1; + smtp_peer_options = smtp_peer_options_wrap; + sx->ok = !sx->smtps + && smtp_write_command(sx, SCMD_FLUSH, "EHLO %s\r\n", sx->helo_data) + >= 0 + && smtp_read_response(sx, sx->buffer, sizeof(sx->buffer), + '2', ob->command_timeout); if (sx->ok && f.continue_more) goto TIDYUP; /* More addresses for another run */