Skip to content

Commit

Permalink
Apply credentials from envs to final config
Browse files Browse the repository at this point in the history
Signed-off-by: sashayakovtseva <[email protected]>
  • Loading branch information
sashayakovtseva committed Aug 1, 2023
1 parent b3d9d2a commit b34051a
Showing 1 changed file with 26 additions and 9 deletions.
35 changes: 26 additions & 9 deletions task/sinker.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import (
)

var (
createTableSQL = `CREATE TABLE IF NOT EXISTS %s as %s.%s ENGINE=Merge('%s', '%s')`
createTableSQL = `CREATE TABLE IF NOT EXISTS %s AS %s.%s ENGINE=Merge('%s', '%s')`
dropTableSQL = `DROP TABLE IF EXISTS %s `
countSeriesSQL = `WITH (SELECT max(timestamp) FROM %s) AS m
SELECT count() FROM %s FINAL WHERE __series_id GLOBAL IN (
Expand Down Expand Up @@ -111,15 +111,17 @@ func (s *Sinker) Run() {
}

if s.rcm == nil {
if _, err = os.Stat(s.cmdOps.LocalCfgFile); err == nil {
if newCfg, err = config.ParseLocalCfgFile(s.cmdOps.LocalCfgFile); err != nil {
util.Logger.Fatal("config.ParseLocalCfgFile failed", zap.Error(err))
return
}
} else {
if _, err := os.Stat(s.cmdOps.LocalCfgFile); err != nil {
util.Logger.Fatal("expect --local-cfg-file or --nacos-dataid")
return
}

if newCfg, err = config.ParseLocalCfgFile(s.cmdOps.LocalCfgFile); err != nil {
util.Logger.Fatal("config.ParseLocalCfgFile failed", zap.Error(err))
return
}
applyCredentials(newCfg, s.cmdOps.Credentials)

ha := ""
if s.cmdOps.NacosServiceName != "" {
ha = s.httpAddr
Expand Down Expand Up @@ -302,7 +304,7 @@ func (s *Sinker) applyConfig(newCfg *config.Config) (err error) {
}

func (s *Sinker) applyFirstConfig(newCfg *config.Config) (err error) {
util.Logger.Info("going to apply the first config", zap.Reflect("config", newCfg))
util.Logger.Info("going to apply the first config")
// 1. Initialize clickhouse connections
chCfg := &newCfg.Clickhouse
if err = pool.InitClusterConn(chCfg.Hosts, chCfg.Port, chCfg.DB, chCfg.Username, chCfg.Password,
Expand Down Expand Up @@ -338,7 +340,7 @@ func (s *Sinker) applyFirstConfig(newCfg *config.Config) (err error) {
}

func (s *Sinker) applyAnotherConfig(newCfg *config.Config) (err error) {
util.Logger.Info("going to apply another config", zap.Int("number", s.numCfg), zap.Reflect("config", newCfg))
util.Logger.Info("going to apply another config", zap.Int("number", s.numCfg))
if !reflect.DeepEqual(newCfg.Kafka, s.curCfg.Kafka) || !reflect.DeepEqual(newCfg.Clickhouse, s.curCfg.Clickhouse) {
// 1. Stop tasks gracefully. Wait until all flying data be processed (write to CH and commit to Kafka).
s.stopAllTasks()
Expand Down Expand Up @@ -453,6 +455,21 @@ func (s *Sinker) applyAnotherConfig(newCfg *config.Config) (err error) {
return
}

func applyCredentials(newCfg *config.Config, cred util.Credentials) {
if cred.ClickhouseUsername != "" {
newCfg.Clickhouse.Username = cred.ClickhouseUsername
}
if cred.ClickhousePassword != "" {
newCfg.Clickhouse.Password = cred.ClickhousePassword
}
if cred.KafkaUsername != "" {
newCfg.Kafka.Sasl.Username = cred.KafkaUsername
}
if cred.KafkaPassword != "" {
newCfg.Kafka.Sasl.Password = cred.KafkaPassword
}
}

func (s *Sinker) commitFn() {
for {
select {
Expand Down

0 comments on commit b34051a

Please sign in to comment.