优化日志,增加依赖
This commit is contained in:
parent
f603577888
commit
7e5bd5923c
1
go.mod
1
go.mod
@ -8,6 +8,7 @@ require (
|
||||
github.com/sirupsen/logrus v1.9.0
|
||||
golang.org/x/crypto v0.6.0
|
||||
gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df
|
||||
gopkg.in/natefinch/lumberjack.v2 v2.2.1
|
||||
gorm.io/driver/sqlserver v1.4.2
|
||||
gorm.io/gorm v1.24.5
|
||||
)
|
||||
|
||||
2
go.sum
2
go.sum
@ -159,6 +159,8 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8
|
||||
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
|
||||
gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df h1:n7WqCuqOuCbNr617RXOY0AWRXxgwEyPp2z+p0+hgMuE=
|
||||
gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df/go.mod h1:LRQQ+SO6ZHR7tOkpBDuZnXENFzX8qRjMDMyPD6BRkCw=
|
||||
gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc=
|
||||
gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc=
|
||||
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce h1:+JknDZhAj8YMt7GC73Ei8pv4MzjDUNPHgQWJdtMAaDU=
|
||||
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce/go.mod h1:5AcXVHNjg+BDxry382+8OKon8SEWiKktQR07RKPsv1c=
|
||||
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
|
||||
|
||||
Binary file not shown.
BIN
iniDataForLinux
BIN
iniDataForLinux
Binary file not shown.
522
main.go
522
main.go
@ -10,7 +10,6 @@ import (
|
||||
"io"
|
||||
"log"
|
||||
"os"
|
||||
"os/signal"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
@ -24,6 +23,7 @@ import (
|
||||
"github.com/sirupsen/logrus"
|
||||
"golang.org/x/crypto/ssh"
|
||||
"gopkg.in/gomail.v2"
|
||||
"gopkg.in/natefinch/lumberjack.v2"
|
||||
"gorm.io/driver/sqlserver"
|
||||
_ "gorm.io/driver/sqlserver"
|
||||
"gorm.io/gorm"
|
||||
@ -32,7 +32,6 @@ import (
|
||||
|
||||
/*
|
||||
var (
|
||||
|
||||
applogger *logrus.Logger
|
||||
redisClient *redis.Client
|
||||
executableDir string
|
||||
@ -50,23 +49,21 @@ var (
|
||||
zipPath = "RawData/Zip/"
|
||||
txtPath = "RawData/Txt/"
|
||||
logPath = "logs/"
|
||||
batchSize = 5000 //提交数据
|
||||
insertSize = 500 //一次性入库
|
||||
cSize = 10 //入库协程数
|
||||
|
||||
)
|
||||
*/
|
||||
|
||||
var ( //正式环境
|
||||
|
||||
applogger *logrus.Logger
|
||||
redisClient *redis.Client
|
||||
executableDir string
|
||||
redisAddress = "r-bp11564d96842414128.redis.rds.aliyuncs.com:6379"
|
||||
redisPassword = "3Nsb4Pmsl9bcLs24mL12l"
|
||||
redisDB = 233
|
||||
sftpAddress = "10.11.0.63:22"
|
||||
sftpUser = "wyeth2018"
|
||||
sftpPassword = "nYydNHOu"
|
||||
sftpDir = "/wyeth2018/sephora_sms"
|
||||
sftpAddress = "esftp.sephora.com.cn:20981"
|
||||
sftpUser = "CHN-CRMTOWemedia-wemedia"
|
||||
sftpPassword = "uoWdMHEv39ZFjiOg"
|
||||
sftpDir = "/CN-CRMTOWemedia/SMS"
|
||||
dbAddress = "rm-bp16l424ln96q1ouk.sqlserver.rds.aliyuncs.com:3433"
|
||||
dbUser = "sephora_sms"
|
||||
dbPassword = "5ORiiLmgkniC0EqF"
|
||||
@ -74,10 +71,6 @@ var ( //正式环境
|
||||
zipPath = "RawData/Zip/"
|
||||
txtPath = "RawData/Txt/"
|
||||
logPath = "logs/"
|
||||
batchSize = 5000 //提交数据
|
||||
insertSize = 500 //一次性入库
|
||||
cSize = 10 //入库协程数
|
||||
|
||||
)
|
||||
|
||||
type Batch struct {
|
||||
@ -143,20 +136,6 @@ func init() {
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
applogger = logrus.New()
|
||||
logFile := filepath.Join(executableDir, logPath, "download.log")
|
||||
_, err = os.Stat(logFile)
|
||||
if os.IsNotExist(err) {
|
||||
if _, err := os.Create(logFile); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
}
|
||||
f, err := os.OpenFile(logFile, os.O_APPEND|os.O_WRONLY, 0666)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
//defer f.Close()
|
||||
applogger.SetOutput(f)
|
||||
|
||||
redisClient = redis.NewClient(&redis.Options{
|
||||
Addr: redisAddress,
|
||||
@ -166,33 +145,41 @@ func init() {
|
||||
}
|
||||
|
||||
func main() {
|
||||
// 创建一个channel用于接收信号
|
||||
signalChan := make(chan os.Signal, 1)
|
||||
// 打开一个文件作为锁
|
||||
lockFile, err := os.OpenFile(filepath.Join(executableDir, ".lock"), os.O_CREATE|os.O_RDWR, 0666)
|
||||
if err != nil {
|
||||
fmt.Println("打开锁文件失败:", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
defer lockFile.Close()
|
||||
// 尝试获取文件的独占锁
|
||||
err = syscall.Flock(int(lockFile.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
|
||||
if err != nil {
|
||||
fmt.Println("程序已经在运行,本程序无法同时运行多个")
|
||||
os.Exit(1)
|
||||
}
|
||||
applogger = logrus.New()
|
||||
logPath := filepath.Join(executableDir, "logs")
|
||||
logFileName := "sms_processing_" + time.Now().Format("2006-01-02") + ".log"
|
||||
logFileHook := &lumberjack.Logger{
|
||||
Filename: filepath.Join(logPath, logFileName),
|
||||
}
|
||||
applogger.SetOutput(logFileHook)
|
||||
|
||||
// 注册SIGINT, SIGTERM信号处理函数
|
||||
signal.Notify(signalChan, os.Interrupt, syscall.SIGTERM)
|
||||
go downloadDecompression() // 启动立即执行一次
|
||||
|
||||
// 启动goroutine等待信号的到来
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-signalChan:
|
||||
// 当信号到来时,执行特定的代码
|
||||
fmt.Println("程序已经在运行,本程序无法同时运行多个")
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
ticker := time.NewTicker(time.Minute) // 创建一个定时器,每隔一分钟触发一次任务函数
|
||||
tickCount := 1 //记录循环次数
|
||||
defer ticker.Stop() // 延迟关闭定时器,确保所有的定时器事件都被处理完毕
|
||||
ticker := time.NewTicker(60 * time.Minute) // 创建一个定时器,每隔一分钟触发一次任务函数
|
||||
tickCount := 1 //记录循环次数
|
||||
defer ticker.Stop() // 延迟关闭定时器,确保所有的定时器事件都被处理完毕
|
||||
// 循环处理任务
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
// 定时器触发时执行的任务函数
|
||||
fmt.Printf("尝试第%d执行....\n", tickCount)
|
||||
logFileName := "sms_processing_" + time.Now().Format("2006-01-02") + ".log"
|
||||
logFileHook.Filename = filepath.Join(logPath, logFileName)
|
||||
fmt.Printf("尝试第%d此执行....\n", tickCount)
|
||||
applogger.Info(fmt.Sprintf("尝试第%d此执行....\n", tickCount))
|
||||
go downloadDecompression() // 在新协程中异步执行
|
||||
tickCount++
|
||||
}
|
||||
@ -201,12 +188,13 @@ func main() {
|
||||
|
||||
func downloadDecompression() {
|
||||
if exists, _ := redisClient.Exists("iniDataStatus").Result(); exists == 1 {
|
||||
fmt.Println("批次执行中,跳过本次")
|
||||
fmt.Print("上一批次执行中,跳过本次\n")
|
||||
} else {
|
||||
// 写入执行中标记
|
||||
err := redisClient.Set("iniDataStatus", 1, 0).Err()
|
||||
if err != nil {
|
||||
fmt.Println("写入标记失败:%d", err)
|
||||
fmt.Printf("写入任务执行中标记失败:%s\n", err.Error())
|
||||
applogger.Info("写入任务执行中标记失败:%v", err)
|
||||
}
|
||||
// Connect to SFTP server
|
||||
sshConfig := &ssh.ClientConfig{
|
||||
@ -218,25 +206,29 @@ func downloadDecompression() {
|
||||
}
|
||||
sshClient, err := ssh.Dial("tcp", sftpAddress, sshConfig)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
fmt.Printf("写入任务执行中标记失败:%s\n", err.Error())
|
||||
applogger.Info("写入任务执行中标记失败:%v", err)
|
||||
}
|
||||
sftpClient, err := sftp.NewClient(sshClient)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
fmt.Printf("写入任务执行中标记失败:%s\n", err.Error())
|
||||
applogger.Info("写入任务执行中标记失败:%v", err)
|
||||
}
|
||||
defer sftpClient.Close()
|
||||
files, err := sftpClient.ReadDir(sftpDir)
|
||||
if err != nil {
|
||||
applogger.Fatalf("sftp目录不存在: %v", err)
|
||||
fmt.Printf("sftp目录不存在: 错误信息:%s,\n", err.Error())
|
||||
applogger.Info("sftp目录不存在: 错误信息:%v", err)
|
||||
}
|
||||
it := 1
|
||||
fmt.Printf("共%d个文件\n", len(files))
|
||||
for _, file := range files {
|
||||
fmt.Printf("第%d个文件\n", it)
|
||||
fmt.Printf("第%d个文件处理中\n", it)
|
||||
it++
|
||||
// Check if file has been downloaded before
|
||||
fileKey := fmt.Sprintf("downloaded:%s", file.Name())
|
||||
if exists, _ := redisClient.Exists(fileKey).Result(); exists == 1 {
|
||||
fmt.Println("跳过已处理过的文件:" + file.Name())
|
||||
continue
|
||||
}
|
||||
|
||||
@ -245,27 +237,27 @@ func downloadDecompression() {
|
||||
// Download file
|
||||
srcFile, err := sftpClient.Open(path.Join(sftpDir, file.Name()))
|
||||
if err != nil {
|
||||
applogger.Fatalf("Failed to download file: %v", err)
|
||||
applogger.Info("打开sftp文件失败,文件名:%s,错误信息: %v", file.Name(), err)
|
||||
continue
|
||||
}
|
||||
defer srcFile.Close()
|
||||
dstFile, err := os.Create(path.Join(executableDir, zipPath, file.Name()))
|
||||
|
||||
if err != nil {
|
||||
applogger.Fatalf("Failed to download file: %v", err)
|
||||
applogger.Info("创建本地文件失败,文件名: %s,错误信息:%v", file.Name(), err)
|
||||
continue
|
||||
}
|
||||
defer dstFile.Close()
|
||||
if _, err := io.Copy(dstFile, srcFile); err != nil {
|
||||
applogger.Fatalf("Failed to download file: %v", err)
|
||||
applogger.Info("下载文件失败,文件名: %s,错误信息:%v", file.Name(), err)
|
||||
continue
|
||||
}
|
||||
fmt.Println("下载完成(数据包):" + file.Name())
|
||||
applogger.Info("下载完成(数据包):%d", file.Name())
|
||||
fmt.Printf("%s(数据包):下载完成\n", file.Name())
|
||||
applogger.Info(fmt.Sprintf("%s(数据包):下载完成", file.Name()))
|
||||
// Unzip file
|
||||
zipReader, err := zip.OpenReader(path.Join(executableDir, zipPath, file.Name()))
|
||||
if err != nil {
|
||||
applogger.Fatalf("Failed to download file: %v", err)
|
||||
applogger.Info("解压文件失败: 文件名:%s,错误信息:%v", file.Name(), err)
|
||||
continue
|
||||
}
|
||||
defer zipReader.Close()
|
||||
@ -280,7 +272,7 @@ func downloadDecompression() {
|
||||
continue
|
||||
}
|
||||
if err != nil || zipFileReader == nil {
|
||||
applogger.Fatalf("Failed to open zip file: %v", err)
|
||||
applogger.Info("Failed to open zip file: %v", err)
|
||||
fmt.Print("压缩文件处理结束")
|
||||
continue
|
||||
}
|
||||
@ -288,50 +280,46 @@ func downloadDecompression() {
|
||||
// Create the file
|
||||
unzipFile, err := os.Create(path.Join(executableDir, txtPath, zipFile.Name))
|
||||
if err != nil {
|
||||
|
||||
applogger.Fatalf("Failed to create unzip file: %v", err)
|
||||
applogger.Info("创建压缩后的文件失败,文件名:%s,错误信息: %v", zipFile.Name, err)
|
||||
continue
|
||||
}
|
||||
defer unzipFile.Close()
|
||||
// Write the unzip data to the file
|
||||
_, err = io.Copy(unzipFile, zipFileReader)
|
||||
if err != nil {
|
||||
applogger.Fatalf("Failed to write unzip data to file %v", err)
|
||||
applogger.Info("文件解压失败,文件名:%s,错误信息: %v", zipFileReader, err)
|
||||
continue
|
||||
}
|
||||
applogger.Info("解压完成(数据包):%d", zipFile.Name)
|
||||
fmt.Print("解压完成(数据包):" + zipFile.Name)
|
||||
applogger.Info(fmt.Sprintf("%s(数据包)解压完成", zipFile.Name))
|
||||
fmt.Printf("%s(数据包)解压完成\n", zipFile.Name)
|
||||
batchDataInsert(zipFile.Name)
|
||||
applogger.Info("入库完成(数据包):%d", zipFile.Name)
|
||||
fmt.Print("入库完成(数据包)::" + zipFile.Name)
|
||||
|
||||
}
|
||||
} else if filepath.Ext(file.Name()) == ".txt" {
|
||||
srcFile, err := sftpClient.Open(path.Join(sftpDir, file.Name()))
|
||||
if err != nil {
|
||||
applogger.Fatalf("sftp目录不存在: %v", err)
|
||||
applogger.Info("打开sftp文件失败,文件名:%s,错误信息: %v", file.Name(), err)
|
||||
continue
|
||||
}
|
||||
defer srcFile.Close()
|
||||
dstFile, err := os.Create(path.Join(executableDir, txtPath, file.Name()))
|
||||
if err != nil {
|
||||
applogger.Fatalf("Failed to download file: %v", err)
|
||||
applogger.Info("创建本地文件失败,文件名: %s,错误信息:%v", file.Name(), err)
|
||||
continue
|
||||
}
|
||||
defer dstFile.Close()
|
||||
if _, err := io.Copy(dstFile, srcFile); err != nil {
|
||||
applogger.Fatalf("Failed to download file: %v", err)
|
||||
applogger.Info("下载文件失败,文件名: %s,错误信息:%v", file.Name(), err)
|
||||
continue
|
||||
}
|
||||
fmt.Printf("下载完成(批次文件):%s \n", file.Name())
|
||||
applogger.Info("下载完成(批次文件):%d", file.Name())
|
||||
fmt.Printf("%s(批次文件)下载完成 \n", file.Name())
|
||||
applogger.Info(fmt.Sprintf("%s(批次文件)下载完成", file.Name()))
|
||||
batchInsert(file.Name())
|
||||
fmt.Printf("入库完成(批次文件)%s \n", file.Name())
|
||||
|
||||
applogger.Info("入库完成(批次文件):%d\n", file.Name())
|
||||
}
|
||||
err = redisClient.Set(fileKey, 1, 0).Err() //写入下载、解压、入库完成标记
|
||||
if err != nil {
|
||||
fmt.Println("写入标记失败:%", err)
|
||||
fmt.Printf("写入文件处理完成标记失败文件名:%s,错误信息:%v\n", file.Name(), err)
|
||||
applogger.Info("写入文件处理完成标记失败文件名:%s,错误信息:v%\n", file.Name(), err)
|
||||
}
|
||||
}
|
||||
redisClient.Del("iniDataStatus") //删除任务执行中标记
|
||||
@ -343,222 +331,224 @@ func downloadDecompression() {
|
||||
func batchInsert(fileName string) {
|
||||
//fmt.Print("批次处理开始")
|
||||
start := time.Now()
|
||||
db, err := connectToDB()
|
||||
if err != nil {
|
||||
panic("failed to connect database")
|
||||
}
|
||||
db, _ := connectToDB()
|
||||
file, err := os.Open(path.Join(executableDir, txtPath, fileName))
|
||||
defer file.Close()
|
||||
if err != nil {
|
||||
panic("failed to open file")
|
||||
applogger.Fatalf("ailed to open file: %v", fileName)
|
||||
}
|
||||
reader := csv.NewReader(bufio.NewReader(file))
|
||||
reader.Read()
|
||||
batchRows := 0
|
||||
for {
|
||||
record, err := reader.Read()
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
communicationChannelID, _ := strconv.ParseUint(record[0], 10, 32)
|
||||
TargetsMember, _ := strconv.ParseUint(record[2], 10, 32)
|
||||
templateID, _ := strconv.ParseUint(record[3], 10, 32)
|
||||
status := uint(1)
|
||||
fmt.Printf("文件打开失败文件名:%s,错误信息%v\n", fileName, err)
|
||||
applogger.Info("文件打开失败文件名:%s,错误信息%v", fileName, err)
|
||||
} else {
|
||||
defer file.Close()
|
||||
reader := csv.NewReader(bufio.NewReader(file))
|
||||
reader.Read()
|
||||
batchRows := 0
|
||||
for {
|
||||
record, err := reader.Read()
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
communicationChannelID, _ := strconv.ParseUint(record[0], 10, 32)
|
||||
TargetsMember, _ := strconv.ParseUint(record[2], 10, 32)
|
||||
templateID, _ := strconv.ParseUint(record[3], 10, 32)
|
||||
status := uint(1)
|
||||
|
||||
batch := Batch{
|
||||
CommunicationChannelID: uint(communicationChannelID),
|
||||
CommunicationName: record[1],
|
||||
TargetsMember: uint(TargetsMember),
|
||||
TemplateID: uint(templateID),
|
||||
Content: record[4],
|
||||
Status: status,
|
||||
DataFileName: fileName,
|
||||
batch := Batch{
|
||||
CommunicationChannelID: uint(communicationChannelID),
|
||||
CommunicationName: record[1],
|
||||
TargetsMember: uint(TargetsMember),
|
||||
TemplateID: uint(templateID),
|
||||
Content: record[4],
|
||||
Status: status,
|
||||
DataFileName: fileName,
|
||||
}
|
||||
db.Create(&batch)
|
||||
batchRows++
|
||||
}
|
||||
db.Create(&batch)
|
||||
batchRows++
|
||||
time.Sleep(time.Second)
|
||||
elapsed := time.Since(start)
|
||||
subject := "丝芙兰批次文件处理完成"
|
||||
body := "批次文件:" + fileName + ";\n批次数:" + strconv.Itoa(batchRows) + ";\n处理完成,请前往管理平台查看处理。"
|
||||
SendEmail(subject, body) //发送邮件
|
||||
fmt.Printf("%s(批次文件)入库完成 \n", fileName)
|
||||
applogger.Info(fmt.Sprintf("%s(批次文件)入库完成", fileName))
|
||||
fmt.Printf("执行时间%s 插入批次次数:%d\n\n\n", elapsed, batchRows)
|
||||
applogger.Info(fmt.Sprintf("执行时间%s 插入批次次数:%d", elapsed, batchRows))
|
||||
}
|
||||
time.Sleep(time.Second)
|
||||
|
||||
elapsed := time.Since(start)
|
||||
subject := "丝芙兰批次文件处理完成"
|
||||
body := "批次文件:" + fileName + ";\n批次数:" + strconv.Itoa(batchRows) + ";\n处理完成,请前往管理平台查看处理。"
|
||||
err = SendEmail(subject, body) //发送邮件
|
||||
if err != nil {
|
||||
applogger.Info("邮件发送失:%d", err)
|
||||
fmt.Print(err)
|
||||
}
|
||||
fmt.Printf("执行时间%s\n插入批次次数:%d\n", elapsed, batchRows)
|
||||
}
|
||||
|
||||
func batchDataInsert(fileName string) {
|
||||
//fmt.Print("批次数据处理开始")
|
||||
start := time.Now()
|
||||
db, err := connectToDB()
|
||||
if err != nil {
|
||||
panic("failed to connect database")
|
||||
}
|
||||
// Insert data in batches using multiple goroutines
|
||||
var wg sync.WaitGroup
|
||||
dataBatchChan := make(chan []BatcheData, cSize)
|
||||
for i := 0; i < cSize; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
for batch := range dataBatchChan {
|
||||
retryCount := 0
|
||||
for {
|
||||
if err := db.CreateInBatches(batch, insertSize).Error; err != nil {
|
||||
if retryCount >= 5 {
|
||||
panic(err)
|
||||
}
|
||||
fmt.Printf("Insert failed, retrying in %v seconds...\n", 2*retryCount)
|
||||
time.Sleep(time.Duration(2*retryCount) * time.Second)
|
||||
retryCount++
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
|
||||
// Open file
|
||||
file, err := os.Open(path.Join(executableDir, txtPath, fileName))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
defer file.Close()
|
||||
fmt.Printf("文件打开失败,文件名:%s,错误信息%v\n", fileName, err)
|
||||
applogger.Info("文件打开失败,文件名:%s,错误信息%v", fileName, err)
|
||||
} else {
|
||||
defer file.Close()
|
||||
db, _ := connectToDB()
|
||||
// Insert data in batches using multiple goroutines
|
||||
batchSize := 5000 //提交数据
|
||||
insertSize := 500 //一次性入库
|
||||
insertChanSize := 50 //通道缓冲数
|
||||
goSize := 150 //协程数
|
||||
var wg sync.WaitGroup
|
||||
dataBatchChan := make(chan []BatcheData, insertChanSize)
|
||||
for i := 0; i < goSize; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
for batch := range dataBatchChan {
|
||||
retryCount := 0
|
||||
for {
|
||||
if err := db.CreateInBatches(batch, insertSize).Error; err != nil {
|
||||
if retryCount >= 5 {
|
||||
panic(err)
|
||||
}
|
||||
fmt.Printf("Insert failed, retrying in %v seconds...\n", 2*retryCount)
|
||||
time.Sleep(time.Duration(2*retryCount) * time.Second)
|
||||
retryCount++
|
||||
} else {
|
||||
break
|
||||
}
|
||||
|
||||
// Create and initialize hashset
|
||||
hs := make(map[string]bool)
|
||||
// Prepare batch data
|
||||
dataBatch := make([]BatcheData, 0, batchSize)
|
||||
dataBatchDuplicate := make([]BatchDataDuplicateLog, 0, batchSize)
|
||||
// Parse file line by line and insert data in batches
|
||||
scanner := bufio.NewScanner(file)
|
||||
scanner.Split(bufio.ScanLines)
|
||||
scanner.Scan() // skip first line
|
||||
bi := 0
|
||||
duplicateCount := make(map[string]int)
|
||||
insertsCount := make(map[string]int)
|
||||
var count int
|
||||
|
||||
for scanner.Scan() {
|
||||
line := scanner.Text()
|
||||
row, err := csv.NewReader(strings.NewReader(line)).Read()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
|
||||
reservedFields := map[string]string{ //合并个性化字段
|
||||
"ReservedField1": row[5],
|
||||
"ReservedField2": row[6],
|
||||
"ReservedField3": row[7],
|
||||
"ReservedField4": row[8],
|
||||
"ReservedField5": row[9],
|
||||
"DataFileName": fileName,
|
||||
}
|
||||
reservedFieldsJson, err := json.Marshal(reservedFields) //个性化字段转json
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
// Check if record exists in hashset
|
||||
key := fmt.Sprintf("%s-%s", row[2], row[3])
|
||||
if _, exists := hs[key]; exists {
|
||||
bi++
|
||||
// Increment duplicate count
|
||||
if _, ok := duplicateCount[row[2]]; !ok {
|
||||
duplicateCount[row[2]] = 0
|
||||
// Create and initialize hashset
|
||||
hs := make(map[string]bool)
|
||||
// Prepare batch data
|
||||
dataBatch := make([]BatcheData, 0, batchSize)
|
||||
dataBatchDuplicate := make([]BatchDataDuplicateLog, 0, batchSize)
|
||||
// Parse file line by line and insert data in batches
|
||||
scanner := bufio.NewScanner(file)
|
||||
scanner.Split(bufio.ScanLines)
|
||||
scanner.Scan() // skip first line
|
||||
bi := 0
|
||||
duplicateCount := make(map[string]int)
|
||||
insertsCount := make(map[string]int)
|
||||
var count int
|
||||
//ci := 1
|
||||
for scanner.Scan() {
|
||||
line := scanner.Text()
|
||||
row, err := csv.NewReader(strings.NewReader(line)).Read()
|
||||
if err != nil {
|
||||
fmt.Printf("文件按行读取失败,文件名:%s,错误信息%v\n", fileName, err)
|
||||
applogger.Info("文件按行读取失败,文件名:%s,错误信息%v", fileName, err)
|
||||
continue
|
||||
}
|
||||
duplicateCount[row[2]]++
|
||||
reservedFields := map[string]string{ //合并个性化字段
|
||||
"ReservedField1": row[5],
|
||||
"ReservedField2": row[6],
|
||||
"ReservedField3": row[7],
|
||||
"ReservedField4": row[8],
|
||||
"ReservedField5": row[9],
|
||||
"DataFileName": fileName,
|
||||
}
|
||||
reservedFieldsJson, err := json.Marshal(reservedFields) //个性化字段转json
|
||||
if err != nil {
|
||||
fmt.Printf("个性化字段合并失败,文件名:%s,错误信息%v\n", fileName, err)
|
||||
applogger.Info("个性化字段合并失败,文件名:%s,错误信息%v", fileName, err)
|
||||
continue
|
||||
}
|
||||
// Check if record exists in hashset
|
||||
key := fmt.Sprintf("%s-%s", row[2], row[3])
|
||||
if _, exists := hs[key]; exists { //如果批次数据重复
|
||||
bi++
|
||||
// Increment duplicate count
|
||||
if _, ok := duplicateCount[row[2]]; !ok {
|
||||
duplicateCount[row[2]] = 0
|
||||
}
|
||||
duplicateCount[row[2]]++
|
||||
dataBatchDuplicate = append(dataBatchDuplicate, BatchDataDuplicateLog{
|
||||
CommunicationChannelID: row[2],
|
||||
Mobile: row[3],
|
||||
FullName: row[4],
|
||||
ReservedField: string(reservedFieldsJson),
|
||||
})
|
||||
if len(dataBatchDuplicate) >= batchSize {
|
||||
err = db.CreateInBatches(dataBatchDuplicate, insertSize).Error
|
||||
if err != nil {
|
||||
fmt.Printf("插入重复数据失败,文件名:%s,错误信息%v\n", fileName, err)
|
||||
applogger.Info("插入重复数据失败,文件名:%s,错误信息%v", fileName, err)
|
||||
} else {
|
||||
dataBatchDuplicate = make([]BatchDataDuplicateLog, 0, batchSize)
|
||||
}
|
||||
|
||||
dataBatchDuplicate = append(dataBatchDuplicate, BatchDataDuplicateLog{
|
||||
}
|
||||
continue
|
||||
}
|
||||
// Add record to hashset
|
||||
hs[key] = true
|
||||
|
||||
dataBatch = append(dataBatch, BatcheData{
|
||||
CommunicationChannelID: row[2],
|
||||
Mobile: row[3],
|
||||
FullName: row[4],
|
||||
ReservedField: string(reservedFieldsJson),
|
||||
})
|
||||
|
||||
if len(dataBatchDuplicate) >= batchSize {
|
||||
err = db.CreateInBatches(dataBatchDuplicate, insertSize).Error
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
dataBatchDuplicate = make([]BatchDataDuplicateLog, 0, batchSize)
|
||||
if len(dataBatch) >= batchSize {
|
||||
dataBatchChan <- dataBatch
|
||||
dataBatch = make([]BatcheData, 0, batchSize)
|
||||
//fmt.Print("提交通次数" + strconv.Itoa(ci))
|
||||
//ci++
|
||||
}
|
||||
|
||||
continue
|
||||
if _, ok := insertsCount[row[2]]; !ok {
|
||||
insertsCount[row[2]] = 0
|
||||
}
|
||||
insertsCount[row[2]]++
|
||||
count++
|
||||
}
|
||||
|
||||
// Add record to hashset
|
||||
hs[key] = true
|
||||
|
||||
dataBatch = append(dataBatch, BatcheData{
|
||||
CommunicationChannelID: row[2],
|
||||
Mobile: row[3],
|
||||
FullName: row[4],
|
||||
ReservedField: string(reservedFieldsJson),
|
||||
})
|
||||
|
||||
if len(dataBatch) >= batchSize {
|
||||
if len(dataBatch) > 0 {
|
||||
dataBatchChan <- dataBatch
|
||||
dataBatch = make([]BatcheData, 0, batchSize)
|
||||
//fmt.Print("文件读取完成,最后一批提交至通道")
|
||||
}
|
||||
if _, ok := insertsCount[row[2]]; !ok {
|
||||
insertsCount[row[2]] = 0
|
||||
|
||||
close(dataBatchChan)
|
||||
|
||||
wg.Wait() //所有入库全部完成
|
||||
|
||||
//插入批次处理信息
|
||||
bpi := []BatchProcessingInformation{}
|
||||
for key, value := range duplicateCount {
|
||||
bpi = append(bpi, BatchProcessingInformation{
|
||||
CommunicationChannelID: key,
|
||||
RepeatTargetsMember: value,
|
||||
InsertsTargetsMember: insertsCount[key],
|
||||
DataFileName: fileName,
|
||||
})
|
||||
}
|
||||
insertsCount[row[2]]++
|
||||
count++
|
||||
}
|
||||
|
||||
if len(dataBatch) > 0 {
|
||||
dataBatchChan <- dataBatch
|
||||
dataBatch = make([]BatcheData, 0, batchSize)
|
||||
fmt.Print("文件读取完成,最后一批提交至通道")
|
||||
}
|
||||
|
||||
close(dataBatchChan)
|
||||
|
||||
wg.Wait() //所有入库全部完成
|
||||
|
||||
//插入批次处理信息
|
||||
bpi := []BatchProcessingInformation{}
|
||||
for key, value := range duplicateCount {
|
||||
bpi = append(bpi, BatchProcessingInformation{
|
||||
CommunicationChannelID: key,
|
||||
RepeatTargetsMember: value,
|
||||
InsertsTargetsMember: insertsCount[key],
|
||||
DataFileName: fileName,
|
||||
})
|
||||
}
|
||||
err = db.CreateInBatches(bpi, insertSize).Error
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
//插入批此重复数据
|
||||
if len(dataBatchDuplicate) > 0 {
|
||||
err = db.CreateInBatches(dataBatchDuplicate, insertSize).Error
|
||||
err = db.CreateInBatches(bpi, insertSize).Error
|
||||
if err != nil {
|
||||
panic(err)
|
||||
fmt.Printf("插入批次处理信息失败,文件名:%s,错误信息%v\n", fileName, err)
|
||||
applogger.Info("插入批次处理信息失败,文件名:%s,错误信息%v", fileName, err)
|
||||
}
|
||||
dataBatchDuplicate = make([]BatchDataDuplicateLog, 0, batchSize)
|
||||
|
||||
//插入批此重复数据
|
||||
if len(dataBatchDuplicate) > 0 {
|
||||
err = db.CreateInBatches(dataBatchDuplicate, insertSize).Error
|
||||
if err != nil {
|
||||
fmt.Printf("插入重复数据失败,文件名:%s,错误信息%v\n", fileName, err)
|
||||
applogger.Info("插入重复数据失败,文件名:%s,错误信息%v", fileName, err)
|
||||
} else {
|
||||
dataBatchDuplicate = nil
|
||||
}
|
||||
}
|
||||
|
||||
elapsed := time.Since(start)
|
||||
|
||||
//发送提醒邮件
|
||||
subject := "丝芙兰数据包处理完成"
|
||||
body := "数据包:" + fileName + ";\n总数:" + strconv.Itoa(count+bi) + ";\n过滤重复数:" + strconv.Itoa(bi) + ";\n过滤后总数:" + strconv.Itoa(count) + ";\n处理完成,请前往管理平台查看处理。"
|
||||
SendEmail(subject, body) //发送邮件
|
||||
applogger.Info(fmt.Sprintf(":%s(数据包) ,入库完成", fileName))
|
||||
fmt.Printf("%s(数据包) 入库完成\n", fileName)
|
||||
fmt.Printf("执行时间:%s 插入数据:%d 过滤数:%d\n\n\n", elapsed, count, bi)
|
||||
applogger.Info(fmt.Sprintf("执行时间:%s 插入数据:%d条 过滤数数:%d条", elapsed, count, bi))
|
||||
}
|
||||
|
||||
//发送提醒邮件
|
||||
subject := "丝芙兰数据包处理完成"
|
||||
body := "数据包:" + fileName + ";\n总数:" + strconv.Itoa(count+bi) + ";\n过滤重复数:" + strconv.Itoa(bi) + ";\n过滤后总数:" + strconv.Itoa(count) + ";\n处理完成,请前往管理平台查看处理。"
|
||||
err = SendEmail(subject, body) //发送邮件
|
||||
if err != nil {
|
||||
applogger.Info("邮件发送失:%d", err)
|
||||
fmt.Print(err)
|
||||
}
|
||||
|
||||
elapsed := time.Since(start)
|
||||
|
||||
fmt.Printf("批次数据入库函数执行时间:%s;\n插入数:%d条\n过滤数:%d条\n", elapsed, count, bi)
|
||||
}
|
||||
|
||||
func connectToDB() (*gorm.DB, error) {
|
||||
@ -575,6 +565,8 @@ func connectToDB() (*gorm.DB, error) {
|
||||
break
|
||||
}
|
||||
if attempt >= maxAttempts {
|
||||
fmt.Printf("数据库连接失败,错误信息%v\n", err)
|
||||
applogger.Info("数据库连接失败,错误信息%v", err)
|
||||
return nil, err
|
||||
}
|
||||
time.Sleep(backoff)
|
||||
@ -588,9 +580,11 @@ func SendEmail(subject string, body string) error {
|
||||
// 邮箱认证信息
|
||||
smtpHost := "smtp.exmail.qq.com"
|
||||
smtpPort := 465
|
||||
from := "chejiulong@wemediacn.com"
|
||||
password := "hdQfpav4x8LwbJPH"
|
||||
from := "auto_system@wemediacn.com"
|
||||
password := "yJJYYcy5NKx2y69r"
|
||||
//to := []string{"chejiulong@wemediacn.com", "xiayujuan@wemediacn.com", "wangyuanbing@wemediacn.com", "tangweiqi@wemediacn.com"}
|
||||
to := []string{"chejiulong@wemediacn.com"}
|
||||
|
||||
// 邮件内容
|
||||
m := gomail.NewMessage()
|
||||
m.SetHeader("From", from)
|
||||
@ -602,8 +596,8 @@ func SendEmail(subject string, body string) error {
|
||||
d.TLSConfig = &tls.Config{InsecureSkipVerify: true}
|
||||
err := d.DialAndSend(m)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
//return
|
||||
fmt.Printf("邮件发送失败,错误信息%v\n", err)
|
||||
applogger.Info("邮件发送失败,错误信息%v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user