首页  

temporal worker 线程信息 及 轮询获取工作流关键代码     所属分类 temporal 浏览量 822
com.dyyx.temporal.demo.echo.EchoWorker at localhost:61729	
	Thread [main] (Running)	
	Daemon Thread [grpc-connection-manager-thread-0] (Running)	
	Daemon Thread [grpc-nio-worker-ELG-1-1] (Running)	
	Daemon Thread [grpc-nio-worker-ELG-1-2] (Running)	
	Daemon Thread [grpc-nio-worker-ELG-1-3] (Running)	
	Daemon Thread [grpc-nio-worker-ELG-1-4] (Running)	
	Daemon Thread [grpc-nio-worker-ELG-1-5] (Running)	
	Thread [Workflow Poller taskQueue="echo_taskq3", namespace="default": 1] (Running)	
	Thread [Workflow Poller taskQueue="echo_taskq3", namespace="default": 2] (Running)	
	Daemon Thread [grpc-nio-worker-ELG-1-6] (Running)	
	Daemon Thread [grpc-nio-worker-ELG-1-7] (Running)	
	Daemon Thread [grpc-nio-worker-ELG-1-8] (Running)	
	Thread [Local Activity Poller taskQueue="echo_taskq3", namespace="default": 1] (Running)	
	Thread [Activity Poller taskQueue="echo_taskq3", namespace="default": 1] (Running)	
	Thread [Activity Poller taskQueue="echo_taskq3", namespace="default": 2] (Running)	
	Thread [Activity Poller taskQueue="echo_taskq3", namespace="default": 3] (Running)	
	Thread [Activity Poller taskQueue="echo_taskq3", namespace="default": 4] (Running)	
	Thread [Activity Poller taskQueue="echo_taskq3", namespace="default": 5] (Running)	
	Thread [Host Local Workflow Poller: 1] (Running)	
	Thread [Host Local Workflow Poller: 2] (Running)	
	Thread [Host Local Workflow Poller: 3] (Running)	
	Thread [Host Local Workflow Poller: 4] (Running)	
	Thread [Host Local Workflow Poller: 5] (Running)	
	Daemon Thread [Thread-14] (Running)	
	Daemon Thread [grpc-default-executor-5] (Running)	

	
	
Thread [Workflow Poller taskQueue="echo_taskq3", namespace="default": 1] (Running)	
Thread [Activity Poller taskQueue="echo_taskq3", namespace="default": 1] (Running)	
Thread [Host Local Workflow Poller: 1] (Running)

注意线程名字
	






io.temporal.internal.worker.WorkflowPollTask.poll()

  @Override
  public PollWorkflowTaskQueueResponse poll() {
    PollWorkflowTaskQueueRequest pollRequest =
        PollWorkflowTaskQueueRequest.newBuilder()
            .setNamespace(namespace)
            .setBinaryChecksum(binaryChecksum)
            .setIdentity(identity)
            .setTaskQueue(TaskQueue.newBuilder().setName(taskQueue).build())
            .build();

    PollWorkflowTaskQueueResponse result =
          service
              .blockingStub()
              .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope)
              .pollWorkflowTaskQueue(pollRequest);







io.temporal.internal.worker.Poller<T>

ThreadPoolExecutor     pollExecutor =
        new ThreadPoolExecutor(
            pollerOptions.getPollThreadCount(),
            pollerOptions.getPollThreadCount(),
            1,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(pollerOptions.getPollThreadCount()));
    pollExecutor.setThreadFactory(
        new ExecutorThreadFactory(
            pollerOptions.getPollThreadNamePrefix(), pollerOptions.getUncaughtExceptionHandler()));
    

for (int i = 0; i < pollerOptions.getPollThreadCount(); i++) {
    pollExecutor.execute(new PollLoopTask(new PollExecutionTask()));
    metricsScope.counter(MetricsType.POLLER_START_COUNTER).inc(1);
}
    
    
PollLoopTask  
PollExecutionTask

注意 PollLoopTask  run方法没有使用循环  
而是在  finally 里 重新提交了任务 ,实现了 循环执行的效果
// Resubmit itself back to pollExecutor
pollExecutor.execute(this);
          
    
io.temporal.internal.worker.WorkflowWorker

private static final String POLL_THREAD_NAME_PREFIX = "Workflow Poller taskQueue=";

PollerOptions pollerOptions = options.getPollerOptions();
    if (pollerOptions.getPollThreadNamePrefix() == null) {
      pollerOptions =
          PollerOptions.newBuilder(pollerOptions)
              .setPollThreadNamePrefix(
                  POLL_THREAD_NAME_PREFIX
                      + "\""
                      + taskQueue
                      + "\", namespace=\""
                      + namespace
                      + "\"")
              .build();
    }
    
    

io.temporal.internal.worker.ActivityWorker
  private static final String POLL_THREAD_NAME_PREFIX = "Activity Poller taskQueue=";



io.temporal.internal.worker.WorkflowPollTask
io.temporal.internal.worker.ActivityPollTask

Thread [Workflow Poller taskQueue="echo_taskq3", namespace="default": 2] (Suspended (breakpoint at line 77 in WorkflowPollTask))	
	WorkflowPollTask.poll() line: 77	
	WorkflowPollTask.poll() line: 37	
	Poller$PollExecutionTask.run() line: 270	
	Poller$PollLoopTask.run() line: 235	
	ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) line: 1149	
	ThreadPoolExecutor$Worker.run() line: 624 [local variables unavailable]	
	Thread.run() line: 748 [local variables unavailable]	



设置线程数 Activity  和 Workflow PollThreadCount
// 方便调试  线程数设置为 1 
WorkerOptions workerOptions = WorkerOptions.newBuilder()
				.setActivityPollThreadCount(1)
				.setWorkflowPollThreadCount(1)
				.build();
				
		
WorkerFactory factory = WorkerFactory.newInstance(client);
Worker worker = factory.newWorker(TASK_QUEUE,workerOptions);
		
		
io.temporal.internal.worker.WorkflowPollTask.poll()
设置条件断点 
taskQueue.equals("echo_taskq3")


        


上一篇     下一篇
temporal 重要表说明

Temporal Clusters

Go检查结构体是否实现了指定接口

不使用for和while 实现循环效果

temporal 多个 worker 实例 测试说明

temporal 工作流注册关键代码