diff --git a/go.mod b/go.mod index dde88ab..172d905 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/sirupsen/logrus v1.9.0 golang.org/x/crypto v0.6.0 gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df + gopkg.in/natefinch/lumberjack.v2 v2.2.1 gorm.io/driver/sqlserver v1.4.2 gorm.io/gorm v1.24.5 ) diff --git a/go.sum b/go.sum index 85de0eb..2b664bf 100644 --- a/go.sum +++ b/go.sum @@ -159,6 +159,8 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df h1:n7WqCuqOuCbNr617RXOY0AWRXxgwEyPp2z+p0+hgMuE= gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df/go.mod h1:LRQQ+SO6ZHR7tOkpBDuZnXENFzX8qRjMDMyPD6BRkCw= +gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc= +gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc= gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce h1:+JknDZhAj8YMt7GC73Ei8pv4MzjDUNPHgQWJdtMAaDU= gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce/go.mod h1:5AcXVHNjg+BDxry382+8OKon8SEWiKktQR07RKPsv1c= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= diff --git a/iniData.exe b/iniData similarity index 63% rename from iniData.exe rename to iniData index ef97ab0..69b8f44 100755 Binary files a/iniData.exe and b/iniData differ diff --git a/iniDataForLinux b/iniDataForLinux index 8e8c8a7..6992c85 100755 Binary files a/iniDataForLinux and b/iniDataForLinux differ diff --git a/main.go b/main.go index 9e7967c..7664275 100644 --- a/main.go +++ b/main.go @@ -10,7 +10,6 @@ import ( "io" "log" "os" - "os/signal" "path" "path/filepath" "strconv" @@ -24,6 +23,7 @@ import ( "github.com/sirupsen/logrus" "golang.org/x/crypto/ssh" "gopkg.in/gomail.v2" + "gopkg.in/natefinch/lumberjack.v2" "gorm.io/driver/sqlserver" _ "gorm.io/driver/sqlserver" "gorm.io/gorm" @@ -32,7 +32,6 @@ import ( /* var ( - applogger *logrus.Logger redisClient *redis.Client executableDir string @@ -50,23 +49,21 @@ var ( zipPath = "RawData/Zip/" txtPath = "RawData/Txt/" logPath = "logs/" - batchSize = 5000 //提交数据 - insertSize = 500 //一次性入库 - cSize = 10 //入库协程数 - ) */ + var ( //正式环境 + applogger *logrus.Logger redisClient *redis.Client executableDir string redisAddress = "r-bp11564d96842414128.redis.rds.aliyuncs.com:6379" redisPassword = "3Nsb4Pmsl9bcLs24mL12l" redisDB = 233 - sftpAddress = "10.11.0.63:22" - sftpUser = "wyeth2018" - sftpPassword = "nYydNHOu" - sftpDir = "/wyeth2018/sephora_sms" + sftpAddress = "esftp.sephora.com.cn:20981" + sftpUser = "CHN-CRMTOWemedia-wemedia" + sftpPassword = "uoWdMHEv39ZFjiOg" + sftpDir = "/CN-CRMTOWemedia/SMS" dbAddress = "rm-bp16l424ln96q1ouk.sqlserver.rds.aliyuncs.com:3433" dbUser = "sephora_sms" dbPassword = "5ORiiLmgkniC0EqF" @@ -74,10 +71,6 @@ var ( //正式环境 zipPath = "RawData/Zip/" txtPath = "RawData/Txt/" logPath = "logs/" - batchSize = 5000 //提交数据 - insertSize = 500 //一次性入库 - cSize = 10 //入库协程数 - ) type Batch struct { @@ -143,20 +136,6 @@ func init() { if err != nil { log.Fatal(err) } - applogger = logrus.New() - logFile := filepath.Join(executableDir, logPath, "download.log") - _, err = os.Stat(logFile) - if os.IsNotExist(err) { - if _, err := os.Create(logFile); err != nil { - log.Fatal(err) - } - } - f, err := os.OpenFile(logFile, os.O_APPEND|os.O_WRONLY, 0666) - if err != nil { - log.Fatal(err) - } - //defer f.Close() - applogger.SetOutput(f) redisClient = redis.NewClient(&redis.Options{ Addr: redisAddress, @@ -166,33 +145,41 @@ func init() { } func main() { - // 创建一个channel用于接收信号 - signalChan := make(chan os.Signal, 1) + // 打开一个文件作为锁 + lockFile, err := os.OpenFile(filepath.Join(executableDir, ".lock"), os.O_CREATE|os.O_RDWR, 0666) + if err != nil { + fmt.Println("打开锁文件失败:", err) + os.Exit(1) + } + defer lockFile.Close() + // 尝试获取文件的独占锁 + err = syscall.Flock(int(lockFile.Fd()), syscall.LOCK_EX|syscall.LOCK_NB) + if err != nil { + fmt.Println("程序已经在运行,本程序无法同时运行多个") + os.Exit(1) + } + applogger = logrus.New() + logPath := filepath.Join(executableDir, "logs") + logFileName := "sms_processing_" + time.Now().Format("2006-01-02") + ".log" + logFileHook := &lumberjack.Logger{ + Filename: filepath.Join(logPath, logFileName), + } + applogger.SetOutput(logFileHook) - // 注册SIGINT, SIGTERM信号处理函数 - signal.Notify(signalChan, os.Interrupt, syscall.SIGTERM) + go downloadDecompression() // 启动立即执行一次 - // 启动goroutine等待信号的到来 - go func() { - for { - select { - case <-signalChan: - // 当信号到来时,执行特定的代码 - fmt.Println("程序已经在运行,本程序无法同时运行多个") - os.Exit(1) - } - } - }() - - ticker := time.NewTicker(time.Minute) // 创建一个定时器,每隔一分钟触发一次任务函数 - tickCount := 1 //记录循环次数 - defer ticker.Stop() // 延迟关闭定时器,确保所有的定时器事件都被处理完毕 + ticker := time.NewTicker(60 * time.Minute) // 创建一个定时器,每隔一分钟触发一次任务函数 + tickCount := 1 //记录循环次数 + defer ticker.Stop() // 延迟关闭定时器,确保所有的定时器事件都被处理完毕 // 循环处理任务 for { select { case <-ticker.C: // 定时器触发时执行的任务函数 - fmt.Printf("尝试第%d执行....\n", tickCount) + logFileName := "sms_processing_" + time.Now().Format("2006-01-02") + ".log" + logFileHook.Filename = filepath.Join(logPath, logFileName) + fmt.Printf("尝试第%d此执行....\n", tickCount) + applogger.Info(fmt.Sprintf("尝试第%d此执行....\n", tickCount)) go downloadDecompression() // 在新协程中异步执行 tickCount++ } @@ -201,12 +188,13 @@ func main() { func downloadDecompression() { if exists, _ := redisClient.Exists("iniDataStatus").Result(); exists == 1 { - fmt.Println("批次执行中,跳过本次") + fmt.Print("上一批次执行中,跳过本次\n") } else { // 写入执行中标记 err := redisClient.Set("iniDataStatus", 1, 0).Err() if err != nil { - fmt.Println("写入标记失败:%d", err) + fmt.Printf("写入任务执行中标记失败:%s\n", err.Error()) + applogger.Info("写入任务执行中标记失败:%v", err) } // Connect to SFTP server sshConfig := &ssh.ClientConfig{ @@ -218,25 +206,29 @@ func downloadDecompression() { } sshClient, err := ssh.Dial("tcp", sftpAddress, sshConfig) if err != nil { - panic(err) + fmt.Printf("写入任务执行中标记失败:%s\n", err.Error()) + applogger.Info("写入任务执行中标记失败:%v", err) } sftpClient, err := sftp.NewClient(sshClient) if err != nil { - panic(err) + fmt.Printf("写入任务执行中标记失败:%s\n", err.Error()) + applogger.Info("写入任务执行中标记失败:%v", err) } defer sftpClient.Close() files, err := sftpClient.ReadDir(sftpDir) if err != nil { - applogger.Fatalf("sftp目录不存在: %v", err) + fmt.Printf("sftp目录不存在: 错误信息:%s,\n", err.Error()) + applogger.Info("sftp目录不存在: 错误信息:%v", err) } it := 1 fmt.Printf("共%d个文件\n", len(files)) for _, file := range files { - fmt.Printf("第%d个文件\n", it) + fmt.Printf("第%d个文件处理中\n", it) it++ // Check if file has been downloaded before fileKey := fmt.Sprintf("downloaded:%s", file.Name()) if exists, _ := redisClient.Exists(fileKey).Result(); exists == 1 { + fmt.Println("跳过已处理过的文件:" + file.Name()) continue } @@ -245,27 +237,27 @@ func downloadDecompression() { // Download file srcFile, err := sftpClient.Open(path.Join(sftpDir, file.Name())) if err != nil { - applogger.Fatalf("Failed to download file: %v", err) + applogger.Info("打开sftp文件失败,文件名:%s,错误信息: %v", file.Name(), err) continue } defer srcFile.Close() dstFile, err := os.Create(path.Join(executableDir, zipPath, file.Name())) if err != nil { - applogger.Fatalf("Failed to download file: %v", err) + applogger.Info("创建本地文件失败,文件名: %s,错误信息:%v", file.Name(), err) continue } defer dstFile.Close() if _, err := io.Copy(dstFile, srcFile); err != nil { - applogger.Fatalf("Failed to download file: %v", err) + applogger.Info("下载文件失败,文件名: %s,错误信息:%v", file.Name(), err) continue } - fmt.Println("下载完成(数据包):" + file.Name()) - applogger.Info("下载完成(数据包):%d", file.Name()) + fmt.Printf("%s(数据包):下载完成\n", file.Name()) + applogger.Info(fmt.Sprintf("%s(数据包):下载完成", file.Name())) // Unzip file zipReader, err := zip.OpenReader(path.Join(executableDir, zipPath, file.Name())) if err != nil { - applogger.Fatalf("Failed to download file: %v", err) + applogger.Info("解压文件失败: 文件名:%s,错误信息:%v", file.Name(), err) continue } defer zipReader.Close() @@ -280,7 +272,7 @@ func downloadDecompression() { continue } if err != nil || zipFileReader == nil { - applogger.Fatalf("Failed to open zip file: %v", err) + applogger.Info("Failed to open zip file: %v", err) fmt.Print("压缩文件处理结束") continue } @@ -288,50 +280,46 @@ func downloadDecompression() { // Create the file unzipFile, err := os.Create(path.Join(executableDir, txtPath, zipFile.Name)) if err != nil { - - applogger.Fatalf("Failed to create unzip file: %v", err) + applogger.Info("创建压缩后的文件失败,文件名:%s,错误信息: %v", zipFile.Name, err) continue } defer unzipFile.Close() // Write the unzip data to the file _, err = io.Copy(unzipFile, zipFileReader) if err != nil { - applogger.Fatalf("Failed to write unzip data to file %v", err) + applogger.Info("文件解压失败,文件名:%s,错误信息: %v", zipFileReader, err) continue } - applogger.Info("解压完成(数据包):%d", zipFile.Name) - fmt.Print("解压完成(数据包):" + zipFile.Name) + applogger.Info(fmt.Sprintf("%s(数据包)解压完成", zipFile.Name)) + fmt.Printf("%s(数据包)解压完成\n", zipFile.Name) batchDataInsert(zipFile.Name) - applogger.Info("入库完成(数据包):%d", zipFile.Name) - fmt.Print("入库完成(数据包)::" + zipFile.Name) + } } else if filepath.Ext(file.Name()) == ".txt" { srcFile, err := sftpClient.Open(path.Join(sftpDir, file.Name())) if err != nil { - applogger.Fatalf("sftp目录不存在: %v", err) + applogger.Info("打开sftp文件失败,文件名:%s,错误信息: %v", file.Name(), err) continue } defer srcFile.Close() dstFile, err := os.Create(path.Join(executableDir, txtPath, file.Name())) if err != nil { - applogger.Fatalf("Failed to download file: %v", err) + applogger.Info("创建本地文件失败,文件名: %s,错误信息:%v", file.Name(), err) continue } defer dstFile.Close() if _, err := io.Copy(dstFile, srcFile); err != nil { - applogger.Fatalf("Failed to download file: %v", err) + applogger.Info("下载文件失败,文件名: %s,错误信息:%v", file.Name(), err) continue } - fmt.Printf("下载完成(批次文件):%s \n", file.Name()) - applogger.Info("下载完成(批次文件):%d", file.Name()) + fmt.Printf("%s(批次文件)下载完成 \n", file.Name()) + applogger.Info(fmt.Sprintf("%s(批次文件)下载完成", file.Name())) batchInsert(file.Name()) - fmt.Printf("入库完成(批次文件)%s \n", file.Name()) - - applogger.Info("入库完成(批次文件):%d\n", file.Name()) } err = redisClient.Set(fileKey, 1, 0).Err() //写入下载、解压、入库完成标记 if err != nil { - fmt.Println("写入标记失败:%", err) + fmt.Printf("写入文件处理完成标记失败文件名:%s,错误信息:%v\n", file.Name(), err) + applogger.Info("写入文件处理完成标记失败文件名:%s,错误信息:v%\n", file.Name(), err) } } redisClient.Del("iniDataStatus") //删除任务执行中标记 @@ -343,222 +331,224 @@ func downloadDecompression() { func batchInsert(fileName string) { //fmt.Print("批次处理开始") start := time.Now() - db, err := connectToDB() - if err != nil { - panic("failed to connect database") - } + db, _ := connectToDB() file, err := os.Open(path.Join(executableDir, txtPath, fileName)) - defer file.Close() if err != nil { - panic("failed to open file") - applogger.Fatalf("ailed to open file: %v", fileName) - } - reader := csv.NewReader(bufio.NewReader(file)) - reader.Read() - batchRows := 0 - for { - record, err := reader.Read() - if err != nil { - break - } - communicationChannelID, _ := strconv.ParseUint(record[0], 10, 32) - TargetsMember, _ := strconv.ParseUint(record[2], 10, 32) - templateID, _ := strconv.ParseUint(record[3], 10, 32) - status := uint(1) + fmt.Printf("文件打开失败文件名:%s,错误信息%v\n", fileName, err) + applogger.Info("文件打开失败文件名:%s,错误信息%v", fileName, err) + } else { + defer file.Close() + reader := csv.NewReader(bufio.NewReader(file)) + reader.Read() + batchRows := 0 + for { + record, err := reader.Read() + if err != nil { + break + } + communicationChannelID, _ := strconv.ParseUint(record[0], 10, 32) + TargetsMember, _ := strconv.ParseUint(record[2], 10, 32) + templateID, _ := strconv.ParseUint(record[3], 10, 32) + status := uint(1) - batch := Batch{ - CommunicationChannelID: uint(communicationChannelID), - CommunicationName: record[1], - TargetsMember: uint(TargetsMember), - TemplateID: uint(templateID), - Content: record[4], - Status: status, - DataFileName: fileName, + batch := Batch{ + CommunicationChannelID: uint(communicationChannelID), + CommunicationName: record[1], + TargetsMember: uint(TargetsMember), + TemplateID: uint(templateID), + Content: record[4], + Status: status, + DataFileName: fileName, + } + db.Create(&batch) + batchRows++ } - db.Create(&batch) - batchRows++ + time.Sleep(time.Second) + elapsed := time.Since(start) + subject := "丝芙兰批次文件处理完成" + body := "批次文件:" + fileName + ";\n批次数:" + strconv.Itoa(batchRows) + ";\n处理完成,请前往管理平台查看处理。" + SendEmail(subject, body) //发送邮件 + fmt.Printf("%s(批次文件)入库完成 \n", fileName) + applogger.Info(fmt.Sprintf("%s(批次文件)入库完成", fileName)) + fmt.Printf("执行时间%s 插入批次次数:%d\n\n\n", elapsed, batchRows) + applogger.Info(fmt.Sprintf("执行时间%s 插入批次次数:%d", elapsed, batchRows)) } - time.Sleep(time.Second) - - elapsed := time.Since(start) - subject := "丝芙兰批次文件处理完成" - body := "批次文件:" + fileName + ";\n批次数:" + strconv.Itoa(batchRows) + ";\n处理完成,请前往管理平台查看处理。" - err = SendEmail(subject, body) //发送邮件 - if err != nil { - applogger.Info("邮件发送失:%d", err) - fmt.Print(err) - } - fmt.Printf("执行时间%s\n插入批次次数:%d\n", elapsed, batchRows) } func batchDataInsert(fileName string) { - //fmt.Print("批次数据处理开始") start := time.Now() - db, err := connectToDB() - if err != nil { - panic("failed to connect database") - } - // Insert data in batches using multiple goroutines - var wg sync.WaitGroup - dataBatchChan := make(chan []BatcheData, cSize) - for i := 0; i < cSize; i++ { - wg.Add(1) - go func() { - for batch := range dataBatchChan { - retryCount := 0 - for { - if err := db.CreateInBatches(batch, insertSize).Error; err != nil { - if retryCount >= 5 { - panic(err) - } - fmt.Printf("Insert failed, retrying in %v seconds...\n", 2*retryCount) - time.Sleep(time.Duration(2*retryCount) * time.Second) - retryCount++ - } else { - break - } - } - } - wg.Done() - }() - } - // Open file file, err := os.Open(path.Join(executableDir, txtPath, fileName)) if err != nil { - panic(err) - } - defer file.Close() + fmt.Printf("文件打开失败,文件名:%s,错误信息%v\n", fileName, err) + applogger.Info("文件打开失败,文件名:%s,错误信息%v", fileName, err) + } else { + defer file.Close() + db, _ := connectToDB() + // Insert data in batches using multiple goroutines + batchSize := 5000 //提交数据 + insertSize := 500 //一次性入库 + insertChanSize := 50 //通道缓冲数 + goSize := 150 //协程数 + var wg sync.WaitGroup + dataBatchChan := make(chan []BatcheData, insertChanSize) + for i := 0; i < goSize; i++ { + wg.Add(1) + go func() { + for batch := range dataBatchChan { + retryCount := 0 + for { + if err := db.CreateInBatches(batch, insertSize).Error; err != nil { + if retryCount >= 5 { + panic(err) + } + fmt.Printf("Insert failed, retrying in %v seconds...\n", 2*retryCount) + time.Sleep(time.Duration(2*retryCount) * time.Second) + retryCount++ + } else { + break + } - // Create and initialize hashset - hs := make(map[string]bool) - // Prepare batch data - dataBatch := make([]BatcheData, 0, batchSize) - dataBatchDuplicate := make([]BatchDataDuplicateLog, 0, batchSize) - // Parse file line by line and insert data in batches - scanner := bufio.NewScanner(file) - scanner.Split(bufio.ScanLines) - scanner.Scan() // skip first line - bi := 0 - duplicateCount := make(map[string]int) - insertsCount := make(map[string]int) - var count int - - for scanner.Scan() { - line := scanner.Text() - row, err := csv.NewReader(strings.NewReader(line)).Read() - if err != nil { - panic(err) + } + } + wg.Done() + }() } - reservedFields := map[string]string{ //合并个性化字段 - "ReservedField1": row[5], - "ReservedField2": row[6], - "ReservedField3": row[7], - "ReservedField4": row[8], - "ReservedField5": row[9], - "DataFileName": fileName, - } - reservedFieldsJson, err := json.Marshal(reservedFields) //个性化字段转json - if err != nil { - panic(err) - } - // Check if record exists in hashset - key := fmt.Sprintf("%s-%s", row[2], row[3]) - if _, exists := hs[key]; exists { - bi++ - // Increment duplicate count - if _, ok := duplicateCount[row[2]]; !ok { - duplicateCount[row[2]] = 0 + // Create and initialize hashset + hs := make(map[string]bool) + // Prepare batch data + dataBatch := make([]BatcheData, 0, batchSize) + dataBatchDuplicate := make([]BatchDataDuplicateLog, 0, batchSize) + // Parse file line by line and insert data in batches + scanner := bufio.NewScanner(file) + scanner.Split(bufio.ScanLines) + scanner.Scan() // skip first line + bi := 0 + duplicateCount := make(map[string]int) + insertsCount := make(map[string]int) + var count int + //ci := 1 + for scanner.Scan() { + line := scanner.Text() + row, err := csv.NewReader(strings.NewReader(line)).Read() + if err != nil { + fmt.Printf("文件按行读取失败,文件名:%s,错误信息%v\n", fileName, err) + applogger.Info("文件按行读取失败,文件名:%s,错误信息%v", fileName, err) + continue } - duplicateCount[row[2]]++ + reservedFields := map[string]string{ //合并个性化字段 + "ReservedField1": row[5], + "ReservedField2": row[6], + "ReservedField3": row[7], + "ReservedField4": row[8], + "ReservedField5": row[9], + "DataFileName": fileName, + } + reservedFieldsJson, err := json.Marshal(reservedFields) //个性化字段转json + if err != nil { + fmt.Printf("个性化字段合并失败,文件名:%s,错误信息%v\n", fileName, err) + applogger.Info("个性化字段合并失败,文件名:%s,错误信息%v", fileName, err) + continue + } + // Check if record exists in hashset + key := fmt.Sprintf("%s-%s", row[2], row[3]) + if _, exists := hs[key]; exists { //如果批次数据重复 + bi++ + // Increment duplicate count + if _, ok := duplicateCount[row[2]]; !ok { + duplicateCount[row[2]] = 0 + } + duplicateCount[row[2]]++ + dataBatchDuplicate = append(dataBatchDuplicate, BatchDataDuplicateLog{ + CommunicationChannelID: row[2], + Mobile: row[3], + FullName: row[4], + ReservedField: string(reservedFieldsJson), + }) + if len(dataBatchDuplicate) >= batchSize { + err = db.CreateInBatches(dataBatchDuplicate, insertSize).Error + if err != nil { + fmt.Printf("插入重复数据失败,文件名:%s,错误信息%v\n", fileName, err) + applogger.Info("插入重复数据失败,文件名:%s,错误信息%v", fileName, err) + } else { + dataBatchDuplicate = make([]BatchDataDuplicateLog, 0, batchSize) + } - dataBatchDuplicate = append(dataBatchDuplicate, BatchDataDuplicateLog{ + } + continue + } + // Add record to hashset + hs[key] = true + + dataBatch = append(dataBatch, BatcheData{ CommunicationChannelID: row[2], Mobile: row[3], FullName: row[4], ReservedField: string(reservedFieldsJson), }) - if len(dataBatchDuplicate) >= batchSize { - err = db.CreateInBatches(dataBatchDuplicate, insertSize).Error - if err != nil { - panic(err) - } - - dataBatchDuplicate = make([]BatchDataDuplicateLog, 0, batchSize) + if len(dataBatch) >= batchSize { + dataBatchChan <- dataBatch + dataBatch = make([]BatcheData, 0, batchSize) + //fmt.Print("提交通次数" + strconv.Itoa(ci)) + //ci++ } - - continue + if _, ok := insertsCount[row[2]]; !ok { + insertsCount[row[2]] = 0 + } + insertsCount[row[2]]++ + count++ } - // Add record to hashset - hs[key] = true - - dataBatch = append(dataBatch, BatcheData{ - CommunicationChannelID: row[2], - Mobile: row[3], - FullName: row[4], - ReservedField: string(reservedFieldsJson), - }) - - if len(dataBatch) >= batchSize { + if len(dataBatch) > 0 { dataBatchChan <- dataBatch dataBatch = make([]BatcheData, 0, batchSize) + //fmt.Print("文件读取完成,最后一批提交至通道") } - if _, ok := insertsCount[row[2]]; !ok { - insertsCount[row[2]] = 0 + + close(dataBatchChan) + + wg.Wait() //所有入库全部完成 + + //插入批次处理信息 + bpi := []BatchProcessingInformation{} + for key, value := range duplicateCount { + bpi = append(bpi, BatchProcessingInformation{ + CommunicationChannelID: key, + RepeatTargetsMember: value, + InsertsTargetsMember: insertsCount[key], + DataFileName: fileName, + }) } - insertsCount[row[2]]++ - count++ - } - - if len(dataBatch) > 0 { - dataBatchChan <- dataBatch - dataBatch = make([]BatcheData, 0, batchSize) - fmt.Print("文件读取完成,最后一批提交至通道") - } - - close(dataBatchChan) - - wg.Wait() //所有入库全部完成 - - //插入批次处理信息 - bpi := []BatchProcessingInformation{} - for key, value := range duplicateCount { - bpi = append(bpi, BatchProcessingInformation{ - CommunicationChannelID: key, - RepeatTargetsMember: value, - InsertsTargetsMember: insertsCount[key], - DataFileName: fileName, - }) - } - err = db.CreateInBatches(bpi, insertSize).Error - if err != nil { - panic(err) - } - - //插入批此重复数据 - if len(dataBatchDuplicate) > 0 { - err = db.CreateInBatches(dataBatchDuplicate, insertSize).Error + err = db.CreateInBatches(bpi, insertSize).Error if err != nil { - panic(err) + fmt.Printf("插入批次处理信息失败,文件名:%s,错误信息%v\n", fileName, err) + applogger.Info("插入批次处理信息失败,文件名:%s,错误信息%v", fileName, err) } - dataBatchDuplicate = make([]BatchDataDuplicateLog, 0, batchSize) + + //插入批此重复数据 + if len(dataBatchDuplicate) > 0 { + err = db.CreateInBatches(dataBatchDuplicate, insertSize).Error + if err != nil { + fmt.Printf("插入重复数据失败,文件名:%s,错误信息%v\n", fileName, err) + applogger.Info("插入重复数据失败,文件名:%s,错误信息%v", fileName, err) + } else { + dataBatchDuplicate = nil + } + } + + elapsed := time.Since(start) + + //发送提醒邮件 + subject := "丝芙兰数据包处理完成" + body := "数据包:" + fileName + ";\n总数:" + strconv.Itoa(count+bi) + ";\n过滤重复数:" + strconv.Itoa(bi) + ";\n过滤后总数:" + strconv.Itoa(count) + ";\n处理完成,请前往管理平台查看处理。" + SendEmail(subject, body) //发送邮件 + applogger.Info(fmt.Sprintf(":%s(数据包) ,入库完成", fileName)) + fmt.Printf("%s(数据包) 入库完成\n", fileName) + fmt.Printf("执行时间:%s 插入数据:%d 过滤数:%d\n\n\n", elapsed, count, bi) + applogger.Info(fmt.Sprintf("执行时间:%s 插入数据:%d条 过滤数数:%d条", elapsed, count, bi)) } - - //发送提醒邮件 - subject := "丝芙兰数据包处理完成" - body := "数据包:" + fileName + ";\n总数:" + strconv.Itoa(count+bi) + ";\n过滤重复数:" + strconv.Itoa(bi) + ";\n过滤后总数:" + strconv.Itoa(count) + ";\n处理完成,请前往管理平台查看处理。" - err = SendEmail(subject, body) //发送邮件 - if err != nil { - applogger.Info("邮件发送失:%d", err) - fmt.Print(err) - } - - elapsed := time.Since(start) - - fmt.Printf("批次数据入库函数执行时间:%s;\n插入数:%d条\n过滤数:%d条\n", elapsed, count, bi) } func connectToDB() (*gorm.DB, error) { @@ -575,6 +565,8 @@ func connectToDB() (*gorm.DB, error) { break } if attempt >= maxAttempts { + fmt.Printf("数据库连接失败,错误信息%v\n", err) + applogger.Info("数据库连接失败,错误信息%v", err) return nil, err } time.Sleep(backoff) @@ -588,9 +580,11 @@ func SendEmail(subject string, body string) error { // 邮箱认证信息 smtpHost := "smtp.exmail.qq.com" smtpPort := 465 - from := "chejiulong@wemediacn.com" - password := "hdQfpav4x8LwbJPH" + from := "auto_system@wemediacn.com" + password := "yJJYYcy5NKx2y69r" + //to := []string{"chejiulong@wemediacn.com", "xiayujuan@wemediacn.com", "wangyuanbing@wemediacn.com", "tangweiqi@wemediacn.com"} to := []string{"chejiulong@wemediacn.com"} + // 邮件内容 m := gomail.NewMessage() m.SetHeader("From", from) @@ -602,8 +596,8 @@ func SendEmail(subject string, body string) error { d.TLSConfig = &tls.Config{InsecureSkipVerify: true} err := d.DialAndSend(m) if err != nil { - panic(err) - //return + fmt.Printf("邮件发送失败,错误信息%v\n", err) + applogger.Info("邮件发送失败,错误信息%v", err) } return nil }