重构queryBatchState

This commit is contained in:
chejiulong 2023-03-15 16:59:09 +08:00
parent dc25ff389d
commit 7c83e5c87d
3 changed files with 113 additions and 113 deletions

Binary file not shown.

Binary file not shown.

226
main.go
View File

@ -57,8 +57,8 @@ func init() {
} }
func main() { func main() {
ticker := time.NewTicker(time.Duration(taskTime) * time.Minute) // 数据下载、处理 ticker := time.NewTicker(time.Duration(taskTime) * time.Minute) // 数据下载、处理
ticker_merge := time.NewTicker(5 * time.Minute) //名单合并 ticker_merge := time.NewTicker(time.Duration(batchStatusTaskTime) * time.Minute) //查询批次
defer ticker.Stop() defer ticker.Stop()
defer ticker_merge.Stop() defer ticker_merge.Stop()
@ -434,9 +434,7 @@ func downloadDecompression() {
} }
} }
// 批次入库
func batchInsert(fileName string, isLastCall bool, excludedFilename string) int { func batchInsert(fileName string, isLastCall bool, excludedFilename string) int {
//fmt.Print("批次处理开始")
start := time.Now() start := time.Now()
db, _ := connectToDB() db, _ := connectToDB()
file, err := os.Open(path.Join(executableDir, txtPath, fileName)) file, err := os.Open(path.Join(executableDir, txtPath, fileName))
@ -450,13 +448,11 @@ func batchInsert(fileName string, isLastCall bool, excludedFilename string) int
for { for {
record, err := reader.Read() record, err := reader.Read()
if err != nil { if err != nil {
//return -1
break break
} }
//communicationChannelID, _ := strconv.ParseUint(record[0], 10, 32) TargetsMember, _ := strconv.ParseUint(strings.TrimSpace(record[2]), 10, 32)
TargetsMember, _ := strconv.ParseUint(record[2], 10, 32) templateID, _ := strconv.ParseUint(strings.TrimSpace(record[3]), 10, 32)
templateID, _ := strconv.ParseUint(record[3], 10, 32)
status := 1 status := 1
t := time.Now() t := time.Now()
@ -482,13 +478,11 @@ func batchInsert(fileName string, isLastCall bool, excludedFilename string) int
Token: token, Token: token,
} }
// 调用发送短信批次接口
sid, err := CreateBatch(batchParams) sid, err := CreateBatch(batchParams)
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
return -1 return -1
} }
fmt.Println(sid)
batch := Batch{ batch := Batch{
CommunicationChannelID: record[0], CommunicationChannelID: record[0],
@ -508,7 +502,7 @@ func batchInsert(fileName string, isLastCall bool, excludedFilename string) int
subject := "丝芙兰批次文件处理完成" subject := "丝芙兰批次文件处理完成"
body := fmt.Sprintf("批次数:%d;\n批次文件%s;\n处理完成请前往管理平台查看处理。", batchRows, fileName) body := fmt.Sprintf("批次数:%d;\n批次文件%s;\n处理完成请前往管理平台查看处理。", batchRows, fileName)
SendEmail(subject, body) //发送邮件 SendEmail(subject, body)
applogger.Info(fmt.Sprintf("%s批次文件入库完成", fileName)) applogger.Info(fmt.Sprintf("%s批次文件入库完成", fileName))
applogger.Info(fmt.Sprintf("%s批次文件执行时间%s 插入批次次数:%d", fileName, elapsed, batchRows)) applogger.Info(fmt.Sprintf("%s批次文件执行时间%s 插入批次次数:%d", fileName, elapsed, batchRows))
@ -900,82 +894,84 @@ func batchDataInsert(fileName string, lastCallKeys map[string]bool, excludedFile
} }
func queryBatchState() { func queryBatchState() {
db, _ := connectToDB() db, err := connectToDB()
//if err := db.AutoMigrate(&Batch{}); err != nil { if err != nil {
//applogger.Error(fmt.Sprintf("AutoMigrate失败%s", err)) handleError(err, "数据库连接失败")
//} return
}
var batches []Batch var batches []Batch
if err := db.Where("status NOT IN (?)", []int{6, 8}).Find(&batches).Error; err != nil { err = db.Where("status NOT IN (?)", []int{6, 8}).Find(&batches).Error
applogger.Error(fmt.Sprintf("查询批次状态失败:%s", err)) handleError(err, "查询批次状态失败")
} else { if err != nil {
for _, batch := range batches { return
sf := SmsFinish{Sid: batch.Sid, Token: token} }
sfJson, _ := json.Marshal(sf)
url := "http://www.wemediacn.net/webservice/BatchService?service=sms.querybatchstate" for _, batch := range batches {
resp, err := http.Post(url, "application/json; charset=utf-8", strings.NewReader(string(sfJson))) sf := SmsFinish{Sid: batch.Sid, Token: token}
if err != nil { sfJson, _ := json.Marshal(sf)
applogger.Error(fmt.Sprintf("查询批次状态失败:%s", err)) url := "http://www.wemediacn.net/webservice/BatchService?service=sms.querybatchstate"
} else { resp, err := http.Post(url, "application/json; charset=utf-8", strings.NewReader(string(sfJson)))
// 解析响应数据 if err != nil {
var retobj map[string]interface{} handleError(err, "查询批次状态失败")
err = json.NewDecoder(resp.Body).Decode(&retobj) continue
if err != nil {
fmt.Println(err)
}
jsonStr, err := json.Marshal(retobj)
if err != nil {
fmt.Println(err)
}
fmt.Printf("查询批次信息API 返回:%s\n", string(jsonStr))
fmt.Print("\n")
fmt.Print("\n")
code := int(retobj["code"].(float64))
if code == 0 {
status := int(retobj["state"].(float64))
if batch.Status != status || status == 5 {
fmt.Println(batch)
fmt.Print("\n")
fmt.Print("状态不一致或者发送中,更新状态\n")
updates := map[string]interface{}{
"status": status,
}
if endTime, ok := retobj["endTime"].(string); ok {
if endTimeTime, err := time.Parse("2006-01-02 15:04:05", endTime); err == nil {
updates["end_time"] = &endTimeTime
}
} else {
updates["end_time"] = nil
}
if endTime, ok := retobj["startTime"].(string); ok {
if endTimeTime, err := time.Parse("2006-01-02 15:04:05", endTime); err == nil {
updates["start_time"] = &endTimeTime
}
} else {
updates["start_time"] = nil
}
if mc, ok := retobj["mc"].(float64); ok {
updates["mc"] = int(mc)
}
if rc, ok := retobj["rc"].(float64); ok {
updates["rc"] = int(rc)
}
if sc, ok := retobj["sc"].(float64); ok {
updates["sc"] = int(sc)
}
fmt.Print("\n")
fmt.Print(updates)
fmt.Print("\n")
fmt.Print("\n")
if err := db.Model(&batch).Updates(updates).Error; err != nil {
applogger.Error(fmt.Sprintf("修改批次状态失败:%s", err))
}
}
} else {
applogger.Error(fmt.Sprintf("查询批次状态失败返回不为0%s", string(jsonStr)))
}
}
defer resp.Body.Close()
} }
defer resp.Body.Close()
var retobj map[string]interface{}
err = json.NewDecoder(resp.Body).Decode(&retobj)
if err != nil {
handleError(err, "解析响应数据失败")
continue
}
code := int(retobj["code"].(float64))
if code == 0 {
status := int(retobj["state"].(float64))
if batch.Status != status || status == 5 {
updates := createUpdatesMap(retobj)
err = db.Model(&batch).Updates(updates).Error
handleError(err, "修改批次状态失败")
}
} else {
jsonStr, _ := json.Marshal(retobj)
handleError(fmt.Errorf("返回不为0"), fmt.Sprintf("查询批次状态失败:%s", string(jsonStr)))
}
}
}
func createUpdatesMap(retobj map[string]interface{}) map[string]interface{} {
updates := map[string]interface{}{
"status": int(retobj["state"].(float64)),
}
if endTime, ok := retobj["endTime"].(string); ok {
if endTimeTime, err := time.Parse("2006-01-02 15:04:05", endTime); err == nil {
updates["end_time"] = &endTimeTime
}
}
if startTime, ok := retobj["startTime"].(string); ok {
if startTimeTime, err := time.Parse("2006-01-02 15:04:05", startTime); err == nil {
updates["start_time"] = &startTimeTime
}
}
if mc, ok := retobj["mc"].(float64); ok {
updates["mc"] = int(mc)
}
if rc, ok := retobj["rc"].(float64); ok {
updates["rc"] = int(rc)
}
if sc, ok := retobj["sc"].(float64); ok {
updates["sc"] = int(sc)
}
return updates
}
func handleError(err error, errMsg string) {
if err != nil {
applogger.Error(fmt.Sprintf("%s: %s", errMsg, err))
} }
} }
@ -993,7 +989,8 @@ func connectToDB() (*gorm.DB, error) {
break break
} }
if attempt >= maxAttempts { if attempt >= maxAttempts {
applogger.Error(fmt.Sprintf("数据库连接失败,错误信息%v", err)) //applogger.Error(fmt.Sprintf("数据库连接失败,错误信息%v", err))
handleError(err, "数据库连接失败,错误信息")
return nil, err return nil, err
} }
time.Sleep(backoff) time.Sleep(backoff)
@ -1079,6 +1076,7 @@ func iniConfi() {
insertChanSize = 10 //通道缓冲数 insertChanSize = 10 //通道缓冲数
goSize = 10 //协程数 goSize = 10 //协程数
taskTime = 1 taskTime = 1
batchStatusTaskTime = 1
to = []string{"chejiulong@wemediacn.com"} to = []string{"chejiulong@wemediacn.com"}
token = "7100477930234217" token = "7100477930234217"
lastCallPath = "RawData/LastCall" lastCallPath = "RawData/LastCall"
@ -1105,6 +1103,7 @@ func iniConfi() {
insertChanSize = 100 //通道缓冲数 insertChanSize = 100 //通道缓冲数
goSize = 50 //协程数 goSize = 50 //协程数
taskTime = 60 taskTime = 60
batchStatusTaskTime = 5
to = []string{"chejiulong@wemediacn.com", "xiayujuan@wemediacn.com", "wangyuanbing@wemediacn.com", "tangweiqi@wemediacn.com"} to = []string{"chejiulong@wemediacn.com", "xiayujuan@wemediacn.com", "wangyuanbing@wemediacn.com", "tangweiqi@wemediacn.com"}
token = "7100178600777091" //7100477930234217 token = "7100178600777091" //7100477930234217
lastCallPath = "RawData/LastCall" lastCallPath = "RawData/LastCall"
@ -1167,33 +1166,34 @@ func (f FileSorter) Less(i, j int) bool {
} }
var ( //初始化变量 var ( //初始化变量
env string env string
applogger *logrus.Logger applogger *logrus.Logger
redisClient *redis.Client redisClient *redis.Client
executableDir string executableDir string
redisAddress string redisAddress string
redisPassword string redisPassword string
redisDB int redisDB int
sftpAddress string sftpAddress string
sftpUser string sftpUser string
sftpPassword string sftpPassword string
sftpDir string sftpDir string
dbAddress string dbAddress string
dbUser string dbUser string
dbPassword string dbPassword string
dbName string dbName string
zipPath string zipPath string
txtPath string txtPath string
logPath string logPath string
batchSize int //提交数据 batchSize int //提交数据
insertSize int //一次性入库 insertSize int //一次性入库
insertChanSize int //通道缓冲数 insertChanSize int //通道缓冲数
goSize int //协程数 goSize int //协程数
taskTime int taskTime int
to []string batchStatusTaskTime int
token string to []string
lastCallPath string token string
verifySignatureKey string lastCallPath string
verifySignatureKey string
) )
type Batch struct { type Batch struct {