Skip to content

Commit

Permalink
SERVER-15424 Age out old connections used by replication for communic…
Browse files Browse the repository at this point in the history
…ating with other members.

By capping the lifetime of connections used for heartbeats at 30 seconds, we
ensure that DNS and firewall changes that only affect new connections eventually
impact nodes' belief about the reachability of other nodes in the replica set.
  • Loading branch information
amschwerin committed Dec 8, 2014
1 parent a9c88df commit 2d79068
Showing 1 changed file with 91 additions and 57 deletions.
148 changes: 91 additions & 57 deletions src/mongo/db/repl/network_interface_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,14 @@
#include "mongo/util/assert_util.h"
#include "mongo/util/log.h"
#include "mongo/util/net/hostandport.h"
#include "mongo/util/timer.h"
#include "mongo/util/time_support.h"

namespace mongo {
namespace repl {

namespace {
const size_t kNumThreads = 8;
Seconds kDefaultMaxIdleConnectionAge(60);
Seconds kDefaultMaxConnectionAge(30);

} // namespace

Expand All @@ -73,11 +73,11 @@ namespace {
* Options for configuring the pool.
*/
struct Options {
Options() : maxIdleConnectionAge(kDefaultMaxIdleConnectionAge) {}
Options() : maxConnectionAge(kDefaultMaxConnectionAge) {
}

// Maximum age of an idle connection, as measured since last operation completion,
// before it is reaped.
Seconds maxIdleConnectionAge;
// Maximum age of a connection, before it is reaped.
Seconds maxConnectionAge;
};

typedef stdx::list<ConnectionInfo> ConnectionList;
Expand All @@ -101,8 +101,9 @@ namespace {
*/
ConnectionPtr(ConnectionPool* pool,
const HostAndPort& target,
Date_t now,
Seconds timeout) :
_pool(pool), _connInfo(pool->acquireConnection(target, timeout)) {}
_pool(pool), _connInfo(pool->acquireConnection(target, now, timeout)) {}

/**
* Destructor reaps the connection if it wasn't already returned to the pool by calling
Expand All @@ -113,7 +114,7 @@ namespace {
/**
* Releases the connection back to the pool from which it was drawn.
*/
void done() { _pool->releaseConnection(_connInfo); _pool = NULL; }
void done(Date_t now) { _pool->releaseConnection(_connInfo, now); _pool = NULL; }

DBClientConnection& operator*();
DBClientConnection* operator->();
Expand All @@ -134,14 +135,15 @@ namespace {
* Acquires a connection to "target" with the given "timeout", or throws a DBException.
* Intended for use by ConnectionPtr.
*/
ConnectionList::iterator acquireConnection(const HostAndPort& target, Seconds timeout);
ConnectionList::iterator acquireConnection(
const HostAndPort& target, Date_t now, Seconds timeout);

/**
* Releases a connection back into the pool.
* Intended for use by ConnectionPtr.
* Call this for connections that can safely be reused.
*/
void releaseConnection(ConnectionList::iterator);
void releaseConnection(ConnectionList::iterator iter, Date_t now);

/**
* Destroys a connection previously acquired from the pool.
Expand All @@ -157,28 +159,40 @@ namespace {
void closeAllInUseConnections();

/**
* Reap all connections in the pool that have been there continuously since before
* "date".
* Reaps all connections in the pool that are too old as of "now".
*/
void cleanUpOlderThan(Date_t date);
void cleanUpOlderThan(Date_t now);

private:
/**
* Returns true if the given connection is young enough to keep in the pool.
*/
bool shouldKeepConnection(Date_t now, const ConnectionInfo& connInfo) const;

/**
* Implementation of cleanUpOlderThan which assumes that _mutex is already held.
*/
void cleanUpOlderThan_inlock(Date_t date);
void cleanUpOlderThan_inlock(Date_t now);

/**
* Reaps connections in "hostConns" that are too old or have been in the pool too long as of
* "now". Expects _mutex to be held.
*/
void cleanUpOlderThan_inlock(Date_t now, ConnectionList* hostConns);

/**
* Reaps connections in "hostConns" that were already in the pool as of "date". Expects
* _mutex to be held.
* Destroys the connection associated with "iter" and removes "iter" fron connList.
*/
void cleanUpOlderThan_inlock(Date_t date, ConnectionList* hostConns);
static void destroyConnection_inlock(ConnectionList* connList,
ConnectionList::iterator iter);

// Mutex guarding members of the connection pool
boost::mutex _mutex;

// Map from HostAndPort to free connections.
// Map from HostAndPort to idle connections.
HostConnectionMap _connections;

// List of non-idle connections.
ConnectionList _inUseConnections;

// Options with which this pool was configured.
Expand All @@ -189,21 +203,16 @@ namespace {
* Information about a connection in the pool.
*/
struct NetworkInterfaceImpl::ConnectionPool::ConnectionInfo {
ConnectionInfo() : conn(NULL), lastEnteredPoolDate(0ULL) {}
ConnectionInfo() : conn(NULL), creationDate(0ULL) {}
ConnectionInfo(DBClientConnection* theConn, Date_t date) :
conn(theConn),
lastEnteredPoolDate(date) {}

/**
* Returns true if the connection entered the pool "date".
*/
bool isNotNewerThan(Date_t date) { return lastEnteredPoolDate <= date; }
creationDate(date) {}

// A connection in the pool.
DBClientConnection* conn;

// The date at which the connection "conn" was last put into the pool.
Date_t lastEnteredPoolDate;
// The date at which the connection was created.
Date_t creationDate;
};

DBClientConnection& NetworkInterfaceImpl::ConnectionPool::ConnectionPtr::operator*() {
Expand All @@ -223,15 +232,15 @@ namespace {
invariant(_inUseConnections.empty());
}

void NetworkInterfaceImpl::ConnectionPool::cleanUpOlderThan(Date_t date) {
void NetworkInterfaceImpl::ConnectionPool::cleanUpOlderThan(Date_t now) {
boost::lock_guard<boost::mutex> lk(_mutex);
cleanUpOlderThan_inlock(date);
cleanUpOlderThan_inlock(now);
}

void NetworkInterfaceImpl::ConnectionPool::cleanUpOlderThan_inlock(Date_t date) {
void NetworkInterfaceImpl::ConnectionPool::cleanUpOlderThan_inlock(Date_t now) {
HostConnectionMap::iterator hostConns = _connections.begin();
while (hostConns != _connections.end()) {
cleanUpOlderThan_inlock(date, &hostConns->second);
cleanUpOlderThan_inlock(now, &hostConns->second);
if (hostConns->second.empty()) {
_connections.erase(hostConns++);
}
Expand All @@ -242,21 +251,31 @@ namespace {
}

void NetworkInterfaceImpl::ConnectionPool::cleanUpOlderThan_inlock(
Date_t date,
Date_t now,
ConnectionList* hostConns) {
ConnectionList::iterator iter = hostConns->begin();
while (iter != hostConns->end()) {
if (iter->isNotNewerThan(date)) {
ConnectionList::iterator toDelete = iter++;
delete toDelete->conn;
hostConns->erase(toDelete);
if (shouldKeepConnection(now, *iter)) {
++iter;
}
else {
++iter;
destroyConnection_inlock(hostConns, iter++);
}
}
}

bool NetworkInterfaceImpl::ConnectionPool::shouldKeepConnection(
const Date_t now,
const ConnectionInfo& connInfo) const {

const Date_t expirationDate =
connInfo.creationDate + _options.maxConnectionAge.total_milliseconds();
if (expirationDate <= now) {
return false;
}
return true;
}

void NetworkInterfaceImpl::ConnectionPool::closeAllInUseConnections() {
boost::lock_guard<boost::mutex> lk(_mutex);
for (ConnectionList::iterator iter = _inUseConnections.begin();
Expand All @@ -270,12 +289,16 @@ namespace {
NetworkInterfaceImpl::ConnectionPool::ConnectionList::iterator
NetworkInterfaceImpl::ConnectionPool::acquireConnection(
const HostAndPort& target,
Date_t now,
Seconds timeout) {
boost::unique_lock<boost::mutex> lk(_mutex);
for (HostConnectionMap::iterator hostConns;
((hostConns = _connections.find(target)) != _connections.end()) &&
!hostConns->second.empty();) {
((hostConns = _connections.find(target)) != _connections.end());) {

cleanUpOlderThan_inlock(now, &hostConns->second);
if (hostConns->second.empty()) {
break;
}
_inUseConnections.splice(_inUseConnections.begin(),
hostConns->second,
hostConns->second.begin());
Expand All @@ -289,14 +312,14 @@ namespace {
}
catch (...) {
lk.lock();
_inUseConnections.erase(candidate);
destroyConnection_inlock(&_inUseConnections, candidate);
throw;
}
lk.lock();
_inUseConnections.erase(candidate);
destroyConnection_inlock(&_inUseConnections, candidate);
}

// No free connection in the pool; make a new one.
// No idle connection in the pool; make a new one.
lk.unlock();
std::auto_ptr<DBClientConnection> conn(new DBClientConnection);
conn->setSoTimeout(timeout.total_seconds());
Expand All @@ -313,26 +336,31 @@ namespace {
conn->auth(getInternalUserAuthParamsWithFallback());
}
lk.lock();
return _inUseConnections.insert(
_inUseConnections.begin(),
ConnectionInfo(conn.release(), Date_t(0)));
return _inUseConnections.insert(_inUseConnections.begin(),
ConnectionInfo(conn.release(), now));
}

void NetworkInterfaceImpl::ConnectionPool::releaseConnection(ConnectionList::iterator iter) {
const Date_t now(curTimeMillis64());
void NetworkInterfaceImpl::ConnectionPool::releaseConnection(ConnectionList::iterator iter,
const Date_t now) {
boost::lock_guard<boost::mutex> lk(_mutex);
if (!shouldKeepConnection(now, *iter)) {
destroyConnection_inlock(&_inUseConnections, iter);
return;
}
ConnectionList& hostConns = _connections[iter->conn->getServerHostAndPort()];
cleanUpOlderThan_inlock(
now - _options.maxIdleConnectionAge.total_seconds(),
&hostConns);
iter->lastEnteredPoolDate = now;
cleanUpOlderThan_inlock(now, &hostConns);
hostConns.splice(hostConns.begin(), _inUseConnections, iter);
}

void NetworkInterfaceImpl::ConnectionPool::destroyConnection(ConnectionList::iterator iter) {
boost::lock_guard<boost::mutex> lk(_mutex);
destroyConnection_inlock(&_inUseConnections, iter);
}

void NetworkInterfaceImpl::ConnectionPool::destroyConnection_inlock(
ConnectionList* connList, ConnectionList::iterator iter) {
delete iter->conn;
_inUseConnections.erase(iter);
connList->erase(iter);
}

NetworkInterfaceImpl::NetworkInterfaceImpl() :
Expand Down Expand Up @@ -507,16 +535,22 @@ namespace {
try {
BSONObj output;

StatusWith<int> timeoutStatus = getTimeoutMillis(request.expirationDate, now());
const Date_t requestStartDate = now();
StatusWith<int> timeoutStatus = getTimeoutMillis(request.expirationDate,
requestStartDate);
if (!timeoutStatus.isOK())
return ResponseStatus(timeoutStatus.getStatus());

Seconds timeout(timeoutStatus.getValue());
Timer timer;
ConnectionPool::ConnectionPtr conn(_connPool.get(), request.target, timeout);
ConnectionPool::ConnectionPtr conn(_connPool.get(),
request.target,
requestStartDate,
timeout);
conn->runCommand(request.dbname, request.cmdObj, output);
conn.done();
return ResponseStatus(Response(output, Milliseconds(timer.millis())));
const Date_t requestFinishDate = now();
conn.done(requestFinishDate);
return ResponseStatus(Response(output,
Milliseconds(requestFinishDate - requestStartDate)));
}
catch (const DBException& ex) {
return ResponseStatus(ex.toStatus());
Expand Down

0 comments on commit 2d79068

Please sign in to comment.