diff --git a/iniDataForLinux b/iniDataForLinux index e8e27cb..110a4e2 100755 Binary files a/iniDataForLinux and b/iniDataForLinux differ diff --git a/iniDataForMacOs b/iniDataForMacOs index 9cd545a..8002227 100755 Binary files a/iniDataForMacOs and b/iniDataForMacOs differ diff --git a/main.go b/main.go index 99fb917..1c21e81 100644 --- a/main.go +++ b/main.go @@ -78,19 +78,17 @@ func main() { iniLog() fmt.Print("查询批次状态...\n") queryBatchState() - case <-notification.C: iniLog() - applogger.Info("开始清除历史数据") - delData() + go delData() notification.Reset(time.Until(nextNotificationTime())) - applogger.Info("清除历史数据完成") } } } // 删除历史数据和历史重复数据 func delData() { + applogger.Info("开始清除历史数据") // 删除15天前的批次数据 err := deleteOldData(&BatcheData{}, dataExpirationDays) handleError(err, "删除15天前的批次数据失败") @@ -98,7 +96,7 @@ func delData() { // 删除15天前的批次数据重复日志 err = deleteOldData(&BatchDataDuplicateLog{}, dataExpirationDays) handleError(err, "删除15天前的批次数据重复日志失败") - + applogger.Info("清除历史数据完成") } // 删除15天前的数据 @@ -107,14 +105,17 @@ func deleteOldData(model interface{}, daysAgo int) error { totalRowsAffected := int64(0) db, err := connectToDB() if err != nil { + applogger.Error("连接数据库失败: ", err) return fmt.Errorf("连接数据库失败: %w", err) } threshold := time.Now().AddDate(0, 0, -daysAgo) for { tx := db.Begin() + applogger.Info("开始事务") defer func() { if r := recover(); r != nil { + applogger.Error("异常,回滚事务: ", r) tx.Rollback() } }() @@ -130,21 +131,26 @@ func deleteOldData(model interface{}, daysAgo int) error { } if len(ids) == 0 { + applogger.Info("没有找到符合条件的数据,退出循环") break } result = tx.Where("id IN (?)", ids).Delete(model) if result.Error != nil { + applogger.Error("删除操作失败,回滚事务: ", result.Error) tx.Rollback() return result.Error } if err := tx.Commit().Error; err != nil { + applogger.Error("事务提交失败: ", err) return err } + applogger.Info("事务提交成功") totalRowsAffected += result.RowsAffected if len(ids) < batchSize { + applogger.Info("已处理所有符合条件的数据,退出循环") break } } @@ -153,6 +159,7 @@ func deleteOldData(model interface{}, daysAgo int) error { modelName := reflect.TypeOf(model).Elem().Name() applogger.Info(fmt.Sprintf("执行删除%v用时%s成功删除数据%d", modelName, elapsed.String(), totalRowsAffected)) + closeDb(db) return nil } @@ -392,7 +399,11 @@ func downloadDecompression() { applogger.Error(body) SendEmail(subject, body) //发送邮件 } - defer sftpClient.Close() + defer func() { + sftpClient.Close() + sshClient.Close() + }() + files, err := sftpClient.ReadDir(sftpDir) if err != nil { body := fmt.Sprintf("sftp目录%s不存在, 错误信息:%v", sftpDir, err) @@ -403,7 +414,8 @@ func downloadDecompression() { fmt.Printf("共%d个文件\n", len(files)) sort.Sort(FileSorter(files)) - //设置限速 + + // 限制下载速度为 15MB/s limiter := rate.NewLimiter(rate.Limit(rateLimiter*1024*1024), int(rateLimiter*1024*1024)) for _, file := range files { processingStatus := -1 @@ -559,6 +571,7 @@ func downloadDecompression() { } } +// 批次信息入库 func batchInsert(fileName string, isLastCall bool, excludedFilename string) int { start := time.Now() db, err := connectToDB() @@ -628,6 +641,7 @@ func batchInsert(fileName string, isLastCall bool, excludedFilename string) int applogger.Info(fmt.Sprintf("%s(批次文件)入库完成", fileName)) applogger.Info(fmt.Sprintf("%s(批次文件)执行时间%s 插入批次次数:%d", fileName, elapsed, batchRows)) + closeDb(db) return 0 } @@ -1002,9 +1016,11 @@ func batchDataInsert(fileName string, lastCallKeys map[string]bool, excludedFile SendEmail(subject, body) //发送邮件 applogger.Info(fmt.Sprintf("%s(数据包) 入库完成", fileName)) applogger.Info(fmt.Sprintf("%s(数据包)执行时间:%s 插入数据:%d条 过滤数数:%d条", fileName, elapsed, count, bi)) + closeDb(db) return 0 } else { //如果没有查询到批次信息,跳过处理 applogger.Error(fmt.Sprintf("未查询到批次数据,文件名:%s,截取批次标识:%s", fileName, dataFileName)) + closeDb(db) return -1 } } @@ -1055,6 +1071,7 @@ func queryBatchState() { handleError(fmt.Errorf("返回不为0"), fmt.Sprintf("查询批次状态失败:%s", string(jsonStr))) } } + closeDb(db) } func createUpdatesMap(retobj map[string]interface{}) map[string]interface{} { @@ -1093,30 +1110,6 @@ func handleError(err error, errMsg string) { } func connectToDB() (*gorm.DB, error) { - /** - dsn := "sqlserver://" + dbUser + ":" + dbPassword + "@" + dbAddress + "?database=" + dbName + "&charset=utf8mb4" - var db *gorm.DB - var err error - attempt := 1 - maxAttempts := 5 - backoff := time.Second - logger := logger.Default.LogMode(logger.Error) //Silent、Error、Warn、Info - for { - db, err = gorm.Open(sqlserver.Open(dsn), &gorm.Config{Logger: logger, SkipDefaultTransaction: true}) - if err == nil { - break - } - if attempt >= maxAttempts { - //applogger.Error(fmt.Sprintf("数据库连接失败,错误信息%v", err)) - handleError(err, "数据库连接失败,错误信息") - return nil, err - } - time.Sleep(backoff) - backoff *= 2 - attempt++ - } - */ - dsn := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8mb4&parseTime=True&loc=Local", dbUser, dbPassword, dbAddress, dbPort, dbName) var db *gorm.DB var err error @@ -1137,9 +1130,32 @@ func connectToDB() (*gorm.DB, error) { backoff *= 2 attempt++ } + // 获取底层 *sql.DB 实例 + sqlDB, err := db.DB() + if err != nil { + return nil, err + } + + // 设置最大连接数 + sqlDB.SetMaxOpenConns(dbMaxOpenConns) + + // 设置最大空闲连接数 + sqlDB.SetMaxIdleConns(dbMaxIdleConns) + + // 设置空闲连接的最长生命周期 + sqlDB.SetConnMaxLifetime(time.Duration(dbConnMaxLifetime) * time.Minute) return db, nil } +func closeDb(db *gorm.DB) { + sqlDB, err := db.DB() + if err != nil { + handleError(err, "关闭数据库连接失败") + return + } + sqlDB.Close() +} + func fileExists(filename string) bool { info, err := os.Stat(filename) if os.IsNotExist(err) { @@ -1225,6 +1241,9 @@ func iniConfi() { dataExpirationDays = 14 delDataSize = 60000 rateLimiter = 1 + dbMaxOpenConns = 500 + dbMaxIdleConns = 5 + dbConnMaxLifetime = 10 case "prod": //fmt.Print("正式环境配置已生效\n") @@ -1253,9 +1272,12 @@ func iniConfi() { token = "7100178600777091" //7100477930234217 lastCallPath = "RawData/LastCall" verifySignatureKey = "Lzxf6asgwFRTsANZ8gkXJnP3EmOyNO#&" - dataExpirationDays = 14 + dataExpirationDays = 1 delDataSize = 60000 - rateLimiter = 10 + rateLimiter = 15 + dbMaxOpenConns = 500 + dbMaxIdleConns = 5 + dbConnMaxLifetime = 60 default: panic(fmt.Errorf("无效的运行模式: %s", env)) @@ -1346,6 +1368,9 @@ var ( //初始化变量 dataExpirationDays int delDataSize int rateLimiter uint + dbMaxOpenConns int + dbMaxIdleConns int + dbConnMaxLifetime int ) type Batch struct {