Skip to content

Commit

Permalink
iscsi: Assign connections to poll groups instead of lcores.
Browse files Browse the repository at this point in the history
This patch binds poll groups to SPDK thread through IO channel and
assigns connections to poll groups instead of cores.

iSCSI subsystem registers iSCSI global object as an IO device, and create
poll groups as context of IO channels of the IO device.

Each portal get and hold portal group on which the corresponding acceptor is
running.

When a connection is constructed, iSCSI subsystem assigns a poll group
to the connection by getting it from the corresponding portal.

When a connection enters full-feature phase, iSCSI subsystem schedules
the connection to a poll group by round-robin.

Then, each connection can know its running SPDK thread directly and can
use SPDK message passing infrastructure instead of SPDK event framework.

By this change, iSCSI connections are binded to SPDK thread, and not
binded to processor core anymore.

Some other changes in this patch are
- core ID is removed from the output of get_iscsi_connections RPC. The
  upcoming patches will change the RPC to use spdk_for_each_channel and can
  access SPDK thread safely, and add SPDK thread ID instead.
- utilize UT multithread framework added by the last patch to test
  iSCSI poll groups by UT.

Change-Id: Iec73c778aa413bcabdb63141cc41d4160911ea0e
Signed-off-by: Ben Walker <[email protected]>
Signed-off-by: Shuhei Matsumoto <[email protected]>
Reviewed-on: https://review.gerrithub.io/c/spdk/spdk/+/463359
Reviewed-by: Broadcom SPDK FC-NVMe CI <[email protected]>
Reviewed-by: Jim Harris <[email protected]>
Tested-by: SPDK CI Jenkins <[email protected]>
  • Loading branch information
Ben Walker authored and jimharris committed Aug 12, 2019
1 parent 566aa11 commit fb641c4
Show file tree
Hide file tree
Showing 11 changed files with 134 additions and 88 deletions.
9 changes: 5 additions & 4 deletions app/iscsi_top/iscsi_top.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,15 @@ static void usage(void)
" (required)\n");
}

