首页  

temporal 获取集群信息 关键代码     所属分类 temporal 浏览量 643
grpc接口 
getClusterInfo


public static GetClusterInfoResponse getClusterInfo(String serviceAddress) {
		WorkflowServiceGrpc.WorkflowServiceBlockingStub workflowServiceBlockingStub = buildWorkflowServiceBlockingStub(
				serviceAddress);

		GetClusterInfoRequest getClusterInfoRequest = GetClusterInfoRequest.newBuilder().build();

		GetClusterInfoResponse getClusterInfoResponse = workflowServiceBlockingStub
				.getClusterInfo(getClusterInfoRequest);

		return getClusterInfoResponse;
}
	
	

temporal/service/frontend/dcRedirectionHandler.go
// GetClusterInfo API call
func (handler *DCRedirectionHandlerImpl) GetClusterInfo(
	ctx context.Context,
	request *workflowservice.GetClusterInfoRequest,
) (*workflowservice.GetClusterInfoResponse, error) {
	return handler.frontendHandler.GetClusterInfo(ctx, request)
}


temporal/service/frontend/workflowHandler.go

func (wh *WorkflowHandler) GetClusterInfo(_ context.Context, _ *workflowservice.GetClusterInfoRequest) (_ *workflowservice.GetClusterInfoResponse, retError error) {
	defer log.CapturePanic(wh.logger, &retError)

	metadata, err := wh.clusterMetadataManager.GetCurrentClusterMetadata()
	if err != nil {
		return nil, err
	}

	return &workflowservice.GetClusterInfoResponse{
		SupportedClients:  headers.SupportedClients,
		ServerVersion:     headers.ServerVersion,
		ClusterId:         metadata.ClusterId,
		VersionInfo:       metadata.VersionInfo,
		ClusterName:       metadata.ClusterName,
		HistoryShardCount: metadata.HistoryShardCount,
		PersistenceStore:  wh.persistenceExecutionManager.GetName(),
		VisibilityStore:   wh.visibilityMrg.GetName(),
	}, nil
}


temporal/common/persistence/persistenceMetricClients.go

func (c *clusterMetadataPersistenceClient) GetCurrentClusterMetadata() (*GetClusterMetadataResponse, error) {
	//This is a wrapper of GetClusterMetadata API, use the same scope here
	c.metricClient.IncCounter(metrics.PersistenceGetClusterMetadataScope, metrics.PersistenceRequests)

	sw := c.metricClient.StartTimer(metrics.PersistenceGetClusterMetadataScope, metrics.PersistenceLatency)
	result, err := c.persistence.GetCurrentClusterMetadata()
	sw.Stop()

	if err != nil {
		c.metricClient.IncCounter(metrics.PersistenceGetClusterMetadataScope, metrics.PersistenceFailures)
	}

	return result, err
}

temporal/common/persistence/persistenceRateLimitedClients.go
func (c *clusterMetadataRateLimitedPersistenceClient) GetCurrentClusterMetadata() (*GetClusterMetadataResponse, error) {
	if ok := c.rateLimiter.Allow(); !ok {
		return nil, ErrPersistenceLimitExceeded
	}
	return c.persistence.GetCurrentClusterMetadata()
}

temporal/common/persistence/clusterMetadataStore.go
func (m *clusterMetadataManagerImpl) GetCurrentClusterMetadata() (*GetClusterMetadataResponse, error) {
	resp, err := m.persistence.GetClusterMetadata(&InternalGetClusterMetadataRequest{ClusterName: m.currentClusterName})
	if err != nil {
		return nil, err
	}

	mcm, err := m.serializer.DeserializeClusterMetadata(resp.ClusterMetadata)
	if err != nil {
		return nil, err
	}
	return &GetClusterMetadataResponse{ClusterMetadata: *mcm, Version: resp.Version}, nil
}

temporal/common/persistence/sql/cluster_metadata.go
func (s *sqlClusterMetadataManager) GetClusterMetadata(
	request *p.InternalGetClusterMetadataRequest,
) (*p.InternalGetClusterMetadataResponse, error) {
	ctx, cancel := newExecutionContext()
	defer cancel()
	row, err := s.Db.GetClusterMetadata(ctx, &sqlplugin.ClusterMetadataFilter{ClusterName: request.ClusterName})

	if err != nil {
		return nil, convertCommonErrors("GetClusterMetadata", err)
	}

	return &p.InternalGetClusterMetadataResponse{
		ClusterMetadata: p.NewDataBlob(row.Data, row.DataEncoding),
		Version:         row.Version,
	}, nil
}


temporal/common/persistence/sql/sqlplugin/postgresql/cluster_metadata.go
func (pdb *db) GetClusterMetadata(
	ctx context.Context,
	filter *sqlplugin.ClusterMetadataFilter,
) (*sqlplugin.ClusterMetadataRow, error) {
	var row sqlplugin.ClusterMetadataRow
	err := pdb.conn.GetContext(ctx,
		&row,
		getClusterMetadataQry,
		constMetadataPartition,
		filter.ClusterName,
	)
	if err != nil {
		return nil, err
	}
	return &row, err
}

/Users/dugang/go/pkg/mod/github.com/jmoiron/sqlx@v1.3.4/sqlx_context.go

