Redis学习之Redisson布隆过滤器
相关阅读
简介
本文基于Spring Boot 2.6.6、redisson 3.16.0简单分析Redisson布隆过滤器的使用。
布隆过滤器是一个非常长的二进制向量和一系列随机哈希函数的组合,可用于检索一个元素是否存在;
使用场景如下:
- 解决Redis缓存穿透问题;
- 邮件过滤;
使用
- 建立一个二进制向量,所有位设置
0
; - 选择
K
个散列函数,用于对元素进行K
次散列,计算向量的位下标; - 添加元素:将
K
个散列函数作用于该元素,生成K
个值作为位下标,将向量的对应位设置为1
; - 检索元素:将
K
个散列函数作用于该元素,生成K
个值作为位下标,若向量的对应位都是1
,则说明该元素可能存在;否则,该元素肯定不存在;
Demo
Demo
依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<exclusions>
<exclusion>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
</dependency>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.16.0</version>
</dependency>
测试代码
public class BloomFilterDemo {
public static void main(String[] args) {
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
RedissonClient redissonClient = Redisson.create(config);
RBloomFilter<String> bloomFilter = redissonClient.getBloomFilter("bloom-filter");
// 初始化布隆过滤器
bloomFilter.tryInit(200, 0.01);
List<String> elements = new ArrayList<>();
for (int i = 0; i < 200; i++) {
elements.add(UUID.randomUUID().toString());
}
// 向布隆过滤器中添加内容
init(bloomFilter, elements);
// 测试检索效果
test(bloomFilter, elements);
redissonClient.shutdown();
}
public static void init(RBloomFilter<String> bloomFilter, List<String> elements) {
for (int i = 0; i < elements.size(); i++) {
if (i % 2 == 0) {
bloomFilter.add(elements.get(i));
}
}
}
public static void test(RBloomFilter<String> bloomFilter, List<String> elements) {
int counter = 0;
for (String element : elements) {
if (bloomFilter.contains(element)) {
counter++;
}
}
System.out.println(counter);
}
}
简析
初始化
布隆过滤器的初始化方法tryInit
有两个参数:
expectedInsertions
:预期的插入元素数量;falseProbability
:预期的错误率;
布隆过滤器可以明确元素不存在,但对于元素存在的判断是存在错误率的;所以初始化时指定的这两个参数会决定布隆过滤器的向量长度和散列函数的个数;
RedissonBloomFilter.tryInit
方法代码如下:
public boolean tryInit(long expectedInsertions, double falseProbability) {
if (falseProbability > 1) {
throw new IllegalArgumentException("Bloom filter false probability can't be greater than 1");
}
if (falseProbability < 0) {
throw new IllegalArgumentException("Bloom filter false probability can't be negative");
}
// 根据元素个数和错误率计算得到向量长度
size = optimalNumOfBits(expectedInsertions, falseProbability);
if (size == 0) {
throw new IllegalArgumentException("Bloom filter calculated size is " + size);
}
if (size > getMaxSize()) {
throw new IllegalArgumentException("Bloom filter size can't be greater than " + getMaxSize() + ". But calculated size is " + size);
}
// 根据元素个数和向量长度计算得到散列函数的个数
hashIterations = optimalNumOfHashFunctions(expectedInsertions, size);
CommandBatchService executorService = new CommandBatchService(commandExecutor);
executorService.evalReadAsync(configName, codec, RedisCommands.EVAL_VOID,
"local size = redis.call('hget', KEYS[1], 'size');" +
"local hashIterations = redis.call('hget', KEYS[1], 'hashIterations');" +
"assert(size == false and hashIterations == false, 'Bloom filter config has been changed')",
Arrays.<Object>asList(configName), size, hashIterations);
executorService.writeAsync(configName, StringCodec.INSTANCE,
new RedisCommand<Void>("HMSET", new VoidReplayConvertor()), configName,
"size", size, "hashIterations", hashIterations,
"expectedInsertions", expectedInsertions, "falseProbability", BigDecimal.valueOf(falseProbability).toPlainString());
try {
executorService.execute();
} catch (RedisException e) {
if (e.getMessage() == null || !e.getMessage().contains("Bloom filter config has been changed")) {
throw e;
}
readConfig();
return false;
}
return true;
}
private long optimalNumOfBits(long n, double p) {
if (p == 0) {
p = Double.MIN_VALUE;
}
return (long) (-n * Math.log(p) / (Math.log(2) * Math.log(2)));
}
private int optimalNumOfHashFunctions(long n, long m) {
return Math.max(1, (int) Math.round((double) m / n * Math.log(2)));
}
添加元素
向布隆过滤器中添加元素时,先使用一系列散列函数根据元素得到K
个位下标,然后将向量中位下标对应的位设置为1
;
RedissonBloomFilter.add
方法代码如下:
public boolean add(T object) {
// 根据带插入元素得到两个long类型散列值
long[] hashes = hash(object);
while (true) {
if (size == 0) {
readConfig();
}
int hashIterations = this.hashIterations;
long size = this.size;
// 得到位下标数组
// 以两个散列值根据指定策略生成hashIterations个散列值,从而得到位下标
long[] indexes = hash(hashes[0], hashes[1], hashIterations, size);
CommandBatchService executorService = new CommandBatchService(commandExecutor);
addConfigCheck(hashIterations, size, executorService);
RBitSetAsync bs = createBitSet(executorService);
for (int i = 0; i < indexes.length; i++) {
// 将位下标对应位设置1
bs.setAsync(indexes[i]);
}
try {
List<Boolean> result = (List<Boolean>) executorService.execute().getResponses();
for (Boolean val : result.subList(1, result.size()-1)) {
if (!val) {
// 元素添加成功
return true;
}
}
// 元素已存在
return false;
} catch (RedisException e) {
if (e.getMessage() == null || !e.getMessage().contains("Bloom filter config has been changed")) {
throw e;
}
}
}
}
private long[] hash(Object object) {
ByteBuf state = encode(object);
try {
return Hash.hash128(state);
} finally {
state.release();
}
}
private long[] hash(long hash1, long hash2, int iterations, long size) {
long[] indexes = new long[iterations];
long hash = hash1;
for (int i = 0; i < iterations; i++) {
indexes[i] = (hash & Long.MAX_VALUE) % size;
// 散列函数的实现方式
if (i % 2 == 0) {
// 新散列值
hash += hash2;
} else {
// 新散列值
hash += hash1;
}
}
return indexes;
}
hash(long hash1, long hash2, int iterations, long size)
方法中,利用根据元素得到的两个散列值,生成一系列散列函数,然后得到位下标数组;
检索元素
检索布隆过滤器中是否存在指定元素时,先使用一系列散列函数根据元素得到K
个位下标,然后判断向量中位下标对应的位是否为1
,若存在一个不为1
,则该元素不存在;否则认为存在;
RedissonBloomFilter.contains
方法代码如下:
public boolean contains(T object) {
// 根据带插入元素得到两个long类型散列值
long[] hashes = hash(object);
while (true) {
if (size == 0) {
readConfig();
}
int hashIterations = this.hashIterations;
long size = this.size;
// 得到位下标数组
// 以两个散列值根据指定策略生成hashIterations个散列值,从而得到位下标
long[] indexes = hash(hashes[0], hashes[1], hashIterations, size);
CommandBatchService executorService = new CommandBatchService(commandExecutor);
addConfigCheck(hashIterations, size, executorService);
RBitSetAsync bs = createBitSet(executorService);
for (int i = 0; i < indexes.length; i++) {
// 获取位下标对应位的值
bs.getAsync(indexes[i]);
}
try {
List<Boolean> result = (List<Boolean>) executorService.execute().getResponses();
for (Boolean val : result.subList(1, result.size()-1)) {
if (!val) {
// 若存在不为1的位,则认为元素不存在
return false;
}
}
// 都为1,则认为元素存在
return true;
} catch (RedisException e) {
if (e.getMessage() == null || !e.getMessage().contains("Bloom filter config has been changed")) {
throw e;
}
}
}
}