diff --git a/go.mod b/go.mod index 172d905..74d5cdc 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,7 @@ require ( require ( github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 // indirect github.com/golang-sql/sqlexp v0.1.0 // indirect + github.com/gorilla/websocket v1.5.0 // indirect github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/now v1.1.5 // indirect github.com/kr/fs v0.1.0 // indirect diff --git a/go.sum b/go.sum index 2b664bf..ed0cf51 100644 --- a/go.sum +++ b/go.sum @@ -37,6 +37,8 @@ github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+ github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= +github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= +github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs= diff --git a/iniDataForMacOs b/iniDataForMacOs index 2543a4e..853e2a4 100755 Binary files a/iniDataForMacOs and b/iniDataForMacOs differ diff --git a/main.go b/main.go index 0a853cb..15a87bd 100644 --- a/main.go +++ b/main.go @@ -3,6 +3,7 @@ package main import ( "archive/zip" "bufio" + "bytes" "crypto/tls" "encoding/csv" "encoding/json" @@ -10,9 +11,12 @@ import ( "fmt" "io" "log" + "net/http" "os" "path" "path/filepath" + "regexp" + "sort" "strconv" "strings" "sync" @@ -20,210 +24,124 @@ import ( "time" "github.com/go-redis/redis" + "github.com/gorilla/websocket" "github.com/pkg/sftp" "github.com/sirupsen/logrus" "golang.org/x/crypto/ssh" "gopkg.in/gomail.v2" "gopkg.in/natefinch/lumberjack.v2" "gorm.io/driver/sqlserver" - _ "gorm.io/driver/sqlserver" "gorm.io/gorm" "gorm.io/gorm/logger" ) -var ( //初始化变量 - env string - applogger *logrus.Logger - redisClient *redis.Client - executableDir string - redisAddress string - redisPassword string - redisDB int - sftpAddress string - sftpUser string - sftpPassword string - sftpDir string - dbAddress string - dbUser string - dbPassword string - dbName string - zipPath string - txtPath string - logPath string - batchSize int //提交数据 - insertSize int //一次性入库 - insertChanSize int //通道缓冲数 - goSize int //协程数 - taskTime int - to []string -) - -type Batch struct { - ID uint `gorm:"primary_key"` - CommunicationChannelID uint - CommunicationName string `gorm:"type:varchar(255)"` - TargetsMember uint `gorm:"type:int"` - TemplateID uint - Content string `gorm:"type:text"` - CreatedAt time.Time `gorm:"default:getdate()"` - UpdatedAt time.Time `gorm:"default:getdate()"` - Status uint `gorm:"type:int"` - DataFileName string `gorm:"type:text"` -} - -type BatcheData struct { - ID uint `gorm:"primary_key"` - CommunicationChannelID string `gorm:"column:communication_channel_id"` - Mobile string `gorm:"column:mobile"` - FullName string `gorm:"column:full_name"` - ReservedField string `gorm:"column:reserved_field"` -} - -func (BatcheData) TableName() string { - return "batche_datas" -} - -type BatchProcessingInformation struct { - ID uint `gorm:"primaryKey;autoIncrement"` - CommunicationChannelID string `gorm:"column:communication_channel_id"` - RepeatTargetsMember int `gorm:"column:repeat_targets_member"` - InsertsTargetsMember int `gorm:"column:inserts_targets_member"` - DataFileName string `gorm:"column:data_file_name"` -} - -func (BatchProcessingInformation) TableName() string { - return "batches_processing_informations" -} - -type BatchDataDuplicateLog struct { - ID int `gorm:"primaryKey;autoIncrement"` - CommunicationChannelID string `gorm:"column:communication_channel_id"` - Mobile string `gorm:"column:mobile"` - FullName string `gorm:"column:full_name"` - ReservedField string `gorm:"column:reserved_field"` -} - -func (BatchDataDuplicateLog) TableName() string { - return "batche_data_duplicate_logs" -} - func init() { - // 获取可执行文件所在的路径 - executablePath, err := os.Executable() - if err != nil { - log.Fatal("failed to get executable path: ", err) - } - executableDir = filepath.Dir(executablePath) - - flag.StringVar(&env, "env", "dev", "运行模式") - flag.Parse() - switch env { - case "dev": - //fmt.Print("测试环境配置已生效\n") - redisAddress = "mysql5.weu.me:6379" - redisPassword = "" - redisDB = 1 - sftpAddress = "192.168.10.86:49156" - sftpUser = "demo" - sftpPassword = "demo" - sftpDir = "/sftp/test" - dbAddress = "192.168.10.18:1433" - dbUser = "sa" - dbPassword = "Aa123123" - dbName = "sephora" - zipPath = "RawData/Zip/" - txtPath = "RawData/Txt/" - logPath = "logs/" - batchSize = 5000 //提交数据 - insertSize = 500 //一次性入库 - insertChanSize = 10 //通道缓冲数 - goSize = 10 //协程数 - taskTime = 1 - to = []string{"chejiulong@wemediacn.com"} - case "prod": - //fmt.Print("正式环境配置已生效\n") - redisAddress = "r-bp11564d96842414128.redis.rds.aliyuncs.com:6379" - redisPassword = "3Nsb4Pmsl9bcLs24mL12l" - redisDB = 233 - sftpAddress = "esftp.sephora.com.cn:20981" - sftpUser = "CHN-CRMTOWemedia-wemedia" - sftpPassword = "uoWdMHEv39ZFjiOg" - sftpDir = "/CN-CRMTOWemedia/SMS" - dbAddress = "rm-bp16l424ln96q1ouk.sqlserver.rds.aliyuncs.com:3433" - dbUser = "sephora_sms" - dbPassword = "5ORiiLmgkniC0EqF" - dbName = "sephora_sms" - zipPath = "RawData/Zip/" - txtPath = "RawData/Txt/" - logPath = "logs/" - batchSize = 5000 //提交数据 - insertSize = 500 //一次性入库 - insertChanSize = 50 //通道缓冲数 - goSize = 50 //协程数 - taskTime = 60 - to = []string{"chejiulong@wemediacn.com", "xiayujuan@wemediacn.com", "wangyuanbing@wemediacn.com", "tangweiqi@wemediacn.com"} - default: - panic(fmt.Errorf("无效的运行模式: %s", env)) - } - - //判断目录是否存在,不存在则创建 - err = os.MkdirAll(filepath.Join(executableDir, zipPath), 0755) - err = os.MkdirAll(filepath.Join(executableDir, txtPath), 0755) - err = os.MkdirAll(filepath.Join(executableDir, logPath, time.Now().Format("2006_01")), 0755) - if err != nil { - log.Fatal(err) - } - + monopolize() //独占锁 + iniConfi() //初始化环境变量 + iniPath() //初始化路径 + applogger = logrus.New() //创建日志 + iniLog() //初始化日志配置 + //链接Redis redisClient = redis.NewClient(&redis.Options{ Addr: redisAddress, Password: redisPassword, DB: redisDB, }) - + //go startWebSocket() // 启动 WebSocket 服务,使用协程方式运行 + applogger.Info(fmt.Sprintf("程序启动,加载%s环境,尝试执行...", env)) + go downloadDecompression() // 启动立即执行一次数据下载、处理 } func main() { - // 打开一个文件作为锁 - lockFile, err := os.OpenFile(filepath.Join(executableDir, ".lock"), os.O_CREATE|os.O_RDWR, 0666) - if err != nil { - fmt.Println("打开锁文件失败:", err) - os.Exit(1) - } - defer lockFile.Close() - // 尝试获取文件的独占锁 - err = syscall.Flock(int(lockFile.Fd()), syscall.LOCK_EX|syscall.LOCK_NB) - if err != nil { - fmt.Println("程序已经在运行,本程序无法同时运行多个") - os.Exit(1) - } - applogger = logrus.New() - logPath = filepath.Join(executableDir, "logs", time.Now().Format("2006_01")) - err = os.MkdirAll(logPath, 0755) - logFileName := "sms_processing_" + time.Now().Format("2006_01_02") + ".log" - logFileHook := &lumberjack.Logger{ - Filename: filepath.Join(logPath, logFileName), - } - logOutput := io.MultiWriter(os.Stdout, logFileHook) // 将日志同时输出到控制台和文件 - applogger.SetOutput(logOutput) - applogger.Info(fmt.Sprintf("程序启动,加载%s环境,尝试执行...", env)) - go downloadDecompression() // 启动立即执行一次 + ticker := time.NewTicker(time.Duration(taskTime) * time.Minute) // 数据下载、处理 + ticker_merge := time.NewTicker(time.Minute) //名单合并 + defer ticker.Stop() + defer ticker_merge.Stop() - ticker := time.NewTicker(time.Duration(taskTime) * time.Minute) // 创建一个定时器 - defer ticker.Stop() // 延迟关闭定时器,确保所有的定时器事件都被处理完毕 - // 循环处理任务 for { select { case <-ticker.C: - // 定时器触发时执行的任务函数 - logPath = filepath.Join(executableDir, "logs", time.Now().Format("2006_01")) - logFileName = "sms_processing_" + time.Now().Format("2006_01_02") + ".log" - logFileHook = &lumberjack.Logger{ - Filename: filepath.Join(logPath, logFileName), - } - logOutput := io.MultiWriter(os.Stdout, logFileHook) // 将日志同时输出到控制台和文件 - applogger.SetOutput(logOutput) - applogger.Info("尝试执行...") + iniLog() + applogger.Info("尝试执行数据处理...") go downloadDecompression() + + case <-ticker_merge.C: + iniLog() + fmt.Print("尝试执行名单合并...\n") + //go downloadDecompression() + } + } +} + +func startWebSocket() { + http.HandleFunc("/ws", handleWebSocket) + err := http.ListenAndServe(":8080", nil) + if err != nil { + fmt.Println("启动 WebSocket 服务失败:", err) + } else { + fmt.Println("启动 WebSocket 服务成功:") + } +} + +func handleWebSocket(w http.ResponseWriter, r *http.Request) { + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + fmt.Println("升级连接失败:", err) + return + } + // 打印客户端地址 + fmt.Printf("客户端 %s 连接成功\n", conn.RemoteAddr().String()) + + // 添加客户端连接信息 + c := &client{conn} + clients[c] = true + + // 接收消息 + go func() { + for { + _, message, err := conn.ReadMessage() + if err != nil { + fmt.Println("接收消息失败:", err) + return + } + fmt.Println("收到消息:", string(message)) + if string(message) == "a" { + fmt.Println("触发合并操作") + message := []byte("触发合并操作") + err := conn.WriteMessage(websocket.TextMessage, message) //发送消息给客户端 + if err != nil { + fmt.Println("发送消息失败:", err) + delete(clients, c) + return + } + } else if string(message) == "c" { + for c := range clients { + fmt.Printf("客户端 %s 在线\n", c.conn.RemoteAddr().String()) + } + } else if string(message) == "all" { + message = []byte("所有的客户端们你们好!") + go broadcast(message) + } + } + }() + + // 设置连接关闭时的回调函数 + conn.SetCloseHandler(func(code int, text string) error { + fmt.Println("连接已关闭,code:", code, "text:", text) + fmt.Printf("客户端 %s 关闭连接\n", conn.RemoteAddr().String()) + delete(clients, c) + return nil + }) + +} + +// 聊天室消息广播 +func broadcast(message []byte) { + for c := range clients { + err := c.conn.WriteMessage(websocket.TextMessage, message) + if err != nil { + fmt.Println("发送消息失败:", err) + delete(clients, c) } } } @@ -260,6 +178,9 @@ func downloadDecompression() { } it := 1 fmt.Printf("共%d个文件\n", len(files)) + + sort.Sort(FileSorter(files)) + for _, file := range files { fmt.Printf("第%d个文件处理中\n", it) it++ @@ -356,8 +277,7 @@ func downloadDecompression() { } err = redisClient.Set(fileKey, 1, 0).Err() //写入下载、解压、入库完成标记 if err != nil { - //fmt.Printf("写入文件处理完成标记失败文件名:%s,错误信息:%v\n", file.Name(), err) - applogger.Info(fmt.Sprintf("写入文件处理完成标记失败文件名:%s,错误信息:v%\n", file.Name(), err)) + applogger.Info(fmt.Sprintf("写入文件处理完成标记失败文件名:%s,错误信息:%v\n", file.Name(), err)) } } redisClient.Del("iniDataStatus") //删除任务执行中标记 @@ -383,11 +303,34 @@ func batchInsert(fileName string) { if err != nil { break } + communicationChannelID, _ := strconv.ParseUint(record[0], 10, 32) TargetsMember, _ := strconv.ParseUint(record[2], 10, 32) templateID, _ := strconv.ParseUint(record[3], 10, 32) status := uint(1) + t := time.Now() + s := t.Format("2006-01-02 15:04:05") + //key := + batchParams := BatchParams{ + BatchName: fmt.Sprintf("%s-%s", record[1], strconv.Itoa(int(communicationChannelID))), + BatchDesc: record[4], + IsPersonal: 0, + Message: record[4], + IsInternational: 0, + IsSchedule: 0, //点发 + ScheduleTime: s, + Token: token, + } + + // 调用发送短信批次接口 + sid, err := CreateBatch(batchParams) + if err != nil { + fmt.Println(err) + return + } + fmt.Println(sid) + batch := Batch{ CommunicationChannelID: uint(communicationChannelID), CommunicationName: record[1], @@ -396,6 +339,7 @@ func batchInsert(fileName string) { Content: record[4], Status: status, DataFileName: fileName, + Sid: sid, } db.Create(&batch) batchRows++ @@ -410,6 +354,78 @@ func batchInsert(fileName string) { } } +func CreateBatch(batchParams BatchParams) (int, error) { + // 将请求参数编码为JSON字符串 + jsonBytes, err := json.Marshal(batchParams) + if err != nil { + return -1, err + } + jsonStr := string(jsonBytes) + + // 发送POST请求 + url := "http://www.wemediacn.net/webservice/BatchService?service=sms.createbatch" + resp, err := http.Post(url, "application/json; charset=utf-8", bytes.NewBuffer([]byte(jsonStr))) + if err != nil { + return -1, err + } + defer resp.Body.Close() + + // 解析响应数据 + var retobj map[string]interface{} + err = json.NewDecoder(resp.Body).Decode(&retobj) + if err != nil { + return -1, err + } + fmt.Print(retobj) + + code := int(retobj["code"].(float64)) + fmt.Printf("批次创建API 返回:%d\n", code) + + if code == 0 { + sid := int(retobj["sid"].(float64)) + return sid, nil + } else { + return -1, fmt.Errorf("create batch failed, error code: %d", code) + } +} + +func smsApi(method string, sendSMSDataJson string) (int, error) { + // 发送POST请求 + url := "http://www.wemediacn.net/webservice/BatchService?service=sms." + method + resp, err := http.Post(url, "application/json; charset=utf-8", strings.NewReader(sendSMSDataJson)) + if err != nil { + return -1, err + } + defer resp.Body.Close() + + // 解析响应数据 + var retobj map[string]interface{} + err = json.NewDecoder(resp.Body).Decode(&retobj) + if err != nil { + return -1, err + } + fmt.Print(retobj) + + jsonStr, err := json.Marshal(retobj) + if err != nil { + fmt.Println(err) + return -1, err + } + fmt.Printf("API 返回:%s\n", string(jsonStr)) + + code := int(retobj["code"].(float64)) + + //fmt.Print("code:", code) + + if code == 0 { + //sid := int(retobj["sid"].(float64)) + fmt.Printf("提交批次成功code:%d\n", code) + return code, nil + } else { + return -1, fmt.Errorf("create batch failed, error code: %d", code) + } +} + func batchDataInsert(fileName string) { start := time.Now() // Open file @@ -457,7 +473,30 @@ func batchDataInsert(fileName string) { bi := 0 duplicateCount := make(map[string]int) insertsCount := make(map[string]int) + //sendSMSData := make(map[string][]Sms) + sendMobiles := make(map[string]map[string]interface{}) var count int + ccids := make(map[string]bool) + + // 通过下划线将文件名拆分为多个字段 + fields := strings.Split(fileName, "_") + + // 最后一个字段是日期和时间,我们只需要日期部分 + datetime := fields[len(fields)-1] + fileNameDate := datetime[:8] + var batches []Batch + + // 模糊查询包含“20230103”字符串的记录 + db.Where("data_file_name LIKE ?", "%"+fileNameDate+"%").Find(&batches) + + // 定义一个名为result的map类型的变量,并以CommunicationChannelID作为键 + result := make(map[string][]Batch) + for _, batch := range batches { + cckdKey := strconv.FormatUint(uint64(batch.CommunicationChannelID), 10) + result[cckdKey] = append(result[cckdKey], batch) + print(batch.CommunicationChannelID, "\n") + } + //ci := 1 for scanner.Scan() { line := scanner.Text() @@ -482,6 +521,7 @@ func batchDataInsert(fileName string) { if _, ok := duplicateCount[row[2]]; !ok { duplicateCount[row[2]] = 0 } + // Check if record exists in hashset key := fmt.Sprintf("%s-%s-%s", row[2], row[3], row[5]) if _, exists := hs[key]; exists { //如果批次数据重复 @@ -494,6 +534,7 @@ func batchDataInsert(fileName string) { FullName: row[4], ReservedField: string(reservedFieldsJson), }) + if len(dataBatchDuplicate) >= batchSize { err = db.CreateInBatches(dataBatchDuplicate, insertSize).Error if err != nil { @@ -507,7 +548,8 @@ func batchDataInsert(fileName string) { } // Add record to hashset hs[key] = true - + //tccid, _ := strconv.ParseUint(row[2], 10, 32) + ccids[row[2]] = true dataBatch = append(dataBatch, BatcheData{ CommunicationChannelID: row[2], Mobile: row[3], @@ -521,6 +563,67 @@ func batchDataInsert(fileName string) { //fmt.Print("提交通次数" + strconv.Itoa(ci)) //ci++ } + + if batches, ok := result[row[2]]; ok && len(batches) > 0 && batches[0].Content != "" { + content := batches[0].Content + Sid := batches[0].Sid + pattern := regexp.MustCompile(`\{.*\}`) + + matched := pattern.MatchString(content) + if matched { //个性化短信 + if _, ok := sendMobiles[row[2]]; !ok { + sendMobiles[row[2]] = make(map[string]interface{}) + sendMobiles[row[2]]["sid"] = Sid + sendMobiles[row[2]]["isPersonalizedMsg"] = true + sendMobiles[row[2]]["mobiles"] = make([]SmsList, 0) + } + placeholderMap := map[string]string{ + "{RESERVED_FIELD_1}": row[5], + "{RESERVED_FIELD_2}": row[6], + "{RESERVED_FIELD_3}": row[7], + "{RESERVED_FIELD_4}": row[8], + "{RESERVED_FIELD_5}": row[9], + } + for k, v := range placeholderMap { + content = strings.ReplaceAll(content, k, v) + } + + sendMobiles[row[2]]["mobiles"] = append(sendMobiles[row[2]]["mobiles"].([]SmsList), SmsList{M: row[3], C: content, F: 8}) + + if len(sendMobiles[row[2]]["mobiles"].([]SmsList)) >= batchSize/2 { + sd := Sms{Sid: Sid, Data: sendMobiles[row[2]]["mobiles"].([]SmsList), Token: token} + sendSMSDataJson, _ := json.Marshal(sd) + smsApi("appendbatchdata", string(sendSMSDataJson)) + //fmt.Print(string(sendSMSDataJson)) + sendMobiles[row[2]]["mobiles"] = []SmsList{} // reset mobiles slice + } + + } else { + // 处理非个性化短信 + if _, ok := sendMobiles[row[2]]; !ok { + sendMobiles[row[2]] = make(map[string]interface{}) + sendMobiles[row[2]]["sid"] = Sid + sendMobiles[row[2]]["isPersonalizedMsg"] = false + sendMobiles[row[2]]["content"] = content + sendMobiles[row[2]]["mobiles"] = []string{} + } + + sendMobiles[row[2]]["mobiles"] = append(sendMobiles[row[2]]["mobiles"].([]string), row[3]) + + if len(sendMobiles[row[2]]["mobiles"].([]string)) >= batchSize { + mobiles := sendMobiles[row[2]]["mobiles"].([]string) + mobileStr := strings.Join(mobiles, ",") + var sl []SmsList + sl = append(sl, SmsList{M: mobileStr, C: content, F: 8}) + sd := Sms{Sid: Sid, Data: sl, Token: token} + sendSMSDataJson, _ := json.Marshal(sd) + smsApi("appendbatchdata", string(sendSMSDataJson)) + sendMobiles[row[2]]["mobiles"] = []string{} // reset mobiles slice + + } + } + } + if _, ok := insertsCount[row[2]]; !ok { insertsCount[row[2]] = 0 } @@ -530,28 +633,58 @@ func batchDataInsert(fileName string) { if len(dataBatch) > 0 { dataBatchChan <- dataBatch - dataBatch = make([]BatcheData, 0, batchSize) - //fmt.Print("文件读取完成,最后一批提交至通道") } close(dataBatchChan) - wg.Wait() //所有入库全部完成 + //fmt.Println("结束批次v%\n", ccids) + //fmt.Print("ccids 长度", len(ccids), "\n") + for ccid := range ccids { // 处理各个批次剩余数据,同时处理批次结束api - //插入批次处理信息 - bpi := []BatchProcessingInformation{} - for key, value := range duplicateCount { + fmt.Println("循环处理批次结束\n", ccid) + + //batches := result[ccid] + if sendMobiles[ccid]["isPersonalizedMsg"].(bool) { //个性化 + smsList := sendMobiles[ccid]["mobiles"].([]SmsList) + if len(smsList) > 0 { + sd := Sms{Sid: sendMobiles[ccid]["sid"].(int), Data: sendMobiles[ccid]["mobiles"].([]SmsList), Token: token} + sendSMSDataJson, _ := json.Marshal(sd) + smsApi("appendbatchdata", string(sendSMSDataJson)) + //fmt.Print(string(sendSMSDataJson)) + sendMobiles[ccid]["mobiles"] = []SmsList{} // reset mobiles slice + } + + } else { //非个性化 + if mobiles, ok := sendMobiles[ccid]["mobiles"].([]string); ok && len(mobiles) > 0 { + mobileStr := strings.Join(sendMobiles[ccid]["mobiles"].([]string), ",") + var sl []SmsList + sl = append(sl, SmsList{M: mobileStr, C: sendMobiles[ccid]["content"].(string), F: 8}) + + sd := Sms{Sid: sendMobiles[ccid]["sid"].(int), Data: sl, Token: token} + sendSMSDataJson, _ := json.Marshal(sd) + smsApi("appendbatchdata", string(sendSMSDataJson)) + } + } + + //关闭批次 + sf := SmsFinish{Sid: sendMobiles[ccid]["sid"].(int), Token: token} + sfJson, _ := json.Marshal(sf) + smsApi("finishbatch", string(sfJson)) + + //插入批次处理数据 + bpi := []BatchProcessingInformation{} bpi = append(bpi, BatchProcessingInformation{ - CommunicationChannelID: key, - RepeatTargetsMember: value, - InsertsTargetsMember: insertsCount[key], + CommunicationChannelID: ccid, + RepeatTargetsMember: duplicateCount[ccid], + InsertsTargetsMember: insertsCount[ccid], DataFileName: fileName, }) + err = db.CreateInBatches(bpi, insertSize).Error + if err != nil { + applogger.Error(fmt.Sprintf("插入批次处理信息失败,文件名:%s,错误信息%v", fileName, err)) + } } - err = db.CreateInBatches(bpi, insertSize).Error - if err != nil { - applogger.Error(fmt.Sprintf("插入批次处理信息失败,文件名:%s,错误信息%v", fileName, err)) - } + wg.Wait() //所有入库全部完成 //插入批此重复数据 if len(dataBatchDuplicate) > 0 { @@ -569,7 +702,7 @@ func batchDataInsert(fileName string) { subject := "丝芙兰数据包处理完成" body := "数据包:" + fileName + ";\n总数:" + strconv.Itoa(count+bi) + ";\n过滤重复数:" + strconv.Itoa(bi) + ";\n过滤后总数:" + strconv.Itoa(count) + ";\n处理完成,请前往管理平台查看处理。" SendEmail(subject, body) //发送邮件 - applogger.Info(fmt.Sprintf(fmt.Sprintf("%s(数据包) 入库完成", fileName))) + applogger.Info(fmt.Sprintf("%s(数据包) 入库完成", fileName)) applogger.Info(fmt.Sprintf("%s(数据包)执行时间:%s 插入数据:%d条 过滤数数:%d条", fileName, elapsed, count, bi)) } } @@ -598,6 +731,111 @@ func connectToDB() (*gorm.DB, error) { return db, nil } +func iniPath() { + // 获取可执行文件所在的路径 + executablePath, err := os.Executable() + if err != nil { + log.Fatal("failed to get executable path: ", err) + } + executableDir = filepath.Dir(executablePath) + + //判断目录是否存在,不存在则创建 + err = os.MkdirAll(filepath.Join(executableDir, zipPath), 0755) + if err != nil { + log.Fatal(err) + } + err = os.MkdirAll(filepath.Join(executableDir, txtPath), 0755) + if err != nil { + log.Fatal(err) + } + err = os.MkdirAll(filepath.Join(executableDir, logPath, time.Now().Format("2006_01")), 0755) + if err != nil { + log.Fatal(err) + } +} + +func monopolize() { + // 打开一个文件作为锁 + lockFile, err := os.OpenFile(filepath.Join(executableDir, ".lock"), os.O_CREATE|os.O_RDWR, 0666) + if err != nil { + fmt.Println("打开锁文件失败:", err) + os.Exit(1) + } + defer lockFile.Close() + // 尝试获取文件的独占锁 + err = syscall.Flock(int(lockFile.Fd()), syscall.LOCK_EX|syscall.LOCK_NB) + if err != nil { + fmt.Println("程序已经在运行,本程序无法同时运行多个") + os.Exit(1) + } +} + +func iniConfi() { + flag.StringVar(&env, "env", "dev", "运行模式") + flag.Parse() + switch env { + case "dev": + //fmt.Print("测试环境配置已生效\n") + redisAddress = "mysql5.weu.me:6379" + redisPassword = "" + redisDB = 1 + sftpAddress = "192.168.10.86:49156" + sftpUser = "demo" + sftpPassword = "demo" + sftpDir = "/sftp/test" + dbAddress = "192.168.10.18:1433" + dbUser = "sa" + dbPassword = "Aa123123" + dbName = "sephora" + zipPath = "RawData/Zip/" + txtPath = "RawData/Txt/" + logPath = "logs/" + batchSize = 5000 //提交数据 + insertSize = 500 //一次性入库 + insertChanSize = 10 //通道缓冲数 + goSize = 10 //协程数 + taskTime = 1 + to = []string{"chejiulong@wemediacn.com"} + token = "7100477930234217" + case "prod": + //fmt.Print("正式环境配置已生效\n") + redisAddress = "r-bp11564d96842414128.redis.rds.aliyuncs.com:6379" + redisPassword = "3Nsb4Pmsl9bcLs24mL12l" + redisDB = 233 + sftpAddress = "esftp.sephora.com.cn:20981" + sftpUser = "CHN-CRMTOWemedia-wemedia" + sftpPassword = "uoWdMHEv39ZFjiOg" + sftpDir = "/CN-CRMTOWemedia/SMS" + dbAddress = "rm-bp16l424ln96q1ouk.sqlserver.rds.aliyuncs.com:3433" + dbUser = "sephora_sms" + dbPassword = "5ORiiLmgkniC0EqF" + dbName = "sephora_sms" + zipPath = "RawData/Zip/" + txtPath = "RawData/Txt/" + logPath = "logs/" + batchSize = 5000 //提交数据 + insertSize = 500 //一次性入库 + insertChanSize = 50 //通道缓冲数 + goSize = 50 //协程数 + taskTime = 60 + to = []string{"chejiulong@wemediacn.com", "xiayujuan@wemediacn.com", "wangyuanbing@wemediacn.com", "tangweiqi@wemediacn.com"} + token = "7100477930234217" + default: + panic(fmt.Errorf("无效的运行模式: %s", env)) + } +} + +func iniLog() { + logPath = filepath.Join(executableDir, "logs", time.Now().Format("2006_01")) + os.MkdirAll(logPath, 0755) + logFileName := "sms_processing_" + time.Now().Format("2006_01_02") + ".log" + logFileHook := &lumberjack.Logger{ + Filename: filepath.Join(logPath, logFileName), + } + logOutput := io.MultiWriter(os.Stdout, logFileHook) // 将日志同时输出到控制台和文件 + applogger.SetOutput(logOutput) +} + func SendEmail(subject string, body string) error { // 邮箱认证信息 smtpHost := "smtp.exmail.qq.com" @@ -620,3 +858,143 @@ func SendEmail(subject string, body string) error { } return nil } + +func (f FileSorter) Len() int { + return len(f) +} + +func (f FileSorter) Swap(i, j int) { + f[i], f[j] = f[j], f[i] +} + +func (f FileSorter) Less(i, j int) bool { + // 首先检查文件扩展名是否为“.txt”,如果是,则将其移到文件列表的开头 + if strings.HasSuffix(f[i].Name(), ".txt") && !strings.HasSuffix(f[j].Name(), ".txt") { + return true + } + // 如果文件扩展名都是“.txt”,或都不是“.txt”,则按字母顺序排序 + return f[i].Name() < f[j].Name() +} + +var ( //初始化变量 + env string + applogger *logrus.Logger + redisClient *redis.Client + executableDir string + redisAddress string + redisPassword string + redisDB int + sftpAddress string + sftpUser string + sftpPassword string + sftpDir string + dbAddress string + dbUser string + dbPassword string + dbName string + zipPath string + txtPath string + logPath string + batchSize int //提交数据 + insertSize int //一次性入库 + insertChanSize int //通道缓冲数 + goSize int //协程数 + taskTime int + to []string + token string +) + +type Batch struct { + ID uint `gorm:"primary_key"` + CommunicationChannelID uint + CommunicationName string `gorm:"type:varchar(255)"` + TargetsMember uint `gorm:"type:int"` + TemplateID uint + Content string `gorm:"type:text"` + CreatedAt time.Time `gorm:"default:getdate()"` + UpdatedAt time.Time `gorm:"default:getdate()"` + Status uint `gorm:"type:int"` + DataFileName string `gorm:"type:text"` + Sid int `gorm:"type:int"` +} + +type BatcheData struct { + ID uint `gorm:"primary_key"` + CommunicationChannelID string `gorm:"column:communication_channel_id"` + Mobile string `gorm:"column:mobile"` + FullName string `gorm:"column:full_name"` + ReservedField string `gorm:"column:reserved_field"` +} + +func (BatcheData) TableName() string { + return "batche_datas" +} + +type BatchProcessingInformation struct { + ID uint `gorm:"primaryKey;autoIncrement"` + CommunicationChannelID string `gorm:"column:communication_channel_id"` + RepeatTargetsMember int `gorm:"column:repeat_targets_member"` + InsertsTargetsMember int `gorm:"column:inserts_targets_member"` + DataFileName string `gorm:"column:data_file_name"` +} + +func (BatchProcessingInformation) TableName() string { + return "batches_processing_informations" +} + +type BatchDataDuplicateLog struct { + ID int `gorm:"primaryKey;autoIncrement"` + CommunicationChannelID string `gorm:"column:communication_channel_id"` + Mobile string `gorm:"column:mobile"` + FullName string `gorm:"column:full_name"` + ReservedField string `gorm:"column:reserved_field"` +} + +func (BatchDataDuplicateLog) TableName() string { + return "batche_data_duplicate_logs" +} + +var upgrader = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, +} + +// 客户端连接信息 +type client struct { + conn *websocket.Conn +} + +// 在线客户端列表 + +var clients = make(map[*client]bool) + +// 定义一个FileSorter结构体 +type FileSorter []os.FileInfo + +type BatchParams struct { + BatchName string `json:"batchName"` + BatchDesc string `json:"batchDesc"` + IsPersonal int `json:"isPersonal"` + Message string `json:"message"` + IsInternational int `json:"isInternational"` + IsSchedule int `json:"isSchedule"` + ScheduleTime string `json:"scheduleTime"` + Token string `json:"token"` +} + +type SmsList struct { + M string `json:"m"` + C string `json:"c"` + F int `json:"f"` +} + +type Sms struct { + Sid int `json:"sid"` + Data []SmsList `json:"data"` + Token string `json:"token"` +} + +type SmsFinish struct { + Sid int `json:"sid"` + Token string `json:"token"` +}