func (db *DB) GetContext(ctx context.Context, dest interface{}, query string, args ...interface{}) error {
	return GetContext(ctx, db, dest, query, args...)
}


getClusterMetadataBase      = `SELECT data, data_encoding, version FROM cluster_metadata_info `
getClusterMetadataQry       = getClusterMetadataBase + `WHERE metadata_partition = $1 AND cluster_name = $2`


SELECT data, data_encoding, version FROM cluster_metadata_info WHERE metadata_partition = $1 AND cluster_name = $2
metadata_partition = 0
cluster_name = active


temporal/common/persistence/sql/sqlplugin/postgresql/db.go
// db represents a logical connection to mysql database
type db struct {
	dbKind sqlplugin.DbKind
	dbName string

	db        *sqlx.DB
	tx        *sqlx.Tx
	conn      sqlplugin.Conn
	converter DataConverter
}



go.temporal.io/server/common/persistence/sql/sqlplugin/postgresql.(*db).GetClusterMetadata (/Users/dugang/dugang/work/temporal/temporal/common/persistence/sql/sqlplugin/postgresql/cluster_metadata.go:140) go.temporal.io/server/common/persistence/sql.(*DbConn).GetClusterMetadata (Unknown Source:1) go.temporal.io/server/common/persistence/sql.(*sqlClusterMetadataManager).GetClusterMetadata (/Users/dugang/dugang/work/temporal/temporal/common/persistence/sql/cluster_metadata.go:95) go.temporal.io/server/common/persistence.(*clusterMetadataManagerImpl).GetCurrentClusterMetadata (/Users/dugang/dugang/work/temporal/temporal/common/persistence/clusterMetadataStore.go:133) go.temporal.io/server/common/persistence.(*clusterMetadataRateLimitedPersistenceClient).GetCurrentClusterMetadata (/Users/dugang/dugang/work/temporal/temporal/common/persistence/persistenceRateLimitedClients.go:819) go.temporal.io/server/common/persistence.(*clusterMetadataPersistenceClient).GetCurrentClusterMetadata (/Users/dugang/dugang/work/temporal/temporal/common/persistence/persistenceMetricClients.go:1156) go.temporal.io/server/service/frontend.(*WorkflowHandler).GetClusterInfo (/Users/dugang/dugang/work/temporal/temporal/service/frontend/workflowHandler.go:2919) go.temporal.io/server/service/frontend.(*DCRedirectionHandlerImpl).GetClusterInfo (/Users/dugang/dugang/work/temporal/temporal/service/frontend/dcRedirectionHandler.go:1219) go.temporal.io/api/workflowservice/v1._WorkflowService_GetClusterInfo_Handler.func1 (/Users/dugang/go/pkg/mod/go.temporal.io/api@v1.7.1-0.20220211205804-a4f685c2448b/workflowservice/v1/service.pb.go:1719) go.temporal.io/server/common/rpc/interceptor.(*SDKVersionInterceptor).Intercept (/Users/dugang/dugang/work/temporal/temporal/common/rpc/interceptor/sdk_version.go:63) go.temporal.io/server/common/rpc/interceptor.(*SDKVersionInterceptor).Intercept-fm (/Users/dugang/dugang/work/temporal/temporal/common/rpc/interceptor/sdk_version.go:53) google.golang.org/grpc.chainUnaryInterceptors.func1.1 (/Users/dugang/go/pkg/mod/google.golang.org/grpc@v1.44.0/server.go:1116) go.temporal.io/server/common/authorization.(*interceptor).Interceptor (/Users/dugang/dugang/work/temporal/temporal/common/authorization/interceptor.go:152) go.temporal.io/server/common/authorization.(*interceptor).Interceptor-fm (/Users/dugang/dugang/work/temporal/temporal/common/authorization/interceptor.go:66) google.golang.org/grpc.chainUnaryInterceptors.func1.1 (/Users/dugang/go/pkg/mod/google.golang.org/grpc@v1.44.0/server.go:1119) go.temporal.io/server/common/rpc/interceptor.(*NamespaceCountLimitInterceptor).Intercept (/Users/dugang/dugang/work/temporal/temporal/common/rpc/interceptor/namespace_count_limit.go:99) go.temporal.io/server/common/rpc/interceptor.(*NamespaceCountLimitInterceptor).Intercept-fm (/Users/dugang/dugang/work/temporal/temporal/common/rpc/interceptor/namespace_count_limit.go:76) google.golang.org/grpc.chainUnaryInterceptors.func1.1 (/Users/dugang/go/pkg/mod/google.golang.org/grpc@v1.44.0/server.go:1119) go.temporal.io/server/common/rpc/interceptor.(*NamespaceRateLimitInterceptor).Intercept (/Users/dugang/dugang/work/temporal/temporal/common/rpc/interceptor/namespace_rate_limit.go:89) go.temporal.io/server/common/rpc/interceptor.(*NamespaceRateLimitInterceptor).Intercept-fm (/Users/dugang/dugang/work/temporal/temporal/common/rpc/interceptor/namespace_rate_limit.go:69)

上一篇     下一篇
java hex util

c语言获取UUID

UUID简介

postgresql jdbc 驱动 ResultSet 取值

temporal 获取 namespace信息 关键代码和堆栈信息

temporal 重要表说明