lastCall 相关逻辑完善,增加 lastCall 排重计数器

This commit is contained in:
chejiulong 2023-03-07 18:23:07 +08:00
parent 44810361a0
commit dd7f2b7038
3 changed files with 69 additions and 58 deletions

Binary file not shown.

Binary file not shown.

127
main.go
View File

@ -163,11 +163,11 @@ func handleWebSocket(w http.ResponseWriter, r *http.Request) {
conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端 conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端
SendEmail(subject, body) //发送邮件 SendEmail(subject, body) //发送邮件
} else { } else {
batchInsert(task.TaskData.BatchFilename, true) //创建批次 batchInsert(task.TaskData.BatchFilename, true, task.TaskData.ExcludedFilename) //创建批次
returnMessage = []byte(`{"code": 2000,"err": "批次创建完成"}`) returnMessage = []byte(`{"code": 2000,"err": "批次创建完成"}`)
conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端 conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端
batchDataInsert(task.TaskData.DataFilename, lastCallKeys) //添加数据 batchDataInsert(task.TaskData.DataFilename, lastCallKeys, task.TaskData.ExcludedFilename) //添加数据
redisClient.Del("iniLastCallDataStatus") //删除任务执行中标记 redisClient.Del("iniLastCallDataStatus") //删除任务执行中标记
returnMessage = []byte(`{"code": 2000,"err": "结束处理lastCall"}`) returnMessage = []byte(`{"code": 2000,"err": "结束处理lastCall"}`)
conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端 conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端
} }
@ -191,8 +191,6 @@ func handleWebSocket(w http.ResponseWriter, r *http.Request) {
} }
func verify_signature(signature string, nonce string, timestamp int64, data interface{}) bool { func verify_signature(signature string, nonce string, timestamp int64, data interface{}) bool {
key := "qZ6v1&H#Wjx+yRm2D@*sJF$tnfL83Ia"
fmt.Printf("Received signature: %s\n", signature) fmt.Printf("Received signature: %s\n", signature)
fmt.Printf("Received timestamp: %d\n", timestamp) fmt.Printf("Received timestamp: %d\n", timestamp)
fmt.Printf("Received nonce: %s\n", nonce) fmt.Printf("Received nonce: %s\n", nonce)
@ -212,7 +210,7 @@ func verify_signature(signature string, nonce string, timestamp int64, data inte
expected_data := fmt.Sprintf("%d|%s|%s", received_timestamp, received_nonce, received_data) expected_data := fmt.Sprintf("%d|%s|%s", received_timestamp, received_nonce, received_data)
fmt.Printf("Expected data: %s\n", expected_data) fmt.Printf("Expected data: %s\n", expected_data)
mac := hmac.New(sha256.New, []byte(key)) mac := hmac.New(sha256.New, []byte(verifySignatureKey))
mac.Write([]byte(expected_data)) mac.Write([]byte(expected_data))
expected_signature := hex.EncodeToString(mac.Sum(nil)) expected_signature := hex.EncodeToString(mac.Sum(nil))
fmt.Printf("Expected signature: %s\n", expected_signature) fmt.Printf("Expected signature: %s\n", expected_signature)
@ -378,7 +376,7 @@ func downloadDecompression() {
continue continue
} }
applogger.Info(fmt.Sprintf("%s数据包解压完成", zipFile.Name)) applogger.Info(fmt.Sprintf("%s数据包解压完成", zipFile.Name))
processingStatus = batchDataInsert(zipFile.Name, nil) processingStatus = batchDataInsert(zipFile.Name, nil, "")
} }
} else if filepath.Ext(file.Name()) == ".txt" { } else if filepath.Ext(file.Name()) == ".txt" {
srcFile, err := sftpClient.Open(path.Join(sftpDir, file.Name())) srcFile, err := sftpClient.Open(path.Join(sftpDir, file.Name()))
@ -404,7 +402,7 @@ func downloadDecompression() {
continue continue
} }
applogger.Info(fmt.Sprintf("%s批次文件下载完成", file.Name())) applogger.Info(fmt.Sprintf("%s批次文件下载完成", file.Name()))
processingStatus = batchInsert(file.Name(), false) processingStatus = batchInsert(file.Name(), false, "")
} }
if processingStatus != -1 { if processingStatus != -1 {
err = redisClient.Set(fileKey, 1, 0).Err() //写入文件处理完成 err = redisClient.Set(fileKey, 1, 0).Err() //写入文件处理完成
@ -421,7 +419,7 @@ func downloadDecompression() {
} }
// 批次入库 // 批次入库
func batchInsert(fileName string, isLastCall bool) int { func batchInsert(fileName string, isLastCall bool, excludedFilename string) int {
//fmt.Print("批次处理开始") //fmt.Print("批次处理开始")
start := time.Now() start := time.Now()
db, _ := connectToDB() db, _ := connectToDB()
@ -450,8 +448,8 @@ func batchInsert(fileName string, isLastCall bool) int {
var batchName, dataFileName string var batchName, dataFileName string
if isLastCall { if isLastCall {
batchName = fmt.Sprintf("lastCall-%s-%s", record[1], strconv.Itoa(int(communicationChannelID))) batchName = fmt.Sprintf("lastCall-%s-%s-%s", record[1], excludedFilename, strconv.Itoa(int(communicationChannelID)))
dataFileName = fmt.Sprintf("lastCall-%s", fileName) dataFileName = fmt.Sprintf("lastCall-%s-%s", excludedFilename, fileName)
} else { } else {
batchName = fmt.Sprintf("%s-%s", record[1], strconv.Itoa(int(communicationChannelID))) batchName = fmt.Sprintf("%s-%s", record[1], strconv.Itoa(int(communicationChannelID)))
dataFileName = fileName dataFileName = fileName
@ -568,7 +566,7 @@ func smsApi(method string, sendSMSDataJson string) (int, error) {
} }
} }
func batchDataInsert(fileName string, lastCallKeys map[string]bool) int { func batchDataInsert(fileName string, lastCallKeys map[string]bool, excludedFilename string) int {
start := time.Now() start := time.Now()
// Open file // Open file
fileKey := fmt.Sprintf("downloaded:%s", strings.Replace(fileName, filepath.Ext(fileName), ".zip", 1)) fileKey := fmt.Sprintf("downloaded:%s", strings.Replace(fileName, filepath.Ext(fileName), ".zip", 1))
@ -614,11 +612,12 @@ func batchDataInsert(fileName string, lastCallKeys map[string]bool) int {
batches := []Batch{} //查询批次 batches := []Batch{} //查询批次
fileNameDate := "" fileNameDate := ""
dataFileName := "" dataFileName := ""
fields := strings.Split(fileName, "_")
datetime := fields[len(fields)-1]
if lastCallKeys != nil { if lastCallKeys != nil {
fields := strings.Split(fileName, "_") newFileName := "Communication_definition_SMS_1_wemedia_" + datetime
datetime := fields[len(fields)-1] fileNameDate = fmt.Sprintf("lastCall-%s-%s", excludedFilename, newFileName)
fileNameDate = datetime[:8] fmt.Printf("fileNameDate : %s\n", fileNameDate)
// 模糊查询文件名包含“20230103”字符串的批次记录 // 模糊查询文件名包含“20230103”字符串的批次记录
db.Table("batches b1"). db.Table("batches b1").
Select("b1.*"). Select("b1.*").
@ -626,12 +625,12 @@ func batchDataInsert(fileName string, lastCallKeys map[string]bool) int {
Joins("INNER JOIN (SELECT communication_channel_id, MAX(created_at) AS created_at FROM batches GROUP BY communication_channel_id) b2 ON b1.communication_channel_id = b2.communication_channel_id AND b1.created_at = b2.created_at"). Joins("INNER JOIN (SELECT communication_channel_id, MAX(created_at) AS created_at FROM batches GROUP BY communication_channel_id) b2 ON b1.communication_channel_id = b2.communication_channel_id AND b1.created_at = b2.created_at").
Order("b1.created_at desc"). Order("b1.created_at desc").
Find(&batches) Find(&batches)
dataFileName = fmt.Sprintf("lastCall-%s", fileName) dataFileName = fmt.Sprintf("lastCall-%s-%s", excludedFilename, fileName)
} else { } else {
fileName = fmt.Sprintf("lastCall-%s", fileName) fileNameDate = datetime[:8]
db.Table("batches b1"). db.Table("batches b1").
Select("b1.*"). Select("b1.*").
Where("b1.data_file_name = ?", fileNameDate). Where("b1.data_file_name LIKE ?", "%"+fileNameDate+"%").
Joins("INNER JOIN (SELECT communication_channel_id, MAX(created_at) AS created_at FROM batches GROUP BY communication_channel_id) b2 ON b1.communication_channel_id = b2.communication_channel_id AND b1.created_at = b2.created_at"). Joins("INNER JOIN (SELECT communication_channel_id, MAX(created_at) AS created_at FROM batches GROUP BY communication_channel_id) b2 ON b1.communication_channel_id = b2.communication_channel_id AND b1.created_at = b2.created_at").
Order("b1.created_at desc"). Order("b1.created_at desc").
Find(&batches) Find(&batches)
@ -639,9 +638,10 @@ func batchDataInsert(fileName string, lastCallKeys map[string]bool) int {
} }
batchCount := len(batches) batchCount := len(batches)
sendMobiles := make(map[string]SmsData, batchCount) //短信数据,用于提交发送平台 sendMobiles := make(map[string]SmsData, batchCount) //短信数据,用于提交发送平台
duplicateCount := make(map[string]int, batchCount) //按批次重复数 duplicateCount := make(map[string]int, batchCount) //按批次重复数
insertsCount := make(map[string]int, batchCount) //按批次插入数 lastCallduplicateCount := make(map[string]int, batchCount) //按批次重复数
insertsCount := make(map[string]int, batchCount) //按批次插入数
ccids := make(map[string]bool, batchCount) ccids := make(map[string]bool, batchCount)
if batchCount > 0 { //如果查询到数据需要的批次信息,开始处理数据 if batchCount > 0 { //如果查询到数据需要的批次信息,开始处理数据
@ -680,6 +680,9 @@ func batchDataInsert(fileName string, lastCallKeys map[string]bool) int {
if _, ok := duplicateCount[row[2]]; !ok { if _, ok := duplicateCount[row[2]]; !ok {
duplicateCount[row[2]] = 0 duplicateCount[row[2]] = 0
} }
if _, ok := lastCallduplicateCount[row[2]]; !ok {
lastCallduplicateCount[row[2]] = 0
}
// Check if record exists in hashset // Check if record exists in hashset
key := fmt.Sprintf("%s-%s-%s", row[2], row[3], row[5]) key := fmt.Sprintf("%s-%s-%s", row[2], row[3], row[5])
@ -704,6 +707,7 @@ func batchDataInsert(fileName string, lastCallKeys map[string]bool) int {
continue continue
} }
if lastCallKeys != nil && lastCallKeys[fmt.Sprintf("%s-%s", row[2], row[3])] { if lastCallKeys != nil && lastCallKeys[fmt.Sprintf("%s-%s", row[2], row[3])] {
lastCallduplicateCount[row[2]]++
continue continue
} }
@ -837,10 +841,11 @@ func batchDataInsert(fileName string, lastCallKeys map[string]bool) int {
smsApi("finishbatch", string(sfJson)) smsApi("finishbatch", string(sfJson))
bpi := []BatchProcessingInformation{} bpi := []BatchProcessingInformation{}
bpi = append(bpi, BatchProcessingInformation{ bpi = append(bpi, BatchProcessingInformation{
CommunicationChannelID: ccid, CommunicationChannelID: ccid,
RepeatTargetsMember: duplicateCount[ccid], RepeatTargetsMember: duplicateCount[ccid],
InsertsTargetsMember: insertsCount[ccid], LastCallRepeatTargetsMember: lastCallduplicateCount[ccid],
DataFileName: fileName, InsertsTargetsMember: insertsCount[ccid],
DataFileName: dataFileName,
}) })
err = db.CreateInBatches(bpi, insertSize).Error err = db.CreateInBatches(bpi, insertSize).Error
if err != nil { if err != nil {
@ -980,6 +985,8 @@ func iniConfi() {
to = []string{"chejiulong@wemediacn.com"} to = []string{"chejiulong@wemediacn.com"}
token = "7100477930234217" token = "7100477930234217"
lastCallPath = "RawData/LastCall" lastCallPath = "RawData/LastCall"
verifySignatureKey = "qZ6v1&H#Wjx+yRm2D@*sJF$tnfL83Ia"
case "prod": case "prod":
//fmt.Print("正式环境配置已生效\n") //fmt.Print("正式环境配置已生效\n")
redisAddress = "r-bp11564d96842414128.redis.rds.aliyuncs.com:6379" redisAddress = "r-bp11564d96842414128.redis.rds.aliyuncs.com:6379"
@ -1004,6 +1011,8 @@ func iniConfi() {
to = []string{"chejiulong@wemediacn.com", "xiayujuan@wemediacn.com", "wangyuanbing@wemediacn.com", "tangweiqi@wemediacn.com"} to = []string{"chejiulong@wemediacn.com", "xiayujuan@wemediacn.com", "wangyuanbing@wemediacn.com", "tangweiqi@wemediacn.com"}
token = "7100477930234217" token = "7100477930234217"
lastCallPath = "RawData/LastCall" lastCallPath = "RawData/LastCall"
verifySignatureKey = "Lzxf6asgwFRTsANZ8gkXJnP3EmOyNO#&"
default: default:
panic(fmt.Errorf("无效的运行模式: %s", env)) panic(fmt.Errorf("无效的运行模式: %s", env))
} }
@ -1061,32 +1070,33 @@ func (f FileSorter) Less(i, j int) bool {
} }
var ( //初始化变量 var ( //初始化变量
env string env string
applogger *logrus.Logger applogger *logrus.Logger
redisClient *redis.Client redisClient *redis.Client
executableDir string executableDir string
redisAddress string redisAddress string
redisPassword string redisPassword string
redisDB int redisDB int
sftpAddress string sftpAddress string
sftpUser string sftpUser string
sftpPassword string sftpPassword string
sftpDir string sftpDir string
dbAddress string dbAddress string
dbUser string dbUser string
dbPassword string dbPassword string
dbName string dbName string
zipPath string zipPath string
txtPath string txtPath string
logPath string logPath string
batchSize int //提交数据 batchSize int //提交数据
insertSize int //一次性入库 insertSize int //一次性入库
insertChanSize int //通道缓冲数 insertChanSize int //通道缓冲数
goSize int //协程数 goSize int //协程数
taskTime int taskTime int
to []string to []string
token string token string
lastCallPath string lastCallPath string
verifySignatureKey string
) )
type Batch struct { type Batch struct {
@ -1115,11 +1125,12 @@ func (BatcheData) TableName() string {
} }
type BatchProcessingInformation struct { type BatchProcessingInformation struct {
ID uint `gorm:"primaryKey;autoIncrement"` ID uint `gorm:"primaryKey;autoIncrement"`
CommunicationChannelID string `gorm:"column:communication_channel_id"` CommunicationChannelID string `gorm:"column:communication_channel_id"`
RepeatTargetsMember int `gorm:"column:repeat_targets_member"` RepeatTargetsMember int `gorm:"column:repeat_targets_member"`
InsertsTargetsMember int `gorm:"column:inserts_targets_member"` LastCallRepeatTargetsMember int `gorm:"column:last_call_repeat_targets_member"`
DataFileName string `gorm:"column:data_file_name"` InsertsTargetsMember int `gorm:"column:inserts_targets_member"`
DataFileName string `gorm:"column:data_file_name"`
} }
func (BatchProcessingInformation) TableName() string { func (BatchProcessingInformation) TableName() string {