X-Git-Url: https://git.exim.org/exim.git/blobdiff_plain/7eb6c37c5084760c1d1469bd4be652b479a8df55..a39bd74d3e942db1b465cae643e1a4766ffb304e:/src/src/transport.c diff --git a/src/src/transport.c b/src/src/transport.c index fea914646..7b3d46908 100644 --- a/src/src/transport.c +++ b/src/src/transport.c @@ -1625,13 +1625,24 @@ Arguments: as set by the caller transport new_message_id set to the message id of a waiting message more set TRUE if there are yet more messages waiting + oicf_func function to call to validate if it is ok to send + to this message_id from the current instance. + oicf_data opaque data for oicf_func Returns: TRUE if new_message_id set; FALSE otherwise */ +typedef struct msgq_s msgq_t; + +typedef struct msgq_s +{ + uschar message_id [MESSAGE_ID_LENGTH + 1]; + BOOL bKeep; +} msgq_t; + BOOL transport_check_waiting(const uschar *transport_name, const uschar *hostname, - int local_message_max, uschar *new_message_id, BOOL *more) + int local_message_max, uschar *new_message_id, BOOL *more, oicf oicf_func, void *oicf_data) { dbdata_wait *host_record; int host_length, path_len; @@ -1639,6 +1650,16 @@ open_db dbblock; open_db *dbm_file; uschar buffer[256]; +msgq_t *msgq = NULL; +int msgq_count = 0; +int msgq_actual = 0; +int i; +BOOL bFound = FALSE; +uschar spool_dir [PATH_MAX]; +uschar spool_file [PATH_MAX]; +struct stat statbuf; +BOOL bContinuation = FALSE; + *more = FALSE; DEBUG(D_transport) @@ -1691,58 +1712,107 @@ until one is found for which a spool file actually exists. If the record gets emptied, delete it and continue with any continuation records that may exist. */ -host_length = host_record->count * MESSAGE_ID_LENGTH; +/* For Bug 1141, I refactored this major portion of the routine, it is risky +but the 1 off will remain without it. This code now allows me to SKIP over +a message I do not want to send out on this run. */ -/* Loop to handle continuation host records in the database */ +sprintf(CS spool_dir, "%s/input/", spool_directory); -for (;;) +host_length = host_record->count * MESSAGE_ID_LENGTH; + +while (1) { - BOOL found = FALSE; + /* create an array to read entire message queue into memory for processing */ - sprintf(CS buffer, "%s/input/", spool_directory); - path_len = Ustrlen(buffer); + msgq = (msgq_t*) malloc(sizeof(msgq_t) * host_record->count); + msgq_count = host_record->count; + msgq_actual = msgq_count; - for (host_length -= MESSAGE_ID_LENGTH; host_length >= 0; - host_length -= MESSAGE_ID_LENGTH) + for (i = 0; i < host_record->count; ++i) { - struct stat statbuf; - Ustrncpy(new_message_id, host_record->text + host_length, + msgq[i].bKeep = TRUE; + + Ustrncpy(msgq[i].message_id, host_record->text + (i * MESSAGE_ID_LENGTH), MESSAGE_ID_LENGTH); - new_message_id[MESSAGE_ID_LENGTH] = 0; + msgq[i].message_id[MESSAGE_ID_LENGTH] = 0; + } + + /* first thing remove current message id if it exists */ + for (i = 0; i < msgq_count; ++i) + if (Ustrcmp(msgq[i].message_id, message_id) == 0) + { + msgq[i].bKeep = FALSE; + break; + } + + /* now find the next acceptable message_id */ + + bFound = FALSE; + + for (i = msgq_count - 1; i >= 0; --i) if (msgq[i].bKeep) + { if (split_spool_directory) - sprintf(CS(buffer + path_len), "%c/%s-D", new_message_id[5], new_message_id); + sprintf(CS spool_file, "%s%c/%s-D", + spool_dir, new_message_id[5], msgq[i].message_id); else - sprintf(CS(buffer + path_len), "%s-D", new_message_id); + sprintf(CS spool_file, "%s%s-D", spool_dir, msgq[i].message_id); - /* The listed message may be the one we are currently processing. If - so, we want to remove it from the list without doing anything else. - If not, do a stat to see if it is an existing message. If it is, break - the loop to handle it. No need to bother about locks; as this is all - "hint" processing, it won't matter if it doesn't exist by the time exim - actually tries to deliver it. */ - - if (Ustrcmp(new_message_id, message_id) != 0 && - Ustat(buffer, &statbuf) == 0) + if (Ustat(spool_file, &statbuf) != 0) + msgq[i].bKeep = FALSE; + else if (!oicf_func || oicf_func(msgq[i].message_id, oicf_data)) { - found = TRUE; + Ustrcpy(new_message_id, msgq[i].message_id); + msgq[i].bKeep = FALSE; + bFound = TRUE; break; } } - /* If we have removed all the message ids from the record delete the record. - If there is a continuation record, fetch it and remove it from the file, - as it will be rewritten as the main record. Repeat in the case of an - empty continuation. */ + /* re-count */ + for (msgq_actual = 0, i = 0; i < msgq_count; ++i) + if (msgq[i].bKeep) + msgq_actual++; + + /* reassemble the host record, based on removed message ids, from in + * memory queue. + */ + + if (msgq_actual <= 0) + { + host_length = 0; + host_record->count = 0; + } + else + { + host_length = msgq_actual * MESSAGE_ID_LENGTH; + host_record->count = msgq_actual; + + if (msgq_actual < msgq_count) + { + int new_count; + for (new_count = 0, i = 0; i < msgq_count; ++i) + if (msgq[i].bKeep) + Ustrncpy(&host_record->text[new_count++ * MESSAGE_ID_LENGTH], + msgq[i].message_id, MESSAGE_ID_LENGTH); + + host_record->text[new_count * MESSAGE_ID_LENGTH] = 0; + } + } + +/* Jeremy: check for a continuation record, this code I do not know how to +test but the code should work */ + + bContinuation = FALSE; while (host_length <= 0) { int i; - dbdata_wait *newr = NULL; + dbdata_wait * newr = NULL; /* Search for a continuation */ - for (i = host_record->sequence - 1; i >= 0 && newr == NULL; i--) + for (i = host_record->sequence - 1; i >= 0 && !newr; i--) { sprintf(CS buffer, "%.200s:%d", hostname, i); newr = dbfn_read(dbm_file, buffer); @@ -1750,7 +1820,7 @@ for (;;) /* If no continuation, delete the current and break the loop */ - if (newr == NULL) + if (!newr) { dbfn_delete(dbm_file, hostname); break; @@ -1761,11 +1831,12 @@ for (;;) dbfn_delete(dbm_file, buffer); host_record = newr; host_length = host_record->count * MESSAGE_ID_LENGTH; - } - /* If we found an existing message, break the continuation loop. */ + bContinuation = TRUE; + } - if (found) break; + if (bFound) + break; /* If host_length <= 0 we have emptied a record and not found a good message, and there are no continuation records. Otherwise there is a continuation @@ -1777,6 +1848,26 @@ for (;;) DEBUG(D_transport) debug_printf("waiting messages already delivered\n"); return FALSE; } + + /* we were not able to find an acceptable message, nor was there a + * continuation record. So bug out, outer logic will clean this up. + */ + + if (!bContinuation) + { + Ustrcpy (new_message_id, message_id); + dbfn_close(dbm_file); + return FALSE; + } + } /* we need to process a continuation record */ + +/* clean up in memory queue */ +if (msgq) + { + free (msgq); + msgq = NULL; + msgq_count = 0; + msgq_actual = 0; } /* Control gets here when an existing message has been encountered; its @@ -1786,7 +1877,19 @@ record if required, close the database, and return TRUE. */ if (host_length > 0) { + uschar msg [MESSAGE_ID_LENGTH + 1]; + int i; + host_record->count = host_length/MESSAGE_ID_LENGTH; + + /* rebuild the host_record->text */ + + for (i = 0; i < host_record->count; ++i) + { + Ustrncpy(msg, host_record->text + (i*MESSAGE_ID_LENGTH), MESSAGE_ID_LENGTH); + msg[MESSAGE_ID_LENGTH] = 0; + } + dbfn_write(dbm_file, hostname, host_record, (int)sizeof(dbdata_wait) + host_length); *more = TRUE; } @@ -1795,8 +1898,6 @@ dbfn_close(dbm_file); return TRUE; } - - /************************************************* * Deliver waiting message down same socket * *************************************************/