添加正式环境环境变量调优插入速度

This commit is contained in:
chejiulong 2023-02-20 18:44:32 +08:00
parent 2e874375ef
commit f603577888
3 changed files with 62 additions and 29 deletions

Binary file not shown.

Binary file not shown.

83
main.go
View File

@ -10,6 +10,7 @@ import (
"io" "io"
"log" "log"
"os" "os"
"os/signal"
"path" "path"
"path/filepath" "path/filepath"
"strconv" "strconv"
@ -29,7 +30,9 @@ import (
"gorm.io/gorm/logger" "gorm.io/gorm/logger"
) )
/*
var ( var (
applogger *logrus.Logger applogger *logrus.Logger
redisClient *redis.Client redisClient *redis.Client
executableDir string executableDir string
@ -39,6 +42,7 @@ var (
sftpAddress = "192.168.10.86:49156" sftpAddress = "192.168.10.86:49156"
sftpUser = "demo" sftpUser = "demo"
sftpPassword = "demo" sftpPassword = "demo"
sftpDir = "/sftp/test"
dbAddress = "192.168.10.18:1433" dbAddress = "192.168.10.18:1433"
dbUser = "sa" dbUser = "sa"
dbPassword = "Aa123123" dbPassword = "Aa123123"
@ -50,6 +54,30 @@ var (
insertSize = 500 //一次性入库 insertSize = 500 //一次性入库
cSize = 10 //入库协程数 cSize = 10 //入库协程数
)
*/
var ( //正式环境
applogger *logrus.Logger
redisClient *redis.Client
executableDir string
redisAddress = "r-bp11564d96842414128.redis.rds.aliyuncs.com:6379"
redisPassword = "3Nsb4Pmsl9bcLs24mL12l"
redisDB = 233
sftpAddress = "10.11.0.63:22"
sftpUser = "wyeth2018"
sftpPassword = "nYydNHOu"
sftpDir = "/wyeth2018/sephora_sms"
dbAddress = "rm-bp16l424ln96q1ouk.sqlserver.rds.aliyuncs.com:3433"
dbUser = "sephora_sms"
dbPassword = "5ORiiLmgkniC0EqF"
dbName = "sephora_sms"
zipPath = "RawData/Zip/"
txtPath = "RawData/Txt/"
logPath = "logs/"
batchSize = 5000 //提交数据
insertSize = 500 //一次性入库
cSize = 10 //入库协程数
) )
type Batch struct { type Batch struct {
@ -138,38 +166,42 @@ func init() {
} }
func main() { func main() {
// 打开一个文件作为锁 // 创建一个channel用于接收信号
lockFile, err := os.OpenFile(".lock", os.O_CREATE|os.O_RDWR, 0666) signalChan := make(chan os.Signal, 1)
if err != nil {
fmt.Println("打开锁文件失败:", err) // 注册SIGINT, SIGTERM信号处理函数
signal.Notify(signalChan, os.Interrupt, syscall.SIGTERM)
// 启动goroutine等待信号的到来
go func() {
for {
select {
case <-signalChan:
// 当信号到来时,执行特定的代码
fmt.Println("程序已经在运行,本程序无法同时运行多个")
os.Exit(1) os.Exit(1)
} }
defer lockFile.Close()
// 尝试获取文件的独占锁
err = syscall.Flock(int(lockFile.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
if err != nil {
fmt.Println("程序已经在运行,本程序无法同时运行多个")
os.Exit(1)
} }
}()
ticker := time.NewTicker(time.Minute) // 创建一个定时器,每隔一分钟触发一次任务函数 ticker := time.NewTicker(time.Minute) // 创建一个定时器,每隔一分钟触发一次任务函数
//tickCount := 1 //记录循环次数 tickCount := 1 //记录循环次数
defer ticker.Stop() // 延迟关闭定时器,确保所有的定时器事件都被处理完毕 defer ticker.Stop() // 延迟关闭定时器,确保所有的定时器事件都被处理完毕
// 循环处理任务 // 循环处理任务
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
// 定时器触发时执行的任务函数 // 定时器触发时执行的任务函数
//fmt.Printf("尝试第%d执行....\n", tickCount) fmt.Printf("尝试第%d执行....\n", tickCount)
go downloadDecompression() // 在新协程中异步执行 go downloadDecompression() // 在新协程中异步执行
//tickCount++ tickCount++
} }
} }
} }
func downloadDecompression() { func downloadDecompression() {
if exists, _ := redisClient.Exists("iniDataStatus").Result(); exists == 1 { if exists, _ := redisClient.Exists("iniDataStatus").Result(); exists == 1 {
//fmt.Println("批次执行中,跳过本次") fmt.Println("批次执行中,跳过本次")
} else { } else {
// 写入执行中标记 // 写入执行中标记
err := redisClient.Set("iniDataStatus", 1, 0).Err() err := redisClient.Set("iniDataStatus", 1, 0).Err()
@ -193,15 +225,15 @@ func downloadDecompression() {
panic(err) panic(err)
} }
defer sftpClient.Close() defer sftpClient.Close()
files, err := sftpClient.ReadDir("/sftp/test") files, err := sftpClient.ReadDir(sftpDir)
if err != nil { if err != nil {
applogger.Fatalf("Failed to read SFTP directory: %v", err) applogger.Fatalf("sftp目录不存在: %v", err)
} }
//it := 1 it := 1
//fmt.Printf("共%d个文件\n", len(files)) fmt.Printf("共%d个文件\n", len(files))
for _, file := range files { for _, file := range files {
//fmt.Printf("第%d个文件\n", it) fmt.Printf("第%d个文件\n", it)
//it++ it++
// Check if file has been downloaded before // Check if file has been downloaded before
fileKey := fmt.Sprintf("downloaded:%s", file.Name()) fileKey := fmt.Sprintf("downloaded:%s", file.Name())
if exists, _ := redisClient.Exists(fileKey).Result(); exists == 1 { if exists, _ := redisClient.Exists(fileKey).Result(); exists == 1 {
@ -211,7 +243,7 @@ func downloadDecompression() {
if filepath.Ext(file.Name()) == ".zip" { if filepath.Ext(file.Name()) == ".zip" {
//fmt.Println("下载开始(数据包):" + file.Name()) //fmt.Println("下载开始(数据包):" + file.Name())
// Download file // Download file
srcFile, err := sftpClient.Open(path.Join("/sftp/test", file.Name())) srcFile, err := sftpClient.Open(path.Join(sftpDir, file.Name()))
if err != nil { if err != nil {
applogger.Fatalf("Failed to download file: %v", err) applogger.Fatalf("Failed to download file: %v", err)
continue continue
@ -274,9 +306,9 @@ func downloadDecompression() {
fmt.Print("入库完成(数据包)::" + zipFile.Name) fmt.Print("入库完成(数据包)::" + zipFile.Name)
} }
} else if filepath.Ext(file.Name()) == ".txt" { } else if filepath.Ext(file.Name()) == ".txt" {
srcFile, err := sftpClient.Open(path.Join("/sftp/test", file.Name())) srcFile, err := sftpClient.Open(path.Join(sftpDir, file.Name()))
if err != nil { if err != nil {
applogger.Fatalf("Failed to download file: %v", err) applogger.Fatalf("sftp目录不存在: %v", err)
continue continue
} }
defer srcFile.Close() defer srcFile.Close()
@ -484,6 +516,7 @@ func batchDataInsert(fileName string) {
if len(dataBatch) > 0 { if len(dataBatch) > 0 {
dataBatchChan <- dataBatch dataBatchChan <- dataBatch
dataBatch = make([]BatcheData, 0, batchSize) dataBatch = make([]BatcheData, 0, batchSize)
fmt.Print("文件读取完成,最后一批提交至通道")
} }
close(dataBatchChan) close(dataBatchChan)
@ -535,9 +568,9 @@ func connectToDB() (*gorm.DB, error) {
attempt := 1 attempt := 1
maxAttempts := 5 maxAttempts := 5
backoff := time.Second backoff := time.Second
logger := logger.Default.LogMode(logger.Silent) logger := logger.Default.LogMode(logger.Error) //Silent、Error、Warn、Info
for { for {
db, err = gorm.Open(sqlserver.Open(dsn), &gorm.Config{Logger: logger}) db, err = gorm.Open(sqlserver.Open(dsn), &gorm.Config{Logger: logger, SkipDefaultTransaction: true})
if err == nil { if err == nil {
break break
} }