/* Group by poll group */
static bool
conns_compare(struct spdk_iscsi_conn *first, struct spdk_iscsi_conn *second)
{
if (first->lcore < second->lcore) {
if ((uintptr_t)first->pg < (uintptr_t)second->pg) {
return true;
}

if (first->lcore > second->lcore) {
if ((uintptr_t)first->pg > (uintptr_t)second->pg) {
return false;
}

Expand Down Expand Up @@ -110,8 +111,8 @@ print_connections(void)
stable_sort(v.begin(), v.end(), conns_compare);
for (iter = v.begin(); iter != v.end(); iter++) {
conn = *iter;
printf("lcore %2d conn %3d T:%-8s I:%s (%s)\n",
conn->lcore, conn->id,
printf("pg %p conn %3d T:%-8s I:%s (%s)\n",
conn->pg, conn->id,
conn->target_short_name, conn->initiator_name,
conn->initiator_addr);
}
Expand Down
91 changes: 40 additions & 51 deletions lib/iscsi/conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ static pthread_mutex_t g_conns_mutex = PTHREAD_MUTEX_INITIALIZER;

static struct spdk_poller *g_shutdown_timer = NULL;

static void iscsi_conn_full_feature_migrate(void *arg1, void *arg2);
static void iscsi_conn_full_feature_migrate(void *arg);
static void iscsi_conn_stop(struct spdk_iscsi_conn *conn);
static void iscsi_conn_sock_cb(void *arg, struct spdk_sock_group *group,
struct spdk_sock *sock);
Expand Down Expand Up @@ -296,9 +296,12 @@ spdk_iscsi_conn_construct(struct spdk_iscsi_portal *portal,
SPDK_DEBUGLOG(SPDK_LOG_ISCSI, "Launching connection on acceptor thread\n");
conn->pending_task_cnt = 0;

conn->lcore = spdk_env_get_current_core();
pg = &g_spdk_iscsi.poll_group[conn->lcore];
/* Get the acceptor poll group */
pg = portal->acceptor_pg;

assert(spdk_io_channel_get_thread(spdk_io_channel_from_ctx(pg)) == spdk_get_thread());

conn->pg = pg;
iscsi_poll_group_add_conn(pg, conn);
return 0;

Expand Down Expand Up @@ -477,15 +480,11 @@ _iscsi_conn_check_shutdown(void *arg)
static void
_iscsi_conn_destruct(struct spdk_iscsi_conn *conn)
{
struct spdk_iscsi_poll_group *pg;
int rc;

spdk_clear_all_transfer_task(conn, NULL, NULL);

assert(conn->lcore == spdk_env_get_current_core());
pg = &g_spdk_iscsi.poll_group[conn->lcore];

iscsi_poll_group_remove_conn(pg, conn);
iscsi_poll_group_remove_conn(conn->pg, conn);
spdk_sock_close(&conn->sock);
spdk_poller_unregister(&conn->logout_timer);

Expand Down Expand Up @@ -610,7 +609,8 @@ iscsi_conn_remove_lun(struct spdk_scsi_lun *lun, void *remove_ctx)
struct spdk_iscsi_pdu *pdu, *tmp_pdu;
struct spdk_iscsi_task *iscsi_task, *tmp_iscsi_task;

assert(conn->lcore == spdk_env_get_current_core());
assert(spdk_io_channel_get_thread(spdk_io_channel_from_ctx(conn->pg)) ==
spdk_get_thread());

/* If a connection is already in stating status, just return */
if (conn->state >= ISCSI_CONN_STATE_EXITING) {
Expand Down Expand Up @@ -703,7 +703,8 @@ iscsi_conn_stop(struct spdk_iscsi_conn *conn)
iscsi_conn_close_luns(conn);
}

assert(conn->lcore == spdk_env_get_current_core());
assert(spdk_io_channel_get_thread(spdk_io_channel_from_ctx(conn->pg)) ==
spdk_get_thread());
}

void
Expand Down Expand Up @@ -1393,84 +1394,72 @@ iscsi_conn_sock_cb(void *arg, struct spdk_sock_group *group, struct spdk_sock *s
}

static void
iscsi_conn_full_feature_migrate(void *arg1, void *arg2)
iscsi_conn_full_feature_migrate(void *arg)
{
struct spdk_iscsi_conn *conn = arg1;
struct spdk_iscsi_poll_group *pg;
struct spdk_iscsi_conn *conn = arg;

if (conn->sess->session_type == SESSION_TYPE_NORMAL) {
iscsi_conn_open_luns(conn);
}

/* The poller has been unregistered, so now we can re-register it on the new core. */
conn->lcore = spdk_env_get_current_core();
pg = &g_spdk_iscsi.poll_group[conn->lcore];
iscsi_poll_group_add_conn(pg, conn);
/* Add this connection to the assigned poll group. */
iscsi_poll_group_add_conn(conn->pg, conn);
}

static uint32_t g_next_core = SPDK_ENV_LCORE_ID_ANY;
static struct spdk_iscsi_poll_group *g_next_pg = NULL;

void
spdk_iscsi_conn_schedule(struct spdk_iscsi_conn *conn)
{
struct spdk_iscsi_poll_group *pg;
struct spdk_event *event;
struct spdk_iscsi_tgt_node *target;
uint32_t i;
uint32_t lcore;

lcore = SPDK_ENV_LCORE_ID_ANY;

if (conn->sess->session_type != SESSION_TYPE_NORMAL) {
/* Leave all non-normal sessions on the acceptor
* thread. */
return;
}

for (i = 0; i < spdk_env_get_core_count(); i++) {
if (g_next_core > spdk_env_get_last_core()) {
g_next_core = spdk_env_get_first_core();
}

lcore = g_next_core;
g_next_core = spdk_env_get_next_core(g_next_core);
break;
}

if (i >= spdk_env_get_core_count()) {
SPDK_ERRLOG("Unable to schedule connection on allowed CPU core. Scheduling on first core instead.\n");
lcore = spdk_env_get_first_core();
}
pthread_mutex_lock(&g_spdk_iscsi.mutex);

target = conn->sess->target;
pthread_mutex_lock(&target->mutex);
target->num_active_conns++;
if (target->num_active_conns == 1) {
/**
* This is the only active connection for this target node.
* Save the lcore in the target node so it can be used for
* any other connections to this target node.
* Pick a poll group using round-robin.
*/
target->lcore = lcore;
if (g_next_pg == NULL) {
g_next_pg = TAILQ_FIRST(&g_spdk_iscsi.poll_group_head);
assert(g_next_pg != NULL);
}

pg = g_next_pg;
g_next_pg = TAILQ_NEXT(g_next_pg, link);

/* Save the pg in the target node so it can be used for any other connections to this target node. */
target->pg = pg;
} else {
/**
* There are other active connections for this target node.
* Ignore the lcore specified by the allocator and use the
* the target node's lcore to ensure this connection runs on
* the same lcore as other connections for this target node.
*/
lcore = target->lcore;
pg = target->pg;
}

pthread_mutex_unlock(&target->mutex);
pthread_mutex_unlock(&g_spdk_iscsi.mutex);

assert(conn->lcore == spdk_env_get_current_core());
pg = &g_spdk_iscsi.poll_group[conn->lcore];
iscsi_poll_group_remove_conn(pg, conn);
assert(spdk_io_channel_get_thread(spdk_io_channel_from_ctx(conn->pg)) ==
spdk_get_thread());

/* Remove this connection from the previous poll group */
iscsi_poll_group_remove_conn(conn->pg, conn);

conn->last_nopin = spdk_get_ticks();
event = spdk_event_allocate(lcore, iscsi_conn_full_feature_migrate,
conn, NULL);
spdk_event_call(event);
conn->pg = pg;

spdk_thread_send_msg(spdk_io_channel_get_thread(spdk_io_channel_from_ctx(pg)),
iscsi_conn_full_feature_migrate, conn);
}

static int
Expand Down
2 changes: 1 addition & 1 deletion lib/iscsi/conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ struct spdk_iscsi_conn {
int pg_tag;
char *portal_host;
char *portal_port;
uint32_t lcore;
struct spdk_iscsi_poll_group *pg;
struct spdk_sock *sock;
struct spdk_iscsi_sess *sess;

Expand Down
1 change: 1 addition & 0 deletions lib/iscsi/iscsi.c
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ struct spdk_iscsi_globals g_spdk_iscsi = {
.ig_head = TAILQ_HEAD_INITIALIZER(g_spdk_iscsi.ig_head),
.target_head = TAILQ_HEAD_INITIALIZER(g_spdk_iscsi.target_head),
.auth_group_head = TAILQ_HEAD_INITIALIZER(g_spdk_iscsi.auth_group_head),
.poll_group_head = TAILQ_HEAD_INITIALIZER(g_spdk_iscsi.poll_group_head),
};

/* random value generation */
Expand Down
4 changes: 2 additions & 2 deletions lib/iscsi/iscsi.h
Original file line number Diff line number Diff line change
Expand Up @@ -280,11 +280,11 @@ struct spdk_iscsi_sess {
};

struct spdk_iscsi_poll_group {
uint32_t core;
struct spdk_poller *poller;
struct spdk_poller *nop_poller;
STAILQ_HEAD(connections, spdk_iscsi_conn) connections;
struct spdk_sock_group *sock_group;
TAILQ_ENTRY(spdk_iscsi_poll_group) link;
};

struct spdk_iscsi_opts {
Expand Down Expand Up @@ -318,6 +318,7 @@ struct spdk_iscsi_globals {
TAILQ_HEAD(, spdk_iscsi_init_grp) ig_head;
TAILQ_HEAD(, spdk_iscsi_tgt_node) target_head;
TAILQ_HEAD(, spdk_iscsi_auth_group) auth_group_head;
TAILQ_HEAD(, spdk_iscsi_poll_group) poll_group_head;

int32_t timeout;
int32_t nopininterval;
Expand All @@ -344,7 +345,6 @@ struct spdk_iscsi_globals {
struct spdk_mempool *task_pool;

struct spdk_iscsi_sess **session;
struct spdk_iscsi_poll_group *poll_group;
};

#define ISCSI_SECURITY_NEGOTIATION_PHASE 0
Expand Down
2 changes: 0 additions & 2 deletions lib/iscsi/iscsi_rpc.c
Original file line number Diff line number Diff line change
Expand Up @@ -921,8 +921,6 @@ spdk_rpc_get_iscsi_connections(struct spdk_jsonrpc_request *request,
}
spdk_json_write_named_int32(w, "tsih", tsih);

spdk_json_write_named_int32(w, "lcore_id", c->lcore);

spdk_json_write_named_string(w, "initiator_addr", c->initiator_addr);

spdk_json_write_named_string(w, "target_addr", c->target_addr);
Expand Down
73 changes: 46 additions & 27 deletions lib/iscsi/iscsi_subsystem.c
Original file line number Diff line number Diff line change
Expand Up @@ -1193,14 +1193,10 @@ iscsi_poll_group_handle_nop(void *ctx)
return -1;
}

static void
iscsi_poll_group_create(void *ctx)
static int
iscsi_poll_group_create(void *io_device, void *ctx_buf)
{
struct spdk_iscsi_poll_group *pg;

assert(g_spdk_iscsi.poll_group != NULL);
pg = &g_spdk_iscsi.poll_group[spdk_env_get_current_core()];
pg->core = spdk_env_get_current_core();
struct spdk_iscsi_poll_group *pg = ctx_buf;

STAILQ_INIT(&pg->connections);
pg->sock_group = spdk_sock_group_create(NULL);
Expand All @@ -1209,15 +1205,15 @@ iscsi_poll_group_create(void *ctx)
pg->poller = spdk_poller_register(iscsi_poll_group_poll, pg, 0);
/* set the period to 1 sec */
pg->nop_poller = spdk_poller_register(iscsi_poll_group_handle_nop, pg, 1000000);

return 0;
}

static void
iscsi_poll_group_destroy(void *ctx)
iscsi_poll_group_destroy(void *io_device, void *ctx_buf)
{
struct spdk_iscsi_poll_group *pg;
struct spdk_iscsi_poll_group *pg = ctx_buf;

assert(g_spdk_iscsi.poll_group != NULL);
pg = &g_spdk_iscsi.poll_group[spdk_env_get_current_core()];
assert(pg->poller != NULL);
assert(pg->sock_group != NULL);

Expand All @@ -1227,19 +1223,27 @@ iscsi_poll_group_destroy(void *ctx)
}

static void
initialize_iscsi_poll_group(spdk_msg_fn cpl)
_iscsi_init_thread(void *ctx)
{
size_t g_num_poll_groups = spdk_env_get_last_core() + 1;
struct spdk_io_channel *ch;
struct spdk_iscsi_poll_group *pg;

g_spdk_iscsi.poll_group = calloc(g_num_poll_groups, sizeof(struct spdk_iscsi_poll_group));
if (!g_spdk_iscsi.poll_group) {
SPDK_ERRLOG("Failed to allocated iscsi poll group\n");
iscsi_init_complete(-1);
return;
}
ch = spdk_get_io_channel(&g_spdk_iscsi);
pg = spdk_io_channel_get_ctx(ch);

pthread_mutex_lock(&g_spdk_iscsi.mutex);
TAILQ_INSERT_TAIL(&g_spdk_iscsi.poll_group_head, pg, link);
pthread_mutex_unlock(&g_spdk_iscsi.mutex);
}

static void
initialize_iscsi_poll_group(spdk_msg_fn cpl)
{
spdk_io_device_register(&g_spdk_iscsi, iscsi_poll_group_create, iscsi_poll_group_destroy,
sizeof(struct spdk_iscsi_poll_group), "iscsi_tgt");

/* Send a message to each thread and create a poll group */
spdk_for_each_thread(iscsi_poll_group_create, NULL, cpl);
spdk_for_each_thread(_iscsi_init_thread, NULL, cpl);
}

static void
Expand Down Expand Up @@ -1359,31 +1363,46 @@ spdk_iscsi_fini(spdk_iscsi_fini_cb cb_fn, void *cb_arg)
}

static void
iscsi_fini_done(void *arg)
iscsi_fini_done(struct spdk_io_channel_iter *i, int status)
{
iscsi_check_pools();
iscsi_free_pools();

assert(TAILQ_EMPTY(&g_spdk_iscsi.poll_group_head));

spdk_iscsi_shutdown_tgt_nodes();
spdk_iscsi_init_grps_destroy();
spdk_iscsi_portal_grps_destroy();
iscsi_auth_groups_destroy();
free(g_spdk_iscsi.authfile);
free(g_spdk_iscsi.nodebase);
free(g_spdk_iscsi.poll_group);

pthread_mutex_destroy(&g_spdk_iscsi.mutex);
g_fini_cb_fn(g_fini_cb_arg);
}

static void
_iscsi_fini_thread(struct spdk_io_channel_iter *i)
{
struct spdk_io_channel *ch;
struct spdk_iscsi_poll_group *pg;

ch = spdk_io_channel_iter_get_channel(i);
pg = spdk_io_channel_get_ctx(ch);

pthread_mutex_lock(&g_spdk_iscsi.mutex);
TAILQ_REMOVE(&g_spdk_iscsi.poll_group_head, pg, link);
pthread_mutex_unlock(&g_spdk_iscsi.mutex);

spdk_put_io_channel(ch);

spdk_for_each_channel_continue(i, 0);
}

void
spdk_shutdown_iscsi_conns_done(void)
{
if (g_spdk_iscsi.poll_group) {
spdk_for_each_thread(iscsi_poll_group_destroy, NULL, iscsi_fini_done);
} else {
iscsi_fini_done(NULL);
}
spdk_for_each_channel(&g_spdk_iscsi, _iscsi_fini_thread, NULL, iscsi_fini_done);
}

void
Expand Down
Loading

0 comments on commit fb641c4

Please sign in to comment.