修改环境变量获取模式

-env dev
-env prod
This commit is contained in:
chejiulong 2023-02-27 11:58:39 +08:00
parent 7e5bd5923c
commit d6d8a6e45f
3 changed files with 97 additions and 65 deletions

Binary file not shown.

Binary file not shown.

162
main.go
View File

@ -6,6 +6,7 @@ import (
"crypto/tls" "crypto/tls"
"encoding/csv" "encoding/csv"
"encoding/json" "encoding/json"
"flag"
"fmt" "fmt"
"io" "io"
"log" "log"
@ -30,47 +31,30 @@ import (
"gorm.io/gorm/logger" "gorm.io/gorm/logger"
) )
/* var ( //初始化变量
var ( env string
applogger *logrus.Logger applogger *logrus.Logger
redisClient *redis.Client redisClient *redis.Client
executableDir string executableDir string
redisAddress = "mysql5.weu.me:6379" redisAddress string
redisPassword = "" redisPassword string
redisDB = 1 redisDB int
sftpAddress = "192.168.10.86:49156" sftpAddress string
sftpUser = "demo" sftpUser string
sftpPassword = "demo" sftpPassword string
sftpDir = "/sftp/test" sftpDir string
dbAddress = "192.168.10.18:1433" dbAddress string
dbUser = "sa" dbUser string
dbPassword = "Aa123123" dbPassword string
dbName = "sephora" dbName string
zipPath = "RawData/Zip/" zipPath string
txtPath = "RawData/Txt/" txtPath string
logPath = "logs/" logPath string
) batchSize int //提交数据
*/ insertSize int //一次性入库
insertChanSize int //通道缓冲数
var ( //正式环境 goSize int //协程数
taskTime int
applogger *logrus.Logger
redisClient *redis.Client
executableDir string
redisAddress = "r-bp11564d96842414128.redis.rds.aliyuncs.com:6379"
redisPassword = "3Nsb4Pmsl9bcLs24mL12l"
redisDB = 233
sftpAddress = "esftp.sephora.com.cn:20981"
sftpUser = "CHN-CRMTOWemedia-wemedia"
sftpPassword = "uoWdMHEv39ZFjiOg"
sftpDir = "/CN-CRMTOWemedia/SMS"
dbAddress = "rm-bp16l424ln96q1ouk.sqlserver.rds.aliyuncs.com:3433"
dbUser = "sephora_sms"
dbPassword = "5ORiiLmgkniC0EqF"
dbName = "sephora_sms"
zipPath = "RawData/Zip/"
txtPath = "RawData/Txt/"
logPath = "logs/"
) )
type Batch struct { type Batch struct {
@ -137,6 +121,55 @@ func init() {
log.Fatal(err) log.Fatal(err)
} }
flag.StringVar(&env, "env", "dev", "运行模式")
flag.Parse()
switch env {
case "dev":
fmt.Print("测试环境配置以生效\n")
redisAddress = "mysql5.weu.me:6379"
redisPassword = ""
redisDB = 1
sftpAddress = "192.168.10.86:49156"
sftpUser = "demo"
sftpPassword = "demo"
sftpDir = "/sftp/test"
dbAddress = "192.168.10.18:1433"
dbUser = "sa"
dbPassword = "Aa123123"
dbName = "sephora"
zipPath = "RawData/Zip/"
txtPath = "RawData/Txt/"
logPath = "logs/"
batchSize = 5000 //提交数据
insertSize = 500 //一次性入库
insertChanSize = 10 //通道缓冲数
goSize = 10 //协程数
taskTime = 1
case "prod":
fmt.Print("正式环境配置以生效\n")
redisAddress = "r-bp11564d96842414128.redis.rds.aliyuncs.com:6379"
redisPassword = "3Nsb4Pmsl9bcLs24mL12l"
redisDB = 233
sftpAddress = "esftp.sephora.com.cn:20981"
sftpUser = "CHN-CRMTOWemedia-wemedia"
sftpPassword = "uoWdMHEv39ZFjiOg"
sftpDir = "/CN-CRMTOWemedia/SMS"
dbAddress = "rm-bp16l424ln96q1ouk.sqlserver.rds.aliyuncs.com:3433"
dbUser = "sephora_sms"
dbPassword = "5ORiiLmgkniC0EqF"
dbName = "sephora_sms"
zipPath = "RawData/Zip/"
txtPath = "RawData/Txt/"
logPath = "logs/"
batchSize = 5000 //提交数据
insertSize = 500 //一次性入库
insertChanSize = 50 //通道缓冲数
goSize = 150 //协程数
taskTime = 60
default:
panic(fmt.Errorf("无效的运行模式: %s", env))
}
redisClient = redis.NewClient(&redis.Options{ redisClient = redis.NewClient(&redis.Options{
Addr: redisAddress, Addr: redisAddress,
Password: redisPassword, Password: redisPassword,
@ -160,7 +193,7 @@ func main() {
} }
applogger = logrus.New() applogger = logrus.New()
logPath := filepath.Join(executableDir, "logs") logPath := filepath.Join(executableDir, "logs")
logFileName := "sms_processing_" + time.Now().Format("2006-01-02") + ".log" logFileName := "sms_processing_" + time.Now().Format("2006_01_02") + ".log"
logFileHook := &lumberjack.Logger{ logFileHook := &lumberjack.Logger{
Filename: filepath.Join(logPath, logFileName), Filename: filepath.Join(logPath, logFileName),
} }
@ -168,18 +201,21 @@ func main() {
go downloadDecompression() // 启动立即执行一次 go downloadDecompression() // 启动立即执行一次
ticker := time.NewTicker(60 * time.Minute) // 创建一个定时器,每隔一分钟触发一次任务函数 ticker := time.NewTicker(time.Duration(taskTime) * time.Minute) // 创建一个定时器,每隔一分钟触发一次任务函数
tickCount := 1 //记录循环次数 tickCount := 2 //记录循环次数
defer ticker.Stop() // 延迟关闭定时器,确保所有的定时器事件都被处理完毕 defer ticker.Stop() // 延迟关闭定时器,确保所有的定时器事件都被处理完毕
// 循环处理任务 // 循环处理任务
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
// 定时器触发时执行的任务函数 // 定时器触发时执行的任务函数
logFileName := "sms_processing_" + time.Now().Format("2006-01-02") + ".log" logFileName := "sms_processing_" + time.Now().Format("2006_01_02") + ".log"
logFileHook.Filename = filepath.Join(logPath, logFileName) logFileHook := &lumberjack.Logger{
fmt.Printf("尝试第%d此执行....\n", tickCount) Filename: filepath.Join(logPath, logFileName),
applogger.Info(fmt.Sprintf("尝试第%d此执行....\n", tickCount)) }
applogger.SetOutput(logFileHook)
fmt.Printf("尝试第%d次执行....\n", tickCount)
applogger.Info(fmt.Sprintf("尝试第%d次执行....", tickCount))
go downloadDecompression() // 在新协程中异步执行 go downloadDecompression() // 在新协程中异步执行
tickCount++ tickCount++
} }
@ -252,8 +288,8 @@ func downloadDecompression() {
applogger.Info("下载文件失败,文件名: %s错误信息%v", file.Name(), err) applogger.Info("下载文件失败,文件名: %s错误信息%v", file.Name(), err)
continue continue
} }
fmt.Printf("%s数据包下载完成\n", file.Name()) fmt.Printf("%s数据包下载完成\n", file.Name())
applogger.Info(fmt.Sprintf("%s数据包下载完成", file.Name())) applogger.Info(fmt.Sprintf("%s数据包下载完成", file.Name()))
// Unzip file // Unzip file
zipReader, err := zip.OpenReader(path.Join(executableDir, zipPath, file.Name())) zipReader, err := zip.OpenReader(path.Join(executableDir, zipPath, file.Name()))
if err != nil { if err != nil {
@ -370,8 +406,8 @@ func batchInsert(fileName string) {
SendEmail(subject, body) //发送邮件 SendEmail(subject, body) //发送邮件
fmt.Printf("%s批次文件入库完成 \n", fileName) fmt.Printf("%s批次文件入库完成 \n", fileName)
applogger.Info(fmt.Sprintf("%s批次文件入库完成", fileName)) applogger.Info(fmt.Sprintf("%s批次文件入库完成", fileName))
fmt.Printf("执行时间%s 插入批次次数:%d\n\n\n", elapsed, batchRows) fmt.Printf("%s批次文件执行时间%s 插入批次次数:%d\n\n\n", fileName, elapsed, batchRows)
applogger.Info(fmt.Sprintf("执行时间%s 插入批次次数:%d", elapsed, batchRows)) applogger.Info(fmt.Sprintf("%s批次文件执行时间%s 插入批次次数:%d", fileName, elapsed, batchRows))
} }
} }
@ -386,10 +422,6 @@ func batchDataInsert(fileName string) {
defer file.Close() defer file.Close()
db, _ := connectToDB() db, _ := connectToDB()
// Insert data in batches using multiple goroutines // Insert data in batches using multiple goroutines
batchSize := 5000 //提交数据
insertSize := 500 //一次性入库
insertChanSize := 50 //通道缓冲数
goSize := 150 //协程数
var wg sync.WaitGroup var wg sync.WaitGroup
dataBatchChan := make(chan []BatcheData, insertChanSize) dataBatchChan := make(chan []BatcheData, insertChanSize)
for i := 0; i < goSize; i++ { for i := 0; i < goSize; i++ {
@ -451,14 +483,14 @@ func batchDataInsert(fileName string) {
applogger.Info("个性化字段合并失败,文件名:%s错误信息%v", fileName, err) applogger.Info("个性化字段合并失败,文件名:%s错误信息%v", fileName, err)
continue continue
} }
if _, ok := duplicateCount[row[2]]; !ok {
duplicateCount[row[2]] = 0
}
// Check if record exists in hashset // Check if record exists in hashset
key := fmt.Sprintf("%s-%s", row[2], row[3]) key := fmt.Sprintf("%s-%s", row[2], row[3])
if _, exists := hs[key]; exists { //如果批次数据重复 if _, exists := hs[key]; exists { //如果批次数据重复
bi++ bi++
// Increment duplicate count // Increment duplicate count
if _, ok := duplicateCount[row[2]]; !ok {
duplicateCount[row[2]] = 0
}
duplicateCount[row[2]]++ duplicateCount[row[2]]++
dataBatchDuplicate = append(dataBatchDuplicate, BatchDataDuplicateLog{ dataBatchDuplicate = append(dataBatchDuplicate, BatchDataDuplicateLog{
CommunicationChannelID: row[2], CommunicationChannelID: row[2],
@ -544,10 +576,10 @@ func batchDataInsert(fileName string) {
subject := "丝芙兰数据包处理完成" subject := "丝芙兰数据包处理完成"
body := "数据包:" + fileName + ";\n总数" + strconv.Itoa(count+bi) + ";\n过滤重复数" + strconv.Itoa(bi) + ";\n过滤后总数" + strconv.Itoa(count) + ";\n处理完成请前往管理平台查看处理。" body := "数据包:" + fileName + ";\n总数" + strconv.Itoa(count+bi) + ";\n过滤重复数" + strconv.Itoa(bi) + ";\n过滤后总数" + strconv.Itoa(count) + ";\n处理完成请前往管理平台查看处理。"
SendEmail(subject, body) //发送邮件 SendEmail(subject, body) //发送邮件
applogger.Info(fmt.Sprintf("%s数据包 ,入库完成", fileName)) applogger.Info(fmt.Sprintf("%s数据包 ,入库完成", fileName))
fmt.Printf("%s数据包 入库完成\n", fileName) fmt.Printf("%s数据包 入库完成\n", fileName)
fmt.Printf("执行时间:%s 插入数据:%d 过滤数:%d\n\n\n", elapsed, count, bi) fmt.Printf("%s数据包 执行时间:%s 插入数据:%d 过滤数:%d\n\n\n", fileName, elapsed, count, bi)
applogger.Info(fmt.Sprintf("执行时间:%s 插入数据:%d条 过滤数数:%d条", elapsed, count, bi)) applogger.Info(fmt.Sprintf("%s数据包执行时间:%s 插入数据:%d条 过滤数数:%d条", fileName, elapsed, count, bi))
} }
} }
@ -582,8 +614,8 @@ func SendEmail(subject string, body string) error {
smtpPort := 465 smtpPort := 465
from := "auto_system@wemediacn.com" from := "auto_system@wemediacn.com"
password := "yJJYYcy5NKx2y69r" password := "yJJYYcy5NKx2y69r"
//to := []string{"chejiulong@wemediacn.com", "xiayujuan@wemediacn.com", "wangyuanbing@wemediacn.com", "tangweiqi@wemediacn.com"} to := []string{"chejiulong@wemediacn.com", "xiayujuan@wemediacn.com", "wangyuanbing@wemediacn.com", "tangweiqi@wemediacn.com"}
to := []string{"chejiulong@wemediacn.com"} //to := []string{"chejiulong@wemediacn.com"}
// 邮件内容 // 邮件内容
m := gomail.NewMessage() m := gomail.NewMessage()