增加SFTP下载限速机制,避免下载过快导致服务器网络丢包

This commit is contained in:
chejiulong 2023-04-07 15:11:29 +08:00
parent f4277ab184
commit 9e96fd10d1
5 changed files with 54 additions and 11 deletions

1
go.mod
View File

@ -8,6 +8,7 @@ require (
github.com/pkg/sftp v1.13.5 github.com/pkg/sftp v1.13.5
github.com/sirupsen/logrus v1.9.0 github.com/sirupsen/logrus v1.9.0
golang.org/x/crypto v0.7.0 golang.org/x/crypto v0.7.0
golang.org/x/time v0.3.0
gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df
gopkg.in/natefinch/lumberjack.v2 v2.2.1 gopkg.in/natefinch/lumberjack.v2 v2.2.1
gorm.io/driver/mysql v1.4.7 gorm.io/driver/mysql v1.4.7

2
go.sum
View File

@ -96,6 +96,8 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68= golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68=
golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4=
golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=

Binary file not shown.

Binary file not shown.

50
main.go
View File

@ -4,6 +4,7 @@ import (
"archive/zip" "archive/zip"
"bufio" "bufio"
"bytes" "bytes"
"context"
"crypto/hmac" "crypto/hmac"
"crypto/sha256" "crypto/sha256"
"crypto/tls" "crypto/tls"
@ -32,6 +33,7 @@ import (
"github.com/pkg/sftp" "github.com/pkg/sftp"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"golang.org/x/crypto/ssh" "golang.org/x/crypto/ssh"
"golang.org/x/time/rate"
"gopkg.in/gomail.v2" "gopkg.in/gomail.v2"
"gopkg.in/natefinch/lumberjack.v2" "gopkg.in/natefinch/lumberjack.v2"
"gorm.io/driver/mysql" "gorm.io/driver/mysql"
@ -401,7 +403,8 @@ func downloadDecompression() {
fmt.Printf("共%d个文件\n", len(files)) fmt.Printf("共%d个文件\n", len(files))
sort.Sort(FileSorter(files)) sort.Sort(FileSorter(files))
//设置限速
limiter := rate.NewLimiter(rate.Limit(rateLimiter*1024*1024), int(rateLimiter*1024*1024))
for _, file := range files { for _, file := range files {
processingStatus := -1 processingStatus := -1
fmt.Printf("第%d个文件处理中\n", it) fmt.Printf("第%d个文件处理中\n", it)
@ -432,11 +435,28 @@ func downloadDecompression() {
continue continue
} }
defer dstFile.Close() defer dstFile.Close()
if _, err := io.Copy(dstFile, srcFile); err != nil { buf := make([]byte, 4096)
for {
n, err := srcFile.Read(buf)
if err != nil && err != io.EOF {
body := fmt.Sprintf("下载文件失败,文件名: %s错误信息%v", file.Name(), err) body := fmt.Sprintf("下载文件失败,文件名: %s错误信息%v", file.Name(), err)
applogger.Error(body) applogger.Error(body)
SendEmail(subject, body) //发送邮件 SendEmail(subject, body) //发送邮件
continue break
}
if n == 0 {
break
}
// 等待令牌桶中有足够的令牌可用
limiter.WaitN(context.Background(), n)
if _, err := dstFile.Write(buf[:n]); err != nil {
body := fmt.Sprintf("写入文件失败,文件名: %s错误信息%v", file.Name(), err)
applogger.Error(body)
SendEmail(subject, body) //发送邮件
break
}
} }
applogger.Info(fmt.Sprintf("%s数据包下载完成", file.Name())) applogger.Info(fmt.Sprintf("%s数据包下载完成", file.Name()))
// Unzip file // Unzip file
@ -499,11 +519,28 @@ func downloadDecompression() {
continue continue
} }
defer dstFile.Close() defer dstFile.Close()
if _, err := io.Copy(dstFile, srcFile); err != nil { buf := make([]byte, 4096)
for {
n, err := srcFile.Read(buf)
if err != nil && err != io.EOF {
body := fmt.Sprintf("下载文件失败,文件名: %s错误信息%v", file.Name(), err) body := fmt.Sprintf("下载文件失败,文件名: %s错误信息%v", file.Name(), err)
applogger.Error(body) applogger.Error(body)
SendEmail(subject, body) //发送邮件 SendEmail(subject, body) //发送邮件
continue break
}
if n == 0 {
break
}
// 等待令牌桶中有足够的令牌可用
limiter.WaitN(context.Background(), n)
if _, err := dstFile.Write(buf[:n]); err != nil {
body := fmt.Sprintf("写入文件失败,文件名: %s错误信息%v", file.Name(), err)
applogger.Error(body)
SendEmail(subject, body) //发送邮件
break
}
} }
applogger.Info(fmt.Sprintf("%s批次文件下载完成", file.Name())) applogger.Info(fmt.Sprintf("%s批次文件下载完成", file.Name()))
processingStatus = batchInsert(file.Name(), false, "") processingStatus = batchInsert(file.Name(), false, "")
@ -1187,6 +1224,7 @@ func iniConfi() {
verifySignatureKey = "qZ6v1&H#Wjx+yRm2D@*sJF$tnfL83Ia" verifySignatureKey = "qZ6v1&H#Wjx+yRm2D@*sJF$tnfL83Ia"
dataExpirationDays = 14 dataExpirationDays = 14
delDataSize = 60000 delDataSize = 60000
rateLimiter = 1
case "prod": case "prod":
//fmt.Print("正式环境配置已生效\n") //fmt.Print("正式环境配置已生效\n")
@ -1217,6 +1255,7 @@ func iniConfi() {
verifySignatureKey = "Lzxf6asgwFRTsANZ8gkXJnP3EmOyNO#&" verifySignatureKey = "Lzxf6asgwFRTsANZ8gkXJnP3EmOyNO#&"
dataExpirationDays = 14 dataExpirationDays = 14
delDataSize = 60000 delDataSize = 60000
rateLimiter = 10
default: default:
panic(fmt.Errorf("无效的运行模式: %s", env)) panic(fmt.Errorf("无效的运行模式: %s", env))
@ -1306,6 +1345,7 @@ var ( //初始化变量
verifySignatureKey string verifySignatureKey string
dataExpirationDays int dataExpirationDays int
delDataSize int delDataSize int
rateLimiter uint
) )
type Batch struct { type Batch struct {