本文共 11378 字,大约阅读时间需要 37 分钟。
redis 延迟队列
Several years ago we had to solve how to enqueue events with an arbitrary delay, e.g. check a status of a payment 3 hours later, or send notification to a client in 45 minutes. At that point of time, we didn't find suitable libraries to accomplish this task, which didn't require us to spend time on configuration and maintenance. After analysing possible solutions we ended up building our own small library in Java language on top of Redis
storage engine. In this article I'll explain capabilities of this library, alternatives and problems we solved during creation process.
几年前,我们不得不解决如何任意延迟事件入队的问题,例如3小时后检查付款状态,或在45分钟内将通知发送给客户。 那时,我们找不到合适的库来完成此任务,因此不需要我们花时间进行配置和维护。 在分析了可能的解决方案之后,我们最终在Redis
存储引擎之上构建了自己的Java语言小型库 。 在本文中,我将解释该库的功能,替代方法以及在创建过程中解决的问题。
So what exactly is delayed queue
capable of? An event, added to delayed queue, is delivered to a handler after arbitrary delay. If event handling is unsuccessful, it would be delivered again later. However, the number of retries is limited. Redis
does not provide any resilient guarantees, thus users should be prepared to deal with this. Regardless, in clustered configuration Redis
shows sufficiently high reliability and we haven't faced any issues during 1.5 years of usage.
那么delayed queue
到底能做什么呢? 添加到延迟队列中的事件在任意延迟后传递到处理程序。 如果事件处理不成功,它将在以后再次传递。 但是,重试的次数是有限的。 Redis
不提供任何弹性保证,因此用户应准备好应对。 无论如何,在集群配置中, Redis
表现出足够高的可靠性,并且在使用1.5年中我们没有遇到任何问题。
eventService.enqueueWithDelayNonBlocking(new DummyEvent("1"), Duration.ofHours(1)).subscribe();
Please beware, that the method returns Mono
, so you have to call one of the methods below to launch execution:
请注意,该方法返回Mono
,因此您必须调用以下方法之一来启动执行:
subscribe(...)
subscribe(...)
block()
block()
More details on this could be found in the documentation of Project Reactor
. Event context could be added in the following manner:
有关此内容的更多详细信息,请参见Project Reactor
的文档。 事件上下文可以通过以下方式添加:
eventService.enqueueWithDelayNonBlocking(new DummyEvent("1"), Duration.ofHours(1), Map.of("key", "value")).subscribe();
eventService.addHandler(DummyEvent.class, e -> Mono.just(true), 1);
the same action, but with event context:
相同的动作,但具有事件上下文:
eventService.addHandler( DummyEvent.class, e -> Mono .subscriberContext() .doOnNext(ctx -> { MapeventContext = ctx.get("eventContext"); log.info("context key {}", eventContext.get("key")); }) .thenReturn(true), 1);
eventService.removeHandler(DummyEvent.class);
You can rely on defaults:
您可以依赖默认值:
import static com.github.fred84.queue.DelayedEventService.delayedEventService;var eventService = delayedEventService().client(redisClient).build();
or configure everything by yourself:
或自行配置所有内容:
import static com.github.fred84.queue.DelayedEventService.delayedEventService;var eventService = delayedEventService() .client(redisClient) .mapper(objectMapper) .handlerScheduler(Schedulers.fromExecutorService(executor)) .schedulingInterval(Duration.ofSeconds(1)) .schedulingBatchSize(SCHEDULING_BATCH_SIZE) .enableScheduling(false) .pollingTimeout(POLLING_TIMEOUT) .eventContextHandler(new DefaultEventContextHandler()) .dataSetPrefix("") .retryAttempts(10) .metrics(new NoopMetrics()) .refreshSubscriptionsInterval(Duration.ofMinutes(5)) .build();
Shutting down the service (and all open connections to Redis
) could be done via eventService.close()
or framework with support for the lifecycle annotation @javax.annotation.PreDestroy
.
可以通过eventService.close()
或支持生命周期注释@javax.annotation.PreDestroy
框架关闭服务(以及与Redis
所有打开的连接)。
Any system is prone to face faults and we have to monitor it. For delayed queue
we should look out for:
任何系统都容易出现故障,我们必须对其进行监视。 对于delayed queue
我们应该注意:
Redis
overall memory usage
Redis
整体内存使用情况
size of list
for every event type ("delayed.queue.ready.for.handling.count" + a tag with event type)
每种事件类型的list
大小(“ delayed.queue.ready.for.handling.count” +具有事件类型的标记)
Here is a brief overview on how delayed queue
evolved over time. In 2018, we launched our small project in . Only 2 engineers were in charge of this project, so adding more components, which required configuration and maintenance was discouraged. The main aim was "to use components managed by AWS unless they were too pricey".
这是有关delayed queue
如何随时间演变的简要概述。 在2018年,我们在启动了一个小型项目。 该项目只有2位工程师负责,因此不鼓励添加更多组件,因为这些组件需要进行配置和维护。 主要目标是“使用由AWS管理的组件,除非它们的价格太高”。
The first two were rejected due to maintenance requirements. The last one (SQS) was not considered as the maximum delay could not be bigger than 15 minutes.
由于维护要求,前两个被拒绝了。 不考虑最后一个(SQS),因为最大延迟不能大于15分钟。
Unfortunately, we missed some libraries, which could have solved our needs and were discovered much-much later:
不幸的是,我们错过了一些库,这些库本可以解决我们的需求,但后来却被发现得多:
The first one uses the same technology stack (Java and Redis), the latter is built on top of .
第一个使用相同的技术堆栈(Java和Redis),后者建立在之上。
Initially, we already had a backup mechanism with polling a relational database once a day. After reading several articles on organising simple delayed queues, we decided to build our solution around Redis
, not RDBMS. The structure inside Redis
is as following:
最初,我们已经有了一种备份机制,每天轮询一次关系数据库。 阅读了几篇有关组织简单延迟队列的文章之后,我们决定围绕Redis
而非RDBMS构建我们的解决方案。 Redis
内部的结构如下:
an event is added to , where weight
serves as future execution time
将事件添加到已 ,其中weight
用作将来的执行时间
once weight
becomes lower that now
, the event is moved from sorted_set
to list
(which could be used as a queue
with push
and pop
methods)
一旦weight
变得比now
低,事件就会从sorted_set
移到list
(可以与push
和pop
方法一起用作queue
)
First version of the dispatcher, responsible for moving events from sorted set
to list
was: (simplified code is shown here and after):
负责将事件从sorted set
移动到list
的调度程序的第一个版本是:(此处和之后显示了简化的代码):
var events = redis.zrangebyscore("delayed_events", Range.create(-1, System.currentTimeMillis()), 100);events.forEach(key -> { var payload = extractPayload(key); var listName = extractType(key); redis.lpush(listName, payload); redis.zrem("delayed_events", key);});
Event handlers were built on top of , which executed the following command under the hood:
事件处理程序是建立在之上的, 在后台执行了以下命令:
redis.brpop(listName)
The first problems came soon.
第一个问题很快出现。
If an error appeared in the process of adding an element to the list
(e.g. a connection timeout after the element was added), the dispatcher retried to do this operation, which resulted in multiple copies of an event being added to the list
. Luckily, Redis
supports transactions, so we wrapped the 2 commands above into transaction.
如果在将元素添加到list
的过程中出现错误(例如,添加元素之后的连接超时),则调度程序将重试以执行此操作,从而导致将事件的多个副本添加到list
。 幸运的是, Redis
支持事务,因此我们将上面的2个命令包装到了事务中。
events.forEach(key -> { ... redis.multi(); redis.zrem("delayed_events", key); redis.lpush(listName, payload); redis.exec();});
On the other side of the list
lurked another problem. If a handler failed, an event would be lost forever. As a solution, we chose to reschedule an event to a later period of time (unless a maximum number of attempts had been reached) and delete it only after successful processing by the handler.
在list
的另一端则存在另一个问题。 如果处理程序失败,则事件将永远丢失。 作为解决方案,我们选择将事件重新安排到稍后的时间段(除非已达到最大尝试次数),然后仅在处理程序成功处理后才将其删除。
events.forEach(key -> { ... redis.multi(); redis.zadd("delayed_events", nextAttempt(key)) redis.zrem("delayed_events", key); redis.lpush(listName, payload); redis.exec();});
As I mentioned before, we already had the fallback mechanism, which polled RDBMS and re-added all "pending" entities to delayed queue
. At that time, key
in sorted set
was structured as metadata;payload
, with a mutable metadata (e.g. an attempt number, a log context, ...) and an immutable payload. So again, it resulted in multiple copies of an event being added to the list
. To solve this problem, we moved metadata;payload
to new structure Redis hset
and kept only event type + event id
as a key in sorted set
. Consequently, event enqueueing transformed from:
如前所述,我们已经有了后备机制,该机制轮询RDBMS并将所有“待处理”实体重新添加到delayed queue
。 当时, sorted set
key
被构造为metadata;payload
,具有可变的元数据(例如,尝试次数,日志上下文等)和不可变的有效载荷。 同样,它导致将事件的多个副本添加到list
。 为了解决此问题,我们将metadata;payload
移至新结构Redis hset
并且仅将event type + event id
保留为sorted set
的键。 因此,事件排队从以下方式转换:
var envelope = metadata + SEPARATOR + payload;redis.zadd(envelope, scheduledAt);
into
进入
var envelope = metadata + SEPARATOR + payload;var key = eventType + SEPARATOR + eventId;redis.multi();redis.zadd(key, scheduledAt);redis.hset("metadata", key, envelope)redis.exec();
All our handlers were idempotent, so we didn't pay much attention to event duplicates. However, there was still room for improvement. The dispatchers were running on all our application instances and from time to time were launched simultaneously. This again resulted in duplicate events in list
. The solution was the trivial lock with small TTL:
我们所有的处理程序都是幂等的,因此我们对事件重复没有太多注意。 但是,仍有改进的空间。 调度程序正在我们所有的应用程序实例上运行,并且有时会同时启动。 这再次导致list
中有重复的事件。 解决方案是使用小TTL的琐碎锁:
redis.set(lockKey, "value", ex(lockTimeout.toMillis() * 1000).nx());
When the necessity for using delayed queue
in a project without Spring
emerged, we moved it to a standalone project. To accomplish it, we were forced to remove the following dependencies:
当出现在没有Spring
的项目中使用delayed queue
的必要性时,我们将其移至独立项目。 为此,我们被迫删除以下依赖项:
The first one was easily replaced with the Lettuce
Redis driver. The second led to much more changes. At that point of time I had already acquired some experience working with reactive streams in general and with Project Reactor
in particular. So we chose "hot stream" as a source for our handlers. To achieve the uniform distribution of events among handlers on different application instances we had to implement our own :
第一个很容易用Lettuce
Redis驱动程序替换。 第二个导致了更多的变化。 到那时,我已经获得了一些使用React流,尤其是使用Project Reactor
的经验。 因此,我们选择“热流”作为处理程序的源。 为了在不同应用程序实例上的处理程序之间实现事件的均匀分布,我们必须实现自己的 :
redis .reactive() .brpop(timeout, queue) .map(e -> deserialize(e)) .subscribe(new InnerSubscriber<>(handler, ... params ..))
and
和
class InnerSubscriberextends BaseSubscriber > { @Override protected void hookOnNext(@NotNull EventEnvelope envelope) { Mono promise = handler.apply(envelope.getPayload()); promise.subscribe(r -> request(1)); }}
As a result, we created a library, which delivers events to registered handlers (unlike Netflix dyno queue
, where you have to poll a storage for events).
结果,我们创建了一个库,该库将事件传递给已注册的处理程序(与Netflix dyno queue
不同,您必须在其中轮询事件存储)。
add Kotlin DSL. Currently, our new projects are created in Kotlin language, so it would be handy to use suspend fun
instead of
添加Kotlin DSL。 当前,我们的新项目是用Kotlin语言创建的,因此使用suspend fun
代替
direct interaction with
与...直接互动
Project Reactor
API
Project Reactor
API
replace Redis transactions
with LUA script
用LUA script
替换Redis transactions
翻译自:
redis 延迟队列
转载地址:http://ymbwd.baihongyu.com/