增加定时删除15天前数据功能、修改正式环境数据库为MySQL

This commit is contained in:
chejiulong 2023-04-04 16:02:28 +08:00
parent d6fb8c498e
commit 21119fa4c7
2 changed files with 87 additions and 41 deletions

89
main.go
View File

@ -58,9 +58,11 @@ func init() {
func main() { func main() {
ticker := time.NewTicker(time.Duration(taskTime) * time.Minute) // 数据下载、处理 ticker := time.NewTicker(time.Duration(taskTime) * time.Minute) // 数据下载、处理
ticker_merge := time.NewTicker(time.Duration(batchStatusTaskTime) * time.Minute) //查询批次 ticker_merge := time.NewTicker(time.Duration(batchStatusTaskTime) * time.Minute) //查询批次
notification := time.NewTicker(time.Until(nextNotificationTime())) // 每天凌晨3点输出提示
defer ticker.Stop() defer ticker.Stop()
defer ticker_merge.Stop() defer ticker_merge.Stop()
defer notification.Stop()
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
@ -73,10 +75,54 @@ func main() {
fmt.Print("查询批次状态...\n") fmt.Print("查询批次状态...\n")
queryBatchState() queryBatchState()
case <-notification.C:
iniLog()
applogger.Info("开始清除历史数据")
// 删除15天前的批次数据
rowsAffected, err := deleteOldData(&BatcheData{}, 15)
if err != nil {
handleError(err, "删除15天前的批次数据失败")
} else {
applogger.Info(fmt.Sprintf("成功删除 %d 行 15天前的批次数据", rowsAffected))
}
// 删除15天前的批次数据重复日志
rowsAffected, err = deleteOldData(&BatchDataDuplicateLog{}, 15)
if err != nil {
handleError(err, "删除15天前的批次数据重复日志失败")
} else {
applogger.Info(fmt.Sprintf("成功删除 %d 行 15天前的批次数据重复日志", rowsAffected))
}
notification.Reset(time.Until(nextNotificationTime()))
applogger.Info("清除历史数据完成")
} }
} }
} }
// 删除15天前的数据
func deleteOldData(model interface{}, daysAgo int) (int64, error) {
db, err := connectToDB()
if err != nil {
return 0, fmt.Errorf("连接数据库失败: %w", err)
}
threshold := time.Now().AddDate(0, 0, -daysAgo)
result := db.Where("created_at < ?", threshold).Delete(model)
if result.Error != nil {
return 0, result.Error
}
return result.RowsAffected, nil
}
// 计算下一次执行时间
func nextNotificationTime() time.Time {
now := time.Now()
notificationTime := time.Date(now.Year(), now.Month(), now.Day(), 3, 0, 0, 0, now.Location())
if now.After(notificationTime) {
notificationTime = notificationTime.Add(24 * time.Hour)
}
return notificationTime
}
func startWebSocket() { func startWebSocket() {
http.HandleFunc("/ws", handleWebSocket) http.HandleFunc("/ws", handleWebSocket)
err := http.ListenAndServe(":8080", nil) err := http.ListenAndServe(":8080", nil)
@ -87,6 +133,7 @@ func startWebSocket() {
} }
} }
// 客户端连接信息
func handleWebSocket(w http.ResponseWriter, r *http.Request) { func handleWebSocket(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil) conn, err := upgrader.Upgrade(w, r, nil)
if err != nil { if err != nil {
@ -205,6 +252,7 @@ func handleWebSocket(w http.ResponseWriter, r *http.Request) {
} }
// 验证签名
func verify_signature(signature string, nonce string, timestamp int64, data interface{}) bool { func verify_signature(signature string, nonce string, timestamp int64, data interface{}) bool {
fmt.Printf("Received signature: %s\n", signature) fmt.Printf("Received signature: %s\n", signature)
fmt.Printf("Received timestamp: %d\n", timestamp) fmt.Printf("Received timestamp: %d\n", timestamp)
@ -614,25 +662,23 @@ func batchDataInsert(fileName string, lastCallKeys map[string]bool, excludedFile
dataBatchDuplicate := make([]BatchDataDuplicateLog, 0, batchSize) //重复短信数据 dataBatchDuplicate := make([]BatchDataDuplicateLog, 0, batchSize) //重复短信数据
bi := 0 //重复总数 bi := 0 //重复总数
count := 0 // 总数 count := 0 // 总数
//数据文件中批次 // Data file batches
batches := []Batch{} //查询批次 batches := []Batch{} // Query batches
fileNameDate := "" fileNameDate := ""
dataFileName := "" dataFileName := ""
fields := strings.Split(fileName, "_") fields := strings.Split(fileName, "_")
datetime := fields[len(fields)-1] datetime := fields[len(fields)-1]
if lastCallKeys != nil { if lastCallKeys != nil {
fileNameDate = fmt.Sprintf("lastCall-%s", excludedFilename) fileNameDate = fmt.Sprintf("lastCall-%s", excludedFilename)
fmt.Printf("fileNameDate : %s\n", fileNameDate) fmt.Printf("fileNameDate : %s\n", fileNameDate)
// 模糊查询文件名包含“20230103”字符串的批次记录
db.Table("batches b1").
Select("b1.*").
Where("b1.data_file_name LIKE ?", "%"+fileNameDate+"%").
Joins("INNER JOIN (SELECT communication_channel_id, MAX(created_at) AS created_at FROM batches GROUP BY communication_channel_id) b2 ON b1.communication_channel_id = b2.communication_channel_id AND b1.created_at = b2.created_at").
Order("b1.created_at desc").
Find(&batches)
dataFileName = fmt.Sprintf("lastCall-%s", excludedFilename) dataFileName = fmt.Sprintf("lastCall-%s", excludedFilename)
} else { } else {
fileNameDate = datetime[:8] fileNameDate = datetime[:8]
dataFileName = strings.Replace(fileName, "targets", "definition", -1)
dataFileName = dataFileName[:len(dataFileName)-10]
}
db.Table("batches AS b1"). db.Table("batches AS b1").
Select("b1.*"). Select("b1.*").
Where("b1.data_file_name LIKE ?", "%"+fileNameDate+"%"). Where("b1.data_file_name LIKE ?", "%"+fileNameDate+"%").
@ -640,10 +686,6 @@ func batchDataInsert(fileName string, lastCallKeys map[string]bool, excludedFile
Order("b1.created_at DESC"). Order("b1.created_at DESC").
Find(&batches) Find(&batches)
dataFileName = strings.Replace(fileName, "targets", "definition", -1)
dataFileName = dataFileName[:len(dataFileName)-10]
}
batchCount := len(batches) batchCount := len(batches)
sendMobiles := make(map[string]SmsData, batchCount) //短信数据,用于提交发送平台 sendMobiles := make(map[string]SmsData, batchCount) //短信数据,用于提交发送平台
duplicateCount := make(map[string]int, batchCount) //按批次重复数 duplicateCount := make(map[string]int, batchCount) //按批次重复数
@ -1094,7 +1136,7 @@ func iniConfi() {
logPath = "logs/" logPath = "logs/"
batchSize = 5000 //提交数据 batchSize = 5000 //提交数据
insertSize = 5000 //一次性入库 insertSize = 5000 //一次性入库
insertChanSize = 10 //通道缓冲数 insertChanSize = 100000 //通道缓冲数
goSize = 10 //协程数 goSize = 10 //协程数
taskTime = 1 taskTime = 1
batchStatusTaskTime = 1 batchStatusTaskTime = 1
@ -1112,16 +1154,17 @@ func iniConfi() {
sftpUser = "CHN-CRMTOWemedia-wemedia" sftpUser = "CHN-CRMTOWemedia-wemedia"
sftpPassword = "uoWdMHEv39ZFjiOg" sftpPassword = "uoWdMHEv39ZFjiOg"
sftpDir = "/CN-CRMTOWemedia/SMS" sftpDir = "/CN-CRMTOWemedia/SMS"
dbAddress = "rm-bp16l424ln96q1ouk.sqlserver.rds.aliyuncs.com:3433" dbAddress = "rm-bp1cb0x329c1dwid5.mysql.rds.aliyuncs.com"
dbUser = "sephora_sms" dbPort = "3306"
dbPassword = "5ORiiLmgkniC0EqF" dbUser = "sephora"
dbName = "sephora_sms" dbPassword = "YfbGJWsFkH4pXgPY"
dbName = "sephora"
zipPath = "RawData/Zip/" zipPath = "RawData/Zip/"
txtPath = "RawData/Txt/" txtPath = "RawData/Txt/"
logPath = "logs/" logPath = "logs/"
batchSize = 5000 //提交数据 batchSize = 5000 //提交数据
insertSize = 500 //一次性入库 insertSize = 5000 //一次性入库
insertChanSize = 100 //通道缓冲数 insertChanSize = 100000 //通道缓冲数
goSize = 50 //协程数 goSize = 50 //协程数
taskTime = 60 taskTime = 60
batchStatusTaskTime = 5 batchStatusTaskTime = 5
@ -1243,6 +1286,8 @@ type BatcheData struct {
Mobile string `gorm:"column:mobile"` Mobile string `gorm:"column:mobile"`
ReservedField string `gorm:"column:reserved_field"` ReservedField string `gorm:"column:reserved_field"`
DataFileName string `gorm:"column:data_file_name"` DataFileName string `gorm:"column:data_file_name"`
CreatedAt time.Time `gorm:"default:CURRENT_TIMESTAMP"`
UpdatedAt time.Time `gorm:"default:CURRENT_TIMESTAMP"`
} }
func (BatcheData) TableName() string { func (BatcheData) TableName() string {
@ -1268,6 +1313,8 @@ type BatchDataDuplicateLog struct {
Mobile string `gorm:"column:mobile"` Mobile string `gorm:"column:mobile"`
ReservedField string `gorm:"column:reserved_field"` ReservedField string `gorm:"column:reserved_field"`
DataFileName string `gorm:"column:data_file_name"` DataFileName string `gorm:"column:data_file_name"`
CreatedAt time.Time `gorm:"default:CURRENT_TIMESTAMP"`
UpdatedAt time.Time `gorm:"default:CURRENT_TIMESTAMP"`
} }
func (BatchDataDuplicateLog) TableName() string { func (BatchDataDuplicateLog) TableName() string {

View File

@ -35,4 +35,3 @@ $data = array(
); );
echo sign_message($data); echo sign_message($data);