diff --git a/main.go b/main.go index 15a87bd..d979574 100644 --- a/main.go +++ b/main.go @@ -47,7 +47,7 @@ func init() { Password: redisPassword, DB: redisDB, }) - //go startWebSocket() // 启动 WebSocket 服务,使用协程方式运行 + go startWebSocket() // 启动 WebSocket 服务,使用协程方式运行 applogger.Info(fmt.Sprintf("程序启动,加载%s环境,尝试执行...", env)) go downloadDecompression() // 启动立即执行一次数据下载、处理 } @@ -150,10 +150,13 @@ func downloadDecompression() { if exists, _ := redisClient.Exists("iniDataStatus").Result(); exists == 1 { fmt.Print("上一批次执行中,跳过本次\n") } else { + subject := "丝芙兰短信处理程序异常" // 写入执行中标记 err := redisClient.Set("iniDataStatus", 1, 0).Err() if err != nil { - applogger.Error(fmt.Sprintf("写入任务执行中标记失败:%v", err)) + body := fmt.Sprintf("写入任务执行中标记失败:%v", err) + applogger.Error(body) + SendEmail(subject, body) //发送邮件 } // Connect to SFTP server sshConfig := &ssh.ClientConfig{ @@ -165,22 +168,28 @@ func downloadDecompression() { } sshClient, err := ssh.Dial("tcp", sftpAddress, sshConfig) if err != nil { - applogger.Error(fmt.Sprintf("写入任务执行中标记失败:%v", err)) + body := fmt.Sprintf("sshClient连接失败:%v", err) + applogger.Error(body) + SendEmail(subject, body) //发送邮件 } sftpClient, err := sftp.NewClient(sshClient) if err != nil { - applogger.Error(fmt.Sprintf("写入任务执行中标记失败:%v", err)) + body := fmt.Sprintf("sftp连接失败:%v", err) + applogger.Error(body) + SendEmail(subject, body) //发送邮件 } defer sftpClient.Close() files, err := sftpClient.ReadDir(sftpDir) if err != nil { - applogger.Error(fmt.Sprintf("sftp目录不存在: 错误信息:%v", err)) + body := fmt.Sprintf("sftp目录%s不存在, 错误信息:%v", sftpDir, err) + applogger.Error(body) + SendEmail(subject, body) //发送邮件 } it := 1 fmt.Printf("共%d个文件\n", len(files)) sort.Sort(FileSorter(files)) - + processingStatus := -1 for _, file := range files { fmt.Printf("第%d个文件处理中\n", it) it++ @@ -190,41 +199,45 @@ func downloadDecompression() { fmt.Println("跳过已处理过的文件:" + file.Name()) continue } - if filepath.Ext(file.Name()) == ".zip" { //fmt.Println("下载开始(数据包):" + file.Name()) // Download file srcFile, err := sftpClient.Open(path.Join(sftpDir, file.Name())) if err != nil { - applogger.Error(fmt.Sprintf("打开sftp文件失败,文件名:%s,错误信息: %v", file.Name(), err)) + body := fmt.Sprintf("打开sftp文件失败,文件名:%s,错误信息: %v", file.Name(), err) + applogger.Error(body) + SendEmail(subject, body) //发送邮件 continue } defer srcFile.Close() dstFile, err := os.Create(path.Join(executableDir, zipPath, file.Name())) if err != nil { - applogger.Error(fmt.Sprintf("创建本地文件失败,文件名: %s,错误信息:%v", file.Name(), err)) + body := fmt.Sprintf("创建本地文件失败,文件名: %s,错误信息:%v", file.Name(), err) + applogger.Error(body) + SendEmail(subject, body) //发送邮件 continue } defer dstFile.Close() if _, err := io.Copy(dstFile, srcFile); err != nil { - applogger.Error(fmt.Sprintf("下载文件失败,文件名: %s,错误信息:%v", file.Name(), err)) + body := fmt.Sprintf("下载文件失败,文件名: %s,错误信息:%v", file.Name(), err) + applogger.Error(body) + SendEmail(subject, body) //发送邮件 continue } - //fmt.Printf("%s(数据包)下载完成\n", file.Name()) applogger.Info(fmt.Sprintf("%s(数据包)下载完成", file.Name())) // Unzip file zipReader, err := zip.OpenReader(path.Join(executableDir, zipPath, file.Name())) if err != nil { - applogger.Error(fmt.Sprintf("解压文件失败: 文件名:%s,错误信息:%v", file.Name(), err)) + body := fmt.Sprintf("解压文件失败: 文件名:%s,错误信息:%v", file.Name(), err) + applogger.Error(body) + SendEmail(subject, body) //发送邮件 continue } defer zipReader.Close() - //fmt.Println("压缩报文件数量:", len(zipReader.File)) for _, zipFile := range zipReader.File { zipFileReader, err := zipFile.Open() if strings.Contains(zipFile.Name, "__MACOSX/._") { - //fmt.Print("系统文件.DS_Store,跳过处理", zipFile.Name) continue } else if filepath.Ext(zipFile.Name) != ".txt" { applogger.Error(fmt.Sprintf("文件类型不正确,跳过处理: %v", zipFile.Name)) @@ -239,60 +252,71 @@ func downloadDecompression() { // Create the file unzipFile, err := os.Create(path.Join(executableDir, txtPath, zipFile.Name)) if err != nil { - applogger.Error(fmt.Sprintf("创建压缩后的文件失败,文件名:%s,错误信息: %v", zipFile.Name, err)) + body := fmt.Sprintf("创建压缩后的文件失败,文件名:%s,错误信息: %v", zipFile.Name, err) + applogger.Error(body) + SendEmail(subject, body) //发送邮件 continue } defer unzipFile.Close() // Write the unzip data to the file _, err = io.Copy(unzipFile, zipFileReader) if err != nil { - applogger.Error(fmt.Sprintf("文件解压失败,文件名:%s,错误信息: %v", zipFileReader, err)) + body := fmt.Sprintf("文件解压失败,文件名:%s,错误信息: %v", zipFileReader, err) + applogger.Error(body) + SendEmail(subject, body) //发送邮件 continue } applogger.Info(fmt.Sprintf("%s(数据包)解压完成", zipFile.Name)) - //fmt.Printf("%s(数据包)解压完成\n", zipFile.Name) - batchDataInsert(zipFile.Name) - + processingStatus = batchDataInsert(zipFile.Name) } } else if filepath.Ext(file.Name()) == ".txt" { srcFile, err := sftpClient.Open(path.Join(sftpDir, file.Name())) if err != nil { - applogger.Error(fmt.Sprintf("打开sftp文件失败,文件名:%s,错误信息: %v", file.Name(), err)) + body := fmt.Sprintf("打开sftp文件失败,文件名:%s,错误信息: %v", file.Name(), err) + applogger.Error(body) + SendEmail(subject, body) //发送邮件 continue } defer srcFile.Close() dstFile, err := os.Create(path.Join(executableDir, txtPath, file.Name())) if err != nil { - applogger.Error(fmt.Sprintf("创建本地文件失败,文件名: %s,错误信息:%v", file.Name(), err)) + body := fmt.Sprintf("创建本地文件失败,文件名: %s,错误信息:%v", file.Name(), err) + applogger.Error(body) + SendEmail(subject, body) //发送邮件 continue } defer dstFile.Close() if _, err := io.Copy(dstFile, srcFile); err != nil { - applogger.Error(fmt.Sprintf("下载文件失败,文件名: %s,错误信息:%v", file.Name(), err)) + body := fmt.Sprintf("下载文件失败,文件名: %s,错误信息:%v", file.Name(), err) + applogger.Error(body) + SendEmail(subject, body) //发送邮件 continue } - //fmt.Printf("%s(批次文件)下载完成 \n", file.Name()) applogger.Info(fmt.Sprintf("%s(批次文件)下载完成", file.Name())) - batchInsert(file.Name()) + processingStatus = batchInsert(file.Name()) } - err = redisClient.Set(fileKey, 1, 0).Err() //写入下载、解压、入库完成标记 - if err != nil { - applogger.Info(fmt.Sprintf("写入文件处理完成标记失败文件名:%s,错误信息:%v\n", file.Name(), err)) + if processingStatus == 0 { + err = redisClient.Set(fileKey, 1, 0).Err() //写入文件处理完成 + if err != nil { + body := fmt.Sprintf("写入文件处理完成标记失败文件名:%s,错误信息:%v\n", file.Name(), err) + applogger.Error(body) + SendEmail(subject, body) //发送邮件 + } } + } redisClient.Del("iniDataStatus") //删除任务执行中标记 } - } // 批次入库 -func batchInsert(fileName string) { +func batchInsert(fileName string) int { //fmt.Print("批次处理开始") start := time.Now() db, _ := connectToDB() file, err := os.Open(path.Join(executableDir, txtPath, fileName)) if err != nil { - applogger.Error(fmt.Sprintf("文件打开失败文件名:%s,错误信息%v", fileName, err)) + return -1 } else { defer file.Close() reader := csv.NewReader(bufio.NewReader(file)) @@ -301,6 +325,7 @@ func batchInsert(fileName string) { for { record, err := reader.Read() if err != nil { + return -1 break } @@ -327,7 +352,7 @@ func batchInsert(fileName string) { sid, err := CreateBatch(batchParams) if err != nil { fmt.Println(err) - return + return -1 } fmt.Println(sid) @@ -346,11 +371,14 @@ func batchInsert(fileName string) { } time.Sleep(time.Second) elapsed := time.Since(start) + subject := "丝芙兰批次文件处理完成" - body := "批次文件:" + fileName + ";\n批次数:" + strconv.Itoa(batchRows) + ";\n处理完成,请前往管理平台查看处理。" + body := fmt.Sprintf("批次数:%d;\n批次文件:%s;\n处理完成,请前往管理平台查看处理。", batchRows, fileName) SendEmail(subject, body) //发送邮件 + applogger.Info(fmt.Sprintf("%s(批次文件)入库完成", fileName)) applogger.Info(fmt.Sprintf("%s(批次文件)执行时间%s 插入批次次数:%d", fileName, elapsed, batchRows)) + return 0 } } @@ -397,41 +425,38 @@ func smsApi(method string, sendSMSDataJson string) (int, error) { return -1, err } defer resp.Body.Close() - // 解析响应数据 var retobj map[string]interface{} err = json.NewDecoder(resp.Body).Decode(&retobj) if err != nil { return -1, err } - fmt.Print(retobj) - jsonStr, err := json.Marshal(retobj) if err != nil { fmt.Println(err) return -1, err } fmt.Printf("API 返回:%s\n", string(jsonStr)) - code := int(retobj["code"].(float64)) - - //fmt.Print("code:", code) - if code == 0 { //sid := int(retobj["sid"].(float64)) fmt.Printf("提交批次成功code:%d\n", code) return code, nil } else { - return -1, fmt.Errorf("create batch failed, error code: %d", code) + applogger.Error(string(jsonStr)) + return -1, fmt.Errorf(string(jsonStr)) } } -func batchDataInsert(fileName string) { +func batchDataInsert(fileName string) int { start := time.Now() // Open file + fileKey := fmt.Sprintf("downloaded:%s", strings.Replace(fileName, filepath.Ext(fileName), ".zip", 1)) file, err := os.Open(path.Join(executableDir, txtPath, fileName)) if err != nil { + redisClient.Del(fileKey) //删除文件处理完成的标志位 applogger.Error(fmt.Sprintf("文件打开失败,文件名:%s,错误信息%v", fileName, err)) + return -1 } else { defer file.Close() db, _ := connectToDB() @@ -461,249 +486,258 @@ func batchDataInsert(fileName string) { }() } - // Create and initialize hashset - hs := make(map[string]bool) - // Prepare batch data - dataBatch := make([]BatcheData, 0, batchSize) - dataBatchDuplicate := make([]BatchDataDuplicateLog, 0, batchSize) - // Parse file line by line and insert data in batches - scanner := bufio.NewScanner(file) - scanner.Split(bufio.ScanLines) - scanner.Scan() // skip first line - bi := 0 - duplicateCount := make(map[string]int) - insertsCount := make(map[string]int) - //sendSMSData := make(map[string][]Sms) - sendMobiles := make(map[string]map[string]interface{}) - var count int - ccids := make(map[string]bool) + hs := make(map[string]bool) //排重 + dataBatch := make([]BatcheData, 0, batchSize) //短信数据 + dataBatchDuplicate := make([]BatchDataDuplicateLog, 0, batchSize) //重复短信数据 + bi := 0 //重复总数 + count := 0 // 总数 + //数据文件中批次 + batches := []Batch{} //查询批次 - // 通过下划线将文件名拆分为多个字段 fields := strings.Split(fileName, "_") - - // 最后一个字段是日期和时间,我们只需要日期部分 datetime := fields[len(fields)-1] fileNameDate := datetime[:8] - var batches []Batch + // 模糊查询文件名包含“20230103”字符串的批次记录 + db.Table("batches b1"). + Select("b1.*"). + Where("b1.data_file_name LIKE ?", "%"+fileNameDate+"%"). + Joins("INNER JOIN (SELECT communication_channel_id, MAX(created_at) AS created_at FROM batches GROUP BY communication_channel_id) b2 ON b1.communication_channel_id = b2.communication_channel_id AND b1.created_at = b2.created_at"). + Order("b1.created_at desc"). + Find(&batches) - // 模糊查询包含“20230103”字符串的记录 - db.Where("data_file_name LIKE ?", "%"+fileNameDate+"%").Find(&batches) + batchCount := len(batches) + sendMobiles := make(map[string]SmsData, batchCount) //短信数据,用于提交发送平台 + duplicateCount := make(map[string]int, batchCount) //按批次重复数 + insertsCount := make(map[string]int, batchCount) //按批次插入数 + ccids := make(map[string]bool, batchCount) - // 定义一个名为result的map类型的变量,并以CommunicationChannelID作为键 - result := make(map[string][]Batch) - for _, batch := range batches { - cckdKey := strconv.FormatUint(uint64(batch.CommunicationChannelID), 10) - result[cckdKey] = append(result[cckdKey], batch) - print(batch.CommunicationChannelID, "\n") - } - - //ci := 1 - for scanner.Scan() { - line := scanner.Text() - row, err := csv.NewReader(strings.NewReader(line)).Read() - if err != nil { - applogger.Error(fmt.Sprintf("文件按行读取失败,文件名:%s,错误信息%v", fileName, err)) - continue - } - reservedFields := map[string]string{ //合并个性化字段 - "ReservedField1": row[5], - "ReservedField2": row[6], - "ReservedField3": row[7], - "ReservedField4": row[8], - "ReservedField5": row[9], - "DataFileName": fileName, - } - reservedFieldsJson, err := json.Marshal(reservedFields) //个性化字段转json - if err != nil { - applogger.Error(fmt.Sprintf("个性化字段合并失败,文件名:%s,错误信息%v", fileName, err)) - continue - } - if _, ok := duplicateCount[row[2]]; !ok { - duplicateCount[row[2]] = 0 + if batchCount > 0 { //如果查询到数据需要的批次信息,开始处理数据 + // 定义一个名为result的map类型的变量,并以CommunicationChannelID作为键 + result := make(map[string][]Batch) + for _, batch := range batches { + cckdKey := strconv.FormatUint(uint64(batch.CommunicationChannelID), 10) + result[cckdKey] = append(result[cckdKey], batch) + print(batch.CommunicationChannelID, "\n") } - // Check if record exists in hashset - key := fmt.Sprintf("%s-%s-%s", row[2], row[3], row[5]) - if _, exists := hs[key]; exists { //如果批次数据重复 - bi++ - // Increment duplicate count - duplicateCount[row[2]]++ - dataBatchDuplicate = append(dataBatchDuplicate, BatchDataDuplicateLog{ + scanner := bufio.NewScanner(file) + scanner.Split(bufio.ScanLines) + scanner.Scan() // skip first line + for scanner.Scan() { + line := scanner.Text() + row, err := csv.NewReader(strings.NewReader(line)).Read() + if err != nil { + applogger.Error(fmt.Sprintf("文件按行读取失败,文件名:%s,错误信息%v", fileName, err)) + continue + } + reservedFields := map[string]string{ //合并个性化字段 + "ReservedField1": row[5], + "ReservedField2": row[6], + "ReservedField3": row[7], + "ReservedField4": row[8], + "ReservedField5": row[9], + "DataFileName": fileName, + } + reservedFieldsJson, err := json.Marshal(reservedFields) //个性化字段转json + if err != nil { + applogger.Error(fmt.Sprintf("个性化字段合并失败,文件名:%s,错误信息%v", fileName, err)) + continue + } + if _, ok := duplicateCount[row[2]]; !ok { + duplicateCount[row[2]] = 0 + } + + // Check if record exists in hashset + key := fmt.Sprintf("%s-%s-%s", row[2], row[3], row[5]) + if _, exists := hs[key]; exists { //如果批次数据重复 + bi++ + // Increment duplicate count + duplicateCount[row[2]]++ + dataBatchDuplicate = append(dataBatchDuplicate, BatchDataDuplicateLog{ + CommunicationChannelID: row[2], + Mobile: row[3], + FullName: row[4], + ReservedField: string(reservedFieldsJson), + }) + + if len(dataBatchDuplicate) >= batchSize { + err = db.CreateInBatches(dataBatchDuplicate, insertSize).Error + if err != nil { + applogger.Error(fmt.Sprintf("插入重复数据失败,文件名:%s,错误信息%v", fileName, err)) + } else { + dataBatchDuplicate = make([]BatchDataDuplicateLog, 0, batchSize) + } + } + continue + } + // Add record to hashset + hs[key] = true + ccids[row[2]] = true + dataBatch = append(dataBatch, BatcheData{ CommunicationChannelID: row[2], Mobile: row[3], FullName: row[4], ReservedField: string(reservedFieldsJson), }) - if len(dataBatchDuplicate) >= batchSize { - err = db.CreateInBatches(dataBatchDuplicate, insertSize).Error - if err != nil { - applogger.Error(fmt.Sprintf("插入重复数据失败,文件名:%s,错误信息%v", fileName, err)) - } else { - dataBatchDuplicate = make([]BatchDataDuplicateLog, 0, batchSize) - } - + if len(dataBatch) >= batchSize { + dataBatchChan <- dataBatch + dataBatch = make([]BatcheData, 0, batchSize) } - continue - } - // Add record to hashset - hs[key] = true - //tccid, _ := strconv.ParseUint(row[2], 10, 32) - ccids[row[2]] = true - dataBatch = append(dataBatch, BatcheData{ - CommunicationChannelID: row[2], - Mobile: row[3], - FullName: row[4], - ReservedField: string(reservedFieldsJson), - }) - if len(dataBatch) >= batchSize { + if batches, ok := result[row[2]]; ok && len(batches) > 0 && batches[0].Content != "" { + content := batches[0].Content + Sid := batches[0].Sid + pattern := regexp.MustCompile(`\{.*\}`) + matched := pattern.MatchString(content) + if matched { //个性化短信 + smsData, ok := sendMobiles[row[2]] + if !ok { + smsData = SmsData{ + Sid: Sid, + IsPersonalizedMsg: true, + SmsList: make([]SmsList, 0), + } + } + placeholderMap := map[string]string{ + "{RESERVED_FIELD_1}": row[5], + "{RESERVED_FIELD_2}": row[6], + "{RESERVED_FIELD_3}": row[7], + "{RESERVED_FIELD_4}": row[8], + "{RESERVED_FIELD_5}": row[9], + } + for k, v := range placeholderMap { + content = strings.ReplaceAll(content, k, v) + } + smsData.SmsList = append(smsData.SmsList, SmsList{M: row[3], C: content, F: 8}) + if len(smsData.SmsList) >= batchSize/2 { + sd := Sms{Sid: Sid, Data: smsData.SmsList, Token: token} + sendSMSDataJson, _ := json.Marshal(sd) + // + resp, err := smsApi("appendbatchdata", string(sendSMSDataJson)) + if resp != 0 { + fmt.Printf("smsApi returned error: %d\n", err) + //发送提醒邮件 + subject := "丝芙兰数据包数据提交异常" + body := fmt.Sprintf("数据包:%s;\nCommunicationChannelID:%s;\n错误信息:%s\n 请前往管理平台查看处理。", fileName, row[2], err) + applogger.Error(body) + SendEmail(subject, body) //发送邮件 + } + smsData.SmsList = []SmsList{} // reset mobiles slice + } + sendMobiles[row[2]] = smsData + } else { + // 处理非个性化短信 + smsData, ok := sendMobiles[row[2]] + if !ok { + smsData = SmsData{ + Sid: Sid, + IsPersonalizedMsg: false, + Content: content, + Mobiles: make([]string, 0), + } + } + smsData.Mobiles = append(smsData.Mobiles, row[3]) + if len(smsData.Mobiles) >= batchSize { + mobiles := smsData.Mobiles + mobileStr := strings.Join(mobiles, ",") + var sl []SmsList + sl = append(sl, SmsList{M: mobileStr, C: content, F: 8}) + sd := Sms{Sid: Sid, Data: sl, Token: token} + sendSMSDataJson, _ := json.Marshal(sd) + resp, err := smsApi("appendbatchdata", string(sendSMSDataJson)) + if resp != 0 { + fmt.Printf("smsApi returned error: %d\n", err) + //发送提醒邮件 + subject := "丝芙兰数据包数据提交异常" + body := fmt.Sprintf("数据包:%s;\nCommunicationChannelID:%s;\n错误信息:%s\n 请前往管理平台查看处理。", fileName, row[2], err) + applogger.Error(body) + SendEmail(subject, body) //发送邮件 + } + smsData.Mobiles = []string{} // reset mobiles slice + } + sendMobiles[row[2]] = smsData + } + } + if _, ok := insertsCount[row[2]]; !ok { + insertsCount[row[2]] = 0 + } + insertsCount[row[2]]++ + count++ + } + + if len(dataBatch) > 0 { dataBatchChan <- dataBatch - dataBatch = make([]BatcheData, 0, batchSize) - //fmt.Print("提交通次数" + strconv.Itoa(ci)) - //ci++ } + close(dataBatchChan) - if batches, ok := result[row[2]]; ok && len(batches) > 0 && batches[0].Content != "" { - content := batches[0].Content - Sid := batches[0].Sid - pattern := regexp.MustCompile(`\{.*\}`) - - matched := pattern.MatchString(content) - if matched { //个性化短信 - if _, ok := sendMobiles[row[2]]; !ok { - sendMobiles[row[2]] = make(map[string]interface{}) - sendMobiles[row[2]]["sid"] = Sid - sendMobiles[row[2]]["isPersonalizedMsg"] = true - sendMobiles[row[2]]["mobiles"] = make([]SmsList, 0) - } - placeholderMap := map[string]string{ - "{RESERVED_FIELD_1}": row[5], - "{RESERVED_FIELD_2}": row[6], - "{RESERVED_FIELD_3}": row[7], - "{RESERVED_FIELD_4}": row[8], - "{RESERVED_FIELD_5}": row[9], - } - for k, v := range placeholderMap { - content = strings.ReplaceAll(content, k, v) - } - - sendMobiles[row[2]]["mobiles"] = append(sendMobiles[row[2]]["mobiles"].([]SmsList), SmsList{M: row[3], C: content, F: 8}) - - if len(sendMobiles[row[2]]["mobiles"].([]SmsList)) >= batchSize/2 { - sd := Sms{Sid: Sid, Data: sendMobiles[row[2]]["mobiles"].([]SmsList), Token: token} + for ccid := range ccids { + smsData, ok := sendMobiles[ccid] + if !ok { + continue + } + if smsData.IsPersonalizedMsg { //个性化 + smsList := smsData.SmsList + if len(smsList) > 0 { + sd := Sms{Sid: smsData.Sid, Data: smsList, Token: token} sendSMSDataJson, _ := json.Marshal(sd) smsApi("appendbatchdata", string(sendSMSDataJson)) - //fmt.Print(string(sendSMSDataJson)) - sendMobiles[row[2]]["mobiles"] = []SmsList{} // reset mobiles slice + smsData.SmsList = []SmsList{} // reset mobiles slice } - - } else { - // 处理非个性化短信 - if _, ok := sendMobiles[row[2]]; !ok { - sendMobiles[row[2]] = make(map[string]interface{}) - sendMobiles[row[2]]["sid"] = Sid - sendMobiles[row[2]]["isPersonalizedMsg"] = false - sendMobiles[row[2]]["content"] = content - sendMobiles[row[2]]["mobiles"] = []string{} - } - - sendMobiles[row[2]]["mobiles"] = append(sendMobiles[row[2]]["mobiles"].([]string), row[3]) - - if len(sendMobiles[row[2]]["mobiles"].([]string)) >= batchSize { - mobiles := sendMobiles[row[2]]["mobiles"].([]string) + } else { //非个性化 + mobiles, ok := smsData.Mobiles, len(smsData.Mobiles) > 0 && true + if ok && len(mobiles) > 0 { mobileStr := strings.Join(mobiles, ",") var sl []SmsList - sl = append(sl, SmsList{M: mobileStr, C: content, F: 8}) - sd := Sms{Sid: Sid, Data: sl, Token: token} + sl = append(sl, SmsList{M: mobileStr, C: smsData.Content, F: 8}) + sd := Sms{Sid: smsData.Sid, Data: sl, Token: token} sendSMSDataJson, _ := json.Marshal(sd) smsApi("appendbatchdata", string(sendSMSDataJson)) - sendMobiles[row[2]]["mobiles"] = []string{} // reset mobiles slice - + smsData.Mobiles = []string{} // reset mobiles slice } } - } - - if _, ok := insertsCount[row[2]]; !ok { - insertsCount[row[2]] = 0 - } - insertsCount[row[2]]++ - count++ - } - - if len(dataBatch) > 0 { - dataBatchChan <- dataBatch - } - - close(dataBatchChan) - - //fmt.Println("结束批次v%\n", ccids) - //fmt.Print("ccids 长度", len(ccids), "\n") - for ccid := range ccids { // 处理各个批次剩余数据,同时处理批次结束api - - fmt.Println("循环处理批次结束\n", ccid) - - //batches := result[ccid] - if sendMobiles[ccid]["isPersonalizedMsg"].(bool) { //个性化 - smsList := sendMobiles[ccid]["mobiles"].([]SmsList) - if len(smsList) > 0 { - sd := Sms{Sid: sendMobiles[ccid]["sid"].(int), Data: sendMobiles[ccid]["mobiles"].([]SmsList), Token: token} - sendSMSDataJson, _ := json.Marshal(sd) - smsApi("appendbatchdata", string(sendSMSDataJson)) - //fmt.Print(string(sendSMSDataJson)) - sendMobiles[ccid]["mobiles"] = []SmsList{} // reset mobiles slice + sf := SmsFinish{Sid: smsData.Sid, Token: token} + sfJson, _ := json.Marshal(sf) + smsApi("finishbatch", string(sfJson)) + bpi := []BatchProcessingInformation{} + bpi = append(bpi, BatchProcessingInformation{ + CommunicationChannelID: ccid, + RepeatTargetsMember: duplicateCount[ccid], + InsertsTargetsMember: insertsCount[ccid], + DataFileName: fileName, + }) + err = db.CreateInBatches(bpi, insertSize).Error + if err != nil { + applogger.Error(fmt.Sprintf("插入批次处理信息失败,文件名:%s,错误信息%v", fileName, err)) } + smsData = SmsData{} + sendMobiles[ccid] = smsData + } - } else { //非个性化 - if mobiles, ok := sendMobiles[ccid]["mobiles"].([]string); ok && len(mobiles) > 0 { - mobileStr := strings.Join(sendMobiles[ccid]["mobiles"].([]string), ",") - var sl []SmsList - sl = append(sl, SmsList{M: mobileStr, C: sendMobiles[ccid]["content"].(string), F: 8}) + wg.Wait() //所有入库全部完成 - sd := Sms{Sid: sendMobiles[ccid]["sid"].(int), Data: sl, Token: token} - sendSMSDataJson, _ := json.Marshal(sd) - smsApi("appendbatchdata", string(sendSMSDataJson)) + //插入批此重复数据 + if len(dataBatchDuplicate) > 0 { + err = db.CreateInBatches(dataBatchDuplicate, insertSize).Error + if err != nil { + applogger.Error(fmt.Sprintf("插入重复数据失败,文件名:%s,错误信息%v", fileName, err)) + } else { + dataBatchDuplicate = nil } } - //关闭批次 - sf := SmsFinish{Sid: sendMobiles[ccid]["sid"].(int), Token: token} - sfJson, _ := json.Marshal(sf) - smsApi("finishbatch", string(sfJson)) + elapsed := time.Since(start) - //插入批次处理数据 - bpi := []BatchProcessingInformation{} - bpi = append(bpi, BatchProcessingInformation{ - CommunicationChannelID: ccid, - RepeatTargetsMember: duplicateCount[ccid], - InsertsTargetsMember: insertsCount[ccid], - DataFileName: fileName, - }) - err = db.CreateInBatches(bpi, insertSize).Error - if err != nil { - applogger.Error(fmt.Sprintf("插入批次处理信息失败,文件名:%s,错误信息%v", fileName, err)) - } + //发送提醒邮件 + subject := "丝芙兰数据包处理完成" + body := fmt.Sprintf("总数:%d;\n数据包:%s;\n过滤重复数:%d;\n过滤后总数:%d;\n处理完成,请前往管理平台查看处理。", count+bi, fileName, bi, count) + SendEmail(subject, body) //发送邮件 + applogger.Info(fmt.Sprintf("%s(数据包) 入库完成", fileName)) + applogger.Info(fmt.Sprintf("%s(数据包)执行时间:%s 插入数据:%d条 过滤数数:%d条", fileName, elapsed, count, bi)) + return 0 + } else { //如果没有查询到批次信息,跳过处理 + return -1 } - wg.Wait() //所有入库全部完成 - - //插入批此重复数据 - if len(dataBatchDuplicate) > 0 { - err = db.CreateInBatches(dataBatchDuplicate, insertSize).Error - if err != nil { - applogger.Error(fmt.Sprintf("插入重复数据失败,文件名:%s,错误信息%v", fileName, err)) - } else { - dataBatchDuplicate = nil - } - } - - elapsed := time.Since(start) - - //发送提醒邮件 - subject := "丝芙兰数据包处理完成" - body := "数据包:" + fileName + ";\n总数:" + strconv.Itoa(count+bi) + ";\n过滤重复数:" + strconv.Itoa(bi) + ";\n过滤后总数:" + strconv.Itoa(count) + ";\n处理完成,请前往管理平台查看处理。" - SendEmail(subject, body) //发送邮件 - applogger.Info(fmt.Sprintf("%s(数据包) 入库完成", fileName)) - applogger.Info(fmt.Sprintf("%s(数据包)执行时间:%s 插入数据:%d条 过滤数数:%d条", fileName, elapsed, count, bi)) } } @@ -998,3 +1032,10 @@ type SmsFinish struct { Sid int `json:"sid"` Token string `json:"token"` } +type SmsData struct { + Sid int + IsPersonalizedMsg bool + Content string + Mobiles []string + SmsList []SmsList +}