temporal worker 线程信息 及 轮询获取工作流关键代码
所属分类 temporal
浏览量 807
com.dyyx.temporal.demo.echo.EchoWorker at localhost:61729
Thread [main] (Running)
Daemon Thread [grpc-connection-manager-thread-0] (Running)
Daemon Thread [grpc-nio-worker-ELG-1-1] (Running)
Daemon Thread [grpc-nio-worker-ELG-1-2] (Running)
Daemon Thread [grpc-nio-worker-ELG-1-3] (Running)
Daemon Thread [grpc-nio-worker-ELG-1-4] (Running)
Daemon Thread [grpc-nio-worker-ELG-1-5] (Running)
Thread [Workflow Poller taskQueue="echo_taskq3", namespace="default": 1] (Running)
Thread [Workflow Poller taskQueue="echo_taskq3", namespace="default": 2] (Running)
Daemon Thread [grpc-nio-worker-ELG-1-6] (Running)
Daemon Thread [grpc-nio-worker-ELG-1-7] (Running)
Daemon Thread [grpc-nio-worker-ELG-1-8] (Running)
Thread [Local Activity Poller taskQueue="echo_taskq3", namespace="default": 1] (Running)
Thread [Activity Poller taskQueue="echo_taskq3", namespace="default": 1] (Running)
Thread [Activity Poller taskQueue="echo_taskq3", namespace="default": 2] (Running)
Thread [Activity Poller taskQueue="echo_taskq3", namespace="default": 3] (Running)
Thread [Activity Poller taskQueue="echo_taskq3", namespace="default": 4] (Running)
Thread [Activity Poller taskQueue="echo_taskq3", namespace="default": 5] (Running)
Thread [Host Local Workflow Poller: 1] (Running)
Thread [Host Local Workflow Poller: 2] (Running)
Thread [Host Local Workflow Poller: 3] (Running)
Thread [Host Local Workflow Poller: 4] (Running)
Thread [Host Local Workflow Poller: 5] (Running)
Daemon Thread [Thread-14] (Running)
Daemon Thread [grpc-default-executor-5] (Running)
Thread [Workflow Poller taskQueue="echo_taskq3", namespace="default": 1] (Running)
Thread [Activity Poller taskQueue="echo_taskq3", namespace="default": 1] (Running)
Thread [Host Local Workflow Poller: 1] (Running)
注意线程名字
io.temporal.internal.worker.WorkflowPollTask.poll()
@Override
public PollWorkflowTaskQueueResponse poll() {
PollWorkflowTaskQueueRequest pollRequest =
PollWorkflowTaskQueueRequest.newBuilder()
.setNamespace(namespace)
.setBinaryChecksum(binaryChecksum)
.setIdentity(identity)
.setTaskQueue(TaskQueue.newBuilder().setName(taskQueue).build())
.build();
PollWorkflowTaskQueueResponse result =
service
.blockingStub()
.withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope)
.pollWorkflowTaskQueue(pollRequest);
io.temporal.internal.worker.Poller
ThreadPoolExecutor pollExecutor =
new ThreadPoolExecutor(
pollerOptions.getPollThreadCount(),
pollerOptions.getPollThreadCount(),
1,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(pollerOptions.getPollThreadCount()));
pollExecutor.setThreadFactory(
new ExecutorThreadFactory(
pollerOptions.getPollThreadNamePrefix(), pollerOptions.getUncaughtExceptionHandler()));
for (int i = 0; i < pollerOptions.getPollThreadCount(); i++) {
pollExecutor.execute(new PollLoopTask(new PollExecutionTask()));
metricsScope.counter(MetricsType.POLLER_START_COUNTER).inc(1);
}
PollLoopTask
PollExecutionTask
注意 PollLoopTask run方法没有使用循环
而是在 finally 里 重新提交了任务 ,实现了 循环执行的效果
// Resubmit itself back to pollExecutor
pollExecutor.execute(this);
io.temporal.internal.worker.WorkflowWorker
private static final String POLL_THREAD_NAME_PREFIX = "Workflow Poller taskQueue=";
PollerOptions pollerOptions = options.getPollerOptions();
if (pollerOptions.getPollThreadNamePrefix() == null) {
pollerOptions =
PollerOptions.newBuilder(pollerOptions)
.setPollThreadNamePrefix(
POLL_THREAD_NAME_PREFIX
+ "\""
+ taskQueue
+ "\", namespace=\""
+ namespace
+ "\"")
.build();
}
io.temporal.internal.worker.ActivityWorker
private static final String POLL_THREAD_NAME_PREFIX = "Activity Poller taskQueue=";
io.temporal.internal.worker.WorkflowPollTask
io.temporal.internal.worker.ActivityPollTask
Thread [Workflow Poller taskQueue="echo_taskq3", namespace="default": 2] (Suspended (breakpoint at line 77 in WorkflowPollTask))
WorkflowPollTask.poll() line: 77
WorkflowPollTask.poll() line: 37
Poller$PollExecutionTask.run() line: 270
Poller$PollLoopTask.run() line: 235
ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) line: 1149
ThreadPoolExecutor$Worker.run() line: 624 [local variables unavailable]
Thread.run() line: 748 [local variables unavailable]
设置线程数 Activity 和 Workflow PollThreadCount
// 方便调试 线程数设置为 1
WorkerOptions workerOptions = WorkerOptions.newBuilder()
.setActivityPollThreadCount(1)
.setWorkflowPollThreadCount(1)
.build();
WorkerFactory factory = WorkerFactory.newInstance(client);
Worker worker = factory.newWorker(TASK_QUEUE,workerOptions);
io.temporal.internal.worker.WorkflowPollTask.poll()
设置条件断点
taskQueue.equals("echo_taskq3")
上一篇
下一篇
temporal 重要表说明
Temporal Clusters
Go检查结构体是否实现了指定接口
不使用for和while 实现循环效果
temporal 多个 worker 实例 测试说明
temporal 工作流注册关键代码