GO/main.go

642 lines
22 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package main
import (
"archive/zip"
"bufio"
"crypto/tls"
"encoding/csv"
"encoding/json"
"flag"
"fmt"
"io"
"log"
"os"
"path"
"path/filepath"
"strconv"
"strings"
"sync"
"syscall"
"time"
"github.com/go-redis/redis"
"github.com/pkg/sftp"
"github.com/sirupsen/logrus"
"golang.org/x/crypto/ssh"
"gopkg.in/gomail.v2"
"gopkg.in/natefinch/lumberjack.v2"
"gorm.io/driver/sqlserver"
_ "gorm.io/driver/sqlserver"
"gorm.io/gorm"
"gorm.io/gorm/logger"
)
var ( //初始化变量
env string
applogger *logrus.Logger
redisClient *redis.Client
executableDir string
redisAddress string
redisPassword string
redisDB int
sftpAddress string
sftpUser string
sftpPassword string
sftpDir string
dbAddress string
dbUser string
dbPassword string
dbName string
zipPath string
txtPath string
logPath string
batchSize int //提交数据
insertSize int //一次性入库
insertChanSize int //通道缓冲数
goSize int //协程数
taskTime int
)
type Batch struct {
ID uint `gorm:"primary_key"`
CommunicationChannelID uint
CommunicationName string `gorm:"type:varchar(255)"`
TargetsMember uint `gorm:"type:int"`
TemplateID uint
Content string `gorm:"type:text"`
CreatedAt time.Time `gorm:"default:getdate()"`
UpdatedAt time.Time `gorm:"default:getdate()"`
Status uint `gorm:"type:int"`
DataFileName string `gorm:"type:text"`
}
type BatcheData struct {
ID uint `gorm:"primary_key"`
CommunicationChannelID string `gorm:"column:communication_channel_id"`
Mobile string `gorm:"column:mobile"`
FullName string `gorm:"column:full_name"`
ReservedField string `gorm:"column:reserved_field"`
}
func (BatcheData) TableName() string {
return "batche_datas"
}
type BatchProcessingInformation struct {
ID uint `gorm:"primaryKey;autoIncrement"`
CommunicationChannelID string `gorm:"column:communication_channel_id"`
RepeatTargetsMember int `gorm:"column:repeat_targets_member"`
InsertsTargetsMember int `gorm:"column:inserts_targets_member"`
DataFileName string `gorm:"column:data_file_name"`
}
func (BatchProcessingInformation) TableName() string {
return "batches_processing_informations"
}
type BatchDataDuplicateLog struct {
ID int `gorm:"primaryKey;autoIncrement"`
CommunicationChannelID string `gorm:"column:communication_channel_id"`
Mobile string `gorm:"column:mobile"`
FullName string `gorm:"column:full_name"`
ReservedField string `gorm:"column:reserved_field"`
}
func (BatchDataDuplicateLog) TableName() string {
return "batche_data_duplicate_logs"
}
func init() {
// 获取可执行文件所在的路径
executablePath, err := os.Executable()
if err != nil {
log.Fatal("failed to get executable path: ", err)
}
executableDir = filepath.Dir(executablePath)
flag.StringVar(&env, "env", "dev", "运行模式")
flag.Parse()
switch env {
case "dev":
//fmt.Print("测试环境配置已生效\n")
redisAddress = "mysql5.weu.me:6379"
redisPassword = ""
redisDB = 1
sftpAddress = "192.168.10.86:49156"
sftpUser = "demo"
sftpPassword = "demo"
sftpDir = "/sftp/test"
dbAddress = "192.168.10.18:1433"
dbUser = "sa"
dbPassword = "Aa123123"
dbName = "sephora"
zipPath = "RawData/Zip/"
txtPath = "RawData/Txt/"
logPath = "logs/"
batchSize = 5000 //提交数据
insertSize = 500 //一次性入库
insertChanSize = 10 //通道缓冲数
goSize = 10 //协程数
taskTime = 1
case "prod":
//fmt.Print("正式环境配置已生效\n")
redisAddress = "r-bp11564d96842414128.redis.rds.aliyuncs.com:6379"
redisPassword = "3Nsb4Pmsl9bcLs24mL12l"
redisDB = 233
sftpAddress = "esftp.sephora.com.cn:20981"
sftpUser = "CHN-CRMTOWemedia-wemedia"
sftpPassword = "uoWdMHEv39ZFjiOg"
sftpDir = "/CN-CRMTOWemedia/SMS"
dbAddress = "rm-bp16l424ln96q1ouk.sqlserver.rds.aliyuncs.com:3433"
dbUser = "sephora_sms"
dbPassword = "5ORiiLmgkniC0EqF"
dbName = "sephora_sms"
zipPath = "RawData/Zip/"
txtPath = "RawData/Txt/"
logPath = "logs/"
batchSize = 5000 //提交数据
insertSize = 500 //一次性入库
insertChanSize = 50 //通道缓冲数
goSize = 10 //协程数
taskTime = 60
default:
panic(fmt.Errorf("无效的运行模式: %s", env))
}
//判断目录是否存在,不存在则创建
err = os.MkdirAll(filepath.Join(executableDir, zipPath), 0755)
err = os.MkdirAll(filepath.Join(executableDir, txtPath), 0755)
err = os.MkdirAll(filepath.Join(executableDir, logPath, time.Now().Format("2006_01")), 0755)
if err != nil {
log.Fatal(err)
}
redisClient = redis.NewClient(&redis.Options{
Addr: redisAddress,
Password: redisPassword,
DB: redisDB,
})
}
func main() {
// 打开一个文件作为锁
lockFile, err := os.OpenFile(filepath.Join(executableDir, ".lock"), os.O_CREATE|os.O_RDWR, 0666)
if err != nil {
fmt.Println("打开锁文件失败:", err)
os.Exit(1)
}
defer lockFile.Close()
// 尝试获取文件的独占锁
err = syscall.Flock(int(lockFile.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
if err != nil {
fmt.Println("程序已经在运行,本程序无法同时运行多个")
os.Exit(1)
}
applogger = logrus.New()
logPath = filepath.Join(executableDir, "logs", time.Now().Format("2006_01"))
err = os.MkdirAll(logPath, 0755)
logFileName := "sms_processing_" + time.Now().Format("2006_01_02") + ".log"
logFileHook := &lumberjack.Logger{
Filename: filepath.Join(logPath, logFileName),
}
logOutput := io.MultiWriter(os.Stdout, logFileHook) // 将日志同时输出到控制台和文件
applogger.SetOutput(logOutput)
applogger.Info(fmt.Sprintf("程序启动,加载%s环境....", env))
go downloadDecompression() // 启动立即执行一次
ticker := time.NewTicker(time.Duration(taskTime) * time.Minute) // 创建一个定时器
tickCount := 2 //记录循环次数
defer ticker.Stop() // 延迟关闭定时器,确保所有的定时器事件都被处理完毕
// 循环处理任务
for {
select {
case <-ticker.C:
// 定时器触发时执行的任务函数
logPath = filepath.Join(executableDir, "logs", time.Now().Format("2006_01"))
logFileName = "sms_processing_" + time.Now().Format("2006_01_02") + ".log"
logFileHook = &lumberjack.Logger{
Filename: filepath.Join(logPath, logFileName),
}
logOutput := io.MultiWriter(os.Stdout, logFileHook) // 将日志同时输出到控制台和文件
applogger.SetOutput(logOutput)
//fmt.Printf("尝试第%d次执行....\n", tickCount)
applogger.Info(fmt.Sprintf("尝试第%d次执行....", tickCount))
go downloadDecompression() // 在新协程中异步执行
tickCount++
}
}
}
func downloadDecompression() {
if exists, _ := redisClient.Exists("iniDataStatus").Result(); exists == 1 {
fmt.Print("上一批次执行中,跳过本次\n")
} else {
// 写入执行中标记
err := redisClient.Set("iniDataStatus", 1, 0).Err()
if err != nil {
fmt.Printf("写入任务执行中标记失败:%s\n", err.Error())
applogger.Info("写入任务执行中标记失败:%v", err)
}
// Connect to SFTP server
sshConfig := &ssh.ClientConfig{
User: sftpUser,
Auth: []ssh.AuthMethod{
ssh.Password(sftpPassword),
},
HostKeyCallback: ssh.InsecureIgnoreHostKey(),
}
sshClient, err := ssh.Dial("tcp", sftpAddress, sshConfig)
if err != nil {
fmt.Printf("写入任务执行中标记失败:%s\n", err.Error())
applogger.Info("写入任务执行中标记失败:%v", err)
}
sftpClient, err := sftp.NewClient(sshClient)
if err != nil {
fmt.Printf("写入任务执行中标记失败:%s\n", err.Error())
applogger.Info("写入任务执行中标记失败:%v", err)
}
defer sftpClient.Close()
files, err := sftpClient.ReadDir(sftpDir)
if err != nil {
fmt.Printf("sftp目录不存在: 错误信息:%s\n", err.Error())
applogger.Info("sftp目录不存在: 错误信息:%v", err)
}
it := 1
fmt.Printf("共%d个文件\n", len(files))
for _, file := range files {
fmt.Printf("第%d个文件处理中\n", it)
it++
// Check if file has been downloaded before
fileKey := fmt.Sprintf("downloaded:%s", file.Name())
if exists, _ := redisClient.Exists(fileKey).Result(); exists == 1 {
fmt.Println("跳过已处理过的文件:" + file.Name())
//applogger.Info("跳过已处理过的文件:" + file.Name())
continue
}
if filepath.Ext(file.Name()) == ".zip" {
//fmt.Println("下载开始(数据包):" + file.Name())
// Download file
srcFile, err := sftpClient.Open(path.Join(sftpDir, file.Name()))
if err != nil {
applogger.Info("打开sftp文件失败文件名:%s错误信息 %v", file.Name(), err)
continue
}
defer srcFile.Close()
dstFile, err := os.Create(path.Join(executableDir, zipPath, file.Name()))
if err != nil {
applogger.Info("创建本地文件失败,文件名: %s错误信息%v", file.Name(), err)
continue
}
defer dstFile.Close()
if _, err := io.Copy(dstFile, srcFile); err != nil {
applogger.Info("下载文件失败,文件名: %s错误信息%v", file.Name(), err)
continue
}
//fmt.Printf("%s数据包下载完成\n", file.Name())
applogger.Info(fmt.Sprintf("%s数据包下载完成", file.Name()))
// Unzip file
zipReader, err := zip.OpenReader(path.Join(executableDir, zipPath, file.Name()))
if err != nil {
applogger.Info("解压文件失败: 文件名:%s错误信息%v", file.Name(), err)
continue
}
defer zipReader.Close()
//fmt.Println("压缩报文件数量:", len(zipReader.File))
for _, zipFile := range zipReader.File {
zipFileReader, err := zipFile.Open()
if strings.Contains(zipFile.Name, "__MACOSX/._") {
//fmt.Print("系统文件.DS_Store,跳过处理", zipFile.Name)
continue
} else if filepath.Ext(zipFile.Name) != ".txt" {
fmt.Print("文件类型不正确,跳过处理", zipFile.Name)
continue
}
if err != nil || zipFileReader == nil {
applogger.Info("Failed to open zip file: %v", err)
fmt.Print("压缩文件处理结束")
continue
}
defer zipFileReader.Close()
// Create the file
unzipFile, err := os.Create(path.Join(executableDir, txtPath, zipFile.Name))
if err != nil {
applogger.Info("创建压缩后的文件失败,文件名:%s错误信息 %v", zipFile.Name, err)
continue
}
defer unzipFile.Close()
// Write the unzip data to the file
_, err = io.Copy(unzipFile, zipFileReader)
if err != nil {
applogger.Info("文件解压失败,文件名:%s错误信息 %v", zipFileReader, err)
continue
}
applogger.Info(fmt.Sprintf("%s数据包解压完成", zipFile.Name))
//fmt.Printf("%s数据包解压完成\n", zipFile.Name)
batchDataInsert(zipFile.Name)
}
} else if filepath.Ext(file.Name()) == ".txt" {
srcFile, err := sftpClient.Open(path.Join(sftpDir, file.Name()))
if err != nil {
applogger.Info("打开sftp文件失败文件名:%s错误信息 %v", file.Name(), err)
continue
}
defer srcFile.Close()
dstFile, err := os.Create(path.Join(executableDir, txtPath, file.Name()))
if err != nil {
applogger.Info("创建本地文件失败,文件名: %s错误信息%v", file.Name(), err)
continue
}
defer dstFile.Close()
if _, err := io.Copy(dstFile, srcFile); err != nil {
applogger.Info("下载文件失败,文件名: %s错误信息%v", file.Name(), err)
continue
}
fmt.Printf("%s批次文件下载完成 \n", file.Name())
applogger.Info(fmt.Sprintf("%s批次文件下载完成", file.Name()))
batchInsert(file.Name())
}
err = redisClient.Set(fileKey, 1, 0).Err() //写入下载、解压、入库完成标记
if err != nil {
//fmt.Printf("写入文件处理完成标记失败文件名:%s错误信息%v\n", file.Name(), err)
applogger.Info("写入文件处理完成标记失败文件名:%s错误信息v%\n", file.Name(), err)
}
}
redisClient.Del("iniDataStatus") //删除任务执行中标记
}
}
// 批次入库
func batchInsert(fileName string) {
//fmt.Print("批次处理开始")
start := time.Now()
db, _ := connectToDB()
file, err := os.Open(path.Join(executableDir, txtPath, fileName))
if err != nil {
//fmt.Printf("文件打开失败文件名:%s错误信息%v\n", fileName, err)
applogger.Info("文件打开失败文件名:%s错误信息%v", fileName, err)
} else {
defer file.Close()
reader := csv.NewReader(bufio.NewReader(file))
reader.Read()
batchRows := 0
for {
record, err := reader.Read()
if err != nil {
break
}
communicationChannelID, _ := strconv.ParseUint(record[0], 10, 32)
TargetsMember, _ := strconv.ParseUint(record[2], 10, 32)
templateID, _ := strconv.ParseUint(record[3], 10, 32)
status := uint(1)
batch := Batch{
CommunicationChannelID: uint(communicationChannelID),
CommunicationName: record[1],
TargetsMember: uint(TargetsMember),
TemplateID: uint(templateID),
Content: record[4],
Status: status,
DataFileName: fileName,
}
db.Create(&batch)
batchRows++
}
time.Sleep(time.Second)
elapsed := time.Since(start)
subject := "丝芙兰批次文件处理完成"
body := "批次文件:" + fileName + ";\n批次数" + strconv.Itoa(batchRows) + ";\n处理完成请前往管理平台查看处理。"
SendEmail(subject, body) //发送邮件
//fmt.Printf("%s批次文件入库完成 \n", fileName)
applogger.Info(fmt.Sprintf("%s批次文件入库完成", fileName))
//fmt.Printf("%s批次文件执行时间%s 插入批次次数:%d\n\n\n", fileName, elapsed, batchRows)
applogger.Info(fmt.Sprintf("%s批次文件执行时间%s 插入批次次数:%d", fileName, elapsed, batchRows))
}
}
func batchDataInsert(fileName string) {
start := time.Now()
// Open file
file, err := os.Open(path.Join(executableDir, txtPath, fileName))
if err != nil {
//fmt.Printf("文件打开失败,文件名:%s错误信息%v\n", fileName, err)
applogger.Info(fmt.Sprintf("文件打开失败,文件名:%s错误信息%v", fileName, err))
} else {
defer file.Close()
db, _ := connectToDB()
// Insert data in batches using multiple goroutines
var wg sync.WaitGroup
dataBatchChan := make(chan []BatcheData, insertChanSize)
for i := 0; i < goSize; i++ {
wg.Add(1)
go func() {
for batch := range dataBatchChan {
retryCount := 0
for {
if err := db.CreateInBatches(batch, insertSize).Error; err != nil {
if retryCount >= 5 {
panic(err)
}
fmt.Printf("Insert failed, retrying in %v seconds...\n", 2*retryCount)
time.Sleep(time.Duration(2*retryCount) * time.Second)
retryCount++
} else {
break
}
}
}
wg.Done()
}()
}
// Create and initialize hashset
hs := make(map[string]bool)
// Prepare batch data
dataBatch := make([]BatcheData, 0, batchSize)
dataBatchDuplicate := make([]BatchDataDuplicateLog, 0, batchSize)
// Parse file line by line and insert data in batches
scanner := bufio.NewScanner(file)
scanner.Split(bufio.ScanLines)
scanner.Scan() // skip first line
bi := 0
duplicateCount := make(map[string]int)
insertsCount := make(map[string]int)
var count int
//ci := 1
for scanner.Scan() {
line := scanner.Text()
row, err := csv.NewReader(strings.NewReader(line)).Read()
if err != nil {
//fmt.Printf("文件按行读取失败,文件名:%s错误信息%v\n", fileName, err)
applogger.Info(fmt.Sprintf("文件按行读取失败,文件名:%s错误信息%v", fileName, err))
continue
}
reservedFields := map[string]string{ //合并个性化字段
"ReservedField1": row[5],
"ReservedField2": row[6],
"ReservedField3": row[7],
"ReservedField4": row[8],
"ReservedField5": row[9],
"DataFileName": fileName,
}
reservedFieldsJson, err := json.Marshal(reservedFields) //个性化字段转json
if err != nil {
//fmt.Printf("个性化字段合并失败,文件名:%s错误信息%v\n", fileName, err)
applogger.Info(fmt.Sprintf("个性化字段合并失败,文件名:%s错误信息%v", fileName, err))
continue
}
if _, ok := duplicateCount[row[2]]; !ok {
duplicateCount[row[2]] = 0
}
// Check if record exists in hashset
key := fmt.Sprintf("%s-%s", row[2], row[3])
if _, exists := hs[key]; exists { //如果批次数据重复
bi++
// Increment duplicate count
duplicateCount[row[2]]++
dataBatchDuplicate = append(dataBatchDuplicate, BatchDataDuplicateLog{
CommunicationChannelID: row[2],
Mobile: row[3],
FullName: row[4],
ReservedField: string(reservedFieldsJson),
})
if len(dataBatchDuplicate) >= batchSize {
err = db.CreateInBatches(dataBatchDuplicate, insertSize).Error
if err != nil {
//fmt.Printf("插入重复数据失败,文件名:%s错误信息%v\n", fileName, err)
applogger.Info(fmt.Sprintf("插入重复数据失败,文件名:%s错误信息%v", fileName, err))
} else {
dataBatchDuplicate = make([]BatchDataDuplicateLog, 0, batchSize)
}
}
continue
}
// Add record to hashset
hs[key] = true
dataBatch = append(dataBatch, BatcheData{
CommunicationChannelID: row[2],
Mobile: row[3],
FullName: row[4],
ReservedField: string(reservedFieldsJson),
})
if len(dataBatch) >= batchSize {
dataBatchChan <- dataBatch
dataBatch = make([]BatcheData, 0, batchSize)
//fmt.Print("提交通次数" + strconv.Itoa(ci))
//ci++
}
if _, ok := insertsCount[row[2]]; !ok {
insertsCount[row[2]] = 0
}
insertsCount[row[2]]++
count++
}
if len(dataBatch) > 0 {
dataBatchChan <- dataBatch
dataBatch = make([]BatcheData, 0, batchSize)
//fmt.Print("文件读取完成,最后一批提交至通道")
}
close(dataBatchChan)
wg.Wait() //所有入库全部完成
//插入批次处理信息
bpi := []BatchProcessingInformation{}
for key, value := range duplicateCount {
bpi = append(bpi, BatchProcessingInformation{
CommunicationChannelID: key,
RepeatTargetsMember: value,
InsertsTargetsMember: insertsCount[key],
DataFileName: fileName,
})
}
err = db.CreateInBatches(bpi, insertSize).Error
if err != nil {
//fmt.Printf("插入批次处理信息失败,文件名:%s错误信息%v\n", fileName, err)
applogger.Info(fmt.Sprintf("插入批次处理信息失败,文件名:%s错误信息%v", fileName, err))
}
//插入批此重复数据
if len(dataBatchDuplicate) > 0 {
err = db.CreateInBatches(dataBatchDuplicate, insertSize).Error
if err != nil {
//fmt.Printf("插入重复数据失败,文件名:%s错误信息%v\n", fileName, err)
applogger.Info(fmt.Sprintf("插入重复数据失败,文件名:%s错误信息%v", fileName, err))
} else {
dataBatchDuplicate = nil
}
}
elapsed := time.Since(start)
//发送提醒邮件
subject := "丝芙兰数据包处理完成"
body := "数据包:" + fileName + ";\n总数" + strconv.Itoa(count+bi) + ";\n过滤重复数" + strconv.Itoa(bi) + ";\n过滤后总数" + strconv.Itoa(count) + ";\n处理完成请前往管理平台查看处理。"
SendEmail(subject, body) //发送邮件
applogger.Info(fmt.Sprintf("%s数据包 ,入库完成", fileName))
//fmt.Printf("%s数据包 入库完成\n", fileName)
//fmt.Printf("%s数据包 执行时间:%s 插入数据:%d 过滤数:%d\n\n\n", fileName, elapsed, count, bi)
applogger.Info(fmt.Sprintf("%s数据包执行时间:%s 插入数据:%d条 过滤数数:%d条", fileName, elapsed, count, bi))
}
}
func connectToDB() (*gorm.DB, error) {
dsn := "sqlserver://" + dbUser + ":" + dbPassword + "@" + dbAddress + "?database=" + dbName + "&charset=utf8mb4"
var db *gorm.DB
var err error
attempt := 1
maxAttempts := 5
backoff := time.Second
logger := logger.Default.LogMode(logger.Error) //Silent、Error、Warn、Info
for {
db, err = gorm.Open(sqlserver.Open(dsn), &gorm.Config{Logger: logger, SkipDefaultTransaction: true})
if err == nil {
break
}
if attempt >= maxAttempts {
fmt.Printf("数据库连接失败,错误信息%v\n", err)
applogger.Info("数据库连接失败,错误信息%v", err)
return nil, err
}
time.Sleep(backoff)
backoff *= 2
attempt++
}
return db, nil
}
func SendEmail(subject string, body string) error {
// 邮箱认证信息
smtpHost := "smtp.exmail.qq.com"
smtpPort := 465
from := "auto_system@wemediacn.com"
password := "yJJYYcy5NKx2y69r"
to := []string{"chejiulong@wemediacn.com", "xiayujuan@wemediacn.com", "wangyuanbing@wemediacn.com", "tangweiqi@wemediacn.com"}
//to := []string{"chejiulong@wemediacn.com"}
// 邮件内容
m := gomail.NewMessage()
m.SetHeader("From", from)
m.SetHeader("To", to...)
m.SetHeader("Subject", subject)
m.SetBody("text/plain", body)
// 邮件发送
d := gomail.NewDialer(smtpHost, smtpPort, from, password)
d.TLSConfig = &tls.Config{InsecureSkipVerify: true}
err := d.DialAndSend(m)
if err != nil {
fmt.Printf("邮件发送失败,错误信息%v\n", err)
applogger.Info("邮件发送失败,错误信息%v", err)
}
return nil
}