temporal 多个 worker 实例 测试说明
所属分类 temporal
浏览量 759
worker 注册 workflow 和 activity
然后分别 轮询 workflow 和 activity taskqueue 执行
两个 worker 实例
work1 一个 只有 1个 activity
work2 一个 有 2个 activity
模拟 生产环境 升级更新 ,代码实现不一致的问题
运行工作流 会得到 3种结果
1个 activity 时的结果
2个 activity 时的结果
错误结果 , 第二个activity 被 work1 取走执行, 报错,因为他根本没有 注册第二个activty
EVENT_TYPE_WORKFLOW_EXECUTION_FAILED
Activity Type "echo2" is not registered with a worker. Known types are: echo"
注意查看工作流历史事件信息
identity 为具体的执行者
event_id: 13
event_time {
seconds: 1650952955
nanos: 778616000
}
event_type: EVENT_TYPE_ACTIVITY_TASK_FAILED
task_id: 25166370
activity_task_failed_event_attributes {
failure {
message: "Activity Type \"echo2\" is not registered with a worker. Known types are: echo"
source: "JavaSDK"
stack_trace: "
io.temporal.internal.sync.POJOActivityTaskHandler.handle(POJOActivityTaskHandler.java:199)
io.temporal.internal.worker.ActivityWorker$TaskHandlerImpl.handle(ActivityWorker.java:193)
io.temporal.internal.worker.ActivityWorker$TaskHandlerImpl.handle(ActivityWorker.java:151)
io.temporal.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:73)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748)"
application_failure_info {
type: "java.lang.IllegalArgumentException"
}
}
scheduled_event_id: 11
started_event_id: 12
identity: "12666@dugang"
retry_state: RETRY_STATE_MAXIMUM_ATTEMPTS_REACHED
}
可以开启 LocalActivity , activity就不会调度到别的机器上
private final EchoActivities activities = Workflow.newActivityStub(EchoActivities.class,
// 超时设置
ActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofSeconds(TIMEOUT_SECONDS))
// 设置最大重试次数 ,否则会重试直到最大次数
.setRetryOptions(RetryOptions.newBuilder().setMaximumAttempts(1).build()).build());
// LocalActivity , 避免被调度到其他机器上 (被别的worker 拉取到) ,可提升性能
private final EchoActivities activities = Workflow.newLocalActivityStub(EchoActivities.class,
// 超时设置
LocalActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofSeconds(TIMEOUT_SECONDS))
// 设置最大重试次数 ,否则会重试直到最大次数
.setRetryOptions(RetryOptions.newBuilder().setMaximumAttempts(1).build()).build());
工作流历史事件从 17个变为10个
基本可以断定
数据库 和 temporal-server 不会存 工作流注册信息的
一个worker注册的工作流信息 ,只有他自己知道
任务的路由匹配机制 只基于 名字和 taskqueue , 跟 注册的工作流信息无关
com.dyyx.temporal.demo.echo.EchoWorker
com.dyyx.temporal.demo.echo.EchoWorkerOneActivity
完整代码
https://gitee.com/dyyx/hellocode/tree/master/demo/temporal/sdkdemo
上一篇
下一篇
Go检查结构体是否实现了指定接口
temporal worker 线程信息 及 轮询获取工作流关键代码
不使用for和while 实现循环效果
temporal 工作流注册关键代码
PostgreSQL encode 函数
MySQL jdbc 版本问题导致 连接错误