temporal 获取集群信息 关键代码
所属分类 temporal
浏览量 643
grpc接口
getClusterInfo
public static GetClusterInfoResponse getClusterInfo(String serviceAddress) {
WorkflowServiceGrpc.WorkflowServiceBlockingStub workflowServiceBlockingStub = buildWorkflowServiceBlockingStub(
serviceAddress);
GetClusterInfoRequest getClusterInfoRequest = GetClusterInfoRequest.newBuilder().build();
GetClusterInfoResponse getClusterInfoResponse = workflowServiceBlockingStub
.getClusterInfo(getClusterInfoRequest);
return getClusterInfoResponse;
}
temporal/service/frontend/dcRedirectionHandler.go
// GetClusterInfo API call
func (handler *DCRedirectionHandlerImpl) GetClusterInfo(
ctx context.Context,
request *workflowservice.GetClusterInfoRequest,
) (*workflowservice.GetClusterInfoResponse, error) {
return handler.frontendHandler.GetClusterInfo(ctx, request)
}
temporal/service/frontend/workflowHandler.go
func (wh *WorkflowHandler) GetClusterInfo(_ context.Context, _ *workflowservice.GetClusterInfoRequest) (_ *workflowservice.GetClusterInfoResponse, retError error) {
defer log.CapturePanic(wh.logger, &retError)
metadata, err := wh.clusterMetadataManager.GetCurrentClusterMetadata()
if err != nil {
return nil, err
}
return &workflowservice.GetClusterInfoResponse{
SupportedClients: headers.SupportedClients,
ServerVersion: headers.ServerVersion,
ClusterId: metadata.ClusterId,
VersionInfo: metadata.VersionInfo,
ClusterName: metadata.ClusterName,
HistoryShardCount: metadata.HistoryShardCount,
PersistenceStore: wh.persistenceExecutionManager.GetName(),
VisibilityStore: wh.visibilityMrg.GetName(),
}, nil
}
temporal/common/persistence/persistenceMetricClients.go
func (c *clusterMetadataPersistenceClient) GetCurrentClusterMetadata() (*GetClusterMetadataResponse, error) {
//This is a wrapper of GetClusterMetadata API, use the same scope here
c.metricClient.IncCounter(metrics.PersistenceGetClusterMetadataScope, metrics.PersistenceRequests)
sw := c.metricClient.StartTimer(metrics.PersistenceGetClusterMetadataScope, metrics.PersistenceLatency)
result, err := c.persistence.GetCurrentClusterMetadata()
sw.Stop()
if err != nil {
c.metricClient.IncCounter(metrics.PersistenceGetClusterMetadataScope, metrics.PersistenceFailures)
}
return result, err
}
temporal/common/persistence/persistenceRateLimitedClients.go
func (c *clusterMetadataRateLimitedPersistenceClient) GetCurrentClusterMetadata() (*GetClusterMetadataResponse, error) {
if ok := c.rateLimiter.Allow(); !ok {
return nil, ErrPersistenceLimitExceeded
}
return c.persistence.GetCurrentClusterMetadata()
}
temporal/common/persistence/clusterMetadataStore.go
func (m *clusterMetadataManagerImpl) GetCurrentClusterMetadata() (*GetClusterMetadataResponse, error) {
resp, err := m.persistence.GetClusterMetadata(&InternalGetClusterMetadataRequest{ClusterName: m.currentClusterName})
if err != nil {
return nil, err
}
mcm, err := m.serializer.DeserializeClusterMetadata(resp.ClusterMetadata)
if err != nil {
return nil, err
}
return &GetClusterMetadataResponse{ClusterMetadata: *mcm, Version: resp.Version}, nil
}
temporal/common/persistence/sql/cluster_metadata.go
func (s *sqlClusterMetadataManager) GetClusterMetadata(
request *p.InternalGetClusterMetadataRequest,
) (*p.InternalGetClusterMetadataResponse, error) {
ctx, cancel := newExecutionContext()
defer cancel()
row, err := s.Db.GetClusterMetadata(ctx, &sqlplugin.ClusterMetadataFilter{ClusterName: request.ClusterName})
if err != nil {
return nil, convertCommonErrors("GetClusterMetadata", err)
}
return &p.InternalGetClusterMetadataResponse{
ClusterMetadata: p.NewDataBlob(row.Data, row.DataEncoding),
Version: row.Version,
}, nil
}
temporal/common/persistence/sql/sqlplugin/postgresql/cluster_metadata.go
func (pdb *db) GetClusterMetadata(
ctx context.Context,
filter *sqlplugin.ClusterMetadataFilter,
) (*sqlplugin.ClusterMetadataRow, error) {
var row sqlplugin.ClusterMetadataRow
err := pdb.conn.GetContext(ctx,
&row,
getClusterMetadataQry,
constMetadataPartition,
filter.ClusterName,
)
if err != nil {
return nil, err
}
return &row, err
}
/Users/dugang/go/pkg/mod/github.com/jmoiron/sqlx@v1.3.4/sqlx_context.go
func (db *DB) GetContext(ctx context.Context, dest interface{}, query string, args ...interface{}) error {
return GetContext(ctx, db, dest, query, args...)
}
getClusterMetadataBase = `SELECT data, data_encoding, version FROM cluster_metadata_info `
getClusterMetadataQry = getClusterMetadataBase + `WHERE metadata_partition = $1 AND cluster_name = $2`
SELECT data, data_encoding, version FROM cluster_metadata_info WHERE metadata_partition = $1 AND cluster_name = $2
metadata_partition = 0
cluster_name = active
temporal/common/persistence/sql/sqlplugin/postgresql/db.go
// db represents a logical connection to mysql database
type db struct {
dbKind sqlplugin.DbKind
dbName string
db *sqlx.DB
tx *sqlx.Tx
conn sqlplugin.Conn
converter DataConverter
}
go.temporal.io/server/common/persistence/sql/sqlplugin/postgresql.(*db).GetClusterMetadata (/Users/dugang/dugang/work/temporal/temporal/common/persistence/sql/sqlplugin/postgresql/cluster_metadata.go:140)
go.temporal.io/server/common/persistence/sql.(*DbConn).GetClusterMetadata (Unknown Source:1)
go.temporal.io/server/common/persistence/sql.(*sqlClusterMetadataManager).GetClusterMetadata (/Users/dugang/dugang/work/temporal/temporal/common/persistence/sql/cluster_metadata.go:95)
go.temporal.io/server/common/persistence.(*clusterMetadataManagerImpl).GetCurrentClusterMetadata (/Users/dugang/dugang/work/temporal/temporal/common/persistence/clusterMetadataStore.go:133)
go.temporal.io/server/common/persistence.(*clusterMetadataRateLimitedPersistenceClient).GetCurrentClusterMetadata (/Users/dugang/dugang/work/temporal/temporal/common/persistence/persistenceRateLimitedClients.go:819)
go.temporal.io/server/common/persistence.(*clusterMetadataPersistenceClient).GetCurrentClusterMetadata (/Users/dugang/dugang/work/temporal/temporal/common/persistence/persistenceMetricClients.go:1156)
go.temporal.io/server/service/frontend.(*WorkflowHandler).GetClusterInfo (/Users/dugang/dugang/work/temporal/temporal/service/frontend/workflowHandler.go:2919)
go.temporal.io/server/service/frontend.(*DCRedirectionHandlerImpl).GetClusterInfo (/Users/dugang/dugang/work/temporal/temporal/service/frontend/dcRedirectionHandler.go:1219)
go.temporal.io/api/workflowservice/v1._WorkflowService_GetClusterInfo_Handler.func1 (/Users/dugang/go/pkg/mod/go.temporal.io/api@v1.7.1-0.20220211205804-a4f685c2448b/workflowservice/v1/service.pb.go:1719)
go.temporal.io/server/common/rpc/interceptor.(*SDKVersionInterceptor).Intercept (/Users/dugang/dugang/work/temporal/temporal/common/rpc/interceptor/sdk_version.go:63)
go.temporal.io/server/common/rpc/interceptor.(*SDKVersionInterceptor).Intercept-fm (/Users/dugang/dugang/work/temporal/temporal/common/rpc/interceptor/sdk_version.go:53)
google.golang.org/grpc.chainUnaryInterceptors.func1.1 (/Users/dugang/go/pkg/mod/google.golang.org/grpc@v1.44.0/server.go:1116)
go.temporal.io/server/common/authorization.(*interceptor).Interceptor (/Users/dugang/dugang/work/temporal/temporal/common/authorization/interceptor.go:152)
go.temporal.io/server/common/authorization.(*interceptor).Interceptor-fm (/Users/dugang/dugang/work/temporal/temporal/common/authorization/interceptor.go:66)
google.golang.org/grpc.chainUnaryInterceptors.func1.1 (/Users/dugang/go/pkg/mod/google.golang.org/grpc@v1.44.0/server.go:1119)
go.temporal.io/server/common/rpc/interceptor.(*NamespaceCountLimitInterceptor).Intercept (/Users/dugang/dugang/work/temporal/temporal/common/rpc/interceptor/namespace_count_limit.go:99)
go.temporal.io/server/common/rpc/interceptor.(*NamespaceCountLimitInterceptor).Intercept-fm (/Users/dugang/dugang/work/temporal/temporal/common/rpc/interceptor/namespace_count_limit.go:76)
google.golang.org/grpc.chainUnaryInterceptors.func1.1 (/Users/dugang/go/pkg/mod/google.golang.org/grpc@v1.44.0/server.go:1119)
go.temporal.io/server/common/rpc/interceptor.(*NamespaceRateLimitInterceptor).Intercept (/Users/dugang/dugang/work/temporal/temporal/common/rpc/interceptor/namespace_rate_limit.go:89)
go.temporal.io/server/common/rpc/interceptor.(*NamespaceRateLimitInterceptor).Intercept-fm (/Users/dugang/dugang/work/temporal/temporal/common/rpc/interceptor/namespace_rate_limit.go:69)
上一篇
下一篇
java hex util
c语言获取UUID
UUID简介
postgresql jdbc 驱动 ResultSet 取值
temporal 获取 namespace信息 关键代码和堆栈信息
temporal 重要表说明