diff --git a/iniDataForMacOs b/iniDataForMacOs index 3215593..424d4e1 100755 Binary files a/iniDataForMacOs and b/iniDataForMacOs differ diff --git a/main.go b/main.go index 7dd00e2..b47fe86 100644 --- a/main.go +++ b/main.go @@ -103,22 +103,70 @@ func handleWebSocket(w http.ResponseWriter, r *http.Request) { return } fmt.Println("收到消息:", string(message)) - if string(message) == "a" { - fmt.Println("触发合并操作") - message := []byte("触发合并操作") - err := conn.WriteMessage(websocket.TextMessage, message) //发送消息给客户端 - if err != nil { - fmt.Println("发送消息失败:", err) - delete(clients, c) - return + + task := Task{} + var returnMessage []byte + err = json.Unmarshal([]byte(message), &task) + if err != nil { + returnMessage = []byte(`{"code": 2001,"err": "json指令解析失败"}`) + conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端 + } else { + switch task.Command { + case "lastCall": + if !fileExists(path.Join(executableDir, lastCallPath, task.ExcludedFilename)) { + returnMessage = []byte(`{"code": 2003,"err": "task.ExcludedFilename 不存在"}`) + conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端 + break + } + if !fileExists(path.Join(executableDir, txtPath, task.BatchFilename)) { + returnMessage = []byte(`{"code": 2004,"err": "task.BatchFilename 不存在"}`) + conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端 + break + } + if !fileExists(path.Join(executableDir, txtPath, task.DataFilename)) { + returnMessage = []byte(`{"code": 2005,"err": "task.DataFilename 不存在"}`) + conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端 + break + } + returnMessage = []byte(`{"code": 2000,"err": "开始处理lastCall"}`) + conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端 + lastCallKeys, err := readExcludedFile(path.Join(executableDir, lastCallPath, task.ExcludedFilename)) + if err != nil { + returnMessage = []byte(`{"code": 2006,"err": "打开ExcludedFilename失败"}`) + conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端 + break + } else { + returnMessage = []byte(`{"code": 2000,"err": "ExcludedFilename读取完成"}`) + conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端 + } + if exists, _ := redisClient.Exists("iniLastCallDataStatus").Result(); exists == 1 { + returnMessage = []byte(`{"code": 2007,"err": "有lastCall执行中,请稍后重试"}`) + conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端 + + } else { + //无错误,执行逻辑代码, + subject := "丝芙兰短信处理程序异常" + err := redisClient.Set("iniLastCallDataStatus", 1, 0).Err() + if err != nil { + body := fmt.Sprintf("写入lastCall任务执行中标记失败:%v", err) + applogger.Error(body) + returnMessage = []byte(fmt.Sprintf(`{"code": 2008,"err": "写入lastCall任务执行中标记失败%v"}`, err)) + conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端 + SendEmail(subject, body) //发送邮件 + } else { + batchInsert(task.BatchFilename, true) //创建批次 + returnMessage = []byte(`{"code": 2000,"err": "批次创建完成"}`) + conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端 + batchDataInsert(task.DataFilename, lastCallKeys) //添加数据 + redisClient.Del("iniLastCallDataStatus") //删除任务执行中标记 + returnMessage = []byte(`{"code": 2000,"err": "结束处理lastCall"}`) + conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端 + } + } + default: + returnMessage = []byte(`{"code": 2002,"err": "task.Command 不存在"}`) + conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端 } - } else if string(message) == "c" { - for c := range clients { - fmt.Printf("客户端 %s 在线\n", c.conn.RemoteAddr().String()) - } - } else if string(message) == "all" { - message = []byte("所有的客户端们你们好!") - go broadcast(message) } } }() @@ -133,6 +181,26 @@ func handleWebSocket(w http.ResponseWriter, r *http.Request) { } +func readExcludedFile(filename string) (map[string]bool, error) { + file, err := os.Open(filename) + if err != nil { + return nil, err + } + defer file.Close() + scanner := bufio.NewScanner(file) + scanner.Split(bufio.ScanLines) + scanner.Scan() // skip first line + lastCallKeys := make(map[string]bool) + for scanner.Scan() { + line := scanner.Text() + fields := strings.Split(line, ",") + lastCallKey := fmt.Sprintf("%s-%s", fields[0], fields[1]) + lastCallKeys[lastCallKey] = true + } + return lastCallKeys, nil +} + +/* // 聊天室消息广播 func broadcast(message []byte) { for c := range clients { @@ -142,7 +210,7 @@ func broadcast(message []byte) { delete(clients, c) } } -} +}*/ func downloadDecompression() { if exists, _ := redisClient.Exists("iniDataStatus").Result(); exists == 1 { @@ -266,7 +334,7 @@ func downloadDecompression() { continue } applogger.Info(fmt.Sprintf("%s(数据包)解压完成", zipFile.Name)) - processingStatus = batchDataInsert(zipFile.Name) + processingStatus = batchDataInsert(zipFile.Name, nil) } } else if filepath.Ext(file.Name()) == ".txt" { srcFile, err := sftpClient.Open(path.Join(sftpDir, file.Name())) @@ -292,7 +360,7 @@ func downloadDecompression() { continue } applogger.Info(fmt.Sprintf("%s(批次文件)下载完成", file.Name())) - processingStatus = batchInsert(file.Name()) + processingStatus = batchInsert(file.Name(), false) } if processingStatus != -1 { err = redisClient.Set(fileKey, 1, 0).Err() //写入文件处理完成 @@ -309,7 +377,7 @@ func downloadDecompression() { } // 批次入库 -func batchInsert(fileName string) int { +func batchInsert(fileName string, isLastCall bool) int { //fmt.Print("批次处理开始") start := time.Now() db, _ := connectToDB() @@ -335,9 +403,18 @@ func batchInsert(fileName string) int { t := time.Now() s := t.Format("2006-01-02 15:04:05") - //key := + var batchName, dataFileName string + + if isLastCall { + batchName = fmt.Sprintf("lastCall-%s-%s", record[1], strconv.Itoa(int(communicationChannelID))) + dataFileName = fmt.Sprintf("lastCall-%s", fileName) + } else { + batchName = fmt.Sprintf("%s-%s", record[1], strconv.Itoa(int(communicationChannelID))) + dataFileName = fileName + } + batchParams := BatchParams{ - BatchName: fmt.Sprintf("%s-%s", record[1], strconv.Itoa(int(communicationChannelID))), + BatchName: batchName, BatchDesc: record[4], IsPersonal: 0, Message: record[4], @@ -357,12 +434,12 @@ func batchInsert(fileName string) int { batch := Batch{ CommunicationChannelID: uint(communicationChannelID), - CommunicationName: record[1], + CommunicationName: batchName, TargetsMember: uint(TargetsMember), TemplateID: uint(templateID), Content: record[4], Status: status, - DataFileName: fileName, + DataFileName: dataFileName, Sid: sid, } db.Create(&batch) @@ -447,7 +524,7 @@ func smsApi(method string, sendSMSDataJson string) (int, error) { } } -func batchDataInsert(fileName string) int { +func batchDataInsert(fileName string, lastCallKeys map[string]bool) int { start := time.Now() // Open file fileKey := fmt.Sprintf("downloaded:%s", strings.Replace(fileName, filepath.Ext(fileName), ".zip", 1)) @@ -478,7 +555,6 @@ func batchDataInsert(fileName string) int { } else { break } - } } wg.Done() @@ -492,17 +568,31 @@ func batchDataInsert(fileName string) int { count := 0 // 总数 //数据文件中批次 batches := []Batch{} //查询批次 + fileNameDate := "" + dataFileName := "" - fields := strings.Split(fileName, "_") - datetime := fields[len(fields)-1] - fileNameDate := datetime[:8] - // 模糊查询文件名包含“20230103”字符串的批次记录 - db.Table("batches b1"). - Select("b1.*"). - Where("b1.data_file_name LIKE ?", "%"+fileNameDate+"%"). - Joins("INNER JOIN (SELECT communication_channel_id, MAX(created_at) AS created_at FROM batches GROUP BY communication_channel_id) b2 ON b1.communication_channel_id = b2.communication_channel_id AND b1.created_at = b2.created_at"). - Order("b1.created_at desc"). - Find(&batches) + if lastCallKeys != nil { + fields := strings.Split(fileName, "_") + datetime := fields[len(fields)-1] + fileNameDate = datetime[:8] + // 模糊查询文件名包含“20230103”字符串的批次记录 + db.Table("batches b1"). + Select("b1.*"). + Where("b1.data_file_name LIKE ?", "%"+fileNameDate+"%"). + Joins("INNER JOIN (SELECT communication_channel_id, MAX(created_at) AS created_at FROM batches GROUP BY communication_channel_id) b2 ON b1.communication_channel_id = b2.communication_channel_id AND b1.created_at = b2.created_at"). + Order("b1.created_at desc"). + Find(&batches) + dataFileName = fmt.Sprintf("lastCall-%s", fileName) + } else { + fileName = fmt.Sprintf("lastCall-%s", fileName) + db.Table("batches b1"). + Select("b1.*"). + Where("b1.data_file_name = ?", fileNameDate). + Joins("INNER JOIN (SELECT communication_channel_id, MAX(created_at) AS created_at FROM batches GROUP BY communication_channel_id) b2 ON b1.communication_channel_id = b2.communication_channel_id AND b1.created_at = b2.created_at"). + Order("b1.created_at desc"). + Find(&batches) + dataFileName = fileName + } batchCount := len(batches) sendMobiles := make(map[string]SmsData, batchCount) //短信数据,用于提交发送平台 @@ -535,7 +625,7 @@ func batchDataInsert(fileName string) int { "ReservedField3": row[7], "ReservedField4": row[8], "ReservedField5": row[9], - "DataFileName": fileName, + "DataFileName": dataFileName, "FullName": row[4], } reservedFieldsJson, err := json.Marshal(reservedFields) //个性化字段转json @@ -569,6 +659,10 @@ func batchDataInsert(fileName string) int { } continue } + if lastCallKeys != nil && lastCallKeys[fmt.Sprintf("%s-%s", row[2], row[3])] { + continue + } + // Add record to hashset hs[key] = true ccids[row[2]] = true @@ -584,7 +678,7 @@ func batchDataInsert(fileName string) int { } if batches, ok := result[row[2]]; ok && len(batches) > 0 && batches[0].Content != "" { - content := batches[0].Content + content := fmt.Sprintf("%s /", batches[0].Content) Sid := batches[0].Sid pattern := regexp.MustCompile(`\{.*\}`) matched := pattern.MatchString(content) @@ -763,6 +857,14 @@ func connectToDB() (*gorm.DB, error) { return db, nil } +func fileExists(filename string) bool { + info, err := os.Stat(filename) + if os.IsNotExist(err) { + return false + } + return !info.IsDir() +} + func iniPath() { // 获取可执行文件所在的路径 executablePath, err := os.Executable() @@ -780,6 +882,10 @@ func iniPath() { if err != nil { log.Fatal(err) } + err = os.MkdirAll(filepath.Join(executableDir, lastCallPath), 0755) + if err != nil { + log.Fatal(err) + } err = os.MkdirAll(filepath.Join(executableDir, logPath, time.Now().Format("2006_01")), 0755) if err != nil { log.Fatal(err) @@ -829,6 +935,7 @@ func iniConfi() { taskTime = 1 to = []string{"chejiulong@wemediacn.com"} token = "7100477930234217" + lastCallPath = "RawData/LastCall" case "prod": //fmt.Print("正式环境配置已生效\n") redisAddress = "r-bp11564d96842414128.redis.rds.aliyuncs.com:6379" @@ -852,6 +959,7 @@ func iniConfi() { taskTime = 60 to = []string{"chejiulong@wemediacn.com", "xiayujuan@wemediacn.com", "wangyuanbing@wemediacn.com", "tangweiqi@wemediacn.com"} token = "7100477930234217" + lastCallPath = "RawData/LastCall" default: panic(fmt.Errorf("无效的运行模式: %s", env)) } @@ -934,6 +1042,7 @@ var ( //初始化变量 taskTime int to []string token string + lastCallPath string ) type Batch struct { @@ -1035,3 +1144,10 @@ type SmsData struct { Mobiles []string SmsList []SmsList } + +type Task struct { + Command string `json:"command"` + ExcludedFilename string `json:"excluded_filename"` + BatchFilename string `json:"batch_filename"` + DataFilename string `json:"data_filename"` +}