diff --git a/00-RELEASENOTES b/00-RELEASENOTES index 13ab2f635..5c649ac05 100644 --- a/00-RELEASENOTES +++ b/00-RELEASENOTES @@ -11,6 +11,104 @@ CRITICAL: There is a critical bug affecting MOST USERS. Upgrade ASAP. SECURITY: There are security fixes in the release. -------------------------------------------------------------------------------- +================================================================================ +Redis 5.0.8 Released Thu Mar 12 16:05:41 CET 2020 +================================================================================ + +Upgrade urgency HIGH: This release fixes security issues. + +This is a list of fixes in this release: + +Salvatore Sanfilippo in commit 2bea502d: + Merge pull request #6975 from dustinmm80/add-arm-latomic-linking +Dustin Collins in commit b5931405: + Fix Pi building needing -latomic, backport + 1 file changed, 9 insertions(+) + +srzhao in commit fd441300: + fix impl of aof-child whitelist SIGUSR1 feature. + 1 file changed, 5 insertions(+), 4 deletions(-) + +Ariel in commit 77ff332b: + fix ThreadSafeContext lock/unlock function names + 1 file changed, 2 insertions(+), 2 deletions(-) + +Guy Benoish in commit 4f0f799c: + XREADGROUP should propagate XCALIM/SETID in MULTI/EXEC + 1 file changed, 2 insertions(+), 2 deletions(-) + +Oran Agra in commit 0c1273c3: + Fix client flags to be int64 in module.c + 1 file changed, 3 insertions(+), 3 deletions(-) + +Guy Benoish in commit 708a4e8a: + Fix small bugs related to replica and monitor ambiguity + 2 files changed, 8 insertions(+), 6 deletions(-) + +WuYunlong in commit eac4115d: + Fix lua related memory leak. + 1 file changed, 1 insertion(+) + +antirez in commit d075df17: + Simplify #6379 changes. + 2 files changed, 4 insertions(+), 9 deletions(-) + +WuYunlong in commit 80a49c37: + Free allocated sds in pfdebugCommand() to avoid memory leak. + 1 file changed, 1 insertion(+) + +antirez in commit 60870d3a: + Jump to right label on AOF parsing error. + 1 file changed, 6 insertions(+), 4 deletions(-) + +antirez in commit d90f599b: + Free fakeclient argv on AOF error. + 1 file changed, 11 insertions(+), 3 deletions(-) + +WuYunlong in commit 8ee3bddf: + Fix potential memory leak of rioWriteBulkStreamID(). + 1 file changed, 4 insertions(+), 1 deletion(-) + +WuYunlong in commit 4780fe78: + Fix potential memory leak of clusterLoadConfig(). + 1 file changed, 20 insertions(+), 5 deletions(-) + +Leo Murillo in commit f3b77510: + Fix bug on KEYS command where pattern starts with * followed by \x00 (null char). + 1 file changed, 1 insertion(+), 1 deletion(-) + +Guy Benoish in commit 7f3fcedb: + Blocking XREAD[GROUP] should always reply with valid data (or timeout) + 3 files changed, 44 insertions(+), 10 deletions(-) + +antirez in commit f93b2fa5: + XCLAIM: Create the consumer only on successful claims. + 1 file changed, 4 insertions(+), 2 deletions(-) + +Guy Benoish in commit 89682d96: + Stream: Handle streamID-related edge cases + 4 files changed, 54 insertions(+), 4 deletions(-) + +antirez in commit 920e108f: + Fix ip and missing mode in RM_GetClusterNodeInfo(). + 1 file changed, 5 insertions(+), 2 deletions(-) + +antirez in commit 7569b210: + Inline protocol: handle empty strings well. + 1 file changed, 2 insertions(+), 6 deletions(-) + +Khem Raj in commit 3c610b4e: + Mark extern definition of SDS_NOINIT in sds.h + 1 file changed, 1 insertion(+), 1 deletion(-) + +Seunghoon Woo in commit 16b2d07f: + [FIX] revisit CVE-2015-8080 vulnerability + 1 file changed, 6 insertions(+), 4 deletions(-) + +yz1509 in commit 19f33585: + avoid sentinel changes promoted_slave to be its own replica. + 1 file changed, 1 insertion(+), 1 deletion(-) + ================================================================================ Redis 5.0.7 Released Tue Nov 19 17:52:44 CET 2019 ================================================================================ diff --git a/deps/Makefile b/deps/Makefile index 1c10bce9e..eb35c1e1f 100644 --- a/deps/Makefile +++ b/deps/Makefile @@ -36,7 +36,6 @@ distclean: -(cd hiredis && $(MAKE) clean) > /dev/null || true -(cd linenoise && $(MAKE) clean) > /dev/null || true -(cd lua && $(MAKE) clean) > /dev/null || true - -(cd geohash-int && $(MAKE) clean) > /dev/null || true -(cd jemalloc && [ -f Makefile ] && $(MAKE) distclean) > /dev/null || true -(rm -f .make-*) @@ -78,13 +77,7 @@ JEMALLOC_LDFLAGS= $(LDFLAGS) jemalloc: .make-prerequisites @printf '%b %b\n' $(MAKECOLOR)MAKE$(ENDCOLOR) $(BINCOLOR)$@$(ENDCOLOR) - cd jemalloc && ./configure --with-lg-quantum=3 --with-jemalloc-prefix=je_ --enable-cc-silence CFLAGS="$(JEMALLOC_CFLAGS)" LDFLAGS="$(JEMALLOC_LDFLAGS)" + cd jemalloc && ./configure --with-version=5.1.0-0-g0 --with-lg-quantum=3 --with-jemalloc-prefix=je_ --enable-cc-silence CFLAGS="$(JEMALLOC_CFLAGS)" LDFLAGS="$(JEMALLOC_LDFLAGS)" cd jemalloc && $(MAKE) CFLAGS="$(JEMALLOC_CFLAGS)" LDFLAGS="$(JEMALLOC_LDFLAGS)" lib/libjemalloc.a .PHONY: jemalloc - -geohash-int: .make-prerequisites - @printf '%b %b\n' $(MAKECOLOR)MAKE$(ENDCOLOR) $(BINCOLOR)$@$(ENDCOLOR) - cd geohash-int && $(MAKE) - -.PHONY: geohash-int diff --git a/deps/README.md b/deps/README.md index 0ce480046..367ee1627 100644 --- a/deps/README.md +++ b/deps/README.md @@ -13,12 +13,34 @@ How to upgrade the above dependencies Jemalloc --- -Jemalloc is unmodified. We only change settings via the `configure` script of Jemalloc using the `--with-lg-quantum` option, setting it to the value of 3 instead of 4. This provides us with more size classes that better suit the Redis data structures, in order to gain memory efficiency. - -So in order to upgrade jemalloc: +Jemalloc is modified with changes that allow us to implement the Redis +active defragmentation logic. However this feature of Redis is not mandatory +and Redis is able to understand if the Jemalloc version it is compiled +against supports such Redis-specific modifications. So in theory, if you +are not interested in the active defragmentation, you can replace Jemalloc +just following tose steps: 1. Remove the jemalloc directory. 2. Substitute it with the new jemalloc source tree. +3. Edit the Makefile localted in the same directory as the README you are + reading, and change the --with-version in the Jemalloc configure script + options with the version you are using. This is required because otherwise + Jemalloc configuration script is broken and will not work nested in another + git repository. + +However note that we change Jemalloc settings via the `configure` script of Jemalloc using the `--with-lg-quantum` option, setting it to the value of 3 instead of 4. This provides us with more size classes that better suit the Redis data structures, in order to gain memory efficiency. + +If you want to upgrade Jemalloc while also providing support for +active defragmentation, in addition to the above steps you need to perform +the following additional steps: + +5. In Jemalloc three, file `include/jemalloc/jemalloc_macros.h.in`, make sure + to add `#define JEMALLOC_FRAG_HINT`. +6. Implement the function `je_get_defrag_hint()` inside `src/jemalloc.c`. You + can see how it is implemented in the current Jemalloc source tree shipped + with Redis, and rewrite it according to the new Jemalloc internals, if they + changed, otherwise you could just copy the old implementation if you are + upgrading just to a similar version of Jemalloc. Geohash --- @@ -28,7 +50,7 @@ This is never upgraded since it's part of the Redis project. If there are change Hiredis --- -Hiredis uses the SDS string library, that must be the same version used inside Redis itself. Hiredis is also very critical for Sentinel. Historically Redis often used forked versions of hiredis in a way or the other. In order to upgrade it is adviced to take a lot of care: +Hiredis uses the SDS string library, that must be the same version used inside Redis itself. Hiredis is also very critical for Sentinel. Historically Redis often used forked versions of hiredis in a way or the other. In order to upgrade it is advised to take a lot of care: 1. Check with diff if hiredis API changed and what impact it could have in Redis. 2. Make sure thet the SDS library inside Hiredis and inside Redis are compatible. @@ -61,6 +83,6 @@ and our version: 1. Makefile is modified to allow a different compiler than GCC. 2. We have the implementation source code, and directly link to the following external libraries: `lua_cjson.o`, `lua_struct.o`, `lua_cmsgpack.o` and `lua_bit.o`. -3. There is a security fix in `ldo.c`, line 498: The check for `LUA_SIGNATURE[0]` is removed in order toa void direct bytecode exectuion. +3. There is a security fix in `ldo.c`, line 498: The check for `LUA_SIGNATURE[0]` is removed in order toa void direct bytecode execution. diff --git a/deps/lua/src/lua_cmsgpack.c b/deps/lua/src/lua_cmsgpack.c index b5cc191f2..abeab6dc2 100644 --- a/deps/lua/src/lua_cmsgpack.c +++ b/deps/lua/src/lua_cmsgpack.c @@ -190,9 +190,9 @@ void mp_encode_bytes(lua_State *L, mp_buf *buf, const unsigned char *s, size_t l } else { hdr[0] = 0xdb; hdr[1] = (unsigned char)((len&0xff000000)>>24); - hdr[2] = (unsigned char)((len & 0xff0000) >> 16); - hdr[3] = (unsigned char)((len & 0xff00) >> 8); - hdr[4] = (unsigned char)(len & 0xff); + hdr[2] = (unsigned char)((len&0xff0000)>>16); + hdr[3] = (unsigned char)((len&0xff00)>>8); + hdr[4] = (unsigned char)(len&0xff); hdrlen = 5; } mp_buf_append(L,buf,hdr,hdrlen); @@ -385,6 +385,7 @@ void mp_encode_lua_table_as_array(lua_State *L, mp_buf *buf, int level) { #endif mp_encode_array(L,buf,len); + luaL_checkstack(L, 1, "in function mp_encode_lua_table_as_array"); for (j = 1; j <= len; j++) { lua_pushnumber(L,j); lua_gettable(L,-2); @@ -400,6 +401,7 @@ void mp_encode_lua_table_as_map(lua_State *L, mp_buf *buf, int level) { * Lua API, we need to iterate a first time. Note that an alternative * would be to do a single run, and then hack the buffer to insert the * map opcodes for message pack. Too hackish for this lib. */ + luaL_checkstack(L, 3, "in function mp_encode_lua_table_as_map"); lua_pushnil(L); while(lua_next(L,-2)) { lua_pop(L,1); /* remove value, keep key for next iteration. */ @@ -515,10 +517,14 @@ int mp_pack(lua_State *L) { if (nargs == 0) return luaL_argerror(L, 0, "MessagePack pack needs input."); + if (!lua_checkstack(L, nargs)) + return luaL_argerror(L, 0, "Too many arguments for MessagePack pack."); + buf = mp_buf_new(L); for(i = 1; i <= nargs; i++) { /* Copy argument i to top of stack for _encode processing; * the encode function pops it from the stack when complete. */ + luaL_checkstack(L, 1, "in function mp_check"); lua_pushvalue(L, i); mp_encode_lua_type(L,buf,0); @@ -547,6 +553,7 @@ void mp_decode_to_lua_array(lua_State *L, mp_cur *c, size_t len) { int index = 1; lua_newtable(L); + luaL_checkstack(L, 1, "in function mp_decode_to_lua_array"); while(len--) { lua_pushnumber(L,index++); mp_decode_to_lua_type(L,c); @@ -821,6 +828,9 @@ int mp_unpack_full(lua_State *L, int limit, int offset) { * subtract the entire buffer size from the unprocessed size * to get our next start offset */ int offset = (int)(len - c.left); /* WIN_PORT_FIX cast (int) */ + + luaL_checkstack(L, 1, "in function mp_unpack_full"); + /* Return offset -1 when we have have processed the entire buffer. */ lua_pushinteger(L, c.left == 0 ? -1 : offset); /* Results are returned with the arg elements still diff --git a/deps/lua/src/lua_struct.c b/deps/lua/src/lua_struct.c index 27b3a57cf..4986a9ba7 100644 --- a/deps/lua/src/lua_struct.c +++ b/deps/lua/src/lua_struct.c @@ -1,7 +1,7 @@ /* ** {====================================================== ** Library for packing/unpacking structures. -** $Id: struct.c,v 1.4 2012/07/04 18:54:29 roberto Exp $ +** $Id: struct.c,v 1.7 2018/05/11 22:04:31 roberto Exp $ ** See Copyright Notice at the end of this file ** ======================================================= */ @@ -15,8 +15,8 @@ ** h/H - signed/unsigned short ** l/L - signed/unsigned long ** T - size_t -** i/In - signed/unsigned integer with size `n' (default is size of int) -** cn - sequence of `n' chars (from/to a string); when packing, n==0 means +** i/In - signed/unsigned integer with size 'n' (default is size of int) +** cn - sequence of 'n' chars (from/to a string); when packing, n==0 means the whole string; when unpacking, n==0 means use the previous read number as the string length ** s - zero-terminated string @@ -295,21 +295,26 @@ static int b_unpack (lua_State *L) { const char *fmt = luaL_checkstring(L, 1); size_t ld; const char *data = luaL_checklstring(L, 2, &ld); - size_t pos = luaL_optinteger(L, 3, 1) - 1; + size_t pos = luaL_optinteger(L, 3, 1); + luaL_argcheck(L, pos > 0, 3, "offset must be 1 or greater"); + pos--; /* Lua indexes are 1-based, but here we want 0-based for C + * pointer math. */ + int n = 0; /* number of results */ defaultoptions(&h); - lua_settop(L, 2); while (*fmt) { int opt = *fmt++; size_t size = optsize(L, opt, &fmt); pos += gettoalign(pos, &h, opt, size); - luaL_argcheck(L, pos+size <= ld, 2, "data string too short"); - luaL_checkstack(L, 1, "too many results"); + luaL_argcheck(L, size <= ld && pos <= ld - size, + 2, "data string too short"); + /* stack space for item + next position */ + luaL_checkstack(L, 2, "too many results"); switch (opt) { case 'b': case 'B': case 'h': case 'H': case 'l': case 'L': case 'T': case 'i': case 'I': { /* integer types */ int issigned = islower(opt); lua_Number res = getinteger(data+pos, h.endian, issigned, (int)size); - lua_pushnumber(L, res); + lua_pushnumber(L, res); n++; break; } case 'x': { @@ -319,25 +324,26 @@ static int b_unpack (lua_State *L) { float f; memcpy(&f, data+pos, size); correctbytes((char *)&f, sizeof(f), h.endian); - lua_pushnumber(L, f); + lua_pushnumber(L, f); n++; break; } case 'd': { double d; memcpy(&d, data+pos, size); correctbytes((char *)&d, sizeof(d), h.endian); - lua_pushnumber(L, d); + lua_pushnumber(L, d); n++; break; } case 'c': { if (size == 0) { - if (!lua_isnumber(L, -1)) - luaL_error(L, "format `c0' needs a previous size"); + if (n == 0 || !lua_isnumber(L, -1)) + luaL_error(L, "format 'c0' needs a previous size"); size = lua_tonumber(L, -1); - lua_pop(L, 1); - luaL_argcheck(L, pos+size <= ld, 2, "data string too short"); + lua_pop(L, 1); n--; + luaL_argcheck(L, size <= ld && pos <= ld - size, + 2, "data string too short"); } - lua_pushlstring(L, data+pos, size); + lua_pushlstring(L, data+pos, size); n++; break; } case 's': { @@ -345,15 +351,15 @@ static int b_unpack (lua_State *L) { if (e == NULL) luaL_error(L, "unfinished string in data"); size = (e - (data+pos)) + 1; - lua_pushlstring(L, data+pos, size - 1); + lua_pushlstring(L, data+pos, size - 1); n++; break; } default: controloptions(L, opt, &fmt, &h); } pos += size; } - lua_pushinteger(L, pos + 1); - return lua_gettop(L) - 2; + lua_pushinteger(L, pos + 1); /* next position */ + return n + 1; } @@ -399,7 +405,7 @@ LUALIB_API int luaopen_struct (lua_State *L) { /****************************************************************************** -* Copyright (C) 2010-2012 Lua.org, PUC-Rio. All rights reserved. +* Copyright (C) 2010-2018 Lua.org, PUC-Rio. All rights reserved. * * Permission is hereby granted, free of charge, to any person obtaining * a copy of this software and associated documentation files (the diff --git a/deps/lua/src/luaconf.h b/deps/lua/src/luaconf.h index 8887fa2b3..9bbfe2764 100644 --- a/deps/lua/src/luaconf.h +++ b/deps/lua/src/luaconf.h @@ -733,7 +733,7 @@ union luai_Cast { double l_d; long l_l; }; @* in 'string.format'. @@ LUA_INTFRM_T is the integer type correspoding to the previous length @* modifier. -** CHANGE them if your system supports PORT_LONGLONG or does not support long. +** CHANGE them if your system supports long long or does not support long. */ #if defined(LUA_USELONGLONG) diff --git a/src/aof.c b/src/aof.c index beff1f0ec..ae9771b95 100644 --- a/src/aof.c +++ b/src/aof.c @@ -801,18 +801,26 @@ int loadAppendOnlyFile(char *filename) { argc = atoi(buf+1); if (argc < 1) goto fmterr; + /* Load the next command in the AOF as our fake client + * argv. */ argv = zmalloc(sizeof(robj*)*argc); fakeClient->argc = argc; fakeClient->argv = argv; for (j = 0; j < argc; j++) { - if (fgets(buf,sizeof(buf),fp) == NULL) { + /* Parse the argument len. */ + char *readres = fgets(buf,sizeof(buf),fp); + if (readres == NULL || buf[0] != '$') { fakeClient->argc = j; /* Free up to j-1. */ freeFakeClientArgv(fakeClient); - goto readerr; + if (readres == NULL) + goto readerr; + else + goto fmterr; } - if (buf[0] != '$') goto fmterr; len = strtol(buf+1,NULL,10); + + /* Read it into a string object. */ argsds = sdsnewlen(SDS_NOINIT,len); if (len && fread(argsds,len,1,fp) == 0) { sdsfree(argsds); @@ -821,10 +829,12 @@ int loadAppendOnlyFile(char *filename) { goto readerr; } argv[j] = createObject(OBJ_STRING,argsds); + + /* Discard CRLF. */ if (fread(buf,2,1,fp) == 0) { fakeClient->argc = j+1; /* Free up to j. */ freeFakeClientArgv(fakeClient); - goto readerr; /* discard CRLF */ + goto readerr; } } @@ -1154,7 +1164,7 @@ int rioWriteBulkStreamID(rio *r,streamID *id) { int retval; sds replyid = sdscatfmt(sdsempty(),"%U-%U",id->ms,id->seq); - if ((retval = rioWriteBulkString(r,replyid,sdslen(replyid))) == 0) return 0; + retval = rioWriteBulkString(r,replyid,sdslen(replyid)); sdsfree(replyid); return retval; } @@ -1928,14 +1938,15 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) { serverLog(LL_VERBOSE, "Background AOF rewrite signal handler took %lldus", ustime()-now); } else if (!bysignal && exitcode != 0) { + server.aof_lastbgrewrite_status = C_ERR; + + serverLog(LL_WARNING, + "Background AOF rewrite terminated with error"); + } else { /* SIGUSR1 is whitelisted, so we have a way to kill a child without * tirggering an error condition. */ if (bysignal != SIGUSR1) server.aof_lastbgrewrite_status = C_ERR; - serverLog(LL_WARNING, - "Background AOF rewrite terminated with error"); - } else { - server.aof_lastbgrewrite_status = C_ERR; serverLog(LL_WARNING, "Background AOF rewrite terminated by signal %d", bysignal); diff --git a/src/blocked.c b/src/blocked.c index 4416877ce..5668c2795 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -429,7 +429,7 @@ void handleClientsBlockedOnKeys(void) { if (streamCompareID(&s->last_id, gt) > 0) { streamID start = *gt; - start.seq++; /* Can't overflow, it's an uint64_t */ + streamIncrID(&start); /* Lookup the consumer for the group, if any. */ streamConsumer *consumer = NULL; diff --git a/src/cluster.c b/src/cluster.c index c6aca26ed..4ba52729b 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -163,7 +163,10 @@ int clusterLoadConfig(char *filename) { } /* Regular config lines have at least eight fields */ - if (argc < 8) goto fmterr; + if (argc < 8) { + sdsfreesplitres(argv,argc); + goto fmterr; + } /* Create this node if it does not exist */ n = clusterLookupNode(argv[0]); @@ -172,7 +175,10 @@ int clusterLoadConfig(char *filename) { clusterAddNode(n); } /* Address and port */ - if ((p = strrchr(argv[1],':')) == NULL) goto fmterr; + if ((p = strrchr(argv[1],':')) == NULL) { + sdsfreesplitres(argv,argc); + goto fmterr; + } *p = '\0'; memcpy(n->ip,argv[1],strlen(argv[1])+1); char *port = p+1; @@ -253,7 +259,10 @@ int clusterLoadConfig(char *filename) { *p = '\0'; direction = p[1]; /* Either '>' or '<' */ slot = atoi(argv[j]+1); - if (slot < 0 || slot >= CLUSTER_SLOTS) goto fmterr; + if (slot < 0 || slot >= CLUSTER_SLOTS) { + sdsfreesplitres(argv,argc); + goto fmterr; + } p += 3; cn = clusterLookupNode(p); if (!cn) { @@ -273,8 +282,12 @@ int clusterLoadConfig(char *filename) { } else { start = stop = atoi(argv[j]); } - if (start < 0 || start >= CLUSTER_SLOTS) goto fmterr; - if (stop < 0 || stop >= CLUSTER_SLOTS) goto fmterr; + if (start < 0 || start >= CLUSTER_SLOTS || + stop < 0 || stop >= CLUSTER_SLOTS) + { + sdsfreesplitres(argv,argc); + goto fmterr; + } while(start <= stop) clusterAddSlot(n, start++); } diff --git a/src/db.c b/src/db.c index 16c96be24..dfe30167a 100644 --- a/src/db.c +++ b/src/db.c @@ -545,7 +545,7 @@ void keysCommand(client *c) { void *replylen = addDeferredMultiBulkLength(c); di = dictGetSafeIterator(c->db->dict); - allkeys = (pattern[0] == '*' && pattern[1] == '\0'); + allkeys = (pattern[0] == '*' && plen == 1); while((de = dictNext(di)) != NULL) { sds key = dictGetKey(de); robj *keyobj; diff --git a/src/hyperloglog.c b/src/hyperloglog.c index d71997982..7b6da1a90 100644 --- a/src/hyperloglog.c +++ b/src/hyperloglog.c @@ -1535,6 +1535,7 @@ void pfdebugCommand(client *c) { sds decoded = sdsempty(); if (hdr->encoding != HLL_SPARSE) { + sdsfree(decoded); addReplyError(c,"HLL encoding is not sparse"); return; } diff --git a/src/module.c b/src/module.c index 709511771..395055737 100644 --- a/src/module.c +++ b/src/module.c @@ -661,9 +661,9 @@ void RM_KeyAtPos(RedisModuleCtx *ctx, int pos) { * flags into the command flags used by the Redis core. * * It returns the set of flags, or -1 if unknown flags are found. */ -int commandFlagsFromString(char *s) { +int64_t commandFlagsFromString(char *s) { int count, j; - int flags = 0; + int64_t flags = 0; sds *tokens = sdssplitlen(s,strlen(s)," ",1,&count); for (j = 0; j < count; j++) { char *t = tokens[j]; @@ -741,7 +741,7 @@ int commandFlagsFromString(char *s) { * other reason. */ int RM_CreateCommand(RedisModuleCtx *ctx, const char *name, RedisModuleCmdFunc cmdfunc, const char *strflags, int firstkey, int lastkey, int keystep) { - int flags = strflags ? commandFlagsFromString((char*)strflags) : 0; + int64_t flags = strflags ? commandFlagsFromString((char*)strflags) : 0; if (flags == -1) return REDISMODULE_ERR; if ((flags & CMD_MODULE_NO_CLUSTER) && server.cluster_enabled) return REDISMODULE_ERR; @@ -4022,9 +4022,9 @@ int RM_BlockedClientDisconnected(RedisModuleCtx *ctx) { * * To call non-reply APIs, the thread safe context must be prepared with: * - * RedisModule_ThreadSafeCallStart(ctx); + * RedisModule_ThreadSafeContextLock(ctx); * ... make your call here ... - * RedisModule_ThreadSafeCallStop(ctx); + * RedisModule_ThreadSafeContextUnlock(ctx); * * This is not needed when using `RedisModule_Reply*` functions, assuming * that a blocked client was used when the context was created, otherwise @@ -4392,10 +4392,13 @@ int RM_GetClusterNodeInfo(RedisModuleCtx *ctx, const char *id, char *ip, char *m UNUSED(ctx); clusterNode *node = clusterLookupNode(id); - if (node->flags & (CLUSTER_NODE_NOADDR|CLUSTER_NODE_HANDSHAKE)) + if (node == NULL || + node->flags & (CLUSTER_NODE_NOADDR|CLUSTER_NODE_HANDSHAKE)) + { return REDISMODULE_ERR; + } - if (ip) memcpy(ip,node->name,REDISMODULE_NODE_ID_LEN); + if (ip) strncpy(ip,node->ip,NET_IP_STR_LEN); if (master_id) { /* If the information is not available, the function will set the diff --git a/src/networking.c b/src/networking.c index 4306a221f..dc6bad1f9 100644 --- a/src/networking.c +++ b/src/networking.c @@ -954,7 +954,7 @@ void freeClient(client *c) { /* We need to remember the time when we started to have zero * attached slaves, as after some time we'll free the replication * backlog. */ - if (c->flags & CLIENT_SLAVE && listLength(server.slaves) == 0) + if (getClientType(c) == CLIENT_TYPE_SLAVE && listLength(server.slaves) == 0) server.repl_no_slaves_since = server.unixtime; refreshGoodSlavesCount(); } @@ -1107,8 +1107,8 @@ int writeToClient(int fd, client *c, int handler_installed) { * just deliver as much data as it is possible to deliver. * * Moreover, we also send as much as possible if the client is - * a slave (otherwise, on high-speed traffic, the replication - * buffer will grow indefinitely) */ + * a slave or a monitor (otherwise, on high-speed traffic, the + * replication/output buffer will grow indefinitely) */ if (totwritten > NET_MAX_WRITES_PER_EVENT && (server.maxmemory == 0 || zmalloc_used_memory() < server.maxmemory) && @@ -1318,7 +1318,7 @@ int processInlineBuffer(client *c) { /* Newline from slaves can be used to refresh the last ACK time. * This is useful for a slave to ping back while loading a big * RDB file. */ - if (querylen == 0 && c->flags & CLIENT_SLAVE) + if (querylen == 0 && getClientType(c) == CLIENT_TYPE_SLAVE) c->repl_ack_time = server.unixtime; /* Move querybuffer position to the next query in the buffer. */ @@ -1332,12 +1332,8 @@ int processInlineBuffer(client *c) { /* Create redis objects for all arguments. */ for (c->argc = 0, j = 0; j < argc; j++) { - if (sdslen(argv[j])) { - c->argv[c->argc] = createObject(OBJ_STRING,argv[j]); - c->argc++; - } else { - sdsfree(argv[j]); - } + c->argv[c->argc] = createObject(OBJ_STRING,argv[j]); + c->argc++; } zfree(argv); return C_OK; @@ -2139,12 +2135,14 @@ PORT_ULONG getClientOutputBufferMemoryUsage(client *c) { * * The function will return one of the following: * CLIENT_TYPE_NORMAL -> Normal client - * CLIENT_TYPE_SLAVE -> Slave or client executing MONITOR command + * CLIENT_TYPE_SLAVE -> Slave * CLIENT_TYPE_PUBSUB -> Client subscribed to Pub/Sub channels * CLIENT_TYPE_MASTER -> The client representing our replication master. */ int getClientType(client *c) { if (c->flags & CLIENT_MASTER) return CLIENT_TYPE_MASTER; + /* Even though MONITOR clients are marked as replicas, we + * want the expose them as normal clients. */ if ((c->flags & CLIENT_SLAVE) && !(c->flags & CLIENT_MONITOR)) return CLIENT_TYPE_SLAVE; if (c->flags & CLIENT_PUBSUB) return CLIENT_TYPE_PUBSUB; diff --git a/src/scripting.c b/src/scripting.c index 468f97e74..a31b2b27c 100644 --- a/src/scripting.c +++ b/src/scripting.c @@ -2221,6 +2221,7 @@ void ldbEval(lua_State *lua, sds *argv, int argc) { ldbLog(sdscatfmt(sdsempty()," %s",lua_tostring(lua,-1))); lua_pop(lua,1); sdsfree(code); + sdsfree(expr); return; } } diff --git a/src/sds.h b/src/sds.h index a905b39e0..c57347876 100644 --- a/src/sds.h +++ b/src/sds.h @@ -34,7 +34,7 @@ #define __SDS_H #define SDS_MAX_PREALLOC (1024*1024) -const char *SDS_NOINIT; +extern const char *SDS_NOINIT; #ifdef _WIN32 #include "Win32_Interop/Win32_Portability.h" diff --git a/src/sentinel.c b/src/sentinel.c index 8705e54fe..686b5f486 100644 --- a/src/sentinel.c +++ b/src/sentinel.c @@ -4421,7 +4421,7 @@ void sentinelFailoverDetectEnd(sentinelRedisInstance *master) { sentinelRedisInstance *slave = dictGetVal(de); int retval; - if (slave->flags & (SRI_RECONF_DONE|SRI_RECONF_SENT)) continue; + if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE|SRI_RECONF_SENT)) continue; if (slave->link->disconnected) continue; retval = sentinelSendSlaveOf(slave, diff --git a/src/server.c b/src/server.c index 8670108de..14c1e256e 100644 --- a/src/server.c +++ b/src/server.c @@ -832,7 +832,7 @@ int clientsCronHandleTimeout(client *c, mstime_t now_ms) { time_t now = now_ms/1000; if (server.maxidletime && - !(c->flags & CLIENT_SLAVE) && /* no timeout for slaves */ + !(c->flags & CLIENT_SLAVE) && /* no timeout for slaves and monitors */ !(c->flags & CLIENT_MASTER) && /* no timeout for masters */ !(c->flags & CLIENT_BLOCKED) && /* no timeout for BLPOP */ !(c->flags & CLIENT_PUBSUB) && /* no timeout for Pub/Sub clients */ diff --git a/src/stream.h b/src/stream.h index ef08753b5..00f42ede9 100644 --- a/src/stream.h +++ b/src/stream.h @@ -109,5 +109,6 @@ streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id); streamNACK *streamCreateNACK(streamConsumer *consumer); void streamDecodeID(void *buf, streamID *id); int streamCompareID(streamID *a, streamID *b); +void streamIncrID(streamID *id); #endif diff --git a/src/t_stream.c b/src/t_stream.c index 0a66722f0..0a1add859 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -67,6 +67,21 @@ void freeStream(stream *s) { zfree(s); } +/* Set 'id' to be its successor streamID */ +void streamIncrID(streamID *id) { + if (id->seq == UINT64_MAX) { + if (id->ms == UINT64_MAX) { + /* Special case where 'id' is the last possible streamID... */ + id->ms = id->seq = 0; + } else { + id->ms++; + id->seq = 0; + } + } else { + id->seq++; + } +} + /* Generate the next stream item ID given the previous one. If the current * milliseconds Unix time is greater than the previous one, just use this * as time part and start with sequence part of zero. Otherwise we use the @@ -77,8 +92,8 @@ void streamNextID(streamID *last_id, streamID *new_id) { new_id->ms = ms; new_id->seq = 0; } else { - new_id->ms = last_id->ms; - new_id->seq = last_id->seq+1; + *new_id = *last_id; + streamIncrID(new_id); } } @@ -776,6 +791,16 @@ int streamDeleteItem(stream *s, streamID *id) { return deleted; } +/* Get the last valid (non-tombstone) streamID of 's'. */ +void streamLastValidID(stream *s, streamID *maxid) +{ + streamIterator si; + streamIteratorStart(&si,s,NULL,NULL,1); + int64_t numfields; + streamIteratorGetID(&si,maxid,&numfields); + streamIteratorStop(&si); +} + /* Emit a reply in the client output buffer by formatting a Stream ID * in the standard - format, using the simple string protocol * of REPL. */ @@ -817,7 +842,7 @@ void streamPropagateXCLAIM(client *c, robj *key, streamCG *group, robj *groupnam argv[11] = createStringObject("JUSTID",6); argv[12] = createStringObject("LASTID",6); argv[13] = createObjectFromStreamID(&group->last_id); - propagate(server.xclaimCommand,c->db->id,argv,14,PROPAGATE_AOF|PROPAGATE_REPL); + alsoPropagate(server.xclaimCommand,c->db->id,argv,14,PROPAGATE_AOF|PROPAGATE_REPL); decrRefCount(argv[0]); decrRefCount(argv[3]); decrRefCount(argv[4]); @@ -844,7 +869,7 @@ void streamPropagateGroupID(client *c, robj *key, streamCG *group, robj *groupna argv[2] = key; argv[3] = groupname; argv[4] = createObjectFromStreamID(&group->last_id); - propagate(server.xgroupCommand,c->db->id,argv,5,PROPAGATE_AOF|PROPAGATE_REPL); + alsoPropagate(server.xgroupCommand,c->db->id,argv,5,PROPAGATE_AOF|PROPAGATE_REPL); decrRefCount(argv[0]); decrRefCount(argv[1]); decrRefCount(argv[4]); @@ -1233,6 +1258,13 @@ void xaddCommand(client *c) { if ((o = streamTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return; s = o->ptr; + /* Return ASAP if the stream has reached the last possible ID */ + if (s->last_id.ms == UINT64_MAX && s->last_id.seq == UINT64_MAX) { + addReplyError(c,"The stream has exhausted the last possible ID, " + "unable to add more items"); + return; + } + /* Append using the low level function and return the ID. */ if (streamAppendItem(s,c->argv+field_pos,(c->argc-field_pos)/2, &id, id_given ? &id : NULL) @@ -1496,20 +1528,23 @@ void xreadCommand(client *c) { { serve_synchronously = 1; serve_history = 1; - } else { + } else if (s->length) { /* We also want to serve a consumer in a consumer group * synchronously in case the group top item delivered is smaller * than what the stream has inside. */ - streamID *last = &groups[i]->last_id; - if (s->length && (streamCompareID(&s->last_id, last) > 0)) { + streamID maxid, *last = &groups[i]->last_id; + streamLastValidID(s, &maxid); + if (streamCompareID(&maxid, last) > 0) { serve_synchronously = 1; *gt = *last; } } - } else { + } else if (s->length) { /* For consumers without a group, we serve synchronously if we can * actually provide at least one item from the stream. */ - if (s->length && (streamCompareID(&s->last_id, gt) > 0)) { + streamID maxid; + streamLastValidID(s, &maxid); + if (streamCompareID(&maxid, gt) > 0) { serve_synchronously = 1; } } @@ -1521,7 +1556,7 @@ void xreadCommand(client *c) { * so start from the next ID, since we want only messages with * IDs greater than start. */ streamID start = *gt; - start.seq++; /* uint64_t can't overflow in this context. */ + streamIncrID(&start); /* Emit the two elements sub-array consisting of the name * of the stream and the data we extracted from it. */ @@ -1858,11 +1893,7 @@ void xsetidCommand(client *c) { * item, otherwise the fundamental ID monotonicity assumption is violated. */ if (s->length > 0) { streamID maxid; - streamIterator si; - streamIteratorStart(&si,s,NULL,NULL,1); - int64_t numfields; - streamIteratorGetID(&si,&maxid,&numfields); - streamIteratorStop(&si); + streamLastValidID(s,&maxid); if (streamCompareID(&id,&maxid) < 0) { addReplyError(c,"The ID specified in XSETID is smaller than the " @@ -2233,7 +2264,7 @@ void xclaimCommand(client *c) { } /* Do the actual claiming. */ - streamConsumer *consumer = streamLookupConsumer(group,c->argv[3]->ptr,1); + streamConsumer *consumer = NULL; void *arraylenptr = addDeferredMultiBulkLength(c); size_t arraylen = 0; for (int j = 5; j <= last_id_arg; j++) { @@ -2285,9 +2316,11 @@ void xclaimCommand(client *c) { if (nack->consumer) raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL); /* Update the consumer and idle time. */ + if (consumer == NULL) + consumer = streamLookupConsumer(group,c->argv[3]->ptr,1); nack->consumer = consumer; nack->delivery_time = deliverytime; - /* Set the delivery attempts counter if given, otherwise + /* Set the delivery attempts counter if given, otherwise * autoincrement unless JUSTID option provided */ if (retrycount >= 0) { nack->delivery_count = retrycount; diff --git a/src/version.h b/src/version.h index 83cff7fc2..aef576c73 100644 --- a/src/version.h +++ b/src/version.h @@ -1 +1 @@ -#define REDIS_VERSION "5.0.7" +#define REDIS_VERSION "5.0.8" diff --git a/tests/unit/type/stream-cgroups.tcl b/tests/unit/type/stream-cgroups.tcl index 34d4061c2..a59e168ef 100644 --- a/tests/unit/type/stream-cgroups.tcl +++ b/tests/unit/type/stream-cgroups.tcl @@ -147,6 +147,20 @@ start_server { assert {[lindex $res 0 1 1] == {2-0 {field1 B}}} } + test {Blocking XREADGROUP will not reply with an empty array} { + r del mystream + r XGROUP CREATE mystream mygroup $ MKSTREAM + r XADD mystream 666 f v + set res [r XREADGROUP GROUP mygroup Alice BLOCK 10 STREAMS mystream ">"] + assert {[lindex $res 0 1 0] == {666-0 {f v}}} + r XADD mystream 667 f2 v2 + r XDEL mystream 667 + set rd [redis_deferring_client] + $rd XREADGROUP GROUP mygroup Alice BLOCK 10 STREAMS mystream ">" + after 20 + assert {[$rd read] == {}} ;# before the fix, client didn't even block, but was served synchronously with {mystream {}} + } + test {XCLAIM can claim PEL items from another consumer} { # Add 3 items into the stream, and create a consumer group r del mystream diff --git a/tests/unit/type/stream.tcl b/tests/unit/type/stream.tcl index 656bac5de..5de9f0571 100644 --- a/tests/unit/type/stream.tcl +++ b/tests/unit/type/stream.tcl @@ -191,6 +191,17 @@ start_server { assert {[lindex $res 0 1 0 1] eq {old abcd1234}} } + test {Blocking XREAD will not reply with an empty array} { + r del s1 + r XADD s1 666 f v + r XADD s1 667 f2 v2 + r XDEL s1 667 + set rd [redis_deferring_client] + $rd XREAD BLOCK 10 STREAMS s1 666 + after 20 + assert {[$rd read] == {}} ;# before the fix, client didn't even block, but was served synchronously with {s1 {}} + } + test "XREAD: XADD + DEL should not awake client" { set rd [redis_deferring_client] r del s1 @@ -328,6 +339,33 @@ start_server { assert_equal [r xrevrange teststream2 1234567891245 -] {{1234567891240-0 {key1 value2}} {1234567891230-0 {key1 value1}}} } + + test {XREAD streamID edge (no-blocking)} { + r del x + r XADD x 1-1 f v + r XADD x 1-18446744073709551615 f v + r XADD x 2-1 f v + set res [r XREAD BLOCK 0 STREAMS x 1-18446744073709551615] + assert {[lindex $res 0 1 0] == {2-1 {f v}}} + } + + test {XREAD streamID edge (blocking)} { + r del x + set rd [redis_deferring_client] + $rd XREAD BLOCK 0 STREAMS x 1-18446744073709551615 + r XADD x 1-1 f v + r XADD x 1-18446744073709551615 f v + r XADD x 2-1 f v + set res [$rd read] + assert {[lindex $res 0 1 0] == {2-1 {f v}}} + } + + test {XADD streamID edge} { + r del x + r XADD x 2577343934890-18446744073709551615 f v ;# we need the timestamp to be in the future + r XADD x * f2 v2 + assert_equal [r XRANGE x - +] {{2577343934890-18446744073709551615 {f v}} {2577343934891-0 {f2 v2}}} + } } start_server {tags {"stream"} overrides {appendonly yes}} {