diff --git a/go.mod b/go.mod index f5d101f..dde88ab 100644 --- a/go.mod +++ b/go.mod @@ -6,8 +6,8 @@ require ( github.com/go-redis/redis v6.15.9+incompatible github.com/pkg/sftp v1.13.5 github.com/sirupsen/logrus v1.9.0 - github.com/willf/bloom v2.0.3+incompatible golang.org/x/crypto v0.6.0 + gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df gorm.io/driver/sqlserver v1.4.2 gorm.io/gorm v1.24.5 ) @@ -21,8 +21,7 @@ require ( github.com/microsoft/go-mssqldb v0.20.0 // indirect github.com/onsi/ginkgo v1.16.5 // indirect github.com/onsi/gomega v1.26.0 // indirect - github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/stretchr/testify v1.8.1 // indirect - github.com/willf/bitset v1.1.11 // indirect golang.org/x/sys v0.5.0 // indirect + gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect ) diff --git a/go.sum b/go.sum index 37bd84c..85de0eb 100644 --- a/go.sum +++ b/go.sum @@ -77,8 +77,6 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0= github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= -github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= -github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= @@ -90,10 +88,6 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= -github.com/willf/bitset v1.1.11 h1:N7Z7E9UvjW+sGsEl7k/SJrvY2reP1A07MrGuCjIOjRE= -github.com/willf/bitset v1.1.11/go.mod h1:83CECat5yLh5zVOf4P1ErAgKA5UDvKtgyUABdr3+MjI= -github.com/willf/bloom v2.0.3+incompatible h1:QDacWdqcAUI1MPOwIQZRy9kOR7yxfyEmxX8Wdm2/JPA= -github.com/willf/bloom v2.0.3+incompatible/go.mod h1:MmAltL9pDMNTrvUkxdg0k0q5I0suxmuwp3KbyrZLOZ8= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= @@ -159,8 +153,12 @@ google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQ google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc h1:2gGKlE2+asNV9m7xrywl36YYNnBG5ZQ0r/BOOxqPpmk= +gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc/go.mod h1:m7x9LTH6d71AHyAX77c9yqWCCa3UKHcVEj9y7hAtKDk= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= +gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df h1:n7WqCuqOuCbNr617RXOY0AWRXxgwEyPp2z+p0+hgMuE= +gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df/go.mod h1:LRQQ+SO6ZHR7tOkpBDuZnXENFzX8qRjMDMyPD6BRkCw= gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce h1:+JknDZhAj8YMt7GC73Ei8pv4MzjDUNPHgQWJdtMAaDU= gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce/go.mod h1:5AcXVHNjg+BDxry382+8OKon8SEWiKktQR07RKPsv1c= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= diff --git a/main.go b/main.go index 5984b21..b81c939 100644 --- a/main.go +++ b/main.go @@ -3,6 +3,7 @@ package main import ( "archive/zip" "bufio" + "crypto/tls" "encoding/csv" "encoding/json" "fmt" @@ -21,6 +22,7 @@ import ( "github.com/pkg/sftp" "github.com/sirupsen/logrus" "golang.org/x/crypto/ssh" + "gopkg.in/gomail.v2" "gorm.io/driver/sqlserver" _ "gorm.io/driver/sqlserver" "gorm.io/gorm" @@ -99,7 +101,6 @@ func (BatchDataDuplicateLog) TableName() string { } func init() { - // 获取可执行文件所在的路径 executablePath, err := os.Executable() if err != nil { @@ -125,6 +126,7 @@ func init() { if err != nil { log.Fatal(err) } + //defer f.Close() applogger.SetOutput(f) redisClient = redis.NewClient(&redis.Options{ @@ -194,16 +196,14 @@ func downloadDecompression() { if err != nil { applogger.Fatalf("Failed to read SFTP directory: %v", err) } - it := 1 - fmt.Printf("共%d个文件\n", len(files)) + //it := 1 + //fmt.Printf("共%d个文件\n", len(files)) for _, file := range files { - fmt.Printf("第%d个文件\n", it) - it++ + //fmt.Printf("第%d个文件\n", it) + //it++ // Check if file has been downloaded before fileKey := fmt.Sprintf("downloaded:%s", file.Name()) if exists, _ := redisClient.Exists(fileKey).Result(); exists == 1 { - applogger.Info("跳过已处理文件:: %v", file.Name()) - fmt.Println("跳过已处理文件:" + file.Name()) continue } @@ -228,6 +228,7 @@ func downloadDecompression() { continue } fmt.Println("下载完成(数据包):" + file.Name()) + applogger.Info("下载完成(数据包):%d", file.Name()) // Unzip file zipReader, err := zip.OpenReader(path.Join(executableDir, zipPath, file.Name())) if err != nil { @@ -265,8 +266,11 @@ func downloadDecompression() { applogger.Fatalf("Failed to write unzip data to file %v", err) continue } - applogger.Info("Successfully unzipped file %v", zipFile.Name) + applogger.Info("解压完成(数据包):%d", zipFile.Name) + fmt.Print("解压完成(数据包):" + zipFile.Name) batchDataInsert(zipFile.Name) + applogger.Info("入库完成(数据包):%d", zipFile.Name) + fmt.Print("入库完成(数据包)::" + zipFile.Name) } } else if filepath.Ext(file.Name()) == ".txt" { srcFile, err := sftpClient.Open(path.Join("/sftp/test", file.Name())) @@ -286,14 +290,15 @@ func downloadDecompression() { continue } fmt.Println("下载完成(批次文件):" + file.Name()) + applogger.Info("下载完成(批次文件):%d", file.Name()) batchInsert(file.Name()) - fmt.Println("入库完成(批次文件):" + file.Name()) + fmt.Println("入库完成(批次文件)" + file.Name()) + applogger.Info("入库完成(批次文件):%d", file.Name()) } err = redisClient.Set(fileKey, 1, 0).Err() //写入下载、解压、入库完成标记 if err != nil { fmt.Println("写入标记失败:%", err) } - } redisClient.Del("iniDataStatus") //删除任务执行中标记 } @@ -309,6 +314,7 @@ func batchInsert(fileName string) { panic("failed to connect database") } file, err := os.Open(path.Join(executableDir, txtPath, fileName)) + defer file.Close() if err != nil { panic("failed to open file") applogger.Fatalf("ailed to open file: %v", fileName) @@ -339,10 +345,16 @@ func batchInsert(fileName string) { batchRows++ } time.Sleep(time.Second) - elapsed := time.Since(start) - fmt.Printf("执行时间%s\n插入批次数%d", elapsed, batchRows) - fmt.Print("批次处理完成") + elapsed := time.Since(start) + subject := "丝芙兰批次文件处理完成" + body := "批次文件:" + fileName + ";\n批次数:" + strconv.Itoa(batchRows) + ";\n处理完成,请前往管理平台查看处理。" + err = SendEmail(subject, body) //发送邮件 + if err != nil { + applogger.Info("邮件发送失:%d", err) + fmt.Print(err) + } + fmt.Printf("执行时间%s\n插入批次次数:%d\n", elapsed, batchRows) } func batchDataInsert(fileName string) { @@ -383,12 +395,12 @@ func batchDataInsert(fileName string) { panic(err) } defer file.Close() + // Create and initialize hashset hs := make(map[string]bool) // Prepare batch data dataBatch := make([]BatcheData, 0, batchSize) dataBatchDuplicate := make([]BatchDataDuplicateLog, 0, batchSize) - // Parse file line by line and insert data in batches scanner := bufio.NewScanner(file) scanner.Split(bufio.ScanLines) @@ -397,8 +409,6 @@ func batchDataInsert(fileName string) { bi2 := 0 //var filtered []string duplicateCount := make(map[string]int) - //duplicateData := make(map[string]map[string][]BatcheData) - // Define counter to keep track of how many records are inserted var count int64 for scanner.Scan() { @@ -432,21 +442,6 @@ func batchDataInsert(fileName string) { } duplicateCount[row[2]]++ - // 按批次记录重复数据 - /* - if _, ok := duplicateData[row[2]]; !ok { - duplicateData[row[2]] = make(map[string][]BatcheData) - } - - if _, ok := duplicateData[row[2]][row[3]]; !ok { - duplicateData[row[2]][row[3]] = []BatcheData{} - } - duplicateData[row[2]][row[3]] = append(duplicateData[row[2]][row[3]], BatcheData{ - CommunicationChannelID: row[2], - Mobile: row[3], - FullName: row[4], - ReservedField: string(reservedFieldsJson), - })*/ dataBatchDuplicate = append(dataBatchDuplicate, BatchDataDuplicateLog{ CommunicationChannelID: row[2], Mobile: row[3], @@ -506,7 +501,7 @@ func batchDataInsert(fileName string) { panic(err) } - //插入批此重复信息 + //插入批此重复数据 if len(dataBatchDuplicate) > 0 { err = db.CreateInBatches(dataBatchDuplicate, insertSize).Error if err != nil { @@ -514,11 +509,16 @@ func batchDataInsert(fileName string) { } dataBatchDuplicate = make([]BatchDataDuplicateLog, 0, batchSize) } - + subject := "丝芙兰数据包处理完成" + body := "数据包:" + fileName + ";\n总数据(过滤后):" + strconv.FormatInt(count, 10) + ";\n处理完成,请前往管理平台查看处理。" + err = SendEmail(subject, body) //发送邮件 + if err != nil { + applogger.Info("邮件发送失:%d", err) + fmt.Print(err) + } elapsed := time.Since(start) fmt.Printf("批次数据入库函数执行时间:%s;\n插入数:%d条\n过滤数:%d条\n", elapsed, count, bi) - fmt.Println("入库完成(数据包):" + file.Name()) } func connectToDB() (*gorm.DB, error) { @@ -543,3 +543,32 @@ func connectToDB() (*gorm.DB, error) { } return db, nil } + +func SendEmail(subject string, body string) error { + // 邮箱认证信息 + smtpHost := "smtp.exmail.qq.com" + smtpPort := 465 + from := "chejiulong@wemediacn.com" + password := "hdQfpav4x8LwbJPH" + //from := "auto_system@wemediacn.com" + //password := "EJkp39HCajDhpWsx" + to := []string{"chejiulong@wemediacn.com", "99779212@qq.com", "wangyuanbing@wemediacn.com"} + + // 邮件内容 + m := gomail.NewMessage() + m.SetHeader("From", from) + m.SetHeader("To", to...) + m.SetHeader("Subject", subject) + m.SetBody("text/plain", body) + + // 邮件发送 + d := gomail.NewDialer(smtpHost, smtpPort, from, password) + d.TLSConfig = &tls.Config{InsecureSkipVerify: true} + err := d.DialAndSend(m) + if err != nil { + panic(err) + //return + } + + return nil +}