package main import ( "archive/zip" "bufio" "bytes" "crypto/hmac" "crypto/sha256" "crypto/tls" "encoding/csv" "encoding/hex" "encoding/json" "flag" "fmt" "io" "log" "net/http" "os" "path" "path/filepath" "regexp" "sort" "strconv" "strings" "sync" "syscall" "time" "github.com/go-redis/redis" "github.com/gorilla/websocket" "github.com/pkg/sftp" "github.com/sirupsen/logrus" "golang.org/x/crypto/ssh" "gopkg.in/gomail.v2" "gopkg.in/natefinch/lumberjack.v2" "gorm.io/driver/sqlserver" "gorm.io/gorm" "gorm.io/gorm/logger" ) func init() { monopolize() //独占锁 iniConfi() //初始化环境变量 iniPath() //初始化路径 applogger = logrus.New() //创建日志 iniLog() //初始化日志配置 redisClient = redis.NewClient(&redis.Options{ Addr: redisAddress, Password: redisPassword, DB: redisDB, }) go startWebSocket() // 启动 WebSocket 服务,使用协程方式运行 applogger.Info(fmt.Sprintf("程序启动,加载%s环境,尝试执行...", env)) go downloadDecompression() // 启动立即执行一次数据下载、处理 go queryBatchState() } func main() { ticker := time.NewTicker(time.Duration(taskTime) * time.Minute) // 数据下载、处理 ticker_merge := time.NewTicker(time.Duration(batchStatusTaskTime) * time.Minute) //查询批次 defer ticker.Stop() defer ticker_merge.Stop() for { select { case <-ticker.C: iniLog() applogger.Info("尝试执行数据处理...") go downloadDecompression() case <-ticker_merge.C: iniLog() fmt.Print("查询批次状态...\n") queryBatchState() } } } func startWebSocket() { http.HandleFunc("/ws", handleWebSocket) err := http.ListenAndServe(":8080", nil) if err != nil { fmt.Println("启动 WebSocket 服务失败:", err) } else { fmt.Println("启动 WebSocket 服务成功:") } } func handleWebSocket(w http.ResponseWriter, r *http.Request) { conn, err := upgrader.Upgrade(w, r, nil) if err != nil { fmt.Println("升级连接失败:", err) return } // 打印客户端地址 fmt.Printf("客户端 %s 连接成功\n", conn.RemoteAddr().String()) // 添加客户端连接信息 c := &client{conn} clients[c] = true // 接收消息 go func() { for { _, message, err := conn.ReadMessage() if err != nil { fmt.Println("接收消息失败:", err) return } fmt.Println("收到消息:", string(message)) var returnMessage []byte task := Task{} err = json.Unmarshal([]byte(message), &task) if err != nil { returnMessage = []byte(`{"code": 2001,"err": "json指令解析失败"}`) conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端 } else { if !verify_signature(task.Signature.Signature, task.Signature.Nonce, task.Signature.Timestamp, task.TaskData) { // 签名验证失败或超时 returnMessage = []byte(`{"code": 2401,"err": "Unauthorized"}`) conn.WriteMessage(websocket.TextMessage, returnMessage) conn.Close() break } switch task.TaskData.Command { case "lastCall": //判断lastCall是否有执行中 if exists, _ := redisClient.Exists("iniLastCallDataStatus").Result(); exists == 1 { returnMessage = []byte(`{"code": 2007,"err": "有lastCall执行中,请稍后重试"}`) conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端 break } //判断ExcludedFilename是否执行过 lastCallTask := fmt.Sprintf("lastCallTask:%s", task.TaskData.ExcludedFilename) if exists, _ := redisClient.Exists(lastCallTask).Result(); exists == 1 { returnMessage = []byte(`{"code": 2008,"err": "ExcludedFilename重复跳过执行"}`) conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端 break } //判断ExcludedFilename是否存在 if !fileExists(path.Join(executableDir, lastCallPath, task.TaskData.ExcludedFilename)) { returnMessage = []byte(`{"code": 2003,"err": "task.ExcludedFilename 不存在"}`) conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端 break } //判断BatchFilename是否存在 if !fileExists(path.Join(executableDir, txtPath, task.TaskData.BatchFilename)) { returnMessage = []byte(`{"code": 2004,"err": "task.BatchFilename 不存在"}`) conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端 break } //判断DataFilename是否存在 if !fileExists(path.Join(executableDir, txtPath, task.TaskData.DataFilename)) { returnMessage = []byte(`{"code": 2005,"err": "task.DataFilename 不存在"}`) conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端 break } returnMessage = []byte(`{"code": 2000,"err": "开始处理lastCall"}`) conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端 //读取排重文件 lastCallKeys, err := readExcludedFile(path.Join(executableDir, lastCallPath, task.TaskData.ExcludedFilename)) if err != nil { returnMessage = []byte(`{"code": 2006,"err": "打开ExcludedFilename失败"}`) conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端 break } else { returnMessage = []byte(`{"code": 2000,"err": "ExcludedFilename读取完成"}`) conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端 } //无错误,执行逻辑代码, subject := "丝芙兰短信处理程序异常" err = redisClient.Set("iniLastCallDataStatus", 1, 0).Err() if err != nil { body := fmt.Sprintf("写入lastCall任务执行中标记失败:%v", err) applogger.Error(body) returnMessage = []byte(fmt.Sprintf(`{"code": 2008,"err": "写入lastCall任务执行中标记失败%v"}`, err)) conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端 SendEmail(subject, body) //发送邮件 } else { batchInsert(task.TaskData.BatchFilename, true, task.TaskData.ExcludedFilename) //创建批次 returnMessage = []byte(`{"code": 2000,"err": "批次创建完成"}`) conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端 batchDataInsert(task.TaskData.DataFilename, lastCallKeys, task.TaskData.ExcludedFilename) //添加数据 redisClient.Del("iniLastCallDataStatus") //删除任务执行中标记 returnMessage = []byte(`{"code": 2000,"err": "结束处理lastCall"}`) conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端 redisClient.Set(lastCallTask, 1, 0).Err() //记录ExcludedFilename执行完成 } default: returnMessage = []byte(`{"code": 2002,"err": "task.Command 不存在"}`) conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端 } } } }() // 设置连接关闭时的回调函数 conn.SetCloseHandler(func(code int, text string) error { fmt.Println("连接已关闭,code:", code, "text:", text) fmt.Printf("客户端 %s 关闭连接\n", conn.RemoteAddr().String()) delete(clients, c) return nil }) } func verify_signature(signature string, nonce string, timestamp int64, data interface{}) bool { fmt.Printf("Received signature: %s\n", signature) fmt.Printf("Received timestamp: %d\n", timestamp) fmt.Printf("Received nonce: %s\n", nonce) fmt.Printf("Received data: %v\n", data) received_signature := signature received_timestamp, _ := strconv.ParseInt(fmt.Sprintf("%v", timestamp), 10, 64) received_nonce := nonce //strconv.Atoi(fmt.Sprintf("%v", nonce)) if time.Now().Unix()-received_timestamp > 7200 { fmt.Println("Timestamp expired") return false } received_data_bytes, _ := json.Marshal(data) received_data := string(received_data_bytes) expected_data := fmt.Sprintf("%d|%s|%s", received_timestamp, received_nonce, received_data) fmt.Printf("Expected data: %s\n", expected_data) mac := hmac.New(sha256.New, []byte(verifySignatureKey)) mac.Write([]byte(expected_data)) expected_signature := hex.EncodeToString(mac.Sum(nil)) fmt.Printf("Expected signature: %s\n", expected_signature) if received_signature != expected_signature { fmt.Println("Signature does not match") return false } return true } func readExcludedFile(filename string) (map[string]bool, error) { file, err := os.Open(filename) if err != nil { return nil, err } defer file.Close() scanner := bufio.NewScanner(file) scanner.Split(bufio.ScanLines) scanner.Scan() // skip first line lastCallKeys := make(map[string]bool) for scanner.Scan() { line := scanner.Text() fields := strings.Split(line, ",") lastCallKey := fmt.Sprintf("%s-%s", fields[0], fields[1]) lastCallKeys[lastCallKey] = true } return lastCallKeys, nil } /* // 聊天室消息广播 func broadcast(message []byte) { for c := range clients { err := c.conn.WriteMessage(websocket.TextMessage, message) if err != nil { fmt.Println("发送消息失败:", err) delete(clients, c) } } }*/ func downloadDecompression() { if exists, _ := redisClient.Exists("iniDataStatus").Result(); exists == 1 { fmt.Print("上一批次执行中,跳过本次\n") } else { subject := "丝芙兰短信处理程序异常" // 写入执行中标记 err := redisClient.Set("iniDataStatus", 1, 0).Err() if err != nil { body := fmt.Sprintf("写入任务执行中标记失败:%v", err) applogger.Error(body) SendEmail(subject, body) //发送邮件 } // Connect to SFTP server sshConfig := &ssh.ClientConfig{ User: sftpUser, Auth: []ssh.AuthMethod{ ssh.Password(sftpPassword), }, HostKeyCallback: ssh.InsecureIgnoreHostKey(), } sshClient, err := ssh.Dial("tcp", sftpAddress, sshConfig) if err != nil { body := fmt.Sprintf("sshClient连接失败:%v", err) applogger.Error(body) SendEmail(subject, body) //发送邮件 } sftpClient, err := sftp.NewClient(sshClient) if err != nil { body := fmt.Sprintf("sftp连接失败:%v", err) applogger.Error(body) SendEmail(subject, body) //发送邮件 } defer sftpClient.Close() files, err := sftpClient.ReadDir(sftpDir) if err != nil { body := fmt.Sprintf("sftp目录%s不存在, 错误信息:%v", sftpDir, err) applogger.Error(body) SendEmail(subject, body) //发送邮件 } it := 1 fmt.Printf("共%d个文件\n", len(files)) sort.Sort(FileSorter(files)) for _, file := range files { processingStatus := -1 fmt.Printf("第%d个文件处理中\n", it) it++ // Check if file has been downloaded before fileKey := fmt.Sprintf("downloaded:%s", file.Name()) if exists, _ := redisClient.Exists(fileKey).Result(); exists == 1 { fmt.Println("跳过已处理过的文件:" + file.Name()) continue } if filepath.Ext(file.Name()) == ".zip" { //fmt.Println("下载开始(数据包):" + file.Name()) // Download file srcFile, err := sftpClient.Open(path.Join(sftpDir, file.Name())) if err != nil { body := fmt.Sprintf("打开sftp文件失败,文件名:%s,错误信息: %v", file.Name(), err) applogger.Error(body) SendEmail(subject, body) //发送邮件 continue } defer srcFile.Close() dstFile, err := os.Create(path.Join(executableDir, zipPath, file.Name())) if err != nil { body := fmt.Sprintf("创建本地文件失败,文件名: %s,错误信息:%v", file.Name(), err) applogger.Error(body) SendEmail(subject, body) //发送邮件 continue } defer dstFile.Close() if _, err := io.Copy(dstFile, srcFile); err != nil { body := fmt.Sprintf("下载文件失败,文件名: %s,错误信息:%v", file.Name(), err) applogger.Error(body) SendEmail(subject, body) //发送邮件 continue } applogger.Info(fmt.Sprintf("%s(数据包)下载完成", file.Name())) // Unzip file zipReader, err := zip.OpenReader(path.Join(executableDir, zipPath, file.Name())) if err != nil { body := fmt.Sprintf("解压文件失败: 文件名:%s,错误信息:%v", file.Name(), err) applogger.Error(body) SendEmail(subject, body) //发送邮件 continue } defer zipReader.Close() for _, zipFile := range zipReader.File { zipFileReader, err := zipFile.Open() if strings.Contains(zipFile.Name, "__MACOSX/._") { continue } else if filepath.Ext(zipFile.Name) != ".txt" { applogger.Error(fmt.Sprintf("文件类型不正确,跳过处理: %v", zipFile.Name)) continue } if err != nil || zipFileReader == nil { applogger.Error(fmt.Sprintf("Failed to open zip file: %v", err)) fmt.Print("压缩文件处理结束") continue } defer zipFileReader.Close() // Create the file unzipFile, err := os.Create(path.Join(executableDir, txtPath, zipFile.Name)) if err != nil { body := fmt.Sprintf("创建压缩后的文件失败,文件名:%s,错误信息: %v", zipFile.Name, err) applogger.Error(body) SendEmail(subject, body) //发送邮件 continue } defer unzipFile.Close() // Write the unzip data to the file _, err = io.Copy(unzipFile, zipFileReader) if err != nil { body := fmt.Sprintf("文件解压失败,文件名:%s,错误信息: %v", zipFileReader, err) applogger.Error(body) SendEmail(subject, body) //发送邮件 continue } applogger.Info(fmt.Sprintf("%s(数据包)解压完成", zipFile.Name)) processingStatus = batchDataInsert(zipFile.Name, nil, "") } } else if filepath.Ext(file.Name()) == ".txt" { srcFile, err := sftpClient.Open(path.Join(sftpDir, file.Name())) if err != nil { body := fmt.Sprintf("打开sftp文件失败,文件名:%s,错误信息: %v", file.Name(), err) applogger.Error(body) SendEmail(subject, body) //发送邮件 continue } defer srcFile.Close() dstFile, err := os.Create(path.Join(executableDir, txtPath, file.Name())) if err != nil { body := fmt.Sprintf("创建本地文件失败,文件名: %s,错误信息:%v", file.Name(), err) applogger.Error(body) SendEmail(subject, body) //发送邮件 continue } defer dstFile.Close() if _, err := io.Copy(dstFile, srcFile); err != nil { body := fmt.Sprintf("下载文件失败,文件名: %s,错误信息:%v", file.Name(), err) applogger.Error(body) SendEmail(subject, body) //发送邮件 continue } applogger.Info(fmt.Sprintf("%s(批次文件)下载完成", file.Name())) processingStatus = batchInsert(file.Name(), false, "") } if processingStatus != -1 { err = redisClient.Set(fileKey, 1, 0).Err() //写入文件处理完成 if err != nil { body := fmt.Sprintf("写入文件处理完成标记失败文件名:%s,错误信息:%v\n", file.Name(), err) applogger.Error(body) SendEmail(subject, body) //发送邮件 } } } redisClient.Del("iniDataStatus") //删除任务执行中标记 } } func batchInsert(fileName string, isLastCall bool, excludedFilename string) int { start := time.Now() db, err := connectToDB() handleError(err, "连接数据库失败") file, err := os.Open(path.Join(executableDir, txtPath, fileName)) handleError(err, fmt.Sprintf("打开文件失败: %s", fileName)) defer file.Close() reader := csv.NewReader(bufio.NewReader(file)) reader.Read() batchRows := 0 for { record, err := reader.Read() if err != nil { break } TargetsMember, _ := strconv.ParseUint(strings.TrimSpace(record[2]), 10, 32) templateID, _ := strconv.ParseUint(strings.TrimSpace(record[3]), 10, 32) status := 1 s := time.Now().Format("2006-01-02 15:04:05") var batchName, dataFileName string if isLastCall { batchName = fmt.Sprintf("lastCall-%s-%s-%s", record[1], record[0], excludedFilename) dataFileName = fmt.Sprintf("lastCall-%s", excludedFilename) } else { batchName = fmt.Sprintf("%s-%s", record[1], record[0]) dataFileName = fileName[:len(fileName)-10] } batchParams := BatchParams{ BatchName: batchName, BatchDesc: record[4], IsPersonal: 0, Message: record[4], IsInternational: 0, IsSchedule: 0, ScheduleTime: s, Token: token, } sid, err := CreateBatch(batchParams) handleError(err, "创建批次失败") batch := Batch{ CommunicationChannelID: record[0], CommunicationName: batchName, TargetsMember: uint(TargetsMember), TemplateID: uint(templateID), Content: record[4], Status: status, DataFileName: dataFileName, Sid: sid, } db.Create(&batch) batchRows++ } time.Sleep(time.Second) elapsed := time.Since(start) subject := "丝芙兰批次文件处理完成" body := fmt.Sprintf("批次数:%d;\n批次文件:%s;\n处理完成,请前往管理平台查看处理。", batchRows, fileName) SendEmail(subject, body) applogger.Info(fmt.Sprintf("%s(批次文件)入库完成", fileName)) applogger.Info(fmt.Sprintf("%s(批次文件)执行时间%s 插入批次次数:%d", fileName, elapsed, batchRows)) return 0 } func CreateBatch(batchParams BatchParams) (int, error) { // 将请求参数编码为JSON字符串 jsonBytes, err := json.Marshal(batchParams) if err != nil { return -1, err } jsonStr := string(jsonBytes) // 发送POST请求 url := "http://www.wemediacn.net/webservice/BatchService?service=sms.createbatch" resp, err := http.Post(url, "application/json; charset=utf-8", bytes.NewBuffer([]byte(jsonStr))) if err != nil { return -1, err } defer resp.Body.Close() // 解析响应数据 var retobj map[string]interface{} err = json.NewDecoder(resp.Body).Decode(&retobj) if err != nil { return -1, err } fmt.Print(retobj) code := int(retobj["code"].(float64)) fmt.Printf("批次创建API 返回:%d\n", code) if code == 0 { sid := int(retobj["sid"].(float64)) return sid, nil } else { return -1, fmt.Errorf("create batch failed, error code: %d", code) } } func smsApi(method string, sendSMSDataJson string) (int, error) { // 发送POST请求 url := "http://www.wemediacn.net/webservice/BatchService?service=sms." + method resp, err := http.Post(url, "application/json; charset=utf-8", strings.NewReader(sendSMSDataJson)) if err != nil { return -1, err } defer resp.Body.Close() // 解析响应数据 var retobj map[string]interface{} err = json.NewDecoder(resp.Body).Decode(&retobj) if err != nil { return -1, err } jsonStr, err := json.Marshal(retobj) if err != nil { fmt.Println(err) return -1, err } fmt.Printf("API 返回:%s\n", string(jsonStr)) code := int(retobj["code"].(float64)) if code == 0 { //sid := int(retobj["sid"].(float64)) fmt.Printf("提交批次成功code:%d\n", code) return code, nil } else { applogger.Error(string(jsonStr)) return -1, fmt.Errorf(string(jsonStr)) } } func batchDataInsert(fileName string, lastCallKeys map[string]bool, excludedFilename string) int { start := time.Now() // Open file fileKey := fmt.Sprintf("downloaded:%s", strings.Replace(fileName, filepath.Ext(fileName), ".zip", 1)) file, err := os.Open(path.Join(executableDir, txtPath, fileName)) if err != nil { redisClient.Del(fileKey) //删除文件处理完成的标志位 applogger.Error(fmt.Sprintf("文件打开失败,文件名:%s,错误信息%v", fileName, err)) return -1 } else { defer file.Close() db, _ := connectToDB() // Insert data in batches using multiple goroutines var wg sync.WaitGroup dataBatchChan := make(chan []BatcheData, insertChanSize) for i := 0; i < goSize; i++ { wg.Add(1) go func() { for batch := range dataBatchChan { retryCount := 0 for { if err := db.CreateInBatches(batch, insertSize).Error; err != nil { if retryCount >= 5 { panic(err) } fmt.Printf("Insert failed, retrying in %v seconds...\n", 2*retryCount) time.Sleep(time.Duration(2*retryCount) * time.Second) retryCount++ } else { break } } } wg.Done() }() } hs := make(map[string]bool) //排重 dataBatch := make([]BatcheData, 0, batchSize) //短信数据 dataBatchDuplicate := make([]BatchDataDuplicateLog, 0, batchSize) //重复短信数据 bi := 0 //重复总数 count := 0 // 总数 //数据文件中批次 batches := []Batch{} //查询批次 fileNameDate := "" dataFileName := "" fields := strings.Split(fileName, "_") datetime := fields[len(fields)-1] if lastCallKeys != nil { fileNameDate = fmt.Sprintf("lastCall-%s", excludedFilename) fmt.Printf("fileNameDate : %s\n", fileNameDate) // 模糊查询文件名包含“20230103”字符串的批次记录 db.Table("batches b1"). Select("b1.*"). Where("b1.data_file_name LIKE ?", "%"+fileNameDate+"%"). Joins("INNER JOIN (SELECT communication_channel_id, MAX(created_at) AS created_at FROM batches GROUP BY communication_channel_id) b2 ON b1.communication_channel_id = b2.communication_channel_id AND b1.created_at = b2.created_at"). Order("b1.created_at desc"). Find(&batches) dataFileName = fmt.Sprintf("lastCall-%s", excludedFilename) } else { fileNameDate = datetime[:8] db.Table("batches b1"). Select("b1.*"). Where("b1.data_file_name LIKE ?", "%"+fileNameDate+"%"). Joins("INNER JOIN (SELECT communication_channel_id, MAX(created_at) AS created_at FROM batches GROUP BY communication_channel_id) b2 ON b1.communication_channel_id = b2.communication_channel_id AND b1.created_at = b2.created_at"). Order("b1.created_at desc"). Find(&batches) dataFileName = strings.Replace(fileName, "targets", "definition", -1) dataFileName = dataFileName[:len(dataFileName)-10] } batchCount := len(batches) sendMobiles := make(map[string]SmsData, batchCount) //短信数据,用于提交发送平台 duplicateCount := make(map[string]int, batchCount) //按批次重复数 lastCallduplicateCount := make(map[string]int, batchCount) //按批次重复数 insertsCount := make(map[string]int, batchCount) //按批次插入数 ccids := make(map[string]bool, batchCount) if batchCount > 0 { //如果查询到数据需要的批次信息,开始处理数据 // 定义一个名为result的map类型的变量,并以CommunicationChannelID作为键 result := make(map[string][]Batch) for _, batch := range batches { cckdKey := batch.CommunicationChannelID result[cckdKey] = append(result[cckdKey], batch) print(batch.CommunicationChannelID, "\n") } scanner := bufio.NewScanner(file) scanner.Split(bufio.ScanLines) scanner.Scan() // skip first line for scanner.Scan() { line := scanner.Text() row, err := csv.NewReader(strings.NewReader(line)).Read() if err != nil { applogger.Error(fmt.Sprintf("文件按行读取失败,文件名:%s,错误信息%v", fileName, err)) continue } reservedFields := map[string]string{ //合并个性化字段 "ReservedField1": row[5], "ReservedField2": row[6], "ReservedField3": row[7], "ReservedField4": row[8], "ReservedField5": row[9], "FullName": row[4], } reservedFieldsJson, err := json.Marshal(reservedFields) //个性化字段转json if err != nil { applogger.Error(fmt.Sprintf("个性化字段合并失败,文件名:%s,错误信息%v", fileName, err)) continue } if _, ok := duplicateCount[row[2]]; !ok { duplicateCount[row[2]] = 0 } if _, ok := lastCallduplicateCount[row[2]]; !ok { lastCallduplicateCount[row[2]] = 0 } // Check if record exists in hashset key := fmt.Sprintf("%s-%s-%s", row[2], row[3], row[5]) if _, exists := hs[key]; exists { //如果批次数据重复 bi++ // Increment duplicate count duplicateCount[row[2]]++ dataBatchDuplicate = append(dataBatchDuplicate, BatchDataDuplicateLog{ CommunicationChannelID: row[2], Mobile: row[3], ReservedField: string(reservedFieldsJson), }) if len(dataBatchDuplicate) >= batchSize { err = db.CreateInBatches(dataBatchDuplicate, insertSize).Error if err != nil { applogger.Error(fmt.Sprintf("插入重复数据失败,文件名:%s,错误信息%v", fileName, err)) } else { dataBatchDuplicate = make([]BatchDataDuplicateLog, 0, batchSize) } } continue } if lastCallKeys != nil && lastCallKeys[fmt.Sprintf("%s-%s", row[2], row[3])] { lastCallduplicateCount[row[2]]++ continue } // Add record to hashset hs[key] = true ccids[row[2]] = true dataBatch = append(dataBatch, BatcheData{ CommunicationChannelID: row[2], Mobile: row[3], ReservedField: string(reservedFieldsJson), DataFileName: dataFileName, }) if len(dataBatch) >= batchSize { dataBatchChan <- dataBatch dataBatch = make([]BatcheData, 0, batchSize) } if batches, ok := result[row[2]]; ok && len(batches) > 0 && batches[0].Content != "" { content := fmt.Sprintf("%s /", batches[0].Content) Sid := batches[0].Sid pattern := regexp.MustCompile(`\{.*\}`) matched := pattern.MatchString(content) if matched { //个性化短信 smsData, ok := sendMobiles[row[2]] if !ok { smsData = SmsData{ Sid: Sid, IsPersonalizedMsg: true, SmsList: make([]SmsList, 0), } } placeholderMap := map[string]string{ "{RESERVED_FIELD_1}": row[5], "{RESERVED_FIELD_2}": row[6], "{RESERVED_FIELD_3}": row[7], "{RESERVED_FIELD_4}": row[8], "{RESERVED_FIELD_5}": row[9], } for k, v := range placeholderMap { content = strings.ReplaceAll(content, k, v) } smsData.SmsList = append(smsData.SmsList, SmsList{M: row[3], C: content, F: 8}) if len(smsData.SmsList) >= batchSize/2 { sd := Sms{Sid: Sid, Data: smsData.SmsList, Token: token} sendSMSDataJson, _ := json.Marshal(sd) // resp, err := smsApi("appendbatchdata", string(sendSMSDataJson)) if resp != 0 { fmt.Printf("smsApi returned error: %d\n", err) //发送提醒邮件 subject := "丝芙兰数据包数据提交异常" body := fmt.Sprintf("数据包:%s;\nCommunicationChannelID:%s;\n错误信息:%s\n 请前往管理平台查看处理。", fileName, row[2], err) applogger.Error(body) SendEmail(subject, body) //发送邮件 } smsData.SmsList = []SmsList{} // reset mobiles slice } sendMobiles[row[2]] = smsData } else { // 处理非个性化短信 smsData, ok := sendMobiles[row[2]] if !ok { smsData = SmsData{ Sid: Sid, IsPersonalizedMsg: false, Content: content, Mobiles: make([]string, 0), } } smsData.Mobiles = append(smsData.Mobiles, row[3]) if len(smsData.Mobiles) >= batchSize { mobiles := smsData.Mobiles mobileStr := strings.Join(mobiles, ",") var sl []SmsList sl = append(sl, SmsList{M: mobileStr, C: content, F: 8}) sd := Sms{Sid: Sid, Data: sl, Token: token} sendSMSDataJson, _ := json.Marshal(sd) resp, err := smsApi("appendbatchdata", string(sendSMSDataJson)) if resp != 0 { fmt.Printf("smsApi returned error: %d\n", err) //发送提醒邮件 subject := "丝芙兰数据包数据提交异常" body := fmt.Sprintf("数据包:%s;\nCommunicationChannelID:%s;\n错误信息:%s\n 请前往管理平台查看处理。", fileName, row[2], err) applogger.Error(body) SendEmail(subject, body) //发送邮件 } smsData.Mobiles = []string{} // reset mobiles slice } sendMobiles[row[2]] = smsData } } if _, ok := insertsCount[row[2]]; !ok { insertsCount[row[2]] = 0 } insertsCount[row[2]]++ count++ } if len(dataBatch) > 0 { dataBatchChan <- dataBatch } close(dataBatchChan) for ccid := range ccids { smsData, ok := sendMobiles[ccid] if !ok { continue } if smsData.IsPersonalizedMsg { //个性化 smsList := smsData.SmsList if len(smsList) > 0 { sd := Sms{Sid: smsData.Sid, Data: smsList, Token: token} sendSMSDataJson, _ := json.Marshal(sd) smsApi("appendbatchdata", string(sendSMSDataJson)) smsData.SmsList = []SmsList{} // reset mobiles slice } } else { //非个性化 mobiles, ok := smsData.Mobiles, len(smsData.Mobiles) > 0 && true if ok && len(mobiles) > 0 { mobileStr := strings.Join(mobiles, ",") var sl []SmsList sl = append(sl, SmsList{M: mobileStr, C: smsData.Content, F: 8}) sd := Sms{Sid: smsData.Sid, Data: sl, Token: token} sendSMSDataJson, _ := json.Marshal(sd) smsApi("appendbatchdata", string(sendSMSDataJson)) smsData.Mobiles = []string{} // reset mobiles slice } } sf := SmsFinish{Sid: smsData.Sid, Token: token} sfJson, _ := json.Marshal(sf) smsApi("finishbatch", string(sfJson)) bpi := []BatchProcessingInformation{} bpi = append(bpi, BatchProcessingInformation{ CommunicationChannelID: ccid, RepeatTargetsMember: duplicateCount[ccid], LastCallRepeatTargetsMember: lastCallduplicateCount[ccid], InsertsTargetsMember: insertsCount[ccid], DataFileName: dataFileName, }) err = db.CreateInBatches(bpi, insertSize).Error if err != nil { applogger.Error(fmt.Sprintf("插入批次处理信息失败,文件名:%s,错误信息%v", fileName, err)) } smsData = SmsData{} sendMobiles[ccid] = smsData } wg.Wait() //所有入库全部完成 //插入批此重复数据 if len(dataBatchDuplicate) > 0 { err = db.CreateInBatches(dataBatchDuplicate, insertSize).Error if err != nil { applogger.Error(fmt.Sprintf("插入重复数据失败,文件名:%s,错误信息%v", fileName, err)) } else { dataBatchDuplicate = nil } } elapsed := time.Since(start) //发送提醒邮件 subject := "丝芙兰数据包处理完成" body := fmt.Sprintf("总数:%d;\n数据包:%s;\n过滤重复数:%d;\n过滤后总数:%d;\n处理完成,请前往管理平台查看处理。", count+bi, fileName, bi, count) SendEmail(subject, body) //发送邮件 applogger.Info(fmt.Sprintf("%s(数据包) 入库完成", fileName)) applogger.Info(fmt.Sprintf("%s(数据包)执行时间:%s 插入数据:%d条 过滤数数:%d条", fileName, elapsed, count, bi)) return 0 } else { //如果没有查询到批次信息,跳过处理 applogger.Error(fmt.Sprintf("未查询到批次数据,文件名:%s,截取批次标识:%s", fileName, dataFileName)) return -1 } } } func queryBatchState() { db, err := connectToDB() if err != nil { handleError(err, "数据库连接失败") return } var batches []Batch err = db.Where("status NOT IN (?)", []int{6, 8}).Find(&batches).Error handleError(err, "查询批次状态失败") if err != nil { return } for _, batch := range batches { sf := SmsFinish{Sid: batch.Sid, Token: token} sfJson, _ := json.Marshal(sf) url := "http://www.wemediacn.net/webservice/BatchService?service=sms.querybatchstate" resp, err := http.Post(url, "application/json; charset=utf-8", strings.NewReader(string(sfJson))) if err != nil { handleError(err, "查询批次状态失败") continue } defer resp.Body.Close() var retobj map[string]interface{} err = json.NewDecoder(resp.Body).Decode(&retobj) if err != nil { handleError(err, "解析响应数据失败") continue } code := int(retobj["code"].(float64)) if code == 0 { status := int(retobj["state"].(float64)) if batch.Status != status || status == 5 { updates := createUpdatesMap(retobj) err = db.Model(&batch).Updates(updates).Error handleError(err, "修改批次状态失败") } } else { jsonStr, _ := json.Marshal(retobj) handleError(fmt.Errorf("返回不为0"), fmt.Sprintf("查询批次状态失败:%s", string(jsonStr))) } } } func createUpdatesMap(retobj map[string]interface{}) map[string]interface{} { updates := map[string]interface{}{ "status": int(retobj["state"].(float64)), } if endTime, ok := retobj["endTime"].(string); ok { if endTimeTime, err := time.Parse("2006-01-02 15:04:05", endTime); err == nil { updates["end_time"] = &endTimeTime } } if startTime, ok := retobj["startTime"].(string); ok { if startTimeTime, err := time.Parse("2006-01-02 15:04:05", startTime); err == nil { updates["start_time"] = &startTimeTime } } if mc, ok := retobj["mc"].(float64); ok { updates["mc"] = int(mc) } if rc, ok := retobj["rc"].(float64); ok { updates["rc"] = int(rc) } if sc, ok := retobj["sc"].(float64); ok { updates["sc"] = int(sc) } return updates } func handleError(err error, errMsg string) { if err != nil { applogger.Error(fmt.Sprintf("%s: %s", errMsg, err)) } } func connectToDB() (*gorm.DB, error) { dsn := "sqlserver://" + dbUser + ":" + dbPassword + "@" + dbAddress + "?database=" + dbName + "&charset=utf8mb4" var db *gorm.DB var err error attempt := 1 maxAttempts := 5 backoff := time.Second logger := logger.Default.LogMode(logger.Error) //Silent、Error、Warn、Info for { db, err = gorm.Open(sqlserver.Open(dsn), &gorm.Config{Logger: logger, SkipDefaultTransaction: true}) if err == nil { break } if attempt >= maxAttempts { //applogger.Error(fmt.Sprintf("数据库连接失败,错误信息%v", err)) handleError(err, "数据库连接失败,错误信息") return nil, err } time.Sleep(backoff) backoff *= 2 attempt++ } return db, nil } func fileExists(filename string) bool { info, err := os.Stat(filename) if os.IsNotExist(err) { return false } return !info.IsDir() } func iniPath() { // 获取可执行文件所在的路径 executablePath, err := os.Executable() if err != nil { log.Fatal("failed to get executable path: ", err) } executableDir = filepath.Dir(executablePath) //判断目录是否存在,不存在则创建 err = os.MkdirAll(filepath.Join(executableDir, zipPath), 0755) if err != nil { log.Fatal(err) } err = os.MkdirAll(filepath.Join(executableDir, txtPath), 0755) if err != nil { log.Fatal(err) } err = os.MkdirAll(filepath.Join(executableDir, lastCallPath), 0755) if err != nil { log.Fatal(err) } err = os.MkdirAll(filepath.Join(executableDir, logPath, time.Now().Format("2006_01")), 0755) if err != nil { log.Fatal(err) } } func monopolize() { // 打开一个文件作为锁 lockFile, err := os.OpenFile(filepath.Join(executableDir, ".lock"), os.O_CREATE|os.O_RDWR, 0666) if err != nil { fmt.Println("打开锁文件失败:", err) os.Exit(1) } defer lockFile.Close() // 尝试获取文件的独占锁 err = syscall.Flock(int(lockFile.Fd()), syscall.LOCK_EX|syscall.LOCK_NB) if err != nil { fmt.Println("程序已经在运行,本程序无法同时运行多个") os.Exit(1) } } func iniConfi() { flag.StringVar(&env, "env", "dev", "运行模式") flag.Parse() switch env { case "dev": //fmt.Print("测试环境配置已生效\n") redisAddress = "mysql5.weu.me:6379" redisPassword = "" redisDB = 1 sftpAddress = "192.168.10.86:49156" sftpUser = "demo" sftpPassword = "demo" sftpDir = "/sftp/test" dbAddress = "192.168.10.18:1433" dbUser = "sa" dbPassword = "Aa123123" dbName = "sephora" zipPath = "RawData/Zip/" txtPath = "RawData/Txt/" logPath = "logs/" batchSize = 5000 //提交数据 insertSize = 500 //一次性入库 insertChanSize = 10 //通道缓冲数 goSize = 10 //协程数 taskTime = 1 batchStatusTaskTime = 1 to = []string{"chejiulong@wemediacn.com"} token = "7100477930234217" lastCallPath = "RawData/LastCall" verifySignatureKey = "qZ6v1&H#Wjx+yRm2D@*sJF$tnfL83Ia" case "prod": //fmt.Print("正式环境配置已生效\n") redisAddress = "r-bp11564d96842414128.redis.rds.aliyuncs.com:6379" redisPassword = "3Nsb4Pmsl9bcLs24mL12l" redisDB = 233 sftpAddress = "esftp.sephora.com.cn:20981" sftpUser = "CHN-CRMTOWemedia-wemedia" sftpPassword = "uoWdMHEv39ZFjiOg" sftpDir = "/CN-CRMTOWemedia/SMS" dbAddress = "rm-bp16l424ln96q1ouk.sqlserver.rds.aliyuncs.com:3433" dbUser = "sephora_sms" dbPassword = "5ORiiLmgkniC0EqF" dbName = "sephora_sms" zipPath = "RawData/Zip/" txtPath = "RawData/Txt/" logPath = "logs/" batchSize = 5000 //提交数据 insertSize = 500 //一次性入库 insertChanSize = 100 //通道缓冲数 goSize = 50 //协程数 taskTime = 60 batchStatusTaskTime = 5 to = []string{"chejiulong@wemediacn.com", "xiayujuan@wemediacn.com", "wangyuanbing@wemediacn.com", "tangweiqi@wemediacn.com"} token = "7100178600777091" //7100477930234217 lastCallPath = "RawData/LastCall" verifySignatureKey = "Lzxf6asgwFRTsANZ8gkXJnP3EmOyNO#&" default: panic(fmt.Errorf("无效的运行模式: %s", env)) } } func iniLog() { logPath = filepath.Join(executableDir, "logs", time.Now().Format("2006_01")) os.MkdirAll(logPath, 0755) logFileName := "sms_processing_" + time.Now().Format("2006_01_02") + ".log" logFileHook := &lumberjack.Logger{ Filename: filepath.Join(logPath, logFileName), } logOutput := io.MultiWriter(os.Stdout, logFileHook) // 将日志同时输出到控制台和文件 applogger.SetOutput(logOutput) } func SendEmail(subject string, body string) error { // 邮箱认证信息 smtpHost := "smtp.exmail.qq.com" smtpPort := 465 from := "auto_system@wemediacn.com" password := "yJJYYcy5NKx2y69r" // 邮件内容 m := gomail.NewMessage() m.SetHeader("From", from) m.SetHeader("To", to...) m.SetHeader("Subject", subject) m.SetBody("text/plain", body) // 邮件发送 d := gomail.NewDialer(smtpHost, smtpPort, from, password) d.TLSConfig = &tls.Config{InsecureSkipVerify: true} err := d.DialAndSend(m) if err != nil { applogger.Warn(fmt.Sprintf("邮件发送失败,错误信息%v", err)) } return nil } func (f FileSorter) Len() int { return len(f) } func (f FileSorter) Swap(i, j int) { f[i], f[j] = f[j], f[i] } func (f FileSorter) Less(i, j int) bool { // 首先检查文件扩展名是否为“.txt”,如果是,则将其移到文件列表的开头 if strings.HasSuffix(f[i].Name(), ".txt") && !strings.HasSuffix(f[j].Name(), ".txt") { return true } // 如果文件扩展名都是“.txt”,或都不是“.txt”,则按字母顺序排序 return f[i].Name() < f[j].Name() } var ( //初始化变量 env string applogger *logrus.Logger redisClient *redis.Client executableDir string redisAddress string redisPassword string redisDB int sftpAddress string sftpUser string sftpPassword string sftpDir string dbAddress string dbUser string dbPassword string dbName string zipPath string txtPath string logPath string batchSize int //提交数据 insertSize int //一次性入库 insertChanSize int //通道缓冲数 goSize int //协程数 taskTime int batchStatusTaskTime int to []string token string lastCallPath string verifySignatureKey string ) type Batch struct { ID uint `gorm:"primary_key"` CommunicationChannelID string CommunicationName string `gorm:"type:varchar(255)"` TargetsMember uint `gorm:"type:int"` TemplateID uint Content string `gorm:"type:text"` CreatedAt time.Time `gorm:"default:getdate()"` UpdatedAt time.Time `gorm:"default:getdate()"` Status int `gorm:"column:status"` DataFileName string `gorm:"type:text"` Sid int `gorm:"type:int"` EndTime *time.Time `gorm:"column:end_time"` StartTime *time.Time `gorm:"column:start_time"` MC *int `gorm:"column:mc"` RC *int `gorm:"column:rc"` SC *int `gorm:"column:sc"` } type BatcheData struct { ID uint `gorm:"primary_key"` CommunicationChannelID string `gorm:"column:communication_channel_id"` Mobile string `gorm:"column:mobile"` ReservedField string `gorm:"column:reserved_field"` DataFileName string `gorm:"column:data_file_name"` } func (BatcheData) TableName() string { return "batche_datas" } type BatchProcessingInformation struct { ID uint `gorm:"primaryKey;autoIncrement"` CommunicationChannelID string `gorm:"column:communication_channel_id"` RepeatTargetsMember int `gorm:"column:repeat_targets_member"` LastCallRepeatTargetsMember int `gorm:"column:last_call_repeat_targets_member"` InsertsTargetsMember int `gorm:"column:inserts_targets_member"` DataFileName string `gorm:"column:data_file_name"` } func (BatchProcessingInformation) TableName() string { return "batches_processing_informations" } type BatchDataDuplicateLog struct { ID int `gorm:"primaryKey;autoIncrement"` CommunicationChannelID string `gorm:"column:communication_channel_id"` Mobile string `gorm:"column:mobile"` ReservedField string `gorm:"column:reserved_field"` DataFileName string `gorm:"column:data_file_name"` } func (BatchDataDuplicateLog) TableName() string { return "batche_data_duplicate_logs" } var upgrader = websocket.Upgrader{ ReadBufferSize: 1024, WriteBufferSize: 1024, } // 客户端连接信息 type client struct { conn *websocket.Conn } // 在线客户端列表 var clients = make(map[*client]bool) // 定义一个FileSorter结构体 type FileSorter []os.FileInfo type BatchParams struct { BatchName string `json:"batchName"` BatchDesc string `json:"batchDesc"` IsPersonal int `json:"isPersonal"` Message string `json:"message"` IsInternational int `json:"isInternational"` IsSchedule int `json:"isSchedule"` ScheduleTime string `json:"scheduleTime"` Token string `json:"token"` } type SmsList struct { M string `json:"m"` C string `json:"c"` F int `json:"f"` } type Sms struct { Sid int `json:"sid"` Data []SmsList `json:"data"` Token string `json:"token"` } type SmsFinish struct { Sid int `json:"sid"` Token string `json:"token"` } type SmsData struct { Sid int IsPersonalizedMsg bool Content string Mobiles []string SmsList []SmsList } type Task struct { TaskData TaskData Signature Signature } type TaskData struct { Command string `json:"command"` ExcludedFilename string `json:"excluded_filename"` BatchFilename string `json:"batch_filename"` DataFilename string `json:"data_filename"` } type Signature struct { Signature string `json:"signature"` Timestamp int64 `json:"timestamp"` Nonce string `json:"nonce"` }