redis源码学习-hyperloglog篇
0 条评论redis源码学习-hyperloglog篇
hyperloglog介绍
hyperloglog源于基数统计,通过概率算法,将超大数据量的集合转为特定形式存储,用惊人的数据压缩来达到统计大数据集合中不同元素个数的目的。
正如上而言,由于是概率算法,hyperloglog得到的不同元素数目为近似值而非正确值,近似值的误差大约在0.81%,通过12k的内存能过存储高达2^64个值,找出不同元素的数目的时间复杂度也达到了惊人的O(log2(log2(Nmax)))。
hyperloglog也广泛使用在大数据相关中间键中,例如flink,redis,Kylin等。
一些证明过程来自论文,需要自取(较为硬核,非数学系或概率论爱好者慎入)
hyperloglog逻辑分解
首先,计算出每个元素的hash值,并使用二进制来表达。最终形成类似10*****,00***,01***等等的值,通过如上二进制表达式,找出其第一个1最晚出现的位置来估算不重复的哈希值的个数。10出现的概率为1/2,则第一个1最晚出现的位置在1上,hyperloglog就认为不重复元素的个数估计是两个,01中出现的概率为1/4,第一个1最晚出现的位置则为2,hyperloglog就认为不重复元素的个数估计是四个以此类推。一次遍历所有元素,拿到最大的第一个1最晚出现的位置,对应的估值就为这个集合的不同个数了。hyperloglog做出如上猜测的来源为伯努利过程。
伯努利过程详见专业概率论相关的文章,这里不做赘述。
但由于是概率论,在日常真正使用时总会有传说中的欧皇或者非酋,总有连续10,20,30次连续抛到正面的几率在,因此hyperloglog引入了分桶的概念。
分桶的逻辑为,将所有的hash值分到n个桶中,对于每个桶做下估算,最终对所有的桶的计算结果做调和平均数,然后再*n,得到了最终的不同元素个数。
redis中hyperloglog分析
想要理解redis的hyperloglog,需要先额外理解HLL_DENSE 和HLL_SPARSE
HLL_DENSE :
密集存储型,使用6位存储一个数据。当数据较多时采用密集存储。
HLL_SPARSE:
稀疏存储型,当数据少时使用稀疏存储。稀疏有三种方式,分别为ZERO,XZERO,VAL
ZERO:6bit用来表示连续设置为0的寄存器的个数,表示1-64个连续寄存器被设置为0
XZERO:14bit用来表示连续设置为0的寄存器的个数,表示1-16834个连续寄存器被设置为0
VAL:5bit用来表示寄存器的值,可以表示1到32个连续寄存器被设置为0
hyperloglog数据结构:
struct hllhdr {
char magic[4]; /* "HYLL" */
uint8_t encoding; /* HLL_DENSE or HLL_SPARSE. */
uint8_t notused[3]; /* Reserved for future use, must be zero. */
uint8_t card[8]; /* Cached cardinality, little endian. */
uint8_t registers[]; /* Data bytes. */
};
magic用前四个字节表明hll对象。encoding代表着hll使用哪种存储方式,HLL_DENSE 或者HLL_SPARSE。notused[3]为保留字段,由于hll全部使用字节对齐,声明一下方便看代码的理解。card为当前缓存的hll基数值。registers是用来存储元素使用的,对于不同的encoding,这里的registers也不定。假如说encoding为HLL_DENSE,那么registers就为12k的连续数组,HLL_SPARSE的话这里为长度不定的数组。
创建hll(hyperloglog):
/* ========================== HyperLogLog commands ========================== */
/* Create an HLL object. We always create the HLL using sparse encoding.
* This will be upgraded to the dense representation as needed. */
robj *createHLLObject(void) {
robj *o;
struct hllhdr *hdr;
sds s;
uint8_t *p;
//最终值依赖宏,在下单独分析
int sparselen = HLL_HDR_SIZE +
(((HLL_REGISTERS+(HLL_SPARSE_XZERO_MAX_LEN-1)) /
HLL_SPARSE_XZERO_MAX_LEN)*2);
int aux;
/* Populate the sparse representation with as many XZERO opcodes as
* needed to represent all the registers. */
aux = HLL_REGISTERS;
s = sdsnewlen(NULL,sparselen);
p = (uint8_t*)s + HLL_HDR_SIZE;
while(aux) {
int xzero = HLL_SPARSE_XZERO_MAX_LEN;
if (xzero > aux) xzero = aux;
HLL_SPARSE_XZERO_SET(p,xzero);
p += 2;
aux -= xzero;
}
serverAssert((p-(uint8_t*)s) == sparselen);
/* Create the actual object. */
o = createObject(OBJ_STRING,s);
hdr = o->ptr;
memcpy(hdr->magic,"HYLL",4);
hdr->encoding = HLL_SPARSE;
return o;
}
默认的hll都使用sparse的方式创建,用来节约内存。
最开始的sparselen = HLL_HDR_SIZE + (((HLL_REGISTERS+(HLL_SPARSE_XZERO_MAX_LEN-1)) / HLL_SPARSE_XZERO_MAX_LEN)*2);,需要结合宏来看
#define HLL_P 14 /* The greater is P, the smaller the error. */
#define HLL_Q (64-HLL_P) /* The number of bits of the hash value used for
determining the number of leading zeros. */
#define HLL_REGISTERS (1<<HLL_P) /* With P=14, 16384 registers. */
#define HLL_P_MASK (HLL_REGISTERS-1) /* Mask to index register. */
可以看出,sparsenlen最后=HLL_HDR_SIZE +(16384 + (16384-1) / 16384 * 2)。在redis中,默认的桶都是0,那么最终只需要两个字节存储,极大的节约内存。
新增元素:
/* Call hllDenseAdd() or hllSparseAdd() according to the HLL encoding. */
int hllAdd(robj *o, unsigned char *ele, size_t elesize) {
struct hllhdr *hdr = o->ptr;
switch(hdr->encoding) {
case HLL_DENSE: return hllDenseAdd(hdr->registers,ele,elesize);
case HLL_SPARSE: return hllSparseAdd(o,ele,elesize);
default: return -1; /* Invalid representation. */
}
}
HLL_DENSE的add方式:
/* "Add" the element in the dense hyperloglog data structure.
* Actually nothing is added, but the max 0 pattern counter of the subset
* the element belongs to is incremented if needed.
*
* This is just a wrapper to hllDenseSet(), performing the hashing of the
* element in order to retrieve the index and zero-run count. */
int hllDenseAdd(uint8_t *registers, unsigned char *ele, size_t elesize) {
long index;
//index为桶的下标,count最终就是得到后边50个bit里1第一次出现的位置
uint8_t count = hllPatLen(ele,elesize,&index);
/* Update the register if this element produced a longer run of zeroes. */
return hllDenseSet(registers,index,count);
}
/* Given a string element to add to the HyperLogLog, returns the length
* of the pattern 000..1 of the element hash. As a side effect 'regp' is
* set to the register index this element hashes to. */
int hllPatLen(unsigned char *ele, size_t elesize, long *regp) {
uint64_t hash, bit, index;
int count;
/* Count the number of zeroes starting from bit HLL_REGISTERS
* (that is a power of two corresponding to the first bit we don't use
* as index). The max run can be 64-P+1 = Q+1 bits.
*
* Note that the final "1" ending the sequence of zeroes must be
* included in the count, so if we find "001" the count is 3, and
* the smallest count possible is no zeroes at all, just a 1 bit
* at the first position, that is a count of 1.
*
* This may sound like inefficient, but actually in the average case
* there are high probabilities to find a 1 after a few iterations. */
hash = MurmurHash64A(ele,elesize,0xadc83b19ULL);
index = hash & HLL_P_MASK; /* Register index. */
hash >>= HLL_P; /* Remove bits used to address the register. */
hash |= ((uint64_t)1<<HLL_Q); /* Make sure the loop terminates
and count will be <= Q+1. */
bit = 1;
count = 1; /* Initialized to 1 since we count the "00000...1" pattern. */
while((hash & bit) == 0) {
count++;
bit <<= 1;
}
*regp = (int) index;
return count;
}
/* ================== Dense representation implementation ================== */
/* Low level function to set the dense HLL register at 'index' to the
* specified value if the current value is smaller than 'count'.
*
* 'registers' is expected to have room for HLL_REGISTERS plus an
* additional byte on the right. This requirement is met by sds strings
* automatically since they are implicitly null terminated.
*
* The function always succeed, however if as a result of the operation
* the approximated cardinality changed, 1 is returned. Otherwise 0
* is returned. */
int hllDenseSet(uint8_t *registers, long index, uint8_t count) {
uint8_t oldcount;
HLL_DENSE_GET_REGISTER(oldcount,registers,index);
if (count > oldcount) {
//更新值,假如说新的1最后出现的位置的值大于原来的,那就将最大的登记
HLL_DENSE_SET_REGISTER(registers,index,count);
return 1;
} else {
return 0;
}
}
HLL_SPARSE的add方式:
/* "Add" the element in the sparse hyperloglog data structure.
* Actually nothing is added, but the max 0 pattern counter of the subset
* the element belongs to is incremented if needed.
*
* This function is actually a wrapper for hllSparseSet(), it only performs
* the hashing of the element to obtain the index and zeros run length. */
int hllSparseAdd(robj *o, unsigned char *ele, size_t elesize) {
long index;
uint8_t count = hllPatLen(ele,elesize,&index);
/* Update the register if this element produced a longer run of zeroes. */
return hllSparseSet(o,index,count);
}
/* Given a string element to add to the HyperLogLog, returns the length
* of the pattern 000..1 of the element hash. As a side effect 'regp' is
* set to the register index this element hashes to. */
int hllPatLen(unsigned char *ele, size_t elesize, long *regp) {
uint64_t hash, bit, index;
int count;
/* Count the number of zeroes starting from bit HLL_REGISTERS
* (that is a power of two corresponding to the first bit we don't use
* as index). The max run can be 64-P+1 = Q+1 bits.
*
* Note that the final "1" ending the sequence of zeroes must be
* included in the count, so if we find "001" the count is 3, and
* the smallest count possible is no zeroes at all, just a 1 bit
* at the first position, that is a count of 1.
*
* This may sound like inefficient, but actually in the average case
* there are high probabilities to find a 1 after a few iterations. */
hash = MurmurHash64A(ele,elesize,0xadc83b19ULL);
index = hash & HLL_P_MASK; /* Register index. */
hash >>= HLL_P; /* Remove bits used to address the register. */
hash |= ((uint64_t)1<<HLL_Q); /* Make sure the loop terminates
and count will be <= Q+1. */
bit = 1;
count = 1; /* Initialized to 1 since we count the "00000...1" pattern. */
while((hash & bit) == 0) {
count++;
bit <<= 1;
}
*regp = (int) index;
return count;
}
/* Low level function to set the sparse HLL register at 'index' to the
* specified value if the current value is smaller than 'count'.
*
* The object 'o' is the String object holding the HLL. The function requires
* a reference to the object in order to be able to enlarge the string if
* needed.
*
* On success, the function returns 1 if the cardinality changed, or 0
* if the register for this element was not updated.
* On error (if the representation is invalid) -1 is returned.
*
* As a side effect the function may promote the HLL representation from
* sparse to dense: this happens when a register requires to be set to a value
* not representable with the sparse representation, or when the resulting
* size would be greater than server.hll_sparse_max_bytes. */
int hllSparseSet(robj *o, long index, uint8_t count) {
struct hllhdr *hdr;
uint8_t oldcount, *sparse, *end, *p, *prev, *next;
long first, span;
long is_zero = 0, is_xzero = 0, is_val = 0, runlen = 0;
/* If the count is too big to be representable by the sparse representation
* switch to dense representation. */
if (count > HLL_SPARSE_VAL_MAX_VALUE) goto promote;
/* When updating a sparse representation, sometimes we may need to
* enlarge the buffer for up to 3 bytes in the worst case (XZERO split
* into XZERO-VAL-XZERO). Make sure there is enough space right now
* so that the pointers we take during the execution of the function
* will be valid all the time. */
o->ptr = sdsMakeRoomFor(o->ptr,3);
/* Step 1: we need to locate the opcode we need to modify to check
* if a value update is actually needed. */
sparse = p = ((uint8_t*)o->ptr) + HLL_HDR_SIZE;
end = p + sdslen(o->ptr) - HLL_HDR_SIZE;
first = 0;
prev = NULL; /* Points to previous opcode at the end of the loop. */
next = NULL; /* Points to the next opcode at the end of the loop. */
span = 0;
while(p < end) {
long oplen;
/* Set span to the number of registers covered by this opcode.
*
* This is the most performance critical loop of the sparse
* representation. Sorting the conditionals from the most to the
* least frequent opcode in many-bytes sparse HLLs is faster. */
oplen = 1;
if (HLL_SPARSE_IS_ZERO(p)) {
span = HLL_SPARSE_ZERO_LEN(p);
} else if (HLL_SPARSE_IS_VAL(p)) {
span = HLL_SPARSE_VAL_LEN(p);
} else { /* XZERO. */
span = HLL_SPARSE_XZERO_LEN(p);
oplen = 2;
}
/* Break if this opcode covers the register as 'index'. */
if (index <= first+span-1) break;
prev = p;
p += oplen;
first += span;
}
if (span == 0 || p >= end) return -1; /* Invalid format. */
next = HLL_SPARSE_IS_XZERO(p) ? p+2 : p+1;
if (next >= end) next = NULL;
/* Cache current opcode type to avoid using the macro again and
* again for something that will not change.
* Also cache the run-length of the opcode. */
if (HLL_SPARSE_IS_ZERO(p)) {
is_zero = 1;
runlen = HLL_SPARSE_ZERO_LEN(p);
} else if (HLL_SPARSE_IS_XZERO(p)) {
is_xzero = 1;
runlen = HLL_SPARSE_XZERO_LEN(p);
} else {
is_val = 1;
runlen = HLL_SPARSE_VAL_LEN(p);
}
/* Step 2: After the loop:
*
* 'first' stores to the index of the first register covered
* by the current opcode, which is pointed by 'p'.
*
* 'next' ad 'prev' store respectively the next and previous opcode,
* or NULL if the opcode at 'p' is respectively the last or first.
*
* 'span' is set to the number of registers covered by the current
* opcode.
*
* There are different cases in order to update the data structure
* in place without generating it from scratch:
*
* A) If it is a VAL opcode already set to a value >= our 'count'
* no update is needed, regardless of the VAL run-length field.
* In this case PFADD returns 0 since no changes are performed.
*
* B) If it is a VAL opcode with len = 1 (representing only our
* register) and the value is less than 'count', we just update it
* since this is a trivial case. */
if (is_val) {
oldcount = HLL_SPARSE_VAL_VALUE(p);
/* Case A. */
if (oldcount >= count) return 0;
/* Case B. */
if (runlen == 1) {
HLL_SPARSE_VAL_SET(p,count,1);
goto updated;
}
}
/* C) Another trivial to handle case is a ZERO opcode with a len of 1.
* We can just replace it with a VAL opcode with our value and len of 1. */
if (is_zero && runlen == 1) {
HLL_SPARSE_VAL_SET(p,count,1);
goto updated;
}
/* D) General case.
*
* The other cases are more complex: our register requires to be updated
* and is either currently represented by a VAL opcode with len > 1,
* by a ZERO opcode with len > 1, or by an XZERO opcode.
*
* In those cases the original opcode must be split into multiple
* opcodes. The worst case is an XZERO split in the middle resulting into
* XZERO - VAL - XZERO, so the resulting sequence max length is
* 5 bytes.
*
* We perform the split writing the new sequence into the 'new' buffer
* with 'newlen' as length. Later the new sequence is inserted in place
* of the old one, possibly moving what is on the right a few bytes
* if the new sequence is longer than the older one. */
uint8_t seq[5], *n = seq;
int last = first+span-1; /* Last register covered by the sequence. */
int len;
if (is_zero || is_xzero) {
/* Handle splitting of ZERO / XZERO. */
if (index != first) {
len = index-first;
if (len > HLL_SPARSE_ZERO_MAX_LEN) {
HLL_SPARSE_XZERO_SET(n,len);
n += 2;
} else {
HLL_SPARSE_ZERO_SET(n,len);
n++;
}
}
HLL_SPARSE_VAL_SET(n,count,1);
n++;
if (index != last) {
len = last-index;
if (len > HLL_SPARSE_ZERO_MAX_LEN) {
HLL_SPARSE_XZERO_SET(n,len);
n += 2;
} else {
HLL_SPARSE_ZERO_SET(n,len);
n++;
}
}
} else {
/* Handle splitting of VAL. */
int curval = HLL_SPARSE_VAL_VALUE(p);
if (index != first) {
len = index-first;
HLL_SPARSE_VAL_SET(n,curval,len);
n++;
}
HLL_SPARSE_VAL_SET(n,count,1);
n++;
if (index != last) {
len = last-index;
HLL_SPARSE_VAL_SET(n,curval,len);
n++;
}
}
/* Step 3: substitute the new sequence with the old one.
*
* Note that we already allocated space on the sds string
* calling sdsMakeRoomFor(). */
int seqlen = n-seq;
int oldlen = is_xzero ? 2 : 1;
int deltalen = seqlen-oldlen;
if (deltalen > 0 &&
sdslen(o->ptr)+deltalen > server.hll_sparse_max_bytes) goto promote;
if (deltalen && next) memmove(next+deltalen,next,end-next);
sdsIncrLen(o->ptr,deltalen);
memcpy(p,seq,seqlen);
end += deltalen;
updated:
/* Step 4: Merge adjacent values if possible.
*
* The representation was updated, however the resulting representation
* may not be optimal: adjacent VAL opcodes can sometimes be merged into
* a single one. */
p = prev ? prev : sparse;
int scanlen = 5; /* Scan up to 5 upcodes starting from prev. */
while (p < end && scanlen--) {
if (HLL_SPARSE_IS_XZERO(p)) {
p += 2;
continue;
} else if (HLL_SPARSE_IS_ZERO(p)) {
p++;
continue;
}
/* We need two adjacent VAL opcodes to try a merge, having
* the same value, and a len that fits the VAL opcode max len. */
if (p+1 < end && HLL_SPARSE_IS_VAL(p+1)) {
int v1 = HLL_SPARSE_VAL_VALUE(p);
int v2 = HLL_SPARSE_VAL_VALUE(p+1);
if (v1 == v2) {
int len = HLL_SPARSE_VAL_LEN(p)+HLL_SPARSE_VAL_LEN(p+1);
if (len <= HLL_SPARSE_VAL_MAX_LEN) {
HLL_SPARSE_VAL_SET(p+1,v1,len);
memmove(p,p+1,end-p);
sdsIncrLen(o->ptr,-1);
end--;
/* After a merge we reiterate without incrementing 'p'
* in order to try to merge the just merged value with
* a value on its right. */
continue;
}
}
}
p++;
}
/* Invalidate the cached cardinality. */
hdr = o->ptr;
HLL_INVALIDATE_CACHE(hdr);
return 1;
promote: /* Promote to dense representation. */
if (hllSparseToDense(o) == C_ERR) return -1; /* Corrupted HLL. */
hdr = o->ptr;
/* We need to call hllDenseAdd() to perform the operation after the
* conversion. However the result must be 1, since if we need to
* convert from sparse to dense a register requires to be updated.
*
* Note that this in turn means that PFADD will make sure the command
* is propagated to slaves / AOF, so if there is a sparse -> dense
* conversion, it will be performed in all the slaves as well. */
int dense_retval = hllDenseSet(hdr->registers,index,count);
serverAssert(dense_retval == 1);
return dense_retval;
}
HLL_SPARSE的add相对来说相当复杂,在这里只讲下处理思路。
sparse在redis的hll中负责存储数据量小的,当数据量过多时就改为密集存储也就是HLL_DENSE,详见代码
if (count > HLL_SPARSE_VAL_MAX_VALUE) goto promote;
假如说还在sparse的范围内,首先去修改桶,假如不用修改直接到下边,如果len=1且原值时val,那么小于现在的值就不做变动,大的话修改max值。如果len>1,且是val的话,无需考虑别的,不做更新。归根结底,就是根据条件适当修改HLL_SPARSE所采用的结构为ZERO,XZERO,VAL中的一个,最终做优化,将一些可以合并的val合并。
获取count:
#define HLL_REGISTERS (1<<HLL_P) /* With P=14, 16384 registers. */
/* Return the approximated cardinality of the set based on the harmonic
* mean of the registers values. 'hdr' points to the start of the SDS
* representing the String object holding the HLL representation.
*
* If the sparse representation of the HLL object is not valid, the integer
* pointed by 'invalid' is set to non-zero, otherwise it is left untouched.
*
* hllCount() supports a special internal-only encoding of HLL_RAW, that
* is, hdr->registers will point to an uint8_t array of HLL_REGISTERS element.
* This is useful in order to speedup PFCOUNT when called against multiple
* keys (no need to work with 6-bit integers encoding). */
uint64_t hllCount(struct hllhdr *hdr, int *invalid) {
double m = HLL_REGISTERS;
double E;
int j;
/* Note that reghisto size could be just HLL_Q+2, because HLL_Q+1 is
* the maximum frequency of the "000...1" sequence the hash function is
* able to return. However it is slow to check for sanity of the
* input: instead we history array at a safe size: overflows will
* just write data to wrong, but correctly allocated, places. */
int reghisto[64] = {0};
/* Compute register histogram */
if (hdr->encoding == HLL_DENSE) {
hllDenseRegHisto(hdr->registers,reghisto);
} else if (hdr->encoding == HLL_SPARSE) {
hllSparseRegHisto(hdr->registers,
sdslen((sds)hdr)-HLL_HDR_SIZE,invalid,reghisto);
} else if (hdr->encoding == HLL_RAW) {
hllRawRegHisto(hdr->registers,reghisto);
} else {
serverPanic("Unknown HyperLogLog encoding in hllCount()");
}
/* Estimate cardinality from register histogram. See:
* "New cardinality estimation algorithms for HyperLogLog sketches"
* Otmar Ertl, arXiv:1702.01284 */
//修正的过程,来源于论文
double z = m * hllTau((m-reghisto[HLL_Q+1])/(double)m);
for (j = HLL_Q; j >= 1; --j) {
z += reghisto[j];
z *= 0.5;
}
z += m * hllSigma(reghisto[0]/(double)m);
E = llroundl(HLL_ALPHA_INF*m*m/z);
return (uint64_t) E;
}
/* Helper function tau as defined in
* "New cardinality estimation algorithms for HyperLogLog sketches"
* Otmar Ertl, arXiv:1702.01284 */
double hllTau(double x) {
if (x == 0. || x == 1.) return 0.;
double zPrime;
double y = 1.0;
double z = 1 - x;
do {
x = sqrt(x);
zPrime = z;
y *= 0.5;
z -= pow(1 - x, 2)*y;
} while(zPrime != z);
return z / 3;
}
计算一个集合中的不同元素数量,首先将元素散列到n个counter中,计算每个counter中的第一个1出现位置的max值,最后对n个桶做调和平均数,最终得到概率性的不同元素数量
特殊说明
12k内存的来源:
对于超大的数据集,redis中基本都是使用dense的方式了。redis中首先将元素计算出来64位的hash值,这64位中的前14为当为index,后50位作为真正的计算使用。前14位作为index,那么214=16384,记作16384个桶。右边的50位中,第一次出现1的位置一定是小于50的,换算后26刚好够存下来,因此每个桶只需要6位就够存储。计算内存使用的话,16384*6/8=12288 byte 所以大约就是12k的内存。
也因此,hll本身无法存储元数据信息,精度本身也不做保证,只能说基本上是准确的,但是并不是一定正确。也因此特性,redis中的hll常用来统计网站的流量等等特殊需求