diff --git a/iniDataForLinux b/iniDataForLinux index 2bb96f6..8a66b9b 100755 Binary files a/iniDataForLinux and b/iniDataForLinux differ diff --git a/iniDataForMacOs b/iniDataForMacOs index 99ce46c..c541e65 100755 Binary files a/iniDataForMacOs and b/iniDataForMacOs differ diff --git a/main.go b/main.go index d56831a..94ee420 100644 --- a/main.go +++ b/main.go @@ -163,11 +163,11 @@ func handleWebSocket(w http.ResponseWriter, r *http.Request) { conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端 SendEmail(subject, body) //发送邮件 } else { - batchInsert(task.TaskData.BatchFilename, true) //创建批次 + batchInsert(task.TaskData.BatchFilename, true, task.TaskData.ExcludedFilename) //创建批次 returnMessage = []byte(`{"code": 2000,"err": "批次创建完成"}`) - conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端 - batchDataInsert(task.TaskData.DataFilename, lastCallKeys) //添加数据 - redisClient.Del("iniLastCallDataStatus") //删除任务执行中标记 + conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端 + batchDataInsert(task.TaskData.DataFilename, lastCallKeys, task.TaskData.ExcludedFilename) //添加数据 + redisClient.Del("iniLastCallDataStatus") //删除任务执行中标记 returnMessage = []byte(`{"code": 2000,"err": "结束处理lastCall"}`) conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端 } @@ -191,8 +191,6 @@ func handleWebSocket(w http.ResponseWriter, r *http.Request) { } func verify_signature(signature string, nonce string, timestamp int64, data interface{}) bool { - key := "qZ6v1&H#Wjx+yRm2D@*sJF$tnfL83Ia" - fmt.Printf("Received signature: %s\n", signature) fmt.Printf("Received timestamp: %d\n", timestamp) fmt.Printf("Received nonce: %s\n", nonce) @@ -212,7 +210,7 @@ func verify_signature(signature string, nonce string, timestamp int64, data inte expected_data := fmt.Sprintf("%d|%s|%s", received_timestamp, received_nonce, received_data) fmt.Printf("Expected data: %s\n", expected_data) - mac := hmac.New(sha256.New, []byte(key)) + mac := hmac.New(sha256.New, []byte(verifySignatureKey)) mac.Write([]byte(expected_data)) expected_signature := hex.EncodeToString(mac.Sum(nil)) fmt.Printf("Expected signature: %s\n", expected_signature) @@ -378,7 +376,7 @@ func downloadDecompression() { continue } applogger.Info(fmt.Sprintf("%s(数据包)解压完成", zipFile.Name)) - processingStatus = batchDataInsert(zipFile.Name, nil) + processingStatus = batchDataInsert(zipFile.Name, nil, "") } } else if filepath.Ext(file.Name()) == ".txt" { srcFile, err := sftpClient.Open(path.Join(sftpDir, file.Name())) @@ -404,7 +402,7 @@ func downloadDecompression() { continue } applogger.Info(fmt.Sprintf("%s(批次文件)下载完成", file.Name())) - processingStatus = batchInsert(file.Name(), false) + processingStatus = batchInsert(file.Name(), false, "") } if processingStatus != -1 { err = redisClient.Set(fileKey, 1, 0).Err() //写入文件处理完成 @@ -421,7 +419,7 @@ func downloadDecompression() { } // 批次入库 -func batchInsert(fileName string, isLastCall bool) int { +func batchInsert(fileName string, isLastCall bool, excludedFilename string) int { //fmt.Print("批次处理开始") start := time.Now() db, _ := connectToDB() @@ -450,8 +448,8 @@ func batchInsert(fileName string, isLastCall bool) int { var batchName, dataFileName string if isLastCall { - batchName = fmt.Sprintf("lastCall-%s-%s", record[1], strconv.Itoa(int(communicationChannelID))) - dataFileName = fmt.Sprintf("lastCall-%s", fileName) + batchName = fmt.Sprintf("lastCall-%s-%s-%s", record[1], excludedFilename, strconv.Itoa(int(communicationChannelID))) + dataFileName = fmt.Sprintf("lastCall-%s-%s", excludedFilename, fileName) } else { batchName = fmt.Sprintf("%s-%s", record[1], strconv.Itoa(int(communicationChannelID))) dataFileName = fileName @@ -568,7 +566,7 @@ func smsApi(method string, sendSMSDataJson string) (int, error) { } } -func batchDataInsert(fileName string, lastCallKeys map[string]bool) int { +func batchDataInsert(fileName string, lastCallKeys map[string]bool, excludedFilename string) int { start := time.Now() // Open file fileKey := fmt.Sprintf("downloaded:%s", strings.Replace(fileName, filepath.Ext(fileName), ".zip", 1)) @@ -614,11 +612,12 @@ func batchDataInsert(fileName string, lastCallKeys map[string]bool) int { batches := []Batch{} //查询批次 fileNameDate := "" dataFileName := "" - + fields := strings.Split(fileName, "_") + datetime := fields[len(fields)-1] if lastCallKeys != nil { - fields := strings.Split(fileName, "_") - datetime := fields[len(fields)-1] - fileNameDate = datetime[:8] + newFileName := "Communication_definition_SMS_1_wemedia_" + datetime + fileNameDate = fmt.Sprintf("lastCall-%s-%s", excludedFilename, newFileName) + fmt.Printf("fileNameDate : %s\n", fileNameDate) // 模糊查询文件名包含“20230103”字符串的批次记录 db.Table("batches b1"). Select("b1.*"). @@ -626,12 +625,12 @@ func batchDataInsert(fileName string, lastCallKeys map[string]bool) int { Joins("INNER JOIN (SELECT communication_channel_id, MAX(created_at) AS created_at FROM batches GROUP BY communication_channel_id) b2 ON b1.communication_channel_id = b2.communication_channel_id AND b1.created_at = b2.created_at"). Order("b1.created_at desc"). Find(&batches) - dataFileName = fmt.Sprintf("lastCall-%s", fileName) + dataFileName = fmt.Sprintf("lastCall-%s-%s", excludedFilename, fileName) } else { - fileName = fmt.Sprintf("lastCall-%s", fileName) + fileNameDate = datetime[:8] db.Table("batches b1"). Select("b1.*"). - Where("b1.data_file_name = ?", fileNameDate). + Where("b1.data_file_name LIKE ?", "%"+fileNameDate+"%"). Joins("INNER JOIN (SELECT communication_channel_id, MAX(created_at) AS created_at FROM batches GROUP BY communication_channel_id) b2 ON b1.communication_channel_id = b2.communication_channel_id AND b1.created_at = b2.created_at"). Order("b1.created_at desc"). Find(&batches) @@ -639,9 +638,10 @@ func batchDataInsert(fileName string, lastCallKeys map[string]bool) int { } batchCount := len(batches) - sendMobiles := make(map[string]SmsData, batchCount) //短信数据,用于提交发送平台 - duplicateCount := make(map[string]int, batchCount) //按批次重复数 - insertsCount := make(map[string]int, batchCount) //按批次插入数 + sendMobiles := make(map[string]SmsData, batchCount) //短信数据,用于提交发送平台 + duplicateCount := make(map[string]int, batchCount) //按批次重复数 + lastCallduplicateCount := make(map[string]int, batchCount) //按批次重复数 + insertsCount := make(map[string]int, batchCount) //按批次插入数 ccids := make(map[string]bool, batchCount) if batchCount > 0 { //如果查询到数据需要的批次信息,开始处理数据 @@ -680,6 +680,9 @@ func batchDataInsert(fileName string, lastCallKeys map[string]bool) int { if _, ok := duplicateCount[row[2]]; !ok { duplicateCount[row[2]] = 0 } + if _, ok := lastCallduplicateCount[row[2]]; !ok { + lastCallduplicateCount[row[2]] = 0 + } // Check if record exists in hashset key := fmt.Sprintf("%s-%s-%s", row[2], row[3], row[5]) @@ -704,6 +707,7 @@ func batchDataInsert(fileName string, lastCallKeys map[string]bool) int { continue } if lastCallKeys != nil && lastCallKeys[fmt.Sprintf("%s-%s", row[2], row[3])] { + lastCallduplicateCount[row[2]]++ continue } @@ -837,10 +841,11 @@ func batchDataInsert(fileName string, lastCallKeys map[string]bool) int { smsApi("finishbatch", string(sfJson)) bpi := []BatchProcessingInformation{} bpi = append(bpi, BatchProcessingInformation{ - CommunicationChannelID: ccid, - RepeatTargetsMember: duplicateCount[ccid], - InsertsTargetsMember: insertsCount[ccid], - DataFileName: fileName, + CommunicationChannelID: ccid, + RepeatTargetsMember: duplicateCount[ccid], + LastCallRepeatTargetsMember: lastCallduplicateCount[ccid], + InsertsTargetsMember: insertsCount[ccid], + DataFileName: dataFileName, }) err = db.CreateInBatches(bpi, insertSize).Error if err != nil { @@ -980,6 +985,8 @@ func iniConfi() { to = []string{"chejiulong@wemediacn.com"} token = "7100477930234217" lastCallPath = "RawData/LastCall" + verifySignatureKey = "qZ6v1&H#Wjx+yRm2D@*sJF$tnfL83Ia" + case "prod": //fmt.Print("正式环境配置已生效\n") redisAddress = "r-bp11564d96842414128.redis.rds.aliyuncs.com:6379" @@ -1004,6 +1011,8 @@ func iniConfi() { to = []string{"chejiulong@wemediacn.com", "xiayujuan@wemediacn.com", "wangyuanbing@wemediacn.com", "tangweiqi@wemediacn.com"} token = "7100477930234217" lastCallPath = "RawData/LastCall" + verifySignatureKey = "Lzxf6asgwFRTsANZ8gkXJnP3EmOyNO#&" + default: panic(fmt.Errorf("无效的运行模式: %s", env)) } @@ -1061,32 +1070,33 @@ func (f FileSorter) Less(i, j int) bool { } var ( //初始化变量 - env string - applogger *logrus.Logger - redisClient *redis.Client - executableDir string - redisAddress string - redisPassword string - redisDB int - sftpAddress string - sftpUser string - sftpPassword string - sftpDir string - dbAddress string - dbUser string - dbPassword string - dbName string - zipPath string - txtPath string - logPath string - batchSize int //提交数据 - insertSize int //一次性入库 - insertChanSize int //通道缓冲数 - goSize int //协程数 - taskTime int - to []string - token string - lastCallPath string + env string + applogger *logrus.Logger + redisClient *redis.Client + executableDir string + redisAddress string + redisPassword string + redisDB int + sftpAddress string + sftpUser string + sftpPassword string + sftpDir string + dbAddress string + dbUser string + dbPassword string + dbName string + zipPath string + txtPath string + logPath string + batchSize int //提交数据 + insertSize int //一次性入库 + insertChanSize int //通道缓冲数 + goSize int //协程数 + taskTime int + to []string + token string + lastCallPath string + verifySignatureKey string ) type Batch struct { @@ -1115,11 +1125,12 @@ func (BatcheData) TableName() string { } type BatchProcessingInformation struct { - ID uint `gorm:"primaryKey;autoIncrement"` - CommunicationChannelID string `gorm:"column:communication_channel_id"` - RepeatTargetsMember int `gorm:"column:repeat_targets_member"` - InsertsTargetsMember int `gorm:"column:inserts_targets_member"` - DataFileName string `gorm:"column:data_file_name"` + ID uint `gorm:"primaryKey;autoIncrement"` + CommunicationChannelID string `gorm:"column:communication_channel_id"` + RepeatTargetsMember int `gorm:"column:repeat_targets_member"` + LastCallRepeatTargetsMember int `gorm:"column:last_call_repeat_targets_member"` + InsertsTargetsMember int `gorm:"column:inserts_targets_member"` + DataFileName string `gorm:"column:data_file_name"` } func (BatchProcessingInformation) TableName() string {