diff --git a/go.mod b/go.mod index 35d7482..3523744 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/pkg/sftp v1.13.5 github.com/sirupsen/logrus v1.9.0 golang.org/x/crypto v0.7.0 + golang.org/x/time v0.3.0 gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df gopkg.in/natefinch/lumberjack.v2 v2.2.1 gorm.io/driver/mysql v1.4.7 diff --git a/go.sum b/go.sum index 47e6fe5..7b2ca26 100644 --- a/go.sum +++ b/go.sum @@ -96,6 +96,8 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68= +golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4= +golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= diff --git a/iniDataForLinux b/iniDataForLinux index 6018114..e8e27cb 100755 Binary files a/iniDataForLinux and b/iniDataForLinux differ diff --git a/iniDataForMacOs b/iniDataForMacOs index faf9b95..9cd545a 100755 Binary files a/iniDataForMacOs and b/iniDataForMacOs differ diff --git a/main.go b/main.go index cc10cc5..99fb917 100644 --- a/main.go +++ b/main.go @@ -4,6 +4,7 @@ import ( "archive/zip" "bufio" "bytes" + "context" "crypto/hmac" "crypto/sha256" "crypto/tls" @@ -32,6 +33,7 @@ import ( "github.com/pkg/sftp" "github.com/sirupsen/logrus" "golang.org/x/crypto/ssh" + "golang.org/x/time/rate" "gopkg.in/gomail.v2" "gopkg.in/natefinch/lumberjack.v2" "gorm.io/driver/mysql" @@ -401,7 +403,8 @@ func downloadDecompression() { fmt.Printf("共%d个文件\n", len(files)) sort.Sort(FileSorter(files)) - + //设置限速 + limiter := rate.NewLimiter(rate.Limit(rateLimiter*1024*1024), int(rateLimiter*1024*1024)) for _, file := range files { processingStatus := -1 fmt.Printf("第%d个文件处理中\n", it) @@ -432,11 +435,28 @@ func downloadDecompression() { continue } defer dstFile.Close() - if _, err := io.Copy(dstFile, srcFile); err != nil { - body := fmt.Sprintf("下载文件失败,文件名: %s,错误信息:%v", file.Name(), err) - applogger.Error(body) - SendEmail(subject, body) //发送邮件 - continue + buf := make([]byte, 4096) + for { + n, err := srcFile.Read(buf) + if err != nil && err != io.EOF { + body := fmt.Sprintf("下载文件失败,文件名: %s,错误信息:%v", file.Name(), err) + applogger.Error(body) + SendEmail(subject, body) //发送邮件 + break + } + if n == 0 { + break + } + + // 等待令牌桶中有足够的令牌可用 + limiter.WaitN(context.Background(), n) + + if _, err := dstFile.Write(buf[:n]); err != nil { + body := fmt.Sprintf("写入文件失败,文件名: %s,错误信息:%v", file.Name(), err) + applogger.Error(body) + SendEmail(subject, body) //发送邮件 + break + } } applogger.Info(fmt.Sprintf("%s(数据包)下载完成", file.Name())) // Unzip file @@ -499,11 +519,28 @@ func downloadDecompression() { continue } defer dstFile.Close() - if _, err := io.Copy(dstFile, srcFile); err != nil { - body := fmt.Sprintf("下载文件失败,文件名: %s,错误信息:%v", file.Name(), err) - applogger.Error(body) - SendEmail(subject, body) //发送邮件 - continue + buf := make([]byte, 4096) + for { + n, err := srcFile.Read(buf) + if err != nil && err != io.EOF { + body := fmt.Sprintf("下载文件失败,文件名: %s,错误信息:%v", file.Name(), err) + applogger.Error(body) + SendEmail(subject, body) //发送邮件 + break + } + if n == 0 { + break + } + + // 等待令牌桶中有足够的令牌可用 + limiter.WaitN(context.Background(), n) + + if _, err := dstFile.Write(buf[:n]); err != nil { + body := fmt.Sprintf("写入文件失败,文件名: %s,错误信息:%v", file.Name(), err) + applogger.Error(body) + SendEmail(subject, body) //发送邮件 + break + } } applogger.Info(fmt.Sprintf("%s(批次文件)下载完成", file.Name())) processingStatus = batchInsert(file.Name(), false, "") @@ -1187,6 +1224,7 @@ func iniConfi() { verifySignatureKey = "qZ6v1&H#Wjx+yRm2D@*sJF$tnfL83Ia" dataExpirationDays = 14 delDataSize = 60000 + rateLimiter = 1 case "prod": //fmt.Print("正式环境配置已生效\n") @@ -1217,6 +1255,7 @@ func iniConfi() { verifySignatureKey = "Lzxf6asgwFRTsANZ8gkXJnP3EmOyNO#&" dataExpirationDays = 14 delDataSize = 60000 + rateLimiter = 10 default: panic(fmt.Errorf("无效的运行模式: %s", env)) @@ -1306,6 +1345,7 @@ var ( //初始化变量 verifySignatureKey string dataExpirationDays int delDataSize int + rateLimiter uint ) type Batch struct {