修改删除数据方式,小批量多次删除方式,避免一次性删除给数据库造成压力
This commit is contained in:
parent
7619139e8f
commit
f4277ab184
BIN
iniDataForLinux
BIN
iniDataForLinux
Binary file not shown.
BIN
iniDataForMacOs
BIN
iniDataForMacOs
Binary file not shown.
91
main.go
91
main.go
@ -18,6 +18,7 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"reflect"
|
||||||
"regexp"
|
"regexp"
|
||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
@ -53,6 +54,7 @@ func init() {
|
|||||||
applogger.Info(fmt.Sprintf("程序启动,加载%s环境,尝试执行...", env))
|
applogger.Info(fmt.Sprintf("程序启动,加载%s环境,尝试执行...", env))
|
||||||
go downloadDecompression() // 启动立即执行一次数据下载、处理
|
go downloadDecompression() // 启动立即执行一次数据下载、处理
|
||||||
go queryBatchState()
|
go queryBatchState()
|
||||||
|
//go delData()
|
||||||
}
|
}
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
@ -78,39 +80,78 @@ func main() {
|
|||||||
case <-notification.C:
|
case <-notification.C:
|
||||||
iniLog()
|
iniLog()
|
||||||
applogger.Info("开始清除历史数据")
|
applogger.Info("开始清除历史数据")
|
||||||
// 删除15天前的批次数据
|
delData()
|
||||||
rowsAffected, err := deleteOldData(&BatcheData{}, 15)
|
|
||||||
if err != nil {
|
|
||||||
handleError(err, "删除15天前的批次数据失败")
|
|
||||||
} else {
|
|
||||||
applogger.Info(fmt.Sprintf("成功删除 %d 行 15天前的批次数据", rowsAffected))
|
|
||||||
}
|
|
||||||
|
|
||||||
// 删除15天前的批次数据重复日志
|
|
||||||
rowsAffected, err = deleteOldData(&BatchDataDuplicateLog{}, 15)
|
|
||||||
if err != nil {
|
|
||||||
handleError(err, "删除15天前的批次数据重复日志失败")
|
|
||||||
} else {
|
|
||||||
applogger.Info(fmt.Sprintf("成功删除 %d 行 15天前的批次数据重复日志", rowsAffected))
|
|
||||||
}
|
|
||||||
notification.Reset(time.Until(nextNotificationTime()))
|
notification.Reset(time.Until(nextNotificationTime()))
|
||||||
applogger.Info("清除历史数据完成")
|
applogger.Info("清除历史数据完成")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 删除历史数据和历史重复数据
|
||||||
|
func delData() {
|
||||||
|
// 删除15天前的批次数据
|
||||||
|
err := deleteOldData(&BatcheData{}, dataExpirationDays)
|
||||||
|
handleError(err, "删除15天前的批次数据失败")
|
||||||
|
|
||||||
|
// 删除15天前的批次数据重复日志
|
||||||
|
err = deleteOldData(&BatchDataDuplicateLog{}, dataExpirationDays)
|
||||||
|
handleError(err, "删除15天前的批次数据重复日志失败")
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
// 删除15天前的数据
|
// 删除15天前的数据
|
||||||
func deleteOldData(model interface{}, daysAgo int) (int64, error) {
|
func deleteOldData(model interface{}, daysAgo int) error {
|
||||||
|
start := time.Now()
|
||||||
|
totalRowsAffected := int64(0)
|
||||||
db, err := connectToDB()
|
db, err := connectToDB()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, fmt.Errorf("连接数据库失败: %w", err)
|
return fmt.Errorf("连接数据库失败: %w", err)
|
||||||
}
|
}
|
||||||
threshold := time.Now().AddDate(0, 0, -daysAgo)
|
threshold := time.Now().AddDate(0, 0, -daysAgo)
|
||||||
result := db.Where("created_at < ?", threshold).Delete(model)
|
|
||||||
if result.Error != nil {
|
for {
|
||||||
return 0, result.Error
|
tx := db.Begin()
|
||||||
|
defer func() {
|
||||||
|
if r := recover(); r != nil {
|
||||||
|
tx.Rollback()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
if err := tx.Error; err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
var ids []int
|
||||||
|
result := tx.Model(model).Where("created_at < ?", threshold).Limit(delDataSize).Pluck("id", &ids)
|
||||||
|
if result.Error != nil {
|
||||||
|
tx.Rollback()
|
||||||
|
return result.Error
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(ids) == 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
result = tx.Where("id IN (?)", ids).Delete(model)
|
||||||
|
if result.Error != nil {
|
||||||
|
tx.Rollback()
|
||||||
|
return result.Error
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := tx.Commit().Error; err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
totalRowsAffected += result.RowsAffected
|
||||||
|
if len(ids) < batchSize {
|
||||||
|
break
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return result.RowsAffected, nil
|
time.Sleep(time.Second)
|
||||||
|
elapsed := time.Since(start)
|
||||||
|
modelName := reflect.TypeOf(model).Elem().Name()
|
||||||
|
|
||||||
|
applogger.Info(fmt.Sprintf("执行删除%v用时%s成功删除数据%d", modelName, elapsed.String(), totalRowsAffected))
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// 计算下一次执行时间
|
// 计算下一次执行时间
|
||||||
@ -1144,6 +1185,8 @@ func iniConfi() {
|
|||||||
token = "7100477930234217"
|
token = "7100477930234217"
|
||||||
lastCallPath = "RawData/LastCall"
|
lastCallPath = "RawData/LastCall"
|
||||||
verifySignatureKey = "qZ6v1&H#Wjx+yRm2D@*sJF$tnfL83Ia"
|
verifySignatureKey = "qZ6v1&H#Wjx+yRm2D@*sJF$tnfL83Ia"
|
||||||
|
dataExpirationDays = 14
|
||||||
|
delDataSize = 60000
|
||||||
|
|
||||||
case "prod":
|
case "prod":
|
||||||
//fmt.Print("正式环境配置已生效\n")
|
//fmt.Print("正式环境配置已生效\n")
|
||||||
@ -1154,7 +1197,7 @@ func iniConfi() {
|
|||||||
sftpUser = "CHN-CRMTOWemedia-wemedia"
|
sftpUser = "CHN-CRMTOWemedia-wemedia"
|
||||||
sftpPassword = "uoWdMHEv39ZFjiOg"
|
sftpPassword = "uoWdMHEv39ZFjiOg"
|
||||||
sftpDir = "/CN-CRMTOWemedia/SMS"
|
sftpDir = "/CN-CRMTOWemedia/SMS"
|
||||||
dbAddress = "rm-bp1cb0x329c1dwid5.mysql.rds.aliyuncs.com"
|
dbAddress = "rds0yslqyg1iuze8txux545.mysql.rds.aliyuncs.com"
|
||||||
dbPort = "3306"
|
dbPort = "3306"
|
||||||
dbUser = "sephora"
|
dbUser = "sephora"
|
||||||
dbPassword = "YfbGJWsFkH4pXgPY"
|
dbPassword = "YfbGJWsFkH4pXgPY"
|
||||||
@ -1172,6 +1215,8 @@ func iniConfi() {
|
|||||||
token = "7100178600777091" //7100477930234217
|
token = "7100178600777091" //7100477930234217
|
||||||
lastCallPath = "RawData/LastCall"
|
lastCallPath = "RawData/LastCall"
|
||||||
verifySignatureKey = "Lzxf6asgwFRTsANZ8gkXJnP3EmOyNO#&"
|
verifySignatureKey = "Lzxf6asgwFRTsANZ8gkXJnP3EmOyNO#&"
|
||||||
|
dataExpirationDays = 14
|
||||||
|
delDataSize = 60000
|
||||||
|
|
||||||
default:
|
default:
|
||||||
panic(fmt.Errorf("无效的运行模式: %s", env))
|
panic(fmt.Errorf("无效的运行模式: %s", env))
|
||||||
@ -1259,6 +1304,8 @@ var ( //初始化变量
|
|||||||
token string
|
token string
|
||||||
lastCallPath string
|
lastCallPath string
|
||||||
verifySignatureKey string
|
verifySignatureKey string
|
||||||
|
dataExpirationDays int
|
||||||
|
delDataSize int
|
||||||
)
|
)
|
||||||
|
|
||||||
type Batch struct {
|
type Batch struct {
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user