Skip to content

Commit

Permalink
move tso to independent thread
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Oct 16, 2024
1 parent 3d4c416 commit 0696f75
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 44 deletions.
4 changes: 3 additions & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,9 @@ func (k *serviceModeKeeper) close() {
defer k.Unlock()
switch k.serviceMode {
case pdpb.ServiceMode_API_SVC_MODE:
k.tsoSvcDiscovery.Close()
if k.tsoSvcDiscovery != nil {
k.tsoSvcDiscovery.Close()
}
fallthrough
case pdpb.ServiceMode_PD_SVC_MODE:
if k.tsoClient != nil {
Expand Down
101 changes: 86 additions & 15 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package cluster
import (
"context"
"encoding/json"
errorspkg "errors"
"fmt"
"io"
"math"
Expand All @@ -43,6 +44,7 @@ import (
"github.com/tikv/pd/pkg/keyspace"
"github.com/tikv/pd/pkg/mcs/discovery"
"github.com/tikv/pd/pkg/mcs/utils/constant"
"github.com/tikv/pd/pkg/member"
"github.com/tikv/pd/pkg/memory"
"github.com/tikv/pd/pkg/progress"
"github.com/tikv/pd/pkg/ratelimit"
Expand All @@ -56,6 +58,7 @@ import (
"github.com/tikv/pd/pkg/statistics/utils"
"github.com/tikv/pd/pkg/storage"
"github.com/tikv/pd/pkg/syncer"
"github.com/tikv/pd/pkg/tso"
"github.com/tikv/pd/pkg/unsaferecovery"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/keypath"
Expand Down Expand Up @@ -88,12 +91,13 @@ const (
// nodeStateCheckJobInterval is the interval to run node state check job.
nodeStateCheckJobInterval = 10 * time.Second
// metricsCollectionJobInterval is the interval to run metrics collection job.
metricsCollectionJobInterval = 10 * time.Second
updateStoreStatsInterval = 9 * time.Millisecond
clientTimeout = 3 * time.Second
defaultChangedRegionsLimit = 10000
gcTombstoneInterval = 30 * 24 * time.Hour
serviceCheckInterval = 10 * time.Second
metricsCollectionJobInterval = 10 * time.Second
updateStoreStatsInterval = 9 * time.Millisecond
clientTimeout = 3 * time.Second
defaultChangedRegionsLimit = 10000
gcTombstoneInterval = 30 * 24 * time.Hour
schedulingServiceCheckInterval = 10 * time.Second
tsoServiceCheckInterval = 100 * time.Millisecond
// persistLimitRetryTimes is used to reduce the probability of the persistent error
// since the once the store is added or removed, we shouldn't return an error even if the store limit is failed to persist.
persistLimitRetryTimes = 5
Expand Down Expand Up @@ -144,6 +148,7 @@ type RaftCluster struct {
cancel context.CancelFunc

*core.BasicCluster // cached cluster info
member *member.EmbeddedEtcdMember

etcdClient *clientv3.Client
httpClient *http.Client
Expand Down Expand Up @@ -174,6 +179,7 @@ type RaftCluster struct {
keyspaceGroupManager *keyspace.GroupManager
independentServices sync.Map
hbstreams *hbstream.HeartbeatStreams
tsoAllocator *tso.AllocatorManager

// heartbeatRunner is used to process the subtree update task asynchronously.
heartbeatRunner ratelimit.Runner
Expand All @@ -194,16 +200,18 @@ type Status struct {
}

// NewRaftCluster create a new cluster.
func NewRaftCluster(ctx context.Context, clusterID uint64, basicCluster *core.BasicCluster, storage storage.Storage, regionSyncer *syncer.RegionSyncer, etcdClient *clientv3.Client,
httpClient *http.Client) *RaftCluster {
func NewRaftCluster(ctx context.Context, clusterID uint64, member *member.EmbeddedEtcdMember, basicCluster *core.BasicCluster, storage storage.Storage, regionSyncer *syncer.RegionSyncer, etcdClient *clientv3.Client,
httpClient *http.Client, tsoAllocator *tso.AllocatorManager) *RaftCluster {
return &RaftCluster{
serverCtx: ctx,
clusterID: clusterID,
member: member,
regionSyncer: regionSyncer,
httpClient: httpClient,
etcdClient: etcdClient,
BasicCluster: basicCluster,
storage: storage,
tsoAllocator: tsoAllocator,
heartbeatRunner: ratelimit.NewConcurrentRunner(heartbeatTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
miscRunner: ratelimit.NewConcurrentRunner(miscTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
logRunner: ratelimit.NewConcurrentRunner(logTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
Expand Down Expand Up @@ -314,11 +322,13 @@ func (c *RaftCluster) Start(s Server) error {
if err != nil {
return err
}
c.checkTSOService()
cluster, err := c.LoadClusterInfo()
if err != nil {
return err
}
if cluster == nil {
log.Warn("cluster is not bootstrapped")
return nil
}

Expand Down Expand Up @@ -351,7 +361,7 @@ func (c *RaftCluster) Start(s Server) error {
return err
}
}
c.checkServices()
c.checkSchedulingService()
c.wg.Add(9)
go c.runServiceCheckJob()
go c.runMetricsCollectionJob()
Expand All @@ -370,7 +380,7 @@ func (c *RaftCluster) Start(s Server) error {
return nil
}

func (c *RaftCluster) checkServices() {
func (c *RaftCluster) checkSchedulingService() {
if c.isAPIServiceMode {
servers, err := discovery.Discover(c.etcdClient, strconv.FormatUint(c.clusterID, 10), constant.SchedulingServiceName)
if c.opt.GetMicroServiceConfig().IsSchedulingFallbackEnabled() && (err != nil || len(servers) == 0) {
Expand All @@ -390,27 +400,87 @@ func (c *RaftCluster) checkServices() {
}
}

// checkTSOService checks the TSO service.
func (c *RaftCluster) checkTSOService() {
if !c.isAPIServiceMode {
if c.member.IsLeader() {
if err := c.startTSOJobs(); err != nil {
// If there is an error, need to wait for the next check.
return
}
} else {
// leader exits, reset the allocator group
if err := c.stopTSOJobs(); err != nil {
// If there is an error, need to wait for the next check.
return
}

failpoint.Inject("updateAfterResetTSO", func() {
allocator, _ := c.tsoAllocator.GetAllocator(tso.GlobalDCLocation)
if err := allocator.UpdateTSO(); !errorspkg.Is(err, errs.ErrUpdateTimestamp) {
log.Panic("the tso update after reset should return ErrUpdateTimestamp as expected", zap.Error(err))
}
if allocator.IsInitialize() {
log.Panic("the allocator should be uninitialized after reset")
}
})
}
}
}

func (c *RaftCluster) runServiceCheckJob() {
defer logutil.LogPanic()
defer c.wg.Done()

ticker := time.NewTicker(serviceCheckInterval)
schedulingTicker := time.NewTicker(schedulingServiceCheckInterval)
failpoint.Inject("highFrequencyClusterJobs", func() {
ticker.Reset(time.Millisecond)
schedulingTicker.Reset(time.Millisecond)
})
defer ticker.Stop()
defer schedulingTicker.Stop()
tsoTicker := time.NewTicker(tsoServiceCheckInterval)
defer tsoTicker.Stop()

for {
select {
case <-c.ctx.Done():
log.Info("service check job is stopped")
return
case <-ticker.C:
c.checkServices()
case <-schedulingTicker.C:
c.checkSchedulingService()
case <-tsoTicker.C:
c.checkTSOService()
}
}
}

func (c *RaftCluster) startTSOJobs() error {
allocator, err := c.tsoAllocator.GetAllocator(tso.GlobalDCLocation)
if err != nil {
log.Error("failed to get global TSO allocator", errs.ZapError(err))
return err
}
if !allocator.IsInitialize() {
log.Info("initializing the global TSO allocator")
if err := allocator.Initialize(0); err != nil {
log.Error("failed to initialize the global TSO allocator", errs.ZapError(err))
return err
}
}
return nil
}

func (c *RaftCluster) stopTSOJobs() error {
allocator, err := c.tsoAllocator.GetAllocator(tso.GlobalDCLocation)
if err != nil {
log.Error("failed to get global TSO allocator", errs.ZapError(err))
return err
}
if allocator.IsInitialize() {
c.tsoAllocator.ResetAllocatorGroup(tso.GlobalDCLocation, true)
}
return nil
}

// startGCTuner
func (c *RaftCluster) startGCTuner() {
defer logutil.LogPanic()
Expand Down Expand Up @@ -757,6 +827,7 @@ func (c *RaftCluster) Stop() {
if !c.IsServiceIndependent(constant.SchedulingServiceName) {
c.stopSchedulingJobs()
}
c.stopTSOJobs()

Check failure on line 830 in server/cluster/cluster.go

View workflow job for this annotation

GitHub Actions / statics

Error return value of `c.stopTSOJobs` is not checked (errcheck)
c.heartbeatRunner.Stop()
c.miscRunner.Stop()
c.logRunner.Stop()
Expand Down
26 changes: 1 addition & 25 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package server
import (
"bytes"
"context"
errorspkg "errors"
"fmt"
"math/rand"
"net/http"
Expand Down Expand Up @@ -490,7 +489,7 @@ func (s *Server) startServer(ctx context.Context) error {

s.gcSafePointManager = gc.NewSafePointManager(s.storage, s.cfg.PDServerCfg)
s.basicCluster = core.NewBasicCluster()
s.cluster = cluster.NewRaftCluster(ctx, clusterID, s.GetBasicCluster(), s.GetStorage(), syncer.NewRegionSyncer(s), s.client, s.httpClient)
s.cluster = cluster.NewRaftCluster(ctx, clusterID, s.GetMember(), s.GetBasicCluster(), s.GetStorage(), syncer.NewRegionSyncer(s), s.client, s.httpClient, s.tsoAllocatorManager)
keyspaceIDAllocator := id.NewAllocator(&id.AllocatorParams{
Client: s.client,
RootPath: s.rootPath,
Expand Down Expand Up @@ -1715,29 +1714,6 @@ func (s *Server) campaignLeader() {
s.member.KeepLeader(ctx)
log.Info(fmt.Sprintf("campaign %s leader ok", s.mode), zap.String("campaign-leader-name", s.Name()))

if !s.IsAPIServiceMode() {
allocator, err := s.tsoAllocatorManager.GetAllocator(tso.GlobalDCLocation)
if err != nil {
log.Error("failed to get the global TSO allocator", errs.ZapError(err))
return
}
log.Info("initializing the global TSO allocator")
if err := allocator.Initialize(0); err != nil {
log.Error("failed to initialize the global TSO allocator", errs.ZapError(err))
return
}
defer func() {
s.tsoAllocatorManager.ResetAllocatorGroup(tso.GlobalDCLocation, false)
failpoint.Inject("updateAfterResetTSO", func() {
if err = allocator.UpdateTSO(); !errorspkg.Is(err, errs.ErrUpdateTimestamp) {
log.Panic("the tso update after reset should return ErrUpdateTimestamp as expected", zap.Error(err))
}
if allocator.IsInitialize() {
log.Panic("the allocator should be uninitialized after reset")
}
})
}()
}
if err := s.reloadConfigFromKV(); err != nil {
log.Error("failed to reload configuration", errs.ZapError(err))
return
Expand Down
6 changes: 3 additions & 3 deletions tests/server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -912,7 +912,7 @@ func TestLoadClusterInfo(t *testing.T) {
tc.WaitLeader()
leaderServer := tc.GetLeaderServer()
svr := leaderServer.GetServer()
rc := cluster.NewRaftCluster(ctx, svr.ClusterID(), svr.GetBasicCluster(), svr.GetStorage(), syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient())
rc := cluster.NewRaftCluster(ctx, svr.ClusterID(), svr.GetMember(), svr.GetBasicCluster(), svr.GetStorage(), syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient(), svr.GetTSOAllocatorManager())

// Cluster is not bootstrapped.
rc.InitCluster(svr.GetAllocator(), svr.GetPersistOptions(), svr.GetHBStreams(), svr.GetKeyspaceGroupManager())
Expand Down Expand Up @@ -952,7 +952,7 @@ func TestLoadClusterInfo(t *testing.T) {
}
re.NoError(testStorage.Flush())

raftCluster = cluster.NewRaftCluster(ctx, svr.ClusterID(), basicCluster, testStorage, syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient())
raftCluster = cluster.NewRaftCluster(ctx, svr.ClusterID(), svr.GetMember(), basicCluster, testStorage, syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient(), svr.GetTSOAllocatorManager())
raftCluster.InitCluster(mockid.NewIDAllocator(), svr.GetPersistOptions(), svr.GetHBStreams(), svr.GetKeyspaceGroupManager())
raftCluster, err = raftCluster.LoadClusterInfo()
re.NoError(err)
Expand Down Expand Up @@ -1666,7 +1666,7 @@ func TestTransferLeaderBack(t *testing.T) {
tc.WaitLeader()
leaderServer := tc.GetLeaderServer()
svr := leaderServer.GetServer()
rc := cluster.NewRaftCluster(ctx, svr.ClusterID(), svr.GetBasicCluster(), svr.GetStorage(), syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient())
rc := cluster.NewRaftCluster(ctx, svr.ClusterID(), svr.GetMember(), svr.GetBasicCluster(), svr.GetStorage(), syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient(), svr.GetTSOAllocatorManager())
rc.InitCluster(svr.GetAllocator(), svr.GetPersistOptions(), svr.GetHBStreams(), svr.GetKeyspaceGroupManager())
storage := rc.GetStorage()
meta := &metapb.Cluster{Id: 123}
Expand Down

0 comments on commit 0696f75

Please sign in to comment.