首页  

temporal namespace 创建和更新关键代码     所属分类 temporal 浏览量 576
查看 namespaces 表结构信息
select * from information_schema.columns where table_name='namespaces'
data  类型 bytea , 二进制类型


temporal/common/persistence/sql/sqlplugin/namespace.go
NamespaceRow represents a row in namespace table

NamespaceRow struct {
		ID                  primitives.UUID
		Name                string
		Data                []byte
		DataEncoding        string
		IsGlobal            bool
		NotificationVersion int64
	}
data_encoding  Proto3



引用 NamespaceRow 的地方

temporal/temporal/common/persistence/sql/metadata.go

CreateNamespace
UpdateNamespace
namespaceRowToGetNamespaceResponse


创建和更新 Namespace 会操作两张表
namespaces 和 namespace_metadata
数据库操作 见
temporal/common/persistence/sql/sqlplugin/postgresql/namespace.go

// UpdateNamespaceMetadata updates a single row in namespace_metadata table
func (pdb *db) UpdateNamespaceMetadata(
	ctx context.Context,
	row *sqlplugin.NamespaceMetadataRow,
) (sql.Result, error) {
	return pdb.conn.ExecContext(ctx,
		updateNamespaceMetadataQuery,
		row.NotificationVersion+1,
		row.NotificationVersion,
		partitionID,
	)
}

updateNamespaceMetadataQuery 更新语句
UPDATE namespace_metadata SET notification_version = $1 WHERE notification_version = $2 AND partition_id=$3



func (m *sqlMetadataManagerV2) namespaceRowToGetNamespaceResponse(row *sqlplugin.NamespaceRow) (*persistence.InternalGetNamespaceResponse, error) {
	return &persistence.InternalGetNamespaceResponse{
		Namespace:           persistence.NewDataBlob(row.Data, row.DataEncoding),
		IsGlobal:            row.IsGlobal,
		NotificationVersion: row.NotificationVersion,
	}, nil
}


namespaceRowToGetNamespaceResponse
namespaceRow 转换成 GetNamespaceResponse

// InternalGetNamespaceResponse is the response for GetNamespace
InternalGetNamespaceResponse struct {
  Namespace           *commonpb.DataBlob
  IsGlobal            bool
  NotificationVersion int64
}
	
type DataBlob struct {
	EncodingType v1.EncodingType `protobuf:"varint,1,opt,name=encoding_type,json=encodingType,proto3,enum=temporal.api.enums.v1.EncodingType" json:"encoding_type,omitempty"`
	Data         []byte          `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"`
}

Namespace:  persistence.NewDataBlob(row.Data, row.DataEncoding)



InternalGetNamespaceResponse 转换成 GetNamespaceResponse temporal/common/persistence/metadata_manager.go func (m *metadataManagerImpl) ConvertInternalGetResponse(d *InternalGetNamespaceResponse) (*GetNamespaceResponse, error) { ns, err := m.serializer.NamespaceDetailFromBlob(d.Namespace) if err != nil { return nil, err } if ns.Info.Data == nil { ns.Info.Data = map[string]string{} } if ns.Config.BadBinaries == nil || ns.Config.BadBinaries.Binaries == nil { ns.Config.BadBinaries = &namespacepb.BadBinaries{Binaries: map[string]*namespacepb.BadBinaryInfo{}} } ns.ReplicationConfig.ActiveClusterName = GetOrUseDefaultActiveCluster(m.clusterName, ns.ReplicationConfig.ActiveClusterName) ns.ReplicationConfig.Clusters = GetOrUseDefaultClusters(m.clusterName, ns.ReplicationConfig.Clusters) return &GetNamespaceResponse{ Namespace: ns, IsGlobalNamespace: d.IsGlobal, NotificationVersion: d.NotificationVersion, }, nil } ns, err := m.serializer.NamespaceDetailFromBlob(d.Namespace) temporal/common/persistence/serialization/serializer.go func (t *serializerImpl) NamespaceDetailFromBlob(data *commonpb.DataBlob) (*persistencespb.NamespaceDetail, error) { result := &persistencespb.NamespaceDetail{} return result, proto3DecodeBlob(data, result) } func proto3DecodeBlob(data *commonpb.DataBlob, result proto.Message) error { if data == nil { // TODO: should we return nil or error? return NewDeserializationError("cannot decode nil") } if data.EncodingType != enumspb.ENCODING_TYPE_PROTO3 { return NewDeserializationError(fmt.Sprintf("encoding %v doesn't match expected encoding %v", data.EncodingType, enumspb.ENCODING_TYPE_PROTO3)) } if err := proto.Unmarshal(data.Data, result); err != nil { return NewDeserializationError(fmt.Sprintf("error deserializing blob using %v encoding: %s", enumspb.ENCODING_TYPE_PROTO3, err)) } return nil } temporal/api/persistence/v1/namespaces.pb.go Code generated by protoc-gen-gogo. DO NOT EDIT. source: temporal/server/api/persistence/v1/namespaces.proto namespaces.pb.go 是根据 proto 文件生成 可以生成java 对应的文件 ,然后 解析 二进制文件 temporal/proto/internal/temporal/server/api/persistence/v1/namespaces.proto
func (m *sqlMetadataManagerV2) CreateNamespace( request *persistence.InternalCreateNamespaceRequest, ) (*persistence.CreateNamespaceResponse, error) func (m *sqlMetadataManagerV2) CreateNamespace( request *persistence.InternalCreateNamespaceRequest, ) (*persistence.CreateNamespaceResponse, error) { ctx, cancel := newExecutionContext() defer cancel() idBytes, err := primitives.ParseUUID(request.ID) if err != nil { return nil, err } var resp *persistence.CreateNamespaceResponse err = m.txExecute(ctx, "CreateNamespace", func(tx sqlplugin.Tx) error { metadata, err := lockMetadata(ctx, tx) if err != nil { return err } if _, err := tx.InsertIntoNamespace(ctx, &sqlplugin.NamespaceRow{ Name: request.Name, ID: idBytes, Data: request.Namespace.Data, DataEncoding: request.Namespace.EncodingType.String(), IsGlobal: request.IsGlobal, NotificationVersion: metadata.NotificationVersion, }); err != nil { if m.Db.IsDupEntryError(err) { return serviceerror.NewNamespaceAlreadyExists(fmt.Sprintf("name: %v", request.Name)) } return err } if err := updateMetadata(ctx, tx, metadata.NotificationVersion, ); err != nil { return err } resp = &persistence.CreateNamespaceResponse{ID: request.ID} return nil }) return resp, err }
func (m *sqlMetadataManagerV2) UpdateNamespace(request *persistence.InternalUpdateNamespaceRequest) error { ctx, cancel := newExecutionContext() defer cancel() idBytes, err := primitives.ParseUUID(request.Id) if err != nil { return err } return m.txExecute(ctx, "UpdateNamespace", func(tx sqlplugin.Tx) error { metadata, err := lockMetadata(ctx, tx) if err != nil { return err } if metadata.NotificationVersion != request.NotificationVersion { return fmt.Errorf( "conditional update error: expect: %v, actual: %v", request.NotificationVersion, metadata.NotificationVersion, ) } result, err := tx.UpdateNamespace(ctx, &sqlplugin.NamespaceRow{ Name: request.Name, ID: idBytes, Data: request.Namespace.Data, DataEncoding: request.Namespace.EncodingType.String(), NotificationVersion: request.NotificationVersion, IsGlobal: request.IsGlobal, }) if err != nil { return err } noRowsAffected, err := result.RowsAffected() if err != nil { return fmt.Errorf("rowsAffected error: %v", err) } if noRowsAffected != 1 { return fmt.Errorf("%v rows updated instead of one", noRowsAffected) } return updateMetadata(ctx, tx, metadata.NotificationVersion) }) }

上一篇     下一篇
XMLHttpRequest 例子

ssh端口转发

PostgreSQL DBA 常用SQL

temporal workflow 列表 关键代码

go 单引号 双引号 反引号

GO json 序列化与反序列化