修改数据、sftp释放链接方式,采取手动释放,数据库设置连接池最大连接数

This commit is contained in:
chejiulong 2023-04-10 20:00:26 +08:00
parent 9e96fd10d1
commit dc6e64e0d1
3 changed files with 58 additions and 33 deletions

Binary file not shown.

Binary file not shown.

91
main.go
View File

@ -78,19 +78,17 @@ func main() {
iniLog() iniLog()
fmt.Print("查询批次状态...\n") fmt.Print("查询批次状态...\n")
queryBatchState() queryBatchState()
case <-notification.C: case <-notification.C:
iniLog() iniLog()
applogger.Info("开始清除历史数据") go delData()
delData()
notification.Reset(time.Until(nextNotificationTime())) notification.Reset(time.Until(nextNotificationTime()))
applogger.Info("清除历史数据完成")
} }
} }
} }
// 删除历史数据和历史重复数据 // 删除历史数据和历史重复数据
func delData() { func delData() {
applogger.Info("开始清除历史数据")
// 删除15天前的批次数据 // 删除15天前的批次数据
err := deleteOldData(&BatcheData{}, dataExpirationDays) err := deleteOldData(&BatcheData{}, dataExpirationDays)
handleError(err, "删除15天前的批次数据失败") handleError(err, "删除15天前的批次数据失败")
@ -98,7 +96,7 @@ func delData() {
// 删除15天前的批次数据重复日志 // 删除15天前的批次数据重复日志
err = deleteOldData(&BatchDataDuplicateLog{}, dataExpirationDays) err = deleteOldData(&BatchDataDuplicateLog{}, dataExpirationDays)
handleError(err, "删除15天前的批次数据重复日志失败") handleError(err, "删除15天前的批次数据重复日志失败")
applogger.Info("清除历史数据完成")
} }
// 删除15天前的数据 // 删除15天前的数据
@ -107,14 +105,17 @@ func deleteOldData(model interface{}, daysAgo int) error {
totalRowsAffected := int64(0) totalRowsAffected := int64(0)
db, err := connectToDB() db, err := connectToDB()
if err != nil { if err != nil {
applogger.Error("连接数据库失败: ", err)
return fmt.Errorf("连接数据库失败: %w", err) return fmt.Errorf("连接数据库失败: %w", err)
} }
threshold := time.Now().AddDate(0, 0, -daysAgo) threshold := time.Now().AddDate(0, 0, -daysAgo)
for { for {
tx := db.Begin() tx := db.Begin()
applogger.Info("开始事务")
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
applogger.Error("异常,回滚事务: ", r)
tx.Rollback() tx.Rollback()
} }
}() }()
@ -130,21 +131,26 @@ func deleteOldData(model interface{}, daysAgo int) error {
} }
if len(ids) == 0 { if len(ids) == 0 {
applogger.Info("没有找到符合条件的数据,退出循环")
break break
} }
result = tx.Where("id IN (?)", ids).Delete(model) result = tx.Where("id IN (?)", ids).Delete(model)
if result.Error != nil { if result.Error != nil {
applogger.Error("删除操作失败,回滚事务: ", result.Error)
tx.Rollback() tx.Rollback()
return result.Error return result.Error
} }
if err := tx.Commit().Error; err != nil { if err := tx.Commit().Error; err != nil {
applogger.Error("事务提交失败: ", err)
return err return err
} }
applogger.Info("事务提交成功")
totalRowsAffected += result.RowsAffected totalRowsAffected += result.RowsAffected
if len(ids) < batchSize { if len(ids) < batchSize {
applogger.Info("已处理所有符合条件的数据,退出循环")
break break
} }
} }
@ -153,6 +159,7 @@ func deleteOldData(model interface{}, daysAgo int) error {
modelName := reflect.TypeOf(model).Elem().Name() modelName := reflect.TypeOf(model).Elem().Name()
applogger.Info(fmt.Sprintf("执行删除%v用时%s成功删除数据%d", modelName, elapsed.String(), totalRowsAffected)) applogger.Info(fmt.Sprintf("执行删除%v用时%s成功删除数据%d", modelName, elapsed.String(), totalRowsAffected))
closeDb(db)
return nil return nil
} }
@ -392,7 +399,11 @@ func downloadDecompression() {
applogger.Error(body) applogger.Error(body)
SendEmail(subject, body) //发送邮件 SendEmail(subject, body) //发送邮件
} }
defer sftpClient.Close() defer func() {
sftpClient.Close()
sshClient.Close()
}()
files, err := sftpClient.ReadDir(sftpDir) files, err := sftpClient.ReadDir(sftpDir)
if err != nil { if err != nil {
body := fmt.Sprintf("sftp目录%s不存在, 错误信息:%v", sftpDir, err) body := fmt.Sprintf("sftp目录%s不存在, 错误信息:%v", sftpDir, err)
@ -403,7 +414,8 @@ func downloadDecompression() {
fmt.Printf("共%d个文件\n", len(files)) fmt.Printf("共%d个文件\n", len(files))
sort.Sort(FileSorter(files)) sort.Sort(FileSorter(files))
//设置限速
// 限制下载速度为 15MB/s
limiter := rate.NewLimiter(rate.Limit(rateLimiter*1024*1024), int(rateLimiter*1024*1024)) limiter := rate.NewLimiter(rate.Limit(rateLimiter*1024*1024), int(rateLimiter*1024*1024))
for _, file := range files { for _, file := range files {
processingStatus := -1 processingStatus := -1
@ -559,6 +571,7 @@ func downloadDecompression() {
} }
} }
// 批次信息入库
func batchInsert(fileName string, isLastCall bool, excludedFilename string) int { func batchInsert(fileName string, isLastCall bool, excludedFilename string) int {
start := time.Now() start := time.Now()
db, err := connectToDB() db, err := connectToDB()
@ -628,6 +641,7 @@ func batchInsert(fileName string, isLastCall bool, excludedFilename string) int
applogger.Info(fmt.Sprintf("%s批次文件入库完成", fileName)) applogger.Info(fmt.Sprintf("%s批次文件入库完成", fileName))
applogger.Info(fmt.Sprintf("%s批次文件执行时间%s 插入批次次数:%d", fileName, elapsed, batchRows)) applogger.Info(fmt.Sprintf("%s批次文件执行时间%s 插入批次次数:%d", fileName, elapsed, batchRows))
closeDb(db)
return 0 return 0
} }
@ -1002,9 +1016,11 @@ func batchDataInsert(fileName string, lastCallKeys map[string]bool, excludedFile
SendEmail(subject, body) //发送邮件 SendEmail(subject, body) //发送邮件
applogger.Info(fmt.Sprintf("%s数据包 入库完成", fileName)) applogger.Info(fmt.Sprintf("%s数据包 入库完成", fileName))
applogger.Info(fmt.Sprintf("%s数据包执行时间:%s 插入数据:%d条 过滤数数:%d条", fileName, elapsed, count, bi)) applogger.Info(fmt.Sprintf("%s数据包执行时间:%s 插入数据:%d条 过滤数数:%d条", fileName, elapsed, count, bi))
closeDb(db)
return 0 return 0
} else { //如果没有查询到批次信息,跳过处理 } else { //如果没有查询到批次信息,跳过处理
applogger.Error(fmt.Sprintf("未查询到批次数据,文件名:%s截取批次标识%s", fileName, dataFileName)) applogger.Error(fmt.Sprintf("未查询到批次数据,文件名:%s截取批次标识%s", fileName, dataFileName))
closeDb(db)
return -1 return -1
} }
} }
@ -1055,6 +1071,7 @@ func queryBatchState() {
handleError(fmt.Errorf("返回不为0"), fmt.Sprintf("查询批次状态失败:%s", string(jsonStr))) handleError(fmt.Errorf("返回不为0"), fmt.Sprintf("查询批次状态失败:%s", string(jsonStr)))
} }
} }
closeDb(db)
} }
func createUpdatesMap(retobj map[string]interface{}) map[string]interface{} { func createUpdatesMap(retobj map[string]interface{}) map[string]interface{} {
@ -1093,30 +1110,6 @@ func handleError(err error, errMsg string) {
} }
func connectToDB() (*gorm.DB, error) { func connectToDB() (*gorm.DB, error) {
/**
dsn := "sqlserver://" + dbUser + ":" + dbPassword + "@" + dbAddress + "?database=" + dbName + "&charset=utf8mb4"
var db *gorm.DB
var err error
attempt := 1
maxAttempts := 5
backoff := time.Second
logger := logger.Default.LogMode(logger.Error) //Silent、Error、Warn、Info
for {
db, err = gorm.Open(sqlserver.Open(dsn), &gorm.Config{Logger: logger, SkipDefaultTransaction: true})
if err == nil {
break
}
if attempt >= maxAttempts {
//applogger.Error(fmt.Sprintf("数据库连接失败,错误信息%v", err))
handleError(err, "数据库连接失败,错误信息")
return nil, err
}
time.Sleep(backoff)
backoff *= 2
attempt++
}
*/
dsn := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8mb4&parseTime=True&loc=Local", dbUser, dbPassword, dbAddress, dbPort, dbName) dsn := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8mb4&parseTime=True&loc=Local", dbUser, dbPassword, dbAddress, dbPort, dbName)
var db *gorm.DB var db *gorm.DB
var err error var err error
@ -1137,9 +1130,32 @@ func connectToDB() (*gorm.DB, error) {
backoff *= 2 backoff *= 2
attempt++ attempt++
} }
// 获取底层 *sql.DB 实例
sqlDB, err := db.DB()
if err != nil {
return nil, err
}
// 设置最大连接数
sqlDB.SetMaxOpenConns(dbMaxOpenConns)
// 设置最大空闲连接数
sqlDB.SetMaxIdleConns(dbMaxIdleConns)
// 设置空闲连接的最长生命周期
sqlDB.SetConnMaxLifetime(time.Duration(dbConnMaxLifetime) * time.Minute)
return db, nil return db, nil
} }
func closeDb(db *gorm.DB) {
sqlDB, err := db.DB()
if err != nil {
handleError(err, "关闭数据库连接失败")
return
}
sqlDB.Close()
}
func fileExists(filename string) bool { func fileExists(filename string) bool {
info, err := os.Stat(filename) info, err := os.Stat(filename)
if os.IsNotExist(err) { if os.IsNotExist(err) {
@ -1225,6 +1241,9 @@ func iniConfi() {
dataExpirationDays = 14 dataExpirationDays = 14
delDataSize = 60000 delDataSize = 60000
rateLimiter = 1 rateLimiter = 1
dbMaxOpenConns = 500
dbMaxIdleConns = 5
dbConnMaxLifetime = 10
case "prod": case "prod":
//fmt.Print("正式环境配置已生效\n") //fmt.Print("正式环境配置已生效\n")
@ -1253,9 +1272,12 @@ func iniConfi() {
token = "7100178600777091" //7100477930234217 token = "7100178600777091" //7100477930234217
lastCallPath = "RawData/LastCall" lastCallPath = "RawData/LastCall"
verifySignatureKey = "Lzxf6asgwFRTsANZ8gkXJnP3EmOyNO#&" verifySignatureKey = "Lzxf6asgwFRTsANZ8gkXJnP3EmOyNO#&"
dataExpirationDays = 14 dataExpirationDays = 1
delDataSize = 60000 delDataSize = 60000
rateLimiter = 10 rateLimiter = 15
dbMaxOpenConns = 500
dbMaxIdleConns = 5
dbConnMaxLifetime = 60
default: default:
panic(fmt.Errorf("无效的运行模式: %s", env)) panic(fmt.Errorf("无效的运行模式: %s", env))
@ -1346,6 +1368,9 @@ var ( //初始化变量
dataExpirationDays int dataExpirationDays int
delDataSize int delDataSize int
rateLimiter uint rateLimiter uint
dbMaxOpenConns int
dbMaxIdleConns int
dbConnMaxLifetime int
) )
type Batch struct { type Batch struct {