Skip to content

Commit

Permalink
region_cache: more comments and minor refactor (#427)
Browse files Browse the repository at this point in the history
Signed-off-by: youjiali1995 <[email protected]>
  • Loading branch information
youjiali1995 authored Feb 23, 2022
1 parent 79b962a commit f43cc3a
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 65 deletions.
116 changes: 55 additions & 61 deletions internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,19 @@ type AccessIndex int
// regionStore represents region stores info
// it will be store as unsafe.Pointer and be load at once
type regionStore struct {
workTiKVIdx AccessIndex // point to current work peer in meta.Peers and work store in stores(same idx) for tikv peer
proxyTiKVIdx AccessIndex // point to the tikv peer that can forward requests to the leader. -1 means not using proxy
workTiFlashIdx int32 // point to current work peer in meta.Peers and work store in stores(same idx) for tiflash peer
stores []*Store // stores in this region
storeEpochs []uint32 // snapshots of store's epoch, need reload when `storeEpochs[curr] != stores[cur].fail`
accessIndex [numAccessMode][]int // AccessMode => idx in stores
// corresponding stores(in the same order) of Region.meta.Peers in this region.
stores []*Store
// snapshots of store's epoch, need reload when `storeEpochs[curr] != stores[cur].fail`
storeEpochs []uint32
// A region can consist of stores with different type(TiKV and TiFlash). It maintains AccessMode => idx in stores,
// e.g., stores[accessIndex[tiKVOnly][workTiKVIdx]] is the current working TiKV.
accessIndex [numAccessMode][]int
// accessIndex[tiKVOnly][workTiKVIdx] is the index of the current working TiKV in stores.
workTiKVIdx AccessIndex
// accessIndex[tiKVOnly][proxyTiKVIdx] is the index of TiKV that can forward requests to the leader in stores, -1 means not using proxy.
proxyTiKVIdx AccessIndex
// accessIndex[tiFlashOnly][workTiFlashIdx] is the index of the current working TiFlash in stores.
workTiFlashIdx int32
}

func (r *regionStore) accessStore(mode accessMode, idx AccessIndex) (int, *Store) {
Expand All @@ -155,14 +162,14 @@ func (r *regionStore) accessStoreNum(mode accessMode) int {
// clone clones region store struct.
func (r *regionStore) clone() *regionStore {
storeEpochs := make([]uint32, len(r.stores))
copy(storeEpochs, r.storeEpochs)
rs := &regionStore{
workTiFlashIdx: r.workTiFlashIdx,
proxyTiKVIdx: r.proxyTiKVIdx,
workTiKVIdx: r.workTiKVIdx,
stores: r.stores,
storeEpochs: storeEpochs,
}
copy(storeEpochs, r.storeEpochs)
for i := 0; i < int(numAccessMode); i++ {
rs.accessIndex[i] = make([]int, len(r.accessIndex[i]))
copy(rs.accessIndex[i], r.accessIndex[i])
Expand Down Expand Up @@ -218,9 +225,9 @@ func (r *regionStore) filterStoreCandidate(aidx AccessIndex, op *storeSelectorOp
return s.IsLabelsMatch(op.labels)
}

// init initializes region after constructed.
func (r *Region) init(bo *retry.Backoffer, c *RegionCache) error {
// region store pull used store from global store map
func newRegion(bo *retry.Backoffer, c *RegionCache, pdRegion *pd.Region) (*Region, error) {
r := &Region{meta: pdRegion.Meta}
// regionStore pull used store from global store map
// to avoid acquire storeMu in later access.
rs := &regionStore{
workTiKVIdx: 0,
Expand All @@ -229,6 +236,9 @@ func (r *Region) init(bo *retry.Backoffer, c *RegionCache) error {
stores: make([]*Store, 0, len(r.meta.Peers)),
storeEpochs: make([]uint32, 0, len(r.meta.Peers)),
}

leader := pdRegion.Leader
var leaderAccessIdx AccessIndex
availablePeers := r.meta.GetPeers()[:0]
for _, p := range r.meta.Peers {
c.storeMu.RLock()
Expand All @@ -239,12 +249,15 @@ func (r *Region) init(bo *retry.Backoffer, c *RegionCache) error {
}
addr, err := store.initResolve(bo, c)
if err != nil {
return err
return nil, err
}
// Filter the peer on a tombstone store.
if addr == "" {
continue
}
if isSamePeer(p, leader) {
leaderAccessIdx = AccessIndex(len(rs.accessIndex[tiKVOnly]))
}
availablePeers = append(availablePeers, p)
switch store.storeType {
case tikvrpc.TiKV:
Expand All @@ -258,15 +271,16 @@ func (r *Region) init(bo *retry.Backoffer, c *RegionCache) error {
// TODO(youjiali1995): It's possible the region info in PD is stale for now but it can recover.
// Maybe we need backoff here.
if len(availablePeers) == 0 {
return errors.Errorf("no available peers, region: {%v}", r.meta)
return nil, errors.Errorf("no available peers, region: {%v}", r.meta)
}
rs.workTiKVIdx = leaderAccessIdx
r.meta.Peers = availablePeers

atomic.StorePointer(&r.store, unsafe.Pointer(rs))

// mark region has been init accessed.
r.lastAccess = time.Now().Unix()
return nil
return r, nil
}

func (r *Region) getStore() (store *regionStore) {
Expand Down Expand Up @@ -781,7 +795,6 @@ func (c *RegionCache) findRegionByKey(bo *retry.Backoffer, key []byte, isEndKey

// OnSendFailForTiFlash handles send request fail logic for tiflash.
func (c *RegionCache) OnSendFailForTiFlash(bo *retry.Backoffer, store *Store, region RegionVerID, prev *metapb.Region, scheduleReload bool, err error, skipSwitchPeerLog bool) {

r := c.GetCachedRegionWithRLock(region)
if r == nil {
return
Expand Down Expand Up @@ -1064,7 +1077,7 @@ func (c *RegionCache) UpdateLeader(regionID RegionVerID, leader *metapb.Peer, cu
return
}

if !c.switchWorkLeaderToPeer(r, leader) {
if !r.switchWorkLeaderToPeer(leader) {
logutil.BgLogger().Info("invalidate region cache due to cannot find peer when updating leader",
zap.Uint64("regionID", regionID.GetID()),
zap.Int("currIdx", int(currentPeerIdx)),
Expand Down Expand Up @@ -1274,15 +1287,7 @@ func (c *RegionCache) loadRegion(bo *retry.Backoffer, key []byte, isEndKey bool)
searchPrev = true
continue
}
region := &Region{meta: reg.Meta}
err = region.init(bo, c)
if err != nil {
return nil, err
}
if reg.Leader != nil {
c.switchWorkLeaderToPeer(region, reg.Leader)
}
return region, nil
return newRegion(bo, c, reg)
}
}

Expand Down Expand Up @@ -1322,15 +1327,7 @@ func (c *RegionCache) loadRegionByID(bo *retry.Backoffer, regionID uint64) (*Reg
if len(reg.Meta.Peers) == 0 {
return nil, errors.New("receive Region with no available peer")
}
region := &Region{meta: reg.Meta}
err = region.init(bo, c)
if err != nil {
return nil, err
}
if reg.Leader != nil {
c.switchWorkLeaderToPeer(region, reg.Leader)
}
return region, nil
return newRegion(bo, c, reg)
}
}

Expand Down Expand Up @@ -1376,17 +1373,15 @@ func (c *RegionCache) scanRegions(bo *retry.Backoffer, startKey, endKey []byte,
}
regions := make([]*Region, 0, len(regionsInfo))
for _, r := range regionsInfo {
region := &Region{meta: r.Meta}
err := region.init(bo, c)
// Leader id = 0 indicates no leader.
if r.Leader == nil || r.Leader.GetId() == 0 {
continue
}
region, err := newRegion(bo, c, r)
if err != nil {
return nil, err
}
leader := r.Leader
// Leader id = 0 indicates no leader.
if leader != nil && leader.GetId() != 0 {
c.switchWorkLeaderToPeer(region, leader)
regions = append(regions, region)
}
regions = append(regions, region)
}
if len(regions) == 0 {
return nil, errors.New("receive Regions with no peer")
Expand Down Expand Up @@ -1548,8 +1543,7 @@ func (c *RegionCache) OnRegionEpochNotMatch(bo *retry.Backoffer, ctx *RPCContext
return false, errors.Errorf("newRegion's range key is not encoded: %v, %v", meta, err)
}
}
region := &Region{meta: meta}
err := region.init(bo, c)
region, err := newRegion(bo, c, &pd.Region{Meta: meta})
if err != nil {
return false, err
}
Expand All @@ -1559,7 +1553,7 @@ func (c *RegionCache) OnRegionEpochNotMatch(bo *retry.Backoffer, ctx *RPCContext
} else {
initLeaderStoreID = ctx.Store.storeID
}
c.switchWorkLeaderToPeer(region, region.getPeerOnStore(initLeaderStoreID))
region.switchWorkLeaderToPeer(region.getPeerOnStore(initLeaderStoreID))
newRegions = append(newRegions, region)
if ctx.Region == region.VerID() {
needInvalidateOld = false
Expand Down Expand Up @@ -1728,10 +1722,24 @@ func (r *Region) EndKey() []byte {
return r.meta.EndKey
}

func (r *Region) getPeerStoreIndex(peer *metapb.Peer) (idx int, found bool) {
if len(r.meta.Peers) == 0 || peer == nil {
return
}
for i, p := range r.meta.Peers {
if isSamePeer(p, peer) {
idx = i
found = true
return
}
}
return
}

// switchWorkLeaderToPeer switches current store to the one on specific store. It returns
// false if no peer matches the peer.
func (c *RegionCache) switchWorkLeaderToPeer(r *Region, peer *metapb.Peer) (found bool) {
globalStoreIdx, found := c.getPeerStoreIndex(r, peer)
func (r *Region) switchWorkLeaderToPeer(peer *metapb.Peer) (found bool) {
globalStoreIdx, found := r.getPeerStoreIndex(peer)
if !found {
return
}
Expand Down Expand Up @@ -1811,20 +1819,6 @@ func (r *Region) getPeerOnStore(storeID uint64) *metapb.Peer {
return nil
}

func (c *RegionCache) getPeerStoreIndex(r *Region, peer *metapb.Peer) (idx int, found bool) {
if len(r.meta.Peers) == 0 || peer == nil {
return
}
for i, p := range r.meta.Peers {
if isSamePeer(p, peer) {
idx = i
found = true
return
}
}
return
}

// Contains checks whether the key is in the region, for the maximum region endKey is empty.
// startKey <= key < endKey.
func (r *Region) Contains(key []byte) bool {
Expand Down Expand Up @@ -2226,7 +2220,7 @@ func createKVHealthClient(ctx context.Context, addr string) (*grpc.ClientConn, h

cfg := config.GetGlobalConfig()

opt := grpc.WithInsecure() //nolint
opt := grpc.WithInsecure()
if len(cfg.Security.ClusterSSLCA) != 0 {
tlsConfig, err := cfg.Security.ToTLSConfig()
if err != nil {
Expand Down
3 changes: 1 addition & 2 deletions internal/locate/region_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1263,8 +1263,7 @@ func (s *testRegionCacheSuite) TestPeersLenChange() {
DownPeers: []*metapb.Peer{{Id: s.peer1, StoreId: s.store1}},
}
filterUnavailablePeers(cpRegion)
region := &Region{meta: cpRegion.Meta}
err = region.init(s.bo, s.cache)
region, err := newRegion(s.bo, s.cache, cpRegion)
s.Nil(err)
s.cache.insertRegionToCache(region)

Expand Down
4 changes: 2 additions & 2 deletions internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ func (state *tryFollower) next(bo *retry.Backoffer, selector *replicaSelector) (
}

func (state *tryFollower) onSendSuccess(selector *replicaSelector) {
if !selector.regionCache.switchWorkLeaderToPeer(selector.region, selector.targetReplica().peer) {
if !selector.region.switchWorkLeaderToPeer(selector.targetReplica().peer) {
panic("the store must exist")
}
}
Expand Down Expand Up @@ -822,7 +822,7 @@ func (s *replicaSelector) updateLeader(leader *metapb.Peer) {
}
s.state = &accessKnownLeader{leaderIdx: AccessIndex(i)}
// Update the workTiKVIdx so that following requests can be sent to the leader immediately.
if !s.regionCache.switchWorkLeaderToPeer(s.region, leader) {
if !s.region.switchWorkLeaderToPeer(leader) {
panic("the store must exist")
}
logutil.BgLogger().Debug("switch region leader to specific leader due to kv return NotLeader",
Expand Down

0 comments on commit f43cc3a

Please sign in to comment.