增加异常情况邮件提醒,增加WebSocket服务

This commit is contained in:
chejiulong 2023-03-05 17:51:04 +08:00
parent 34ab8962e0
commit 2fb1d934db

537
main.go
View File

@ -47,7 +47,7 @@ func init() {
Password: redisPassword, Password: redisPassword,
DB: redisDB, DB: redisDB,
}) })
//go startWebSocket() // 启动 WebSocket 服务,使用协程方式运行 go startWebSocket() // 启动 WebSocket 服务,使用协程方式运行
applogger.Info(fmt.Sprintf("程序启动,加载%s环境尝试执行...", env)) applogger.Info(fmt.Sprintf("程序启动,加载%s环境尝试执行...", env))
go downloadDecompression() // 启动立即执行一次数据下载、处理 go downloadDecompression() // 启动立即执行一次数据下载、处理
} }
@ -150,10 +150,13 @@ func downloadDecompression() {
if exists, _ := redisClient.Exists("iniDataStatus").Result(); exists == 1 { if exists, _ := redisClient.Exists("iniDataStatus").Result(); exists == 1 {
fmt.Print("上一批次执行中,跳过本次\n") fmt.Print("上一批次执行中,跳过本次\n")
} else { } else {
subject := "丝芙兰短信处理程序异常"
// 写入执行中标记 // 写入执行中标记
err := redisClient.Set("iniDataStatus", 1, 0).Err() err := redisClient.Set("iniDataStatus", 1, 0).Err()
if err != nil { if err != nil {
applogger.Error(fmt.Sprintf("写入任务执行中标记失败:%v", err)) body := fmt.Sprintf("写入任务执行中标记失败:%v", err)
applogger.Error(body)
SendEmail(subject, body) //发送邮件
} }
// Connect to SFTP server // Connect to SFTP server
sshConfig := &ssh.ClientConfig{ sshConfig := &ssh.ClientConfig{
@ -165,22 +168,28 @@ func downloadDecompression() {
} }
sshClient, err := ssh.Dial("tcp", sftpAddress, sshConfig) sshClient, err := ssh.Dial("tcp", sftpAddress, sshConfig)
if err != nil { if err != nil {
applogger.Error(fmt.Sprintf("写入任务执行中标记失败:%v", err)) body := fmt.Sprintf("sshClient连接失败%v", err)
applogger.Error(body)
SendEmail(subject, body) //发送邮件
} }
sftpClient, err := sftp.NewClient(sshClient) sftpClient, err := sftp.NewClient(sshClient)
if err != nil { if err != nil {
applogger.Error(fmt.Sprintf("写入任务执行中标记失败:%v", err)) body := fmt.Sprintf("sftp连接失败%v", err)
applogger.Error(body)
SendEmail(subject, body) //发送邮件
} }
defer sftpClient.Close() defer sftpClient.Close()
files, err := sftpClient.ReadDir(sftpDir) files, err := sftpClient.ReadDir(sftpDir)
if err != nil { if err != nil {
applogger.Error(fmt.Sprintf("sftp目录不存在: 错误信息:%v", err)) body := fmt.Sprintf("sftp目录%s不存在, 错误信息:%v", sftpDir, err)
applogger.Error(body)
SendEmail(subject, body) //发送邮件
} }
it := 1 it := 1
fmt.Printf("共%d个文件\n", len(files)) fmt.Printf("共%d个文件\n", len(files))
sort.Sort(FileSorter(files)) sort.Sort(FileSorter(files))
processingStatus := -1
for _, file := range files { for _, file := range files {
fmt.Printf("第%d个文件处理中\n", it) fmt.Printf("第%d个文件处理中\n", it)
it++ it++
@ -190,41 +199,45 @@ func downloadDecompression() {
fmt.Println("跳过已处理过的文件:" + file.Name()) fmt.Println("跳过已处理过的文件:" + file.Name())
continue continue
} }
if filepath.Ext(file.Name()) == ".zip" { if filepath.Ext(file.Name()) == ".zip" {
//fmt.Println("下载开始(数据包):" + file.Name()) //fmt.Println("下载开始(数据包):" + file.Name())
// Download file // Download file
srcFile, err := sftpClient.Open(path.Join(sftpDir, file.Name())) srcFile, err := sftpClient.Open(path.Join(sftpDir, file.Name()))
if err != nil { if err != nil {
applogger.Error(fmt.Sprintf("打开sftp文件失败文件名:%s错误信息 %v", file.Name(), err)) body := fmt.Sprintf("打开sftp文件失败文件名:%s错误信息 %v", file.Name(), err)
applogger.Error(body)
SendEmail(subject, body) //发送邮件
continue continue
} }
defer srcFile.Close() defer srcFile.Close()
dstFile, err := os.Create(path.Join(executableDir, zipPath, file.Name())) dstFile, err := os.Create(path.Join(executableDir, zipPath, file.Name()))
if err != nil { if err != nil {
applogger.Error(fmt.Sprintf("创建本地文件失败,文件名: %s错误信息%v", file.Name(), err)) body := fmt.Sprintf("创建本地文件失败,文件名: %s错误信息%v", file.Name(), err)
applogger.Error(body)
SendEmail(subject, body) //发送邮件
continue continue
} }
defer dstFile.Close() defer dstFile.Close()
if _, err := io.Copy(dstFile, srcFile); err != nil { if _, err := io.Copy(dstFile, srcFile); err != nil {
applogger.Error(fmt.Sprintf("下载文件失败,文件名: %s错误信息%v", file.Name(), err)) body := fmt.Sprintf("下载文件失败,文件名: %s错误信息%v", file.Name(), err)
applogger.Error(body)
SendEmail(subject, body) //发送邮件
continue continue
} }
//fmt.Printf("%s数据包下载完成\n", file.Name())
applogger.Info(fmt.Sprintf("%s数据包下载完成", file.Name())) applogger.Info(fmt.Sprintf("%s数据包下载完成", file.Name()))
// Unzip file // Unzip file
zipReader, err := zip.OpenReader(path.Join(executableDir, zipPath, file.Name())) zipReader, err := zip.OpenReader(path.Join(executableDir, zipPath, file.Name()))
if err != nil { if err != nil {
applogger.Error(fmt.Sprintf("解压文件失败: 文件名:%s错误信息%v", file.Name(), err)) body := fmt.Sprintf("解压文件失败: 文件名:%s错误信息%v", file.Name(), err)
applogger.Error(body)
SendEmail(subject, body) //发送邮件
continue continue
} }
defer zipReader.Close() defer zipReader.Close()
//fmt.Println("压缩报文件数量:", len(zipReader.File))
for _, zipFile := range zipReader.File { for _, zipFile := range zipReader.File {
zipFileReader, err := zipFile.Open() zipFileReader, err := zipFile.Open()
if strings.Contains(zipFile.Name, "__MACOSX/._") { if strings.Contains(zipFile.Name, "__MACOSX/._") {
//fmt.Print("系统文件.DS_Store,跳过处理", zipFile.Name)
continue continue
} else if filepath.Ext(zipFile.Name) != ".txt" { } else if filepath.Ext(zipFile.Name) != ".txt" {
applogger.Error(fmt.Sprintf("文件类型不正确,跳过处理: %v", zipFile.Name)) applogger.Error(fmt.Sprintf("文件类型不正确,跳过处理: %v", zipFile.Name))
@ -239,60 +252,71 @@ func downloadDecompression() {
// Create the file // Create the file
unzipFile, err := os.Create(path.Join(executableDir, txtPath, zipFile.Name)) unzipFile, err := os.Create(path.Join(executableDir, txtPath, zipFile.Name))
if err != nil { if err != nil {
applogger.Error(fmt.Sprintf("创建压缩后的文件失败,文件名:%s错误信息 %v", zipFile.Name, err)) body := fmt.Sprintf("创建压缩后的文件失败,文件名:%s错误信息 %v", zipFile.Name, err)
applogger.Error(body)
SendEmail(subject, body) //发送邮件
continue continue
} }
defer unzipFile.Close() defer unzipFile.Close()
// Write the unzip data to the file // Write the unzip data to the file
_, err = io.Copy(unzipFile, zipFileReader) _, err = io.Copy(unzipFile, zipFileReader)
if err != nil { if err != nil {
applogger.Error(fmt.Sprintf("文件解压失败,文件名:%s错误信息 %v", zipFileReader, err)) body := fmt.Sprintf("文件解压失败,文件名:%s错误信息 %v", zipFileReader, err)
applogger.Error(body)
SendEmail(subject, body) //发送邮件
continue continue
} }
applogger.Info(fmt.Sprintf("%s数据包解压完成", zipFile.Name)) applogger.Info(fmt.Sprintf("%s数据包解压完成", zipFile.Name))
//fmt.Printf("%s数据包解压完成\n", zipFile.Name) processingStatus = batchDataInsert(zipFile.Name)
batchDataInsert(zipFile.Name)
} }
} else if filepath.Ext(file.Name()) == ".txt" { } else if filepath.Ext(file.Name()) == ".txt" {
srcFile, err := sftpClient.Open(path.Join(sftpDir, file.Name())) srcFile, err := sftpClient.Open(path.Join(sftpDir, file.Name()))
if err != nil { if err != nil {
applogger.Error(fmt.Sprintf("打开sftp文件失败文件名:%s错误信息 %v", file.Name(), err)) body := fmt.Sprintf("打开sftp文件失败文件名:%s错误信息 %v", file.Name(), err)
applogger.Error(body)
SendEmail(subject, body) //发送邮件
continue continue
} }
defer srcFile.Close() defer srcFile.Close()
dstFile, err := os.Create(path.Join(executableDir, txtPath, file.Name())) dstFile, err := os.Create(path.Join(executableDir, txtPath, file.Name()))
if err != nil { if err != nil {
applogger.Error(fmt.Sprintf("创建本地文件失败,文件名: %s错误信息%v", file.Name(), err)) body := fmt.Sprintf("创建本地文件失败,文件名: %s错误信息%v", file.Name(), err)
applogger.Error(body)
SendEmail(subject, body) //发送邮件
continue continue
} }
defer dstFile.Close() defer dstFile.Close()
if _, err := io.Copy(dstFile, srcFile); err != nil { if _, err := io.Copy(dstFile, srcFile); err != nil {
applogger.Error(fmt.Sprintf("下载文件失败,文件名: %s错误信息%v", file.Name(), err)) body := fmt.Sprintf("下载文件失败,文件名: %s错误信息%v", file.Name(), err)
applogger.Error(body)
SendEmail(subject, body) //发送邮件
continue continue
} }
//fmt.Printf("%s批次文件下载完成 \n", file.Name())
applogger.Info(fmt.Sprintf("%s批次文件下载完成", file.Name())) applogger.Info(fmt.Sprintf("%s批次文件下载完成", file.Name()))
batchInsert(file.Name()) processingStatus = batchInsert(file.Name())
} }
err = redisClient.Set(fileKey, 1, 0).Err() //写入下载、解压、入库完成标记 if processingStatus == 0 {
if err != nil { err = redisClient.Set(fileKey, 1, 0).Err() //写入文件处理完成
applogger.Info(fmt.Sprintf("写入文件处理完成标记失败文件名:%s错误信息%v\n", file.Name(), err)) if err != nil {
body := fmt.Sprintf("写入文件处理完成标记失败文件名:%s错误信息%v\n", file.Name(), err)
applogger.Error(body)
SendEmail(subject, body) //发送邮件
}
} }
} }
redisClient.Del("iniDataStatus") //删除任务执行中标记 redisClient.Del("iniDataStatus") //删除任务执行中标记
} }
} }
// 批次入库 // 批次入库
func batchInsert(fileName string) { func batchInsert(fileName string) int {
//fmt.Print("批次处理开始") //fmt.Print("批次处理开始")
start := time.Now() start := time.Now()
db, _ := connectToDB() db, _ := connectToDB()
file, err := os.Open(path.Join(executableDir, txtPath, fileName)) file, err := os.Open(path.Join(executableDir, txtPath, fileName))
if err != nil { if err != nil {
applogger.Error(fmt.Sprintf("文件打开失败文件名:%s错误信息%v", fileName, err)) return -1
} else { } else {
defer file.Close() defer file.Close()
reader := csv.NewReader(bufio.NewReader(file)) reader := csv.NewReader(bufio.NewReader(file))
@ -301,6 +325,7 @@ func batchInsert(fileName string) {
for { for {
record, err := reader.Read() record, err := reader.Read()
if err != nil { if err != nil {
return -1
break break
} }
@ -327,7 +352,7 @@ func batchInsert(fileName string) {
sid, err := CreateBatch(batchParams) sid, err := CreateBatch(batchParams)
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
return return -1
} }
fmt.Println(sid) fmt.Println(sid)
@ -346,11 +371,14 @@ func batchInsert(fileName string) {
} }
time.Sleep(time.Second) time.Sleep(time.Second)
elapsed := time.Since(start) elapsed := time.Since(start)
subject := "丝芙兰批次文件处理完成" subject := "丝芙兰批次文件处理完成"
body := "批次文件:" + fileName + ";\n批次数" + strconv.Itoa(batchRows) + ";\n处理完成请前往管理平台查看处理。" body := fmt.Sprintf("批次数:%d;\n批次文件%s;\n处理完成请前往管理平台查看处理。", batchRows, fileName)
SendEmail(subject, body) //发送邮件 SendEmail(subject, body) //发送邮件
applogger.Info(fmt.Sprintf("%s批次文件入库完成", fileName)) applogger.Info(fmt.Sprintf("%s批次文件入库完成", fileName))
applogger.Info(fmt.Sprintf("%s批次文件执行时间%s 插入批次次数:%d", fileName, elapsed, batchRows)) applogger.Info(fmt.Sprintf("%s批次文件执行时间%s 插入批次次数:%d", fileName, elapsed, batchRows))
return 0
} }
} }
@ -397,41 +425,38 @@ func smsApi(method string, sendSMSDataJson string) (int, error) {
return -1, err return -1, err
} }
defer resp.Body.Close() defer resp.Body.Close()
// 解析响应数据 // 解析响应数据
var retobj map[string]interface{} var retobj map[string]interface{}
err = json.NewDecoder(resp.Body).Decode(&retobj) err = json.NewDecoder(resp.Body).Decode(&retobj)
if err != nil { if err != nil {
return -1, err return -1, err
} }
fmt.Print(retobj)
jsonStr, err := json.Marshal(retobj) jsonStr, err := json.Marshal(retobj)
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
return -1, err return -1, err
} }
fmt.Printf("API 返回:%s\n", string(jsonStr)) fmt.Printf("API 返回:%s\n", string(jsonStr))
code := int(retobj["code"].(float64)) code := int(retobj["code"].(float64))
//fmt.Print("code", code)
if code == 0 { if code == 0 {
//sid := int(retobj["sid"].(float64)) //sid := int(retobj["sid"].(float64))
fmt.Printf("提交批次成功code%d\n", code) fmt.Printf("提交批次成功code%d\n", code)
return code, nil return code, nil
} else { } else {
return -1, fmt.Errorf("create batch failed, error code: %d", code) applogger.Error(string(jsonStr))
return -1, fmt.Errorf(string(jsonStr))
} }
} }
func batchDataInsert(fileName string) { func batchDataInsert(fileName string) int {
start := time.Now() start := time.Now()
// Open file // Open file
fileKey := fmt.Sprintf("downloaded:%s", strings.Replace(fileName, filepath.Ext(fileName), ".zip", 1))
file, err := os.Open(path.Join(executableDir, txtPath, fileName)) file, err := os.Open(path.Join(executableDir, txtPath, fileName))
if err != nil { if err != nil {
redisClient.Del(fileKey) //删除文件处理完成的标志位
applogger.Error(fmt.Sprintf("文件打开失败,文件名:%s错误信息%v", fileName, err)) applogger.Error(fmt.Sprintf("文件打开失败,文件名:%s错误信息%v", fileName, err))
return -1
} else { } else {
defer file.Close() defer file.Close()
db, _ := connectToDB() db, _ := connectToDB()
@ -461,249 +486,258 @@ func batchDataInsert(fileName string) {
}() }()
} }
// Create and initialize hashset hs := make(map[string]bool) //排重
hs := make(map[string]bool) dataBatch := make([]BatcheData, 0, batchSize) //短信数据
// Prepare batch data dataBatchDuplicate := make([]BatchDataDuplicateLog, 0, batchSize) //重复短信数据
dataBatch := make([]BatcheData, 0, batchSize) bi := 0 //重复总数
dataBatchDuplicate := make([]BatchDataDuplicateLog, 0, batchSize) count := 0 // 总数
// Parse file line by line and insert data in batches //数据文件中批次
scanner := bufio.NewScanner(file) batches := []Batch{} //查询批次
scanner.Split(bufio.ScanLines)
scanner.Scan() // skip first line
bi := 0
duplicateCount := make(map[string]int)
insertsCount := make(map[string]int)
//sendSMSData := make(map[string][]Sms)
sendMobiles := make(map[string]map[string]interface{})
var count int
ccids := make(map[string]bool)
// 通过下划线将文件名拆分为多个字段
fields := strings.Split(fileName, "_") fields := strings.Split(fileName, "_")
// 最后一个字段是日期和时间,我们只需要日期部分
datetime := fields[len(fields)-1] datetime := fields[len(fields)-1]
fileNameDate := datetime[:8] fileNameDate := datetime[:8]
var batches []Batch // 模糊查询文件名包含“20230103”字符串的批次记录
db.Table("batches b1").
Select("b1.*").
Where("b1.data_file_name LIKE ?", "%"+fileNameDate+"%").
Joins("INNER JOIN (SELECT communication_channel_id, MAX(created_at) AS created_at FROM batches GROUP BY communication_channel_id) b2 ON b1.communication_channel_id = b2.communication_channel_id AND b1.created_at = b2.created_at").
Order("b1.created_at desc").
Find(&batches)
// 模糊查询包含“20230103”字符串的记录 batchCount := len(batches)
db.Where("data_file_name LIKE ?", "%"+fileNameDate+"%").Find(&batches) sendMobiles := make(map[string]SmsData, batchCount) //短信数据,用于提交发送平台
duplicateCount := make(map[string]int, batchCount) //按批次重复数
insertsCount := make(map[string]int, batchCount) //按批次插入数
ccids := make(map[string]bool, batchCount)
// 定义一个名为result的map类型的变量并以CommunicationChannelID作为键 if batchCount > 0 { //如果查询到数据需要的批次信息,开始处理数据
result := make(map[string][]Batch) // 定义一个名为result的map类型的变量并以CommunicationChannelID作为键
for _, batch := range batches { result := make(map[string][]Batch)
cckdKey := strconv.FormatUint(uint64(batch.CommunicationChannelID), 10) for _, batch := range batches {
result[cckdKey] = append(result[cckdKey], batch) cckdKey := strconv.FormatUint(uint64(batch.CommunicationChannelID), 10)
print(batch.CommunicationChannelID, "\n") result[cckdKey] = append(result[cckdKey], batch)
} print(batch.CommunicationChannelID, "\n")
//ci := 1
for scanner.Scan() {
line := scanner.Text()
row, err := csv.NewReader(strings.NewReader(line)).Read()
if err != nil {
applogger.Error(fmt.Sprintf("文件按行读取失败,文件名:%s错误信息%v", fileName, err))
continue
}
reservedFields := map[string]string{ //合并个性化字段
"ReservedField1": row[5],
"ReservedField2": row[6],
"ReservedField3": row[7],
"ReservedField4": row[8],
"ReservedField5": row[9],
"DataFileName": fileName,
}
reservedFieldsJson, err := json.Marshal(reservedFields) //个性化字段转json
if err != nil {
applogger.Error(fmt.Sprintf("个性化字段合并失败,文件名:%s错误信息%v", fileName, err))
continue
}
if _, ok := duplicateCount[row[2]]; !ok {
duplicateCount[row[2]] = 0
} }
// Check if record exists in hashset scanner := bufio.NewScanner(file)
key := fmt.Sprintf("%s-%s-%s", row[2], row[3], row[5]) scanner.Split(bufio.ScanLines)
if _, exists := hs[key]; exists { //如果批次数据重复 scanner.Scan() // skip first line
bi++ for scanner.Scan() {
// Increment duplicate count line := scanner.Text()
duplicateCount[row[2]]++ row, err := csv.NewReader(strings.NewReader(line)).Read()
dataBatchDuplicate = append(dataBatchDuplicate, BatchDataDuplicateLog{ if err != nil {
applogger.Error(fmt.Sprintf("文件按行读取失败,文件名:%s错误信息%v", fileName, err))
continue
}
reservedFields := map[string]string{ //合并个性化字段
"ReservedField1": row[5],
"ReservedField2": row[6],
"ReservedField3": row[7],
"ReservedField4": row[8],
"ReservedField5": row[9],
"DataFileName": fileName,
}
reservedFieldsJson, err := json.Marshal(reservedFields) //个性化字段转json
if err != nil {
applogger.Error(fmt.Sprintf("个性化字段合并失败,文件名:%s错误信息%v", fileName, err))
continue
}
if _, ok := duplicateCount[row[2]]; !ok {
duplicateCount[row[2]] = 0
}
// Check if record exists in hashset
key := fmt.Sprintf("%s-%s-%s", row[2], row[3], row[5])
if _, exists := hs[key]; exists { //如果批次数据重复
bi++
// Increment duplicate count
duplicateCount[row[2]]++
dataBatchDuplicate = append(dataBatchDuplicate, BatchDataDuplicateLog{
CommunicationChannelID: row[2],
Mobile: row[3],
FullName: row[4],
ReservedField: string(reservedFieldsJson),
})
if len(dataBatchDuplicate) >= batchSize {
err = db.CreateInBatches(dataBatchDuplicate, insertSize).Error
if err != nil {
applogger.Error(fmt.Sprintf("插入重复数据失败,文件名:%s错误信息%v", fileName, err))
} else {
dataBatchDuplicate = make([]BatchDataDuplicateLog, 0, batchSize)
}
}
continue
}
// Add record to hashset
hs[key] = true
ccids[row[2]] = true
dataBatch = append(dataBatch, BatcheData{
CommunicationChannelID: row[2], CommunicationChannelID: row[2],
Mobile: row[3], Mobile: row[3],
FullName: row[4], FullName: row[4],
ReservedField: string(reservedFieldsJson), ReservedField: string(reservedFieldsJson),
}) })
if len(dataBatchDuplicate) >= batchSize { if len(dataBatch) >= batchSize {
err = db.CreateInBatches(dataBatchDuplicate, insertSize).Error dataBatchChan <- dataBatch
if err != nil { dataBatch = make([]BatcheData, 0, batchSize)
applogger.Error(fmt.Sprintf("插入重复数据失败,文件名:%s错误信息%v", fileName, err))
} else {
dataBatchDuplicate = make([]BatchDataDuplicateLog, 0, batchSize)
}
} }
continue
}
// Add record to hashset
hs[key] = true
//tccid, _ := strconv.ParseUint(row[2], 10, 32)
ccids[row[2]] = true
dataBatch = append(dataBatch, BatcheData{
CommunicationChannelID: row[2],
Mobile: row[3],
FullName: row[4],
ReservedField: string(reservedFieldsJson),
})
if len(dataBatch) >= batchSize { if batches, ok := result[row[2]]; ok && len(batches) > 0 && batches[0].Content != "" {
content := batches[0].Content
Sid := batches[0].Sid
pattern := regexp.MustCompile(`\{.*\}`)
matched := pattern.MatchString(content)
if matched { //个性化短信
smsData, ok := sendMobiles[row[2]]
if !ok {
smsData = SmsData{
Sid: Sid,
IsPersonalizedMsg: true,
SmsList: make([]SmsList, 0),
}
}
placeholderMap := map[string]string{
"{RESERVED_FIELD_1}": row[5],
"{RESERVED_FIELD_2}": row[6],
"{RESERVED_FIELD_3}": row[7],
"{RESERVED_FIELD_4}": row[8],
"{RESERVED_FIELD_5}": row[9],
}
for k, v := range placeholderMap {
content = strings.ReplaceAll(content, k, v)
}
smsData.SmsList = append(smsData.SmsList, SmsList{M: row[3], C: content, F: 8})
if len(smsData.SmsList) >= batchSize/2 {
sd := Sms{Sid: Sid, Data: smsData.SmsList, Token: token}
sendSMSDataJson, _ := json.Marshal(sd)
//
resp, err := smsApi("appendbatchdata", string(sendSMSDataJson))
if resp != 0 {
fmt.Printf("smsApi returned error: %d\n", err)
//发送提醒邮件
subject := "丝芙兰数据包数据提交异常"
body := fmt.Sprintf("数据包:%s;\nCommunicationChannelID%s;\n错误信息%s\n 请前往管理平台查看处理。", fileName, row[2], err)
applogger.Error(body)
SendEmail(subject, body) //发送邮件
}
smsData.SmsList = []SmsList{} // reset mobiles slice
}
sendMobiles[row[2]] = smsData
} else {
// 处理非个性化短信
smsData, ok := sendMobiles[row[2]]
if !ok {
smsData = SmsData{
Sid: Sid,
IsPersonalizedMsg: false,
Content: content,
Mobiles: make([]string, 0),
}
}
smsData.Mobiles = append(smsData.Mobiles, row[3])
if len(smsData.Mobiles) >= batchSize {
mobiles := smsData.Mobiles
mobileStr := strings.Join(mobiles, ",")
var sl []SmsList
sl = append(sl, SmsList{M: mobileStr, C: content, F: 8})
sd := Sms{Sid: Sid, Data: sl, Token: token}
sendSMSDataJson, _ := json.Marshal(sd)
resp, err := smsApi("appendbatchdata", string(sendSMSDataJson))
if resp != 0 {
fmt.Printf("smsApi returned error: %d\n", err)
//发送提醒邮件
subject := "丝芙兰数据包数据提交异常"
body := fmt.Sprintf("数据包:%s;\nCommunicationChannelID%s;\n错误信息%s\n 请前往管理平台查看处理。", fileName, row[2], err)
applogger.Error(body)
SendEmail(subject, body) //发送邮件
}
smsData.Mobiles = []string{} // reset mobiles slice
}
sendMobiles[row[2]] = smsData
}
}
if _, ok := insertsCount[row[2]]; !ok {
insertsCount[row[2]] = 0
}
insertsCount[row[2]]++
count++
}
if len(dataBatch) > 0 {
dataBatchChan <- dataBatch dataBatchChan <- dataBatch
dataBatch = make([]BatcheData, 0, batchSize)
//fmt.Print("提交通次数" + strconv.Itoa(ci))
//ci++
} }
close(dataBatchChan)
if batches, ok := result[row[2]]; ok && len(batches) > 0 && batches[0].Content != "" { for ccid := range ccids {
content := batches[0].Content smsData, ok := sendMobiles[ccid]
Sid := batches[0].Sid if !ok {
pattern := regexp.MustCompile(`\{.*\}`) continue
}
matched := pattern.MatchString(content) if smsData.IsPersonalizedMsg { //个性化
if matched { //个性化短信 smsList := smsData.SmsList
if _, ok := sendMobiles[row[2]]; !ok { if len(smsList) > 0 {
sendMobiles[row[2]] = make(map[string]interface{}) sd := Sms{Sid: smsData.Sid, Data: smsList, Token: token}
sendMobiles[row[2]]["sid"] = Sid
sendMobiles[row[2]]["isPersonalizedMsg"] = true
sendMobiles[row[2]]["mobiles"] = make([]SmsList, 0)
}
placeholderMap := map[string]string{
"{RESERVED_FIELD_1}": row[5],
"{RESERVED_FIELD_2}": row[6],
"{RESERVED_FIELD_3}": row[7],
"{RESERVED_FIELD_4}": row[8],
"{RESERVED_FIELD_5}": row[9],
}
for k, v := range placeholderMap {
content = strings.ReplaceAll(content, k, v)
}
sendMobiles[row[2]]["mobiles"] = append(sendMobiles[row[2]]["mobiles"].([]SmsList), SmsList{M: row[3], C: content, F: 8})
if len(sendMobiles[row[2]]["mobiles"].([]SmsList)) >= batchSize/2 {
sd := Sms{Sid: Sid, Data: sendMobiles[row[2]]["mobiles"].([]SmsList), Token: token}
sendSMSDataJson, _ := json.Marshal(sd) sendSMSDataJson, _ := json.Marshal(sd)
smsApi("appendbatchdata", string(sendSMSDataJson)) smsApi("appendbatchdata", string(sendSMSDataJson))
//fmt.Print(string(sendSMSDataJson)) smsData.SmsList = []SmsList{} // reset mobiles slice
sendMobiles[row[2]]["mobiles"] = []SmsList{} // reset mobiles slice
} }
} else { //非个性化
} else { mobiles, ok := smsData.Mobiles, len(smsData.Mobiles) > 0 && true
// 处理非个性化短信 if ok && len(mobiles) > 0 {
if _, ok := sendMobiles[row[2]]; !ok {
sendMobiles[row[2]] = make(map[string]interface{})
sendMobiles[row[2]]["sid"] = Sid
sendMobiles[row[2]]["isPersonalizedMsg"] = false
sendMobiles[row[2]]["content"] = content
sendMobiles[row[2]]["mobiles"] = []string{}
}
sendMobiles[row[2]]["mobiles"] = append(sendMobiles[row[2]]["mobiles"].([]string), row[3])
if len(sendMobiles[row[2]]["mobiles"].([]string)) >= batchSize {
mobiles := sendMobiles[row[2]]["mobiles"].([]string)
mobileStr := strings.Join(mobiles, ",") mobileStr := strings.Join(mobiles, ",")
var sl []SmsList var sl []SmsList
sl = append(sl, SmsList{M: mobileStr, C: content, F: 8}) sl = append(sl, SmsList{M: mobileStr, C: smsData.Content, F: 8})
sd := Sms{Sid: Sid, Data: sl, Token: token} sd := Sms{Sid: smsData.Sid, Data: sl, Token: token}
sendSMSDataJson, _ := json.Marshal(sd) sendSMSDataJson, _ := json.Marshal(sd)
smsApi("appendbatchdata", string(sendSMSDataJson)) smsApi("appendbatchdata", string(sendSMSDataJson))
sendMobiles[row[2]]["mobiles"] = []string{} // reset mobiles slice smsData.Mobiles = []string{} // reset mobiles slice
} }
} }
} sf := SmsFinish{Sid: smsData.Sid, Token: token}
sfJson, _ := json.Marshal(sf)
if _, ok := insertsCount[row[2]]; !ok { smsApi("finishbatch", string(sfJson))
insertsCount[row[2]] = 0 bpi := []BatchProcessingInformation{}
} bpi = append(bpi, BatchProcessingInformation{
insertsCount[row[2]]++ CommunicationChannelID: ccid,
count++ RepeatTargetsMember: duplicateCount[ccid],
} InsertsTargetsMember: insertsCount[ccid],
DataFileName: fileName,
if len(dataBatch) > 0 { })
dataBatchChan <- dataBatch err = db.CreateInBatches(bpi, insertSize).Error
} if err != nil {
applogger.Error(fmt.Sprintf("插入批次处理信息失败,文件名:%s错误信息%v", fileName, err))
close(dataBatchChan)
//fmt.Println("结束批次v%\n", ccids)
//fmt.Print("ccids 长度", len(ccids), "\n")
for ccid := range ccids { // 处理各个批次剩余数据同时处理批次结束api
fmt.Println("循环处理批次结束\n", ccid)
//batches := result[ccid]
if sendMobiles[ccid]["isPersonalizedMsg"].(bool) { //个性化
smsList := sendMobiles[ccid]["mobiles"].([]SmsList)
if len(smsList) > 0 {
sd := Sms{Sid: sendMobiles[ccid]["sid"].(int), Data: sendMobiles[ccid]["mobiles"].([]SmsList), Token: token}
sendSMSDataJson, _ := json.Marshal(sd)
smsApi("appendbatchdata", string(sendSMSDataJson))
//fmt.Print(string(sendSMSDataJson))
sendMobiles[ccid]["mobiles"] = []SmsList{} // reset mobiles slice
} }
smsData = SmsData{}
sendMobiles[ccid] = smsData
}
} else { //非个性化 wg.Wait() //所有入库全部完成
if mobiles, ok := sendMobiles[ccid]["mobiles"].([]string); ok && len(mobiles) > 0 {
mobileStr := strings.Join(sendMobiles[ccid]["mobiles"].([]string), ",")
var sl []SmsList
sl = append(sl, SmsList{M: mobileStr, C: sendMobiles[ccid]["content"].(string), F: 8})
sd := Sms{Sid: sendMobiles[ccid]["sid"].(int), Data: sl, Token: token} //插入批此重复数据
sendSMSDataJson, _ := json.Marshal(sd) if len(dataBatchDuplicate) > 0 {
smsApi("appendbatchdata", string(sendSMSDataJson)) err = db.CreateInBatches(dataBatchDuplicate, insertSize).Error
if err != nil {
applogger.Error(fmt.Sprintf("插入重复数据失败,文件名:%s错误信息%v", fileName, err))
} else {
dataBatchDuplicate = nil
} }
} }
//关闭批次 elapsed := time.Since(start)
sf := SmsFinish{Sid: sendMobiles[ccid]["sid"].(int), Token: token}
sfJson, _ := json.Marshal(sf)
smsApi("finishbatch", string(sfJson))
//插入批次处理数据 //发送提醒邮件
bpi := []BatchProcessingInformation{} subject := "丝芙兰数据包处理完成"
bpi = append(bpi, BatchProcessingInformation{ body := fmt.Sprintf("总数:%d;\n数据包%s;\n过滤重复数%d;\n过滤后总数%d;\n处理完成请前往管理平台查看处理。", count+bi, fileName, bi, count)
CommunicationChannelID: ccid, SendEmail(subject, body) //发送邮件
RepeatTargetsMember: duplicateCount[ccid], applogger.Info(fmt.Sprintf("%s数据包 入库完成", fileName))
InsertsTargetsMember: insertsCount[ccid], applogger.Info(fmt.Sprintf("%s数据包执行时间:%s 插入数据:%d条 过滤数数:%d条", fileName, elapsed, count, bi))
DataFileName: fileName, return 0
}) } else { //如果没有查询到批次信息,跳过处理
err = db.CreateInBatches(bpi, insertSize).Error return -1
if err != nil {
applogger.Error(fmt.Sprintf("插入批次处理信息失败,文件名:%s错误信息%v", fileName, err))
}
} }
wg.Wait() //所有入库全部完成
//插入批此重复数据
if len(dataBatchDuplicate) > 0 {
err = db.CreateInBatches(dataBatchDuplicate, insertSize).Error
if err != nil {
applogger.Error(fmt.Sprintf("插入重复数据失败,文件名:%s错误信息%v", fileName, err))
} else {
dataBatchDuplicate = nil
}
}
elapsed := time.Since(start)
//发送提醒邮件
subject := "丝芙兰数据包处理完成"
body := "数据包:" + fileName + ";\n总数" + strconv.Itoa(count+bi) + ";\n过滤重复数" + strconv.Itoa(bi) + ";\n过滤后总数" + strconv.Itoa(count) + ";\n处理完成请前往管理平台查看处理。"
SendEmail(subject, body) //发送邮件
applogger.Info(fmt.Sprintf("%s数据包 入库完成", fileName))
applogger.Info(fmt.Sprintf("%s数据包执行时间:%s 插入数据:%d条 过滤数数:%d条", fileName, elapsed, count, bi))
} }
} }
@ -998,3 +1032,10 @@ type SmsFinish struct {
Sid int `json:"sid"` Sid int `json:"sid"`
Token string `json:"token"` Token string `json:"token"`
} }
type SmsData struct {
Sid int
IsPersonalizedMsg bool
Content string
Mobiles []string
SmsList []SmsList
}