diff --git a/iniDataForLinux b/iniDataForLinux index 3d99aec..0744b9d 100755 Binary files a/iniDataForLinux and b/iniDataForLinux differ diff --git a/iniDataForMacOs b/iniDataForMacOs index a937bfa..5b5475b 100755 Binary files a/iniDataForMacOs and b/iniDataForMacOs differ diff --git a/main.go b/main.go index 85a9215..ee59961 100644 --- a/main.go +++ b/main.go @@ -57,8 +57,8 @@ func init() { } func main() { - ticker := time.NewTicker(time.Duration(taskTime) * time.Minute) // 数据下载、处理 - ticker_merge := time.NewTicker(5 * time.Minute) //名单合并 + ticker := time.NewTicker(time.Duration(taskTime) * time.Minute) // 数据下载、处理 + ticker_merge := time.NewTicker(time.Duration(batchStatusTaskTime) * time.Minute) //查询批次 defer ticker.Stop() defer ticker_merge.Stop() @@ -434,9 +434,7 @@ func downloadDecompression() { } } -// 批次入库 func batchInsert(fileName string, isLastCall bool, excludedFilename string) int { - //fmt.Print("批次处理开始") start := time.Now() db, _ := connectToDB() file, err := os.Open(path.Join(executableDir, txtPath, fileName)) @@ -450,13 +448,11 @@ func batchInsert(fileName string, isLastCall bool, excludedFilename string) int for { record, err := reader.Read() if err != nil { - //return -1 break } - //communicationChannelID, _ := strconv.ParseUint(record[0], 10, 32) - TargetsMember, _ := strconv.ParseUint(record[2], 10, 32) - templateID, _ := strconv.ParseUint(record[3], 10, 32) + TargetsMember, _ := strconv.ParseUint(strings.TrimSpace(record[2]), 10, 32) + templateID, _ := strconv.ParseUint(strings.TrimSpace(record[3]), 10, 32) status := 1 t := time.Now() @@ -482,13 +478,11 @@ func batchInsert(fileName string, isLastCall bool, excludedFilename string) int Token: token, } - // 调用发送短信批次接口 sid, err := CreateBatch(batchParams) if err != nil { fmt.Println(err) return -1 } - fmt.Println(sid) batch := Batch{ CommunicationChannelID: record[0], @@ -508,7 +502,7 @@ func batchInsert(fileName string, isLastCall bool, excludedFilename string) int subject := "丝芙兰批次文件处理完成" body := fmt.Sprintf("批次数:%d;\n批次文件:%s;\n处理完成,请前往管理平台查看处理。", batchRows, fileName) - SendEmail(subject, body) //发送邮件 + SendEmail(subject, body) applogger.Info(fmt.Sprintf("%s(批次文件)入库完成", fileName)) applogger.Info(fmt.Sprintf("%s(批次文件)执行时间%s 插入批次次数:%d", fileName, elapsed, batchRows)) @@ -900,82 +894,84 @@ func batchDataInsert(fileName string, lastCallKeys map[string]bool, excludedFile } func queryBatchState() { - db, _ := connectToDB() - //if err := db.AutoMigrate(&Batch{}); err != nil { - //applogger.Error(fmt.Sprintf("AutoMigrate,失败%s", err)) - //} + db, err := connectToDB() + if err != nil { + handleError(err, "数据库连接失败") + return + } + var batches []Batch - if err := db.Where("status NOT IN (?)", []int{6, 8}).Find(&batches).Error; err != nil { - applogger.Error(fmt.Sprintf("查询批次状态失败:%s", err)) - } else { - for _, batch := range batches { - sf := SmsFinish{Sid: batch.Sid, Token: token} - sfJson, _ := json.Marshal(sf) - url := "http://www.wemediacn.net/webservice/BatchService?service=sms.querybatchstate" - resp, err := http.Post(url, "application/json; charset=utf-8", strings.NewReader(string(sfJson))) - if err != nil { - applogger.Error(fmt.Sprintf("查询批次状态失败:%s", err)) - } else { - // 解析响应数据 - var retobj map[string]interface{} - err = json.NewDecoder(resp.Body).Decode(&retobj) - if err != nil { - fmt.Println(err) - } - jsonStr, err := json.Marshal(retobj) - if err != nil { - fmt.Println(err) - } - fmt.Printf("查询批次信息API 返回:%s\n", string(jsonStr)) - fmt.Print("\n") - fmt.Print("\n") - code := int(retobj["code"].(float64)) - if code == 0 { - status := int(retobj["state"].(float64)) - if batch.Status != status || status == 5 { - fmt.Println(batch) - fmt.Print("\n") - fmt.Print("状态不一致或者发送中,更新状态\n") - updates := map[string]interface{}{ - "status": status, - } - if endTime, ok := retobj["endTime"].(string); ok { - if endTimeTime, err := time.Parse("2006-01-02 15:04:05", endTime); err == nil { - updates["end_time"] = &endTimeTime - } - } else { - updates["end_time"] = nil - } - if endTime, ok := retobj["startTime"].(string); ok { - if endTimeTime, err := time.Parse("2006-01-02 15:04:05", endTime); err == nil { - updates["start_time"] = &endTimeTime - } - } else { - updates["start_time"] = nil - } - if mc, ok := retobj["mc"].(float64); ok { - updates["mc"] = int(mc) - } - if rc, ok := retobj["rc"].(float64); ok { - updates["rc"] = int(rc) - } - if sc, ok := retobj["sc"].(float64); ok { - updates["sc"] = int(sc) - } - fmt.Print("\n") - fmt.Print(updates) - fmt.Print("\n") - fmt.Print("\n") - if err := db.Model(&batch).Updates(updates).Error; err != nil { - applogger.Error(fmt.Sprintf("修改批次状态失败:%s", err)) - } - } - } else { - applogger.Error(fmt.Sprintf("查询批次状态失败返回不为0:%s", string(jsonStr))) - } - } - defer resp.Body.Close() + err = db.Where("status NOT IN (?)", []int{6, 8}).Find(&batches).Error + handleError(err, "查询批次状态失败") + if err != nil { + return + } + + for _, batch := range batches { + sf := SmsFinish{Sid: batch.Sid, Token: token} + sfJson, _ := json.Marshal(sf) + url := "http://www.wemediacn.net/webservice/BatchService?service=sms.querybatchstate" + resp, err := http.Post(url, "application/json; charset=utf-8", strings.NewReader(string(sfJson))) + if err != nil { + handleError(err, "查询批次状态失败") + continue } + defer resp.Body.Close() + + var retobj map[string]interface{} + err = json.NewDecoder(resp.Body).Decode(&retobj) + if err != nil { + handleError(err, "解析响应数据失败") + continue + } + + code := int(retobj["code"].(float64)) + if code == 0 { + status := int(retobj["state"].(float64)) + if batch.Status != status || status == 5 { + updates := createUpdatesMap(retobj) + err = db.Model(&batch).Updates(updates).Error + handleError(err, "修改批次状态失败") + } + } else { + jsonStr, _ := json.Marshal(retobj) + handleError(fmt.Errorf("返回不为0"), fmt.Sprintf("查询批次状态失败:%s", string(jsonStr))) + } + } +} + +func createUpdatesMap(retobj map[string]interface{}) map[string]interface{} { + updates := map[string]interface{}{ + "status": int(retobj["state"].(float64)), + } + + if endTime, ok := retobj["endTime"].(string); ok { + if endTimeTime, err := time.Parse("2006-01-02 15:04:05", endTime); err == nil { + updates["end_time"] = &endTimeTime + } + } + + if startTime, ok := retobj["startTime"].(string); ok { + if startTimeTime, err := time.Parse("2006-01-02 15:04:05", startTime); err == nil { + updates["start_time"] = &startTimeTime + } + } + + if mc, ok := retobj["mc"].(float64); ok { + updates["mc"] = int(mc) + } + if rc, ok := retobj["rc"].(float64); ok { + updates["rc"] = int(rc) + } + if sc, ok := retobj["sc"].(float64); ok { + updates["sc"] = int(sc) + } + return updates +} + +func handleError(err error, errMsg string) { + if err != nil { + applogger.Error(fmt.Sprintf("%s: %s", errMsg, err)) } } @@ -993,7 +989,8 @@ func connectToDB() (*gorm.DB, error) { break } if attempt >= maxAttempts { - applogger.Error(fmt.Sprintf("数据库连接失败,错误信息%v", err)) + //applogger.Error(fmt.Sprintf("数据库连接失败,错误信息%v", err)) + handleError(err, "数据库连接失败,错误信息") return nil, err } time.Sleep(backoff) @@ -1079,6 +1076,7 @@ func iniConfi() { insertChanSize = 10 //通道缓冲数 goSize = 10 //协程数 taskTime = 1 + batchStatusTaskTime = 1 to = []string{"chejiulong@wemediacn.com"} token = "7100477930234217" lastCallPath = "RawData/LastCall" @@ -1105,6 +1103,7 @@ func iniConfi() { insertChanSize = 100 //通道缓冲数 goSize = 50 //协程数 taskTime = 60 + batchStatusTaskTime = 5 to = []string{"chejiulong@wemediacn.com", "xiayujuan@wemediacn.com", "wangyuanbing@wemediacn.com", "tangweiqi@wemediacn.com"} token = "7100178600777091" //7100477930234217 lastCallPath = "RawData/LastCall" @@ -1167,33 +1166,34 @@ func (f FileSorter) Less(i, j int) bool { } var ( //初始化变量 - env string - applogger *logrus.Logger - redisClient *redis.Client - executableDir string - redisAddress string - redisPassword string - redisDB int - sftpAddress string - sftpUser string - sftpPassword string - sftpDir string - dbAddress string - dbUser string - dbPassword string - dbName string - zipPath string - txtPath string - logPath string - batchSize int //提交数据 - insertSize int //一次性入库 - insertChanSize int //通道缓冲数 - goSize int //协程数 - taskTime int - to []string - token string - lastCallPath string - verifySignatureKey string + env string + applogger *logrus.Logger + redisClient *redis.Client + executableDir string + redisAddress string + redisPassword string + redisDB int + sftpAddress string + sftpUser string + sftpPassword string + sftpDir string + dbAddress string + dbUser string + dbPassword string + dbName string + zipPath string + txtPath string + logPath string + batchSize int //提交数据 + insertSize int //一次性入库 + insertChanSize int //通道缓冲数 + goSize int //协程数 + taskTime int + batchStatusTaskTime int + to []string + token string + lastCallPath string + verifySignatureKey string ) type Batch struct {