temporal namespace 创建和更新关键代码
所属分类 temporal
浏览量 576
查看 namespaces 表结构信息
select * from information_schema.columns where table_name='namespaces'
data 类型 bytea , 二进制类型
temporal/common/persistence/sql/sqlplugin/namespace.go
NamespaceRow represents a row in namespace table
NamespaceRow struct {
ID primitives.UUID
Name string
Data []byte
DataEncoding string
IsGlobal bool
NotificationVersion int64
}
data_encoding Proto3
引用 NamespaceRow 的地方
temporal/temporal/common/persistence/sql/metadata.go
CreateNamespace
UpdateNamespace
namespaceRowToGetNamespaceResponse
创建和更新 Namespace 会操作两张表
namespaces 和 namespace_metadata
数据库操作 见
temporal/common/persistence/sql/sqlplugin/postgresql/namespace.go
// UpdateNamespaceMetadata updates a single row in namespace_metadata table
func (pdb *db) UpdateNamespaceMetadata(
ctx context.Context,
row *sqlplugin.NamespaceMetadataRow,
) (sql.Result, error) {
return pdb.conn.ExecContext(ctx,
updateNamespaceMetadataQuery,
row.NotificationVersion+1,
row.NotificationVersion,
partitionID,
)
}
updateNamespaceMetadataQuery 更新语句
UPDATE namespace_metadata SET notification_version = $1 WHERE notification_version = $2 AND partition_id=$3
func (m *sqlMetadataManagerV2) namespaceRowToGetNamespaceResponse(row *sqlplugin.NamespaceRow) (*persistence.InternalGetNamespaceResponse, error) {
return &persistence.InternalGetNamespaceResponse{
Namespace: persistence.NewDataBlob(row.Data, row.DataEncoding),
IsGlobal: row.IsGlobal,
NotificationVersion: row.NotificationVersion,
}, nil
}
namespaceRowToGetNamespaceResponse
namespaceRow 转换成 GetNamespaceResponse
// InternalGetNamespaceResponse is the response for GetNamespace
InternalGetNamespaceResponse struct {
Namespace *commonpb.DataBlob
IsGlobal bool
NotificationVersion int64
}
type DataBlob struct {
EncodingType v1.EncodingType `protobuf:"varint,1,opt,name=encoding_type,json=encodingType,proto3,enum=temporal.api.enums.v1.EncodingType" json:"encoding_type,omitempty"`
Data []byte `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"`
}
Namespace: persistence.NewDataBlob(row.Data, row.DataEncoding)
InternalGetNamespaceResponse 转换成 GetNamespaceResponse
temporal/common/persistence/metadata_manager.go
func (m *metadataManagerImpl) ConvertInternalGetResponse(d *InternalGetNamespaceResponse) (*GetNamespaceResponse, error) {
ns, err := m.serializer.NamespaceDetailFromBlob(d.Namespace)
if err != nil {
return nil, err
}
if ns.Info.Data == nil {
ns.Info.Data = map[string]string{}
}
if ns.Config.BadBinaries == nil || ns.Config.BadBinaries.Binaries == nil {
ns.Config.BadBinaries = &namespacepb.BadBinaries{Binaries: map[string]*namespacepb.BadBinaryInfo{}}
}
ns.ReplicationConfig.ActiveClusterName = GetOrUseDefaultActiveCluster(m.clusterName, ns.ReplicationConfig.ActiveClusterName)
ns.ReplicationConfig.Clusters = GetOrUseDefaultClusters(m.clusterName, ns.ReplicationConfig.Clusters)
return &GetNamespaceResponse{
Namespace: ns,
IsGlobalNamespace: d.IsGlobal,
NotificationVersion: d.NotificationVersion,
}, nil
}
ns, err := m.serializer.NamespaceDetailFromBlob(d.Namespace)
temporal/common/persistence/serialization/serializer.go
func (t *serializerImpl) NamespaceDetailFromBlob(data *commonpb.DataBlob) (*persistencespb.NamespaceDetail, error) {
result := &persistencespb.NamespaceDetail{}
return result, proto3DecodeBlob(data, result)
}
func proto3DecodeBlob(data *commonpb.DataBlob, result proto.Message) error {
if data == nil {
// TODO: should we return nil or error?
return NewDeserializationError("cannot decode nil")
}
if data.EncodingType != enumspb.ENCODING_TYPE_PROTO3 {
return NewDeserializationError(fmt.Sprintf("encoding %v doesn't match expected encoding %v", data.EncodingType, enumspb.ENCODING_TYPE_PROTO3))
}
if err := proto.Unmarshal(data.Data, result); err != nil {
return NewDeserializationError(fmt.Sprintf("error deserializing blob using %v encoding: %s", enumspb.ENCODING_TYPE_PROTO3, err))
}
return nil
}
temporal/api/persistence/v1/namespaces.pb.go
Code generated by protoc-gen-gogo. DO NOT EDIT.
source: temporal/server/api/persistence/v1/namespaces.proto
namespaces.pb.go 是根据 proto 文件生成
可以生成java 对应的文件 ,然后 解析 二进制文件
temporal/proto/internal/temporal/server/api/persistence/v1/namespaces.proto
func (m *sqlMetadataManagerV2) CreateNamespace(
request *persistence.InternalCreateNamespaceRequest,
) (*persistence.CreateNamespaceResponse, error)
func (m *sqlMetadataManagerV2) CreateNamespace(
request *persistence.InternalCreateNamespaceRequest,
) (*persistence.CreateNamespaceResponse, error) {
ctx, cancel := newExecutionContext()
defer cancel()
idBytes, err := primitives.ParseUUID(request.ID)
if err != nil {
return nil, err
}
var resp *persistence.CreateNamespaceResponse
err = m.txExecute(ctx, "CreateNamespace", func(tx sqlplugin.Tx) error {
metadata, err := lockMetadata(ctx, tx)
if err != nil {
return err
}
if _, err := tx.InsertIntoNamespace(ctx, &sqlplugin.NamespaceRow{
Name: request.Name,
ID: idBytes,
Data: request.Namespace.Data,
DataEncoding: request.Namespace.EncodingType.String(),
IsGlobal: request.IsGlobal,
NotificationVersion: metadata.NotificationVersion,
}); err != nil {
if m.Db.IsDupEntryError(err) {
return serviceerror.NewNamespaceAlreadyExists(fmt.Sprintf("name: %v", request.Name))
}
return err
}
if err := updateMetadata(ctx,
tx,
metadata.NotificationVersion,
); err != nil {
return err
}
resp = &persistence.CreateNamespaceResponse{ID: request.ID}
return nil
})
return resp, err
}
func (m *sqlMetadataManagerV2) UpdateNamespace(request *persistence.InternalUpdateNamespaceRequest) error {
ctx, cancel := newExecutionContext()
defer cancel()
idBytes, err := primitives.ParseUUID(request.Id)
if err != nil {
return err
}
return m.txExecute(ctx, "UpdateNamespace", func(tx sqlplugin.Tx) error {
metadata, err := lockMetadata(ctx, tx)
if err != nil {
return err
}
if metadata.NotificationVersion != request.NotificationVersion {
return fmt.Errorf(
"conditional update error: expect: %v, actual: %v",
request.NotificationVersion,
metadata.NotificationVersion,
)
}
result, err := tx.UpdateNamespace(ctx, &sqlplugin.NamespaceRow{
Name: request.Name,
ID: idBytes,
Data: request.Namespace.Data,
DataEncoding: request.Namespace.EncodingType.String(),
NotificationVersion: request.NotificationVersion,
IsGlobal: request.IsGlobal,
})
if err != nil {
return err
}
noRowsAffected, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("rowsAffected error: %v", err)
}
if noRowsAffected != 1 {
return fmt.Errorf("%v rows updated instead of one", noRowsAffected)
}
return updateMetadata(ctx, tx, metadata.NotificationVersion)
})
}
上一篇
下一篇
XMLHttpRequest 例子
ssh端口转发
PostgreSQL DBA 常用SQL
temporal workflow 列表 关键代码
go 单引号 双引号 反引号
GO json 序列化与反序列化