增加lastCall 的 WebSocket 服务

This commit is contained in:
chejiulong 2023-03-06 19:38:28 +08:00
parent e464a2dee3
commit 495fd10d45
2 changed files with 153 additions and 37 deletions

Binary file not shown.

190
main.go
View File

@ -103,22 +103,70 @@ func handleWebSocket(w http.ResponseWriter, r *http.Request) {
return return
} }
fmt.Println("收到消息:", string(message)) fmt.Println("收到消息:", string(message))
if string(message) == "a" {
fmt.Println("触发合并操作") task := Task{}
message := []byte("触发合并操作") var returnMessage []byte
err := conn.WriteMessage(websocket.TextMessage, message) //发送消息给客户端 err = json.Unmarshal([]byte(message), &task)
if err != nil { if err != nil {
fmt.Println("发送消息失败:", err) returnMessage = []byte(`{"code": 2001,"err": "json指令解析失败"}`)
delete(clients, c) conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端
return } else {
switch task.Command {
case "lastCall":
if !fileExists(path.Join(executableDir, lastCallPath, task.ExcludedFilename)) {
returnMessage = []byte(`{"code": 2003,"err": "task.ExcludedFilename 不存在"}`)
conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端
break
}
if !fileExists(path.Join(executableDir, txtPath, task.BatchFilename)) {
returnMessage = []byte(`{"code": 2004,"err": "task.BatchFilename 不存在"}`)
conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端
break
}
if !fileExists(path.Join(executableDir, txtPath, task.DataFilename)) {
returnMessage = []byte(`{"code": 2005,"err": "task.DataFilename 不存在"}`)
conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端
break
}
returnMessage = []byte(`{"code": 2000,"err": "开始处理lastCall"}`)
conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端
lastCallKeys, err := readExcludedFile(path.Join(executableDir, lastCallPath, task.ExcludedFilename))
if err != nil {
returnMessage = []byte(`{"code": 2006,"err": "打开ExcludedFilename失败"}`)
conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端
break
} else {
returnMessage = []byte(`{"code": 2000,"err": "ExcludedFilename读取完成"}`)
conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端
}
if exists, _ := redisClient.Exists("iniLastCallDataStatus").Result(); exists == 1 {
returnMessage = []byte(`{"code": 2007,"err": "有lastCall执行中请稍后重试"}`)
conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端
} else {
//无错误,执行逻辑代码,
subject := "丝芙兰短信处理程序异常"
err := redisClient.Set("iniLastCallDataStatus", 1, 0).Err()
if err != nil {
body := fmt.Sprintf("写入lastCall任务执行中标记失败%v", err)
applogger.Error(body)
returnMessage = []byte(fmt.Sprintf(`{"code": 2008,"err": "写入lastCall任务执行中标记失败%v"}`, err))
conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端
SendEmail(subject, body) //发送邮件
} else {
batchInsert(task.BatchFilename, true) //创建批次
returnMessage = []byte(`{"code": 2000,"err": "批次创建完成"}`)
conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端
batchDataInsert(task.DataFilename, lastCallKeys) //添加数据
redisClient.Del("iniLastCallDataStatus") //删除任务执行中标记
returnMessage = []byte(`{"code": 2000,"err": "结束处理lastCall"}`)
conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端
}
}
default:
returnMessage = []byte(`{"code": 2002,"err": "task.Command 不存在"}`)
conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端
} }
} else if string(message) == "c" {
for c := range clients {
fmt.Printf("客户端 %s 在线\n", c.conn.RemoteAddr().String())
}
} else if string(message) == "all" {
message = []byte("所有的客户端们你们好!")
go broadcast(message)
} }
} }
}() }()
@ -133,6 +181,26 @@ func handleWebSocket(w http.ResponseWriter, r *http.Request) {
} }
func readExcludedFile(filename string) (map[string]bool, error) {
file, err := os.Open(filename)
if err != nil {
return nil, err
}
defer file.Close()
scanner := bufio.NewScanner(file)
scanner.Split(bufio.ScanLines)
scanner.Scan() // skip first line
lastCallKeys := make(map[string]bool)
for scanner.Scan() {
line := scanner.Text()
fields := strings.Split(line, ",")
lastCallKey := fmt.Sprintf("%s-%s", fields[0], fields[1])
lastCallKeys[lastCallKey] = true
}
return lastCallKeys, nil
}
/*
// 聊天室消息广播 // 聊天室消息广播
func broadcast(message []byte) { func broadcast(message []byte) {
for c := range clients { for c := range clients {
@ -142,7 +210,7 @@ func broadcast(message []byte) {
delete(clients, c) delete(clients, c)
} }
} }
} }*/
func downloadDecompression() { func downloadDecompression() {
if exists, _ := redisClient.Exists("iniDataStatus").Result(); exists == 1 { if exists, _ := redisClient.Exists("iniDataStatus").Result(); exists == 1 {
@ -266,7 +334,7 @@ func downloadDecompression() {
continue continue
} }
applogger.Info(fmt.Sprintf("%s数据包解压完成", zipFile.Name)) applogger.Info(fmt.Sprintf("%s数据包解压完成", zipFile.Name))
processingStatus = batchDataInsert(zipFile.Name) processingStatus = batchDataInsert(zipFile.Name, nil)
} }
} else if filepath.Ext(file.Name()) == ".txt" { } else if filepath.Ext(file.Name()) == ".txt" {
srcFile, err := sftpClient.Open(path.Join(sftpDir, file.Name())) srcFile, err := sftpClient.Open(path.Join(sftpDir, file.Name()))
@ -292,7 +360,7 @@ func downloadDecompression() {
continue continue
} }
applogger.Info(fmt.Sprintf("%s批次文件下载完成", file.Name())) applogger.Info(fmt.Sprintf("%s批次文件下载完成", file.Name()))
processingStatus = batchInsert(file.Name()) processingStatus = batchInsert(file.Name(), false)
} }
if processingStatus != -1 { if processingStatus != -1 {
err = redisClient.Set(fileKey, 1, 0).Err() //写入文件处理完成 err = redisClient.Set(fileKey, 1, 0).Err() //写入文件处理完成
@ -309,7 +377,7 @@ func downloadDecompression() {
} }
// 批次入库 // 批次入库
func batchInsert(fileName string) int { func batchInsert(fileName string, isLastCall bool) int {
//fmt.Print("批次处理开始") //fmt.Print("批次处理开始")
start := time.Now() start := time.Now()
db, _ := connectToDB() db, _ := connectToDB()
@ -335,9 +403,18 @@ func batchInsert(fileName string) int {
t := time.Now() t := time.Now()
s := t.Format("2006-01-02 15:04:05") s := t.Format("2006-01-02 15:04:05")
//key := var batchName, dataFileName string
if isLastCall {
batchName = fmt.Sprintf("lastCall-%s-%s", record[1], strconv.Itoa(int(communicationChannelID)))
dataFileName = fmt.Sprintf("lastCall-%s", fileName)
} else {
batchName = fmt.Sprintf("%s-%s", record[1], strconv.Itoa(int(communicationChannelID)))
dataFileName = fileName
}
batchParams := BatchParams{ batchParams := BatchParams{
BatchName: fmt.Sprintf("%s-%s", record[1], strconv.Itoa(int(communicationChannelID))), BatchName: batchName,
BatchDesc: record[4], BatchDesc: record[4],
IsPersonal: 0, IsPersonal: 0,
Message: record[4], Message: record[4],
@ -357,12 +434,12 @@ func batchInsert(fileName string) int {
batch := Batch{ batch := Batch{
CommunicationChannelID: uint(communicationChannelID), CommunicationChannelID: uint(communicationChannelID),
CommunicationName: record[1], CommunicationName: batchName,
TargetsMember: uint(TargetsMember), TargetsMember: uint(TargetsMember),
TemplateID: uint(templateID), TemplateID: uint(templateID),
Content: record[4], Content: record[4],
Status: status, Status: status,
DataFileName: fileName, DataFileName: dataFileName,
Sid: sid, Sid: sid,
} }
db.Create(&batch) db.Create(&batch)
@ -447,7 +524,7 @@ func smsApi(method string, sendSMSDataJson string) (int, error) {
} }
} }
func batchDataInsert(fileName string) int { func batchDataInsert(fileName string, lastCallKeys map[string]bool) int {
start := time.Now() start := time.Now()
// Open file // Open file
fileKey := fmt.Sprintf("downloaded:%s", strings.Replace(fileName, filepath.Ext(fileName), ".zip", 1)) fileKey := fmt.Sprintf("downloaded:%s", strings.Replace(fileName, filepath.Ext(fileName), ".zip", 1))
@ -478,7 +555,6 @@ func batchDataInsert(fileName string) int {
} else { } else {
break break
} }
} }
} }
wg.Done() wg.Done()
@ -492,17 +568,31 @@ func batchDataInsert(fileName string) int {
count := 0 // 总数 count := 0 // 总数
//数据文件中批次 //数据文件中批次
batches := []Batch{} //查询批次 batches := []Batch{} //查询批次
fileNameDate := ""
dataFileName := ""
fields := strings.Split(fileName, "_") if lastCallKeys != nil {
datetime := fields[len(fields)-1] fields := strings.Split(fileName, "_")
fileNameDate := datetime[:8] datetime := fields[len(fields)-1]
// 模糊查询文件名包含“20230103”字符串的批次记录 fileNameDate = datetime[:8]
db.Table("batches b1"). // 模糊查询文件名包含“20230103”字符串的批次记录
Select("b1.*"). db.Table("batches b1").
Where("b1.data_file_name LIKE ?", "%"+fileNameDate+"%"). Select("b1.*").
Joins("INNER JOIN (SELECT communication_channel_id, MAX(created_at) AS created_at FROM batches GROUP BY communication_channel_id) b2 ON b1.communication_channel_id = b2.communication_channel_id AND b1.created_at = b2.created_at"). Where("b1.data_file_name LIKE ?", "%"+fileNameDate+"%").
Order("b1.created_at desc"). Joins("INNER JOIN (SELECT communication_channel_id, MAX(created_at) AS created_at FROM batches GROUP BY communication_channel_id) b2 ON b1.communication_channel_id = b2.communication_channel_id AND b1.created_at = b2.created_at").
Find(&batches) Order("b1.created_at desc").
Find(&batches)
dataFileName = fmt.Sprintf("lastCall-%s", fileName)
} else {
fileName = fmt.Sprintf("lastCall-%s", fileName)
db.Table("batches b1").
Select("b1.*").
Where("b1.data_file_name = ?", fileNameDate).
Joins("INNER JOIN (SELECT communication_channel_id, MAX(created_at) AS created_at FROM batches GROUP BY communication_channel_id) b2 ON b1.communication_channel_id = b2.communication_channel_id AND b1.created_at = b2.created_at").
Order("b1.created_at desc").
Find(&batches)
dataFileName = fileName
}
batchCount := len(batches) batchCount := len(batches)
sendMobiles := make(map[string]SmsData, batchCount) //短信数据,用于提交发送平台 sendMobiles := make(map[string]SmsData, batchCount) //短信数据,用于提交发送平台
@ -535,7 +625,7 @@ func batchDataInsert(fileName string) int {
"ReservedField3": row[7], "ReservedField3": row[7],
"ReservedField4": row[8], "ReservedField4": row[8],
"ReservedField5": row[9], "ReservedField5": row[9],
"DataFileName": fileName, "DataFileName": dataFileName,
"FullName": row[4], "FullName": row[4],
} }
reservedFieldsJson, err := json.Marshal(reservedFields) //个性化字段转json reservedFieldsJson, err := json.Marshal(reservedFields) //个性化字段转json
@ -569,6 +659,10 @@ func batchDataInsert(fileName string) int {
} }
continue continue
} }
if lastCallKeys != nil && lastCallKeys[fmt.Sprintf("%s-%s", row[2], row[3])] {
continue
}
// Add record to hashset // Add record to hashset
hs[key] = true hs[key] = true
ccids[row[2]] = true ccids[row[2]] = true
@ -584,7 +678,7 @@ func batchDataInsert(fileName string) int {
} }
if batches, ok := result[row[2]]; ok && len(batches) > 0 && batches[0].Content != "" { if batches, ok := result[row[2]]; ok && len(batches) > 0 && batches[0].Content != "" {
content := batches[0].Content content := fmt.Sprintf("%s /", batches[0].Content)
Sid := batches[0].Sid Sid := batches[0].Sid
pattern := regexp.MustCompile(`\{.*\}`) pattern := regexp.MustCompile(`\{.*\}`)
matched := pattern.MatchString(content) matched := pattern.MatchString(content)
@ -763,6 +857,14 @@ func connectToDB() (*gorm.DB, error) {
return db, nil return db, nil
} }
func fileExists(filename string) bool {
info, err := os.Stat(filename)
if os.IsNotExist(err) {
return false
}
return !info.IsDir()
}
func iniPath() { func iniPath() {
// 获取可执行文件所在的路径 // 获取可执行文件所在的路径
executablePath, err := os.Executable() executablePath, err := os.Executable()
@ -780,6 +882,10 @@ func iniPath() {
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
err = os.MkdirAll(filepath.Join(executableDir, lastCallPath), 0755)
if err != nil {
log.Fatal(err)
}
err = os.MkdirAll(filepath.Join(executableDir, logPath, time.Now().Format("2006_01")), 0755) err = os.MkdirAll(filepath.Join(executableDir, logPath, time.Now().Format("2006_01")), 0755)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
@ -829,6 +935,7 @@ func iniConfi() {
taskTime = 1 taskTime = 1
to = []string{"chejiulong@wemediacn.com"} to = []string{"chejiulong@wemediacn.com"}
token = "7100477930234217" token = "7100477930234217"
lastCallPath = "RawData/LastCall"
case "prod": case "prod":
//fmt.Print("正式环境配置已生效\n") //fmt.Print("正式环境配置已生效\n")
redisAddress = "r-bp11564d96842414128.redis.rds.aliyuncs.com:6379" redisAddress = "r-bp11564d96842414128.redis.rds.aliyuncs.com:6379"
@ -852,6 +959,7 @@ func iniConfi() {
taskTime = 60 taskTime = 60
to = []string{"chejiulong@wemediacn.com", "xiayujuan@wemediacn.com", "wangyuanbing@wemediacn.com", "tangweiqi@wemediacn.com"} to = []string{"chejiulong@wemediacn.com", "xiayujuan@wemediacn.com", "wangyuanbing@wemediacn.com", "tangweiqi@wemediacn.com"}
token = "7100477930234217" token = "7100477930234217"
lastCallPath = "RawData/LastCall"
default: default:
panic(fmt.Errorf("无效的运行模式: %s", env)) panic(fmt.Errorf("无效的运行模式: %s", env))
} }
@ -934,6 +1042,7 @@ var ( //初始化变量
taskTime int taskTime int
to []string to []string
token string token string
lastCallPath string
) )
type Batch struct { type Batch struct {
@ -1035,3 +1144,10 @@ type SmsData struct {
Mobiles []string Mobiles []string
SmsList []SmsList SmsList []SmsList
} }
type Task struct {
Command string `json:"command"`
ExcludedFilename string `json:"excluded_filename"`
BatchFilename string `json:"batch_filename"`
DataFilename string `json:"data_filename"`
}