type
Post
status
Published
date
Sep 23, 2022
slug
dapr-actor-1
summary
介绍 Dapr 的 Actor 模块,以及如何使用它来完成一些有意思的需求
tags
Dapr
Actor Pattern
Java
category
技术分享
icon
password
Property
Sep 27, 2022 09:59 AM
Dapr 是什么
Dapr 是分布式应用程序运行时(Distribution APplication Runtime)的缩写,是一个可移植的、事件驱动的运行时,它使得开发人员能够基于此运行时轻松构建出弹性的、无状态和有状态的应用程序,它可运行在云平台或边缘计算中,同时支持多种编程语言和开发框架。
Dapr 本身是开源的(),目前也是 CNCF 孵化期的项目,采用 Golang 语言编写。在云原生技术越来越成熟和被重视的现在,Dapr 也是颇有前景的项目,值得技术人员投入和研究。
详细的功能及搭建方式,参看官方文档。
Actor 是什么
Actor 计算模型是一个上了年纪的通用并发编程模型,是由 Carl Hewitt 在 1973 年定义(论文地址), 然后由 Erlang OTP 发扬光大,其后大大小小的框架或者编程语言带来了多多少少与纯 Actor 模型有差异的实现,Actor 模型就慢慢的进入了麻瓜开发者们(不能手撸 Actor 模型的开发者)的世界,现在各位麻瓜们偶尔可见其施展魔法留下的痕迹并习以为常了。
那么 Actor 模型是用来干嘛的?就原始定义来说,Actor是一个并发处理的数学模型,用来解决并发计算控制中的各种问题。Actor 模式声明 Actors 为并发计算的通用原语,换句话说,我们可以将代码写入独立单元 ( 称为Actor) ,该单元接收消息并一次处理消息,而不进行任何类型的并发控制或线程处理,当代码处理一条消息时,它可以向其他 Actor 发送一条或多条消息,或者创建新的 Actors。 底层运行时将管理每个 Actor 的运行方式、时机和位置,并在 Actors 之间传递消息,大量 Actors 可以同时执行,Actors 彼此相互独立执行。
Dapr 包含专门实现 virtual actors 模式 的运行时,通过 Dapr 的实现,我们可以根据 Actors 模型编写 Dapr Actor,而 Dapr 利用底层平台提供可扩展性和可靠性保证。
Dapr Actor 的使用和实现
适用场景
Actor 设计模式可以很好试用于一些分布式系统问题和场景,但我们首先应该考虑的是模式的约束。 一般来说,在下列场景下可以考虑 Actor 模式来模拟你的问题或场景:
- 您的问题空间涉及大量(数千或更多) 的独立和孤立的微小计算单元和逻辑处理单元
- 您想要处理单线程对象,这些对象不需要外部组件的大量交互,例如在一组 Actors 之间查询状态
- 您的 Actor 实例不会通过发出 I/O 操作来阻塞调用方
Actors 生命周期
Dapr Actors 是虚拟的,它们不需要显式创建或销毁,一切都交给 Dapr Actor 运行时来管理
- Dapr Actors 运行时在第一次接收到该 Actor ID 的请求时自动激活 Actor
- 如果 Actor 在一段时间内未被使用,那么 Dapr Actors 运行时将回收内存对象
- 重新启动该 Actor ID 的 Actor 时,它还将持有回收前的一切原有数据(虽然不是同一个 Actor 实例了)
- Dapr 运行时用来检查 Actor 是否需要回收时,Actor 空闲超时时间和运行时的扫描间隔是可以配置的
Dapr Actor 提供的功能
Dapr Actor 提供了一下的功能来实现 Virtual Actor 模式
- 调用特定的 Actor 方法
POST/GET/PUT/DELETE http://localhost:3500/v1.0/actors/<actorType>/<actorId>/method/<method>
- Actor Timers/Reminders 来注册一个定时执行的方法
- Actor 状态管理,用来保存 Actor 运行时的各种状态数据
初体验
下面我们就用 Dapr 的 Java SDK 来小小体验一下 Dapr Actor,Dapr Java SDK 基于 Reactor 项目提供了异步无阻塞的响应式编程模型,使用之前需要大家提前学习和掌握这个框架,它也是 Spring Webflux 的核心框架哦,学就完了。
正式开始,按照步骤来
- 确保正确安装 Dapr,参看
root@vm1:~/dapr-actor-demo# dapr -v CLI version: 1.8.1 Runtime version: 1.8.4
- 编写一个提供服务的接口
package io.github.wynn5a; import io.dapr.actors.ActorMethod; import io.dapr.actors.ActorType; import reactor.core.publisher.Mono; @ActorType(name = "some-actor") public interface DemoActor { //创建一个 Reminder void registerReminder(); //一个 Actor 方法 @ActorMethod(name = "say_something") String say(String something); //一个 Actor 方法,带有异步返回值 @ActorMethod(returns = Integer.class) Mono<Integer> incrementAndGet(int delta); }
- 实现这些服务
package io.github.wynn5a; import io.dapr.actors.ActorId; import io.dapr.actors.runtime.AbstractActor; import io.dapr.actors.runtime.ActorRuntimeContext; import io.dapr.actors.runtime.Remindable; import io.dapr.utils.TypeRef; import reactor.core.publisher.Mono; import java.time.Duration; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.Random; public class DemoActorImpl extends AbstractActor implements DemoActor, Remindable<Integer> { public static final String COUNTER_KEY = "counter"; private final DateTimeFormatter formatter = DateTimeFormatter.ISO_LOCAL_DATE_TIME; private final Random random = new Random(); /** * Instantiates a new Actor. * * @param runtimeContext Context for the runtime. * @param id Actor identifier. */ public DemoActorImpl(ActorRuntimeContext runtimeContext, ActorId id) { super(runtimeContext, id); } /** * create reminder */ @Override public void registerReminder() { super.registerReminder("reminder-1", random.nextInt(1000), Duration.ofSeconds(5), Duration.ofSeconds(2)).block(); } @Override public String say(String something) { System.out.println("Get from client: " + something); return LocalDateTime.now().format(formatter) + " --> " + something; } /** * use dapr state manager to update and save counter * * @param delta amount to be added to counter * @return new counter value */ @Override public Mono<Integer> incrementAndGet(int delta) { return getActorStateManager().contains(COUNTER_KEY) .flatMap(exists -> exists ? super.getActorStateManager().get(COUNTER_KEY, int.class) : Mono.just(0)) .map(c -> c + delta).flatMap(c -> super.getActorStateManager().set(COUNTER_KEY, c).thenReturn(c)); } /** * return type of reminder to consume * @return type reference */ @Override public TypeRef<Integer> getStateType() { return TypeRef.INT; } /** * do something to consume reminders * * @param reminderName The name of reminder provided during registration. * @param state The user state provided during registration. * @param dueTime The invocation due time provided during registration. * @param period The invocation period provided during registration. * @return nothing */ @Override public Mono<Void> receiveReminder(String reminderName, Integer state, Duration dueTime, Duration period) { return Mono.fromRunnable(() -> { String message = String.format("Server received reminder from actor id:%s and name:%s with state: %d @ %s", this.getId(), reminderName, state, LocalDateTime.now().format(formatter)); // Handles the request by printing message. System.out.println(message); }); } }
- 启动应用,并设置 Actor
package io.github.wynn5a; import io.dapr.actors.runtime.ActorRuntime; import io.dapr.actors.runtime.ActorRuntimeConfig; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import java.time.Duration; /** * How to run: <br/> * dapr run --components-path ./components --app-id demo-actor-server --app-port 8080 \ * -- java -jar target/dapr-actor-demo-1.0.0.jar */ @SpringBootApplication public class DemoActorApplication { public static void main(String[] args) throws Exception { ActorRuntimeConfig config = ActorRuntime.getInstance().getConfig(); // Idle timeout until actor instance is deactivated. config.setActorIdleTimeout(Duration.ofSeconds(30)); // How often actor instances are scanned for deactivation and balance. config.setActorScanInterval(Duration.ofSeconds(10)); // How long to wait until for draining an ongoing API call for an actor instance. config.setDrainOngoingCallTimeout(Duration.ofSeconds(10)); // Determines whether to drain API calls for actors instances being balanced. config.setDrainBalancedActors(true); // Register the Actor class. ActorRuntime.getInstance().registerActor(DemoActorImpl.class); SpringApplication.run(DemoActorApplication.class); } }
- 编写一个 Client 去生成 Actor 并且调用 Actor 服务中的方法。测试流程:启动两个 Actor 来模拟并发,他们会分别注册 Reminder,分别调用
incrementAndGet
和say
两个 Actor 方法
package io.github.wynn5a; import io.dapr.actors.ActorId; import io.dapr.actors.client.ActorClient; import io.dapr.actors.client.ActorProxyBuilder; import java.util.ArrayList; import java.util.List; /** * How to run client: <br/> * dapr run --components-path ./components --app-id actor-client \ * -- mvn compile exec:java -Dexec.mainClass="io.github.wynn5a.DemoActorClient" */ public class DemoActorClient { private static final int NUM_ACTORS = 2; public static void main(String[] args) throws InterruptedException { try (ActorClient client = new ActorClient()) { ActorProxyBuilder<DemoActor> builder = new ActorProxyBuilder<>(DemoActor.class, client); List<Thread> threads = new ArrayList<>(NUM_ACTORS); // Creates multiple actors. for (int i = 0; i < NUM_ACTORS; i++) { ActorId actorId = ActorId.createRandom(); DemoActor actor = builder.build(actorId); // Start a thread per actor. Thread thread = new Thread(() -> callActorForever(actorId.toString(), actor)); thread.start(); threads.add(thread); } // Waits for threads to finish. for (Thread thread : threads) { thread.join(); } } System.out.println("Done."); } /** * Makes multiple method calls into actor until interrupted. * * @param actorId Actor's identifier. * @param actor Actor to be invoked. */ private static void callActorForever(String actorId, DemoActor actor) { // First, call register reminder. actor.registerReminder(); // Now, we run until thread is interrupted. while (!Thread.currentThread().isInterrupted()) { // Invoke actor method to increment counter by 1, then build message. actor.incrementAndGet(1).map(i -> String.format("Actor %s said message #%d", actorId, i)) .map(actor::say) .subscribe(s -> System.out.printf("Actor %s got a reply: %s%n", actorId, s)); try { // Waits for up to 2 second. Thread.sleep((long) (2000 * Math.random())); } catch (InterruptedException e) { // We have been interrupted, so we set the interrupted flag to exit gracefully. Thread.currentThread().interrupt(); } } } }
- 需要注册一个 Dapr component 来存储状态,这个需要支持事务的存储后端来做,我们就用 Dapr 默认的 Redis
apiVersion: dapr.io/v1alpha1 kind: Component metadata: name: statestore spec: type: state.redis version: v1 metadata: - name: redisHost value: localhost:6379 - name: redisPassword value: "" - name: actorStateStore value: "true"
- 然后用 maven 把上面的东西放到一块,启动命令如下
- 启动 Service
dapr run --components-path ./components --app-id demo-actor-server --app-port 8080 -- java -jar target/dapr-actor-demo-1.0.0.jar
dapr run --components-path ./components --app-id actor-client -- mvn compile exec:java -Dexec.mainClass="io.github.wynn5a.DemoActorClient"
总结
上面的代码,我们简单的体验了 Dapr Actor 带来的方法调用和 Reminders 的功能,算是一个体验性质的玩具,还有很多进阶的玩法,需要我们下次去探索
此次示例,完整代码见下方连接