diff --git a/main.go b/main.go index e3a40aa..a40373f 100644 --- a/main.go +++ b/main.go @@ -58,9 +58,11 @@ func init() { func main() { ticker := time.NewTicker(time.Duration(taskTime) * time.Minute) // 数据下载、处理 ticker_merge := time.NewTicker(time.Duration(batchStatusTaskTime) * time.Minute) //查询批次 + notification := time.NewTicker(time.Until(nextNotificationTime())) // 每天凌晨3点输出提示 + defer ticker.Stop() defer ticker_merge.Stop() - + defer notification.Stop() for { select { case <-ticker.C: @@ -73,10 +75,54 @@ func main() { fmt.Print("查询批次状态...\n") queryBatchState() + case <-notification.C: + iniLog() + applogger.Info("开始清除历史数据") + // 删除15天前的批次数据 + rowsAffected, err := deleteOldData(&BatcheData{}, 15) + if err != nil { + handleError(err, "删除15天前的批次数据失败") + } else { + applogger.Info(fmt.Sprintf("成功删除 %d 行 15天前的批次数据", rowsAffected)) + } + + // 删除15天前的批次数据重复日志 + rowsAffected, err = deleteOldData(&BatchDataDuplicateLog{}, 15) + if err != nil { + handleError(err, "删除15天前的批次数据重复日志失败") + } else { + applogger.Info(fmt.Sprintf("成功删除 %d 行 15天前的批次数据重复日志", rowsAffected)) + } + notification.Reset(time.Until(nextNotificationTime())) + applogger.Info("清除历史数据完成") } } } +// 删除15天前的数据 +func deleteOldData(model interface{}, daysAgo int) (int64, error) { + db, err := connectToDB() + if err != nil { + return 0, fmt.Errorf("连接数据库失败: %w", err) + } + threshold := time.Now().AddDate(0, 0, -daysAgo) + result := db.Where("created_at < ?", threshold).Delete(model) + if result.Error != nil { + return 0, result.Error + } + return result.RowsAffected, nil +} + +// 计算下一次执行时间 +func nextNotificationTime() time.Time { + now := time.Now() + notificationTime := time.Date(now.Year(), now.Month(), now.Day(), 3, 0, 0, 0, now.Location()) + if now.After(notificationTime) { + notificationTime = notificationTime.Add(24 * time.Hour) + } + return notificationTime +} + func startWebSocket() { http.HandleFunc("/ws", handleWebSocket) err := http.ListenAndServe(":8080", nil) @@ -87,6 +133,7 @@ func startWebSocket() { } } +// 客户端连接信息 func handleWebSocket(w http.ResponseWriter, r *http.Request) { conn, err := upgrader.Upgrade(w, r, nil) if err != nil { @@ -205,6 +252,7 @@ func handleWebSocket(w http.ResponseWriter, r *http.Request) { } +// 验证签名 func verify_signature(signature string, nonce string, timestamp int64, data interface{}) bool { fmt.Printf("Received signature: %s\n", signature) fmt.Printf("Received timestamp: %d\n", timestamp) @@ -614,36 +662,30 @@ func batchDataInsert(fileName string, lastCallKeys map[string]bool, excludedFile dataBatchDuplicate := make([]BatchDataDuplicateLog, 0, batchSize) //重复短信数据 bi := 0 //重复总数 count := 0 // 总数 - //数据文件中批次 - batches := []Batch{} //查询批次 + // Data file batches + batches := []Batch{} // Query batches fileNameDate := "" dataFileName := "" fields := strings.Split(fileName, "_") datetime := fields[len(fields)-1] + if lastCallKeys != nil { fileNameDate = fmt.Sprintf("lastCall-%s", excludedFilename) fmt.Printf("fileNameDate : %s\n", fileNameDate) - // 模糊查询文件名包含“20230103”字符串的批次记录 - db.Table("batches b1"). - Select("b1.*"). - Where("b1.data_file_name LIKE ?", "%"+fileNameDate+"%"). - Joins("INNER JOIN (SELECT communication_channel_id, MAX(created_at) AS created_at FROM batches GROUP BY communication_channel_id) b2 ON b1.communication_channel_id = b2.communication_channel_id AND b1.created_at = b2.created_at"). - Order("b1.created_at desc"). - Find(&batches) dataFileName = fmt.Sprintf("lastCall-%s", excludedFilename) } else { fileNameDate = datetime[:8] - db.Table("batches AS b1"). - Select("b1.*"). - Where("b1.data_file_name LIKE ?", "%"+fileNameDate+"%"). - Joins("INNER JOIN (SELECT communication_channel_id, MAX(created_at) AS max_created_at FROM batches GROUP BY communication_channel_id) AS b2 ON b1.communication_channel_id = b2.communication_channel_id AND b1.created_at = b2.max_created_at"). - Order("b1.created_at DESC"). - Find(&batches) - dataFileName = strings.Replace(fileName, "targets", "definition", -1) dataFileName = dataFileName[:len(dataFileName)-10] } + db.Table("batches AS b1"). + Select("b1.*"). + Where("b1.data_file_name LIKE ?", "%"+fileNameDate+"%"). + Joins("INNER JOIN (SELECT communication_channel_id, MAX(created_at) AS max_created_at FROM batches GROUP BY communication_channel_id) AS b2 ON b1.communication_channel_id = b2.communication_channel_id AND b1.created_at = b2.max_created_at"). + Order("b1.created_at DESC"). + Find(&batches) + batchCount := len(batches) sendMobiles := make(map[string]SmsData, batchCount) //短信数据,用于提交发送平台 duplicateCount := make(map[string]int, batchCount) //按批次重复数 @@ -1092,10 +1134,10 @@ func iniConfi() { zipPath = "RawData/Zip/" txtPath = "RawData/Txt/" logPath = "logs/" - batchSize = 5000 //提交数据 - insertSize = 5000 //一次性入库 - insertChanSize = 10 //通道缓冲数 - goSize = 10 //协程数 + batchSize = 5000 //提交数据 + insertSize = 5000 //一次性入库 + insertChanSize = 100000 //通道缓冲数 + goSize = 10 //协程数 taskTime = 1 batchStatusTaskTime = 1 to = []string{"chejiulong@wemediacn.com"} @@ -1112,17 +1154,18 @@ func iniConfi() { sftpUser = "CHN-CRMTOWemedia-wemedia" sftpPassword = "uoWdMHEv39ZFjiOg" sftpDir = "/CN-CRMTOWemedia/SMS" - dbAddress = "rm-bp16l424ln96q1ouk.sqlserver.rds.aliyuncs.com:3433" - dbUser = "sephora_sms" - dbPassword = "5ORiiLmgkniC0EqF" - dbName = "sephora_sms" + dbAddress = "rm-bp1cb0x329c1dwid5.mysql.rds.aliyuncs.com" + dbPort = "3306" + dbUser = "sephora" + dbPassword = "YfbGJWsFkH4pXgPY" + dbName = "sephora" zipPath = "RawData/Zip/" txtPath = "RawData/Txt/" logPath = "logs/" - batchSize = 5000 //提交数据 - insertSize = 500 //一次性入库 - insertChanSize = 100 //通道缓冲数 - goSize = 50 //协程数 + batchSize = 5000 //提交数据 + insertSize = 5000 //一次性入库 + insertChanSize = 100000 //通道缓冲数 + goSize = 50 //协程数 taskTime = 60 batchStatusTaskTime = 5 to = []string{"chejiulong@wemediacn.com", "xiayujuan@wemediacn.com", "wangyuanbing@wemediacn.com", "tangweiqi@wemediacn.com"} @@ -1238,11 +1281,13 @@ type Batch struct { } type BatcheData struct { - ID uint `gorm:"primary_key"` - CommunicationChannelID string `gorm:"column:communication_channel_id"` - Mobile string `gorm:"column:mobile"` - ReservedField string `gorm:"column:reserved_field"` - DataFileName string `gorm:"column:data_file_name"` + ID uint `gorm:"primary_key"` + CommunicationChannelID string `gorm:"column:communication_channel_id"` + Mobile string `gorm:"column:mobile"` + ReservedField string `gorm:"column:reserved_field"` + DataFileName string `gorm:"column:data_file_name"` + CreatedAt time.Time `gorm:"default:CURRENT_TIMESTAMP"` + UpdatedAt time.Time `gorm:"default:CURRENT_TIMESTAMP"` } func (BatcheData) TableName() string { @@ -1263,11 +1308,13 @@ func (BatchProcessingInformation) TableName() string { } type BatchDataDuplicateLog struct { - ID int `gorm:"primaryKey;autoIncrement"` - CommunicationChannelID string `gorm:"column:communication_channel_id"` - Mobile string `gorm:"column:mobile"` - ReservedField string `gorm:"column:reserved_field"` - DataFileName string `gorm:"column:data_file_name"` + ID int `gorm:"primaryKey;autoIncrement"` + CommunicationChannelID string `gorm:"column:communication_channel_id"` + Mobile string `gorm:"column:mobile"` + ReservedField string `gorm:"column:reserved_field"` + DataFileName string `gorm:"column:data_file_name"` + CreatedAt time.Time `gorm:"default:CURRENT_TIMESTAMP"` + UpdatedAt time.Time `gorm:"default:CURRENT_TIMESTAMP"` } func (BatchDataDuplicateLog) TableName() string { diff --git a/sign_message.php b/sign_message.php index 87561d5..d400dfc 100644 --- a/sign_message.php +++ b/sign_message.php @@ -34,5 +34,4 @@ $data = array( "data_filename" => "Communication_targets_SMS_1_wemedia_20230303185518.txt" ); -echo sign_message($data); - +echo sign_message($data); \ No newline at end of file