package main import ( "archive/zip" "bufio" "crypto/tls" "encoding/csv" "encoding/json" "fmt" "io" "log" "os" "path" "path/filepath" "strconv" "strings" "sync" "syscall" "time" "github.com/go-redis/redis" "github.com/pkg/sftp" "github.com/sirupsen/logrus" "golang.org/x/crypto/ssh" "gopkg.in/gomail.v2" "gorm.io/driver/sqlserver" _ "gorm.io/driver/sqlserver" "gorm.io/gorm" "gorm.io/gorm/logger" ) var ( applogger *logrus.Logger redisClient *redis.Client executableDir string redisAddress = "mysql5.weu.me:6379" redisPassword = "" redisDB = 1 sftpAddress = "192.168.10.86:49156" sftpUser = "demo" sftpPassword = "demo" dbAddress = "192.168.10.18:1433" dbUser = "sa" dbPassword = "Aa123123" dbName = "sephora" zipPath = "RawData/Zip/" txtPath = "RawData/Txt/" logPath = "logs/" batchSize = 5000 //提交数据 insertSize = 500 //一次性入库 cSize = 10 //入库协程数 ) type Batch struct { ID uint `gorm:"primary_key"` CommunicationChannelID uint CommunicationName string `gorm:"type:varchar(255)"` TargetsMember uint `gorm:"type:int"` TemplateID uint Content string `gorm:"type:text"` CreatedAt time.Time `gorm:"default:getdate()"` UpdatedAt time.Time `gorm:"default:getdate()"` Status uint `gorm:"type:int"` DataFileName string `gorm:"type:text"` } type BatcheData struct { ID uint `gorm:"primary_key"` CommunicationChannelID string `gorm:"column:communication_channel_id"` Mobile string `gorm:"column:mobile"` FullName string `gorm:"column:full_name"` ReservedField string `gorm:"column:reserved_field"` } func (BatcheData) TableName() string { return "batche_datas" } type BatchProcessingInformation struct { ID uint `gorm:"primaryKey;autoIncrement"` CommunicationChannelID string `gorm:"column:communication_channel_id"` RepeatTargetsMember int `gorm:"column:repeat_targets_member"` DataFileName string `gorm:"column:data_file_name"` } func (BatchProcessingInformation) TableName() string { return "batches_processing_informations" } type BatchDataDuplicateLog struct { ID int `gorm:"primaryKey;autoIncrement"` CommunicationChannelID string `gorm:"column:communication_channel_id"` Mobile string `gorm:"column:mobile"` FullName string `gorm:"column:full_name"` ReservedField string `gorm:"column:reserved_field"` } func (BatchDataDuplicateLog) TableName() string { return "batche_data_duplicate_logs" } func init() { // 获取可执行文件所在的路径 executablePath, err := os.Executable() if err != nil { log.Fatal("failed to get executable path: ", err) } executableDir = filepath.Dir(executablePath) //判断目录是否存在,不存在则创建 err = os.MkdirAll(filepath.Join(executableDir, zipPath), 0755) err = os.MkdirAll(filepath.Join(executableDir, txtPath), 0755) err = os.MkdirAll(filepath.Join(executableDir, logPath), 0755) if err != nil { log.Fatal(err) } applogger = logrus.New() logFile := filepath.Join(executableDir, logPath, "download.log") _, err = os.Stat(logFile) if os.IsNotExist(err) { if _, err := os.Create(logFile); err != nil { log.Fatal(err) } } f, err := os.OpenFile(logFile, os.O_APPEND|os.O_WRONLY, 0666) if err != nil { log.Fatal(err) } //defer f.Close() applogger.SetOutput(f) redisClient = redis.NewClient(&redis.Options{ Addr: redisAddress, Password: redisPassword, DB: redisDB, }) } func main() { // 打开一个文件作为锁 lockFile, err := os.OpenFile(".lock", os.O_CREATE|os.O_RDWR, 0666) if err != nil { fmt.Println("打开锁文件失败:", err) os.Exit(1) } defer lockFile.Close() // 尝试获取文件的独占锁 err = syscall.Flock(int(lockFile.Fd()), syscall.LOCK_EX|syscall.LOCK_NB) if err != nil { fmt.Println("程序已经在运行,本程序无法同时运行多个") os.Exit(1) } ticker := time.NewTicker(time.Minute) // 创建一个定时器,每隔一分钟触发一次任务函数 //tickCount := 1 //记录循环次数 defer ticker.Stop() // 延迟关闭定时器,确保所有的定时器事件都被处理完毕 // 循环处理任务 for { select { case <-ticker.C: // 定时器触发时执行的任务函数 //fmt.Printf("尝试第%d执行....\n", tickCount) go downloadDecompression() // 在新协程中异步执行 //tickCount++ } } } func downloadDecompression() { if exists, _ := redisClient.Exists("iniDataStatus").Result(); exists == 1 { //fmt.Println("批次执行中,跳过本次") } else { // 写入执行中标记 err := redisClient.Set("iniDataStatus", 1, 0).Err() if err != nil { fmt.Println("写入标记失败:%d", err) } // Connect to SFTP server sshConfig := &ssh.ClientConfig{ User: sftpUser, Auth: []ssh.AuthMethod{ ssh.Password(sftpPassword), }, HostKeyCallback: ssh.InsecureIgnoreHostKey(), } sshClient, err := ssh.Dial("tcp", sftpAddress, sshConfig) if err != nil { panic(err) } sftpClient, err := sftp.NewClient(sshClient) if err != nil { panic(err) } defer sftpClient.Close() files, err := sftpClient.ReadDir("/sftp/test") if err != nil { applogger.Fatalf("Failed to read SFTP directory: %v", err) } //it := 1 //fmt.Printf("共%d个文件\n", len(files)) for _, file := range files { //fmt.Printf("第%d个文件\n", it) //it++ // Check if file has been downloaded before fileKey := fmt.Sprintf("downloaded:%s", file.Name()) if exists, _ := redisClient.Exists(fileKey).Result(); exists == 1 { continue } if filepath.Ext(file.Name()) == ".zip" { //fmt.Println("下载开始(数据包):" + file.Name()) // Download file srcFile, err := sftpClient.Open(path.Join("/sftp/test", file.Name())) if err != nil { applogger.Fatalf("Failed to download file: %v", err) continue } defer srcFile.Close() dstFile, err := os.Create(path.Join(executableDir, zipPath, file.Name())) if err != nil { applogger.Fatalf("Failed to download file: %v", err) continue } defer dstFile.Close() if _, err := io.Copy(dstFile, srcFile); err != nil { applogger.Fatalf("Failed to download file: %v", err) continue } fmt.Println("下载完成(数据包):" + file.Name()) applogger.Info("下载完成(数据包):%d", file.Name()) // Unzip file zipReader, err := zip.OpenReader(path.Join(executableDir, zipPath, file.Name())) if err != nil { applogger.Fatalf("Failed to download file: %v", err) continue } defer zipReader.Close() //fmt.Println("压缩报文件数量:", len(zipReader.File)) for _, zipFile := range zipReader.File { zipFileReader, err := zipFile.Open() if strings.Contains(zipFile.Name, "__MACOSX/._") { //fmt.Print("系统文件.DS_Store,跳过处理", zipFile.Name) continue } else if filepath.Ext(zipFile.Name) != ".txt" { fmt.Print("文件类型不正确,跳过处理", zipFile.Name) continue } if err != nil || zipFileReader == nil { applogger.Fatalf("Failed to open zip file: %v", err) fmt.Print("压缩文件处理结束") continue } defer zipFileReader.Close() // Create the file unzipFile, err := os.Create(path.Join(executableDir, txtPath, zipFile.Name)) if err != nil { applogger.Fatalf("Failed to create unzip file: %v", err) continue } defer unzipFile.Close() // Write the unzip data to the file _, err = io.Copy(unzipFile, zipFileReader) if err != nil { applogger.Fatalf("Failed to write unzip data to file %v", err) continue } applogger.Info("解压完成(数据包):%d", zipFile.Name) fmt.Print("解压完成(数据包):" + zipFile.Name) batchDataInsert(zipFile.Name) applogger.Info("入库完成(数据包):%d", zipFile.Name) fmt.Print("入库完成(数据包)::" + zipFile.Name) } } else if filepath.Ext(file.Name()) == ".txt" { srcFile, err := sftpClient.Open(path.Join("/sftp/test", file.Name())) if err != nil { applogger.Fatalf("Failed to download file: %v", err) continue } defer srcFile.Close() dstFile, err := os.Create(path.Join(executableDir, txtPath, file.Name())) if err != nil { applogger.Fatalf("Failed to download file: %v", err) continue } defer dstFile.Close() if _, err := io.Copy(dstFile, srcFile); err != nil { applogger.Fatalf("Failed to download file: %v", err) continue } fmt.Println("下载完成(批次文件):" + file.Name()) applogger.Info("下载完成(批次文件):%d", file.Name()) batchInsert(file.Name()) fmt.Println("入库完成(批次文件)" + file.Name()) applogger.Info("入库完成(批次文件):%d\n", file.Name()) } err = redisClient.Set(fileKey, 1, 0).Err() //写入下载、解压、入库完成标记 if err != nil { fmt.Println("写入标记失败:%", err) } } redisClient.Del("iniDataStatus") //删除任务执行中标记 } } // 批次入库 func batchInsert(fileName string) { //fmt.Print("批次处理开始") start := time.Now() db, err := connectToDB() if err != nil { panic("failed to connect database") } file, err := os.Open(path.Join(executableDir, txtPath, fileName)) defer file.Close() if err != nil { panic("failed to open file") applogger.Fatalf("ailed to open file: %v", fileName) } reader := csv.NewReader(bufio.NewReader(file)) reader.Read() batchRows := 0 for { record, err := reader.Read() if err != nil { break } communicationChannelID, _ := strconv.ParseUint(record[0], 10, 32) TargetsMember, _ := strconv.ParseUint(record[2], 10, 32) templateID, _ := strconv.ParseUint(record[3], 10, 32) status := uint(1) batch := Batch{ CommunicationChannelID: uint(communicationChannelID), CommunicationName: record[1], TargetsMember: uint(TargetsMember), TemplateID: uint(templateID), Content: record[4], Status: status, DataFileName: fileName, } db.Create(&batch) batchRows++ } time.Sleep(time.Second) elapsed := time.Since(start) subject := "丝芙兰批次文件处理完成" body := "批次文件:" + fileName + ";\n批次数:" + strconv.Itoa(batchRows) + ";\n处理完成,请前往管理平台查看处理。" err = SendEmail(subject, body) //发送邮件 if err != nil { applogger.Info("邮件发送失:%d", err) fmt.Print(err) } fmt.Printf("执行时间%s\n插入批次次数:%d\n", elapsed, batchRows) } func batchDataInsert(fileName string) { //fmt.Print("批次数据处理开始") start := time.Now() db, err := connectToDB() if err != nil { panic("failed to connect database") } // Insert data in batches using multiple goroutines var wg sync.WaitGroup dataBatchChan := make(chan []BatcheData, cSize) for i := 0; i < cSize; i++ { wg.Add(1) go func() { for batch := range dataBatchChan { retryCount := 0 for { if err := db.CreateInBatches(batch, insertSize).Error; err != nil { if retryCount >= 5 { panic(err) } fmt.Printf("Insert failed, retrying in %v seconds...\n", 2*retryCount) time.Sleep(time.Duration(2*retryCount) * time.Second) retryCount++ } else { break } } } wg.Done() }() } // Open file file, err := os.Open(path.Join(executableDir, txtPath, fileName)) if err != nil { panic(err) } defer file.Close() // Create and initialize hashset hs := make(map[string]bool) // Prepare batch data dataBatch := make([]BatcheData, 0, batchSize) dataBatchDuplicate := make([]BatchDataDuplicateLog, 0, batchSize) // Parse file line by line and insert data in batches scanner := bufio.NewScanner(file) scanner.Split(bufio.ScanLines) scanner.Scan() // skip first line bi := 0 duplicateCount := make(map[string]int) var count int for scanner.Scan() { line := scanner.Text() row, err := csv.NewReader(strings.NewReader(line)).Read() if err != nil { panic(err) } reservedFields := map[string]string{ //合并个性化字段 "ReservedField1": row[5], "ReservedField2": row[6], "ReservedField3": row[7], "ReservedField4": row[8], "ReservedField5": row[9], "DataFileName": fileName, } reservedFieldsJson, err := json.Marshal(reservedFields) //个性化字段转json if err != nil { panic(err) } // Check if record exists in hashset key := fmt.Sprintf("%s-%s", row[2], row[3]) if _, exists := hs[key]; exists { bi++ //filtered = append(filtered, key) // Increment duplicate count if _, ok := duplicateCount[row[2]]; !ok { duplicateCount[row[2]] = 0 } duplicateCount[row[2]]++ dataBatchDuplicate = append(dataBatchDuplicate, BatchDataDuplicateLog{ CommunicationChannelID: row[2], Mobile: row[3], FullName: row[4], ReservedField: string(reservedFieldsJson), }) if len(dataBatchDuplicate) >= batchSize { err = db.CreateInBatches(dataBatchDuplicate, insertSize).Error if err != nil { panic(err) } dataBatchDuplicate = make([]BatchDataDuplicateLog, 0, batchSize) } continue } // Add record to hashset hs[key] = true dataBatch = append(dataBatch, BatcheData{ CommunicationChannelID: row[2], Mobile: row[3], FullName: row[4], ReservedField: string(reservedFieldsJson), }) if len(dataBatch) >= batchSize { dataBatchChan <- dataBatch dataBatch = make([]BatcheData, 0, batchSize) } count++ } if len(dataBatch) > 0 { dataBatchChan <- dataBatch dataBatch = make([]BatcheData, 0, batchSize) } close(dataBatchChan) wg.Wait() //插入批次处理信息 bpi := []BatchProcessingInformation{} for key, value := range duplicateCount { bpi = append(bpi, BatchProcessingInformation{ CommunicationChannelID: key, RepeatTargetsMember: value, DataFileName: fileName, }) } err = db.CreateInBatches(bpi, insertSize).Error if err != nil { panic(err) } //插入批此重复数据 if len(dataBatchDuplicate) > 0 { err = db.CreateInBatches(dataBatchDuplicate, insertSize).Error if err != nil { panic(err) } dataBatchDuplicate = make([]BatchDataDuplicateLog, 0, batchSize) } subject := "丝芙兰数据包处理完成" body := "数据包:" + fileName + ";\n总数:" + strconv.Itoa(count-bi) + ";\n过滤重复数:" + strconv.Itoa(bi) + ";\n过滤后总数:" + strconv.Itoa(count) + "\n处理完成,请前往管理平台查看处理。" err = SendEmail(subject, body) //发送邮件 if err != nil { applogger.Info("邮件发送失:%d", err) fmt.Print(err) } elapsed := time.Since(start) fmt.Printf("批次数据入库函数执行时间:%s;\n插入数:%d条\n过滤数:%d条\n", elapsed, count, bi) } func connectToDB() (*gorm.DB, error) { dsn := "sqlserver://" + dbUser + ":" + dbPassword + "@" + dbAddress + "?database=" + dbName + "&charset=utf8mb4" var db *gorm.DB var err error attempt := 1 maxAttempts := 5 backoff := time.Second logger := logger.Default.LogMode(logger.Silent) for { db, err = gorm.Open(sqlserver.Open(dsn), &gorm.Config{Logger: logger}) if err == nil { break } if attempt >= maxAttempts { return nil, err } time.Sleep(backoff) backoff *= 2 attempt++ } return db, nil } func SendEmail(subject string, body string) error { // 邮箱认证信息 smtpHost := "smtp.exmail.qq.com" smtpPort := 465 from := "chejiulong@wemediacn.com" password := "hdQfpav4x8LwbJPH" //from := "auto_system@wemediacn.com" //password := "EJkp39HCajDhpWsx" to := []string{"chejiulong@wemediacn.com", "99779212@qq.com", "wangyuanbing@wemediacn.com"} // 邮件内容 m := gomail.NewMessage() m.SetHeader("From", from) m.SetHeader("To", to...) m.SetHeader("Subject", subject) m.SetBody("text/plain", body) // 邮件发送 d := gomail.NewDialer(smtpHost, smtpPort, from, password) d.TLSConfig = &tls.Config{InsecureSkipVerify: true} err := d.DialAndSend(m) if err != nil { panic(err) //return } return nil }