2023-02-18 16:32:19 +08:00
|
|
|
|
package main
|
|
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
|
"archive/zip"
|
|
|
|
|
|
"bufio"
|
2023-02-18 19:31:39 +08:00
|
|
|
|
"crypto/tls"
|
2023-02-18 16:32:19 +08:00
|
|
|
|
"encoding/csv"
|
|
|
|
|
|
"encoding/json"
|
2023-02-27 11:58:39 +08:00
|
|
|
|
"flag"
|
2023-02-18 16:32:19 +08:00
|
|
|
|
"fmt"
|
|
|
|
|
|
"io"
|
|
|
|
|
|
"log"
|
|
|
|
|
|
"os"
|
|
|
|
|
|
"path"
|
|
|
|
|
|
"path/filepath"
|
|
|
|
|
|
"strconv"
|
|
|
|
|
|
"strings"
|
|
|
|
|
|
"sync"
|
|
|
|
|
|
"syscall"
|
|
|
|
|
|
"time"
|
|
|
|
|
|
|
|
|
|
|
|
"github.com/go-redis/redis"
|
|
|
|
|
|
"github.com/pkg/sftp"
|
|
|
|
|
|
"github.com/sirupsen/logrus"
|
|
|
|
|
|
"golang.org/x/crypto/ssh"
|
2023-02-18 19:31:39 +08:00
|
|
|
|
"gopkg.in/gomail.v2"
|
2023-02-21 14:36:25 +08:00
|
|
|
|
"gopkg.in/natefinch/lumberjack.v2"
|
2023-02-18 16:32:19 +08:00
|
|
|
|
"gorm.io/driver/sqlserver"
|
|
|
|
|
|
_ "gorm.io/driver/sqlserver"
|
|
|
|
|
|
"gorm.io/gorm"
|
|
|
|
|
|
"gorm.io/gorm/logger"
|
|
|
|
|
|
)
|
|
|
|
|
|
|
2023-02-27 11:58:39 +08:00
|
|
|
|
var ( //初始化变量
|
|
|
|
|
|
env string
|
|
|
|
|
|
applogger *logrus.Logger
|
|
|
|
|
|
redisClient *redis.Client
|
|
|
|
|
|
executableDir string
|
|
|
|
|
|
redisAddress string
|
|
|
|
|
|
redisPassword string
|
|
|
|
|
|
redisDB int
|
|
|
|
|
|
sftpAddress string
|
|
|
|
|
|
sftpUser string
|
|
|
|
|
|
sftpPassword string
|
|
|
|
|
|
sftpDir string
|
|
|
|
|
|
dbAddress string
|
|
|
|
|
|
dbUser string
|
|
|
|
|
|
dbPassword string
|
|
|
|
|
|
dbName string
|
|
|
|
|
|
zipPath string
|
|
|
|
|
|
txtPath string
|
|
|
|
|
|
logPath string
|
|
|
|
|
|
batchSize int //提交数据
|
|
|
|
|
|
insertSize int //一次性入库
|
|
|
|
|
|
insertChanSize int //通道缓冲数
|
|
|
|
|
|
goSize int //协程数
|
|
|
|
|
|
taskTime int
|
2023-02-27 19:09:43 +08:00
|
|
|
|
to []string
|
2023-02-18 16:32:19 +08:00
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
type Batch struct {
|
|
|
|
|
|
ID uint `gorm:"primary_key"`
|
|
|
|
|
|
CommunicationChannelID uint
|
|
|
|
|
|
CommunicationName string `gorm:"type:varchar(255)"`
|
|
|
|
|
|
TargetsMember uint `gorm:"type:int"`
|
|
|
|
|
|
TemplateID uint
|
|
|
|
|
|
Content string `gorm:"type:text"`
|
|
|
|
|
|
CreatedAt time.Time `gorm:"default:getdate()"`
|
|
|
|
|
|
UpdatedAt time.Time `gorm:"default:getdate()"`
|
|
|
|
|
|
Status uint `gorm:"type:int"`
|
|
|
|
|
|
DataFileName string `gorm:"type:text"`
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
type BatcheData struct {
|
|
|
|
|
|
ID uint `gorm:"primary_key"`
|
|
|
|
|
|
CommunicationChannelID string `gorm:"column:communication_channel_id"`
|
|
|
|
|
|
Mobile string `gorm:"column:mobile"`
|
|
|
|
|
|
FullName string `gorm:"column:full_name"`
|
|
|
|
|
|
ReservedField string `gorm:"column:reserved_field"`
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func (BatcheData) TableName() string {
|
|
|
|
|
|
return "batche_datas"
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
type BatchProcessingInformation struct {
|
|
|
|
|
|
ID uint `gorm:"primaryKey;autoIncrement"`
|
|
|
|
|
|
CommunicationChannelID string `gorm:"column:communication_channel_id"`
|
|
|
|
|
|
RepeatTargetsMember int `gorm:"column:repeat_targets_member"`
|
2023-02-20 12:35:41 +08:00
|
|
|
|
InsertsTargetsMember int `gorm:"column:inserts_targets_member"`
|
2023-02-18 16:32:19 +08:00
|
|
|
|
DataFileName string `gorm:"column:data_file_name"`
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func (BatchProcessingInformation) TableName() string {
|
|
|
|
|
|
return "batches_processing_informations"
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
type BatchDataDuplicateLog struct {
|
|
|
|
|
|
ID int `gorm:"primaryKey;autoIncrement"`
|
|
|
|
|
|
CommunicationChannelID string `gorm:"column:communication_channel_id"`
|
|
|
|
|
|
Mobile string `gorm:"column:mobile"`
|
|
|
|
|
|
FullName string `gorm:"column:full_name"`
|
|
|
|
|
|
ReservedField string `gorm:"column:reserved_field"`
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func (BatchDataDuplicateLog) TableName() string {
|
|
|
|
|
|
return "batche_data_duplicate_logs"
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func init() {
|
|
|
|
|
|
// 获取可执行文件所在的路径
|
|
|
|
|
|
executablePath, err := os.Executable()
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
log.Fatal("failed to get executable path: ", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
executableDir = filepath.Dir(executablePath)
|
|
|
|
|
|
|
2023-02-27 11:58:39 +08:00
|
|
|
|
flag.StringVar(&env, "env", "dev", "运行模式")
|
|
|
|
|
|
flag.Parse()
|
|
|
|
|
|
switch env {
|
|
|
|
|
|
case "dev":
|
2023-02-27 18:31:18 +08:00
|
|
|
|
//fmt.Print("测试环境配置已生效\n")
|
2023-02-27 11:58:39 +08:00
|
|
|
|
redisAddress = "mysql5.weu.me:6379"
|
|
|
|
|
|
redisPassword = ""
|
|
|
|
|
|
redisDB = 1
|
|
|
|
|
|
sftpAddress = "192.168.10.86:49156"
|
|
|
|
|
|
sftpUser = "demo"
|
|
|
|
|
|
sftpPassword = "demo"
|
|
|
|
|
|
sftpDir = "/sftp/test"
|
|
|
|
|
|
dbAddress = "192.168.10.18:1433"
|
|
|
|
|
|
dbUser = "sa"
|
|
|
|
|
|
dbPassword = "Aa123123"
|
|
|
|
|
|
dbName = "sephora"
|
|
|
|
|
|
zipPath = "RawData/Zip/"
|
|
|
|
|
|
txtPath = "RawData/Txt/"
|
|
|
|
|
|
logPath = "logs/"
|
|
|
|
|
|
batchSize = 5000 //提交数据
|
|
|
|
|
|
insertSize = 500 //一次性入库
|
|
|
|
|
|
insertChanSize = 10 //通道缓冲数
|
|
|
|
|
|
goSize = 10 //协程数
|
|
|
|
|
|
taskTime = 1
|
2023-02-27 19:09:43 +08:00
|
|
|
|
to = []string{"chejiulong@wemediacn.com"}
|
2023-02-27 11:58:39 +08:00
|
|
|
|
case "prod":
|
2023-02-27 18:31:18 +08:00
|
|
|
|
//fmt.Print("正式环境配置已生效\n")
|
2023-02-27 11:58:39 +08:00
|
|
|
|
redisAddress = "r-bp11564d96842414128.redis.rds.aliyuncs.com:6379"
|
|
|
|
|
|
redisPassword = "3Nsb4Pmsl9bcLs24mL12l"
|
|
|
|
|
|
redisDB = 233
|
|
|
|
|
|
sftpAddress = "esftp.sephora.com.cn:20981"
|
|
|
|
|
|
sftpUser = "CHN-CRMTOWemedia-wemedia"
|
|
|
|
|
|
sftpPassword = "uoWdMHEv39ZFjiOg"
|
|
|
|
|
|
sftpDir = "/CN-CRMTOWemedia/SMS"
|
|
|
|
|
|
dbAddress = "rm-bp16l424ln96q1ouk.sqlserver.rds.aliyuncs.com:3433"
|
|
|
|
|
|
dbUser = "sephora_sms"
|
|
|
|
|
|
dbPassword = "5ORiiLmgkniC0EqF"
|
|
|
|
|
|
dbName = "sephora_sms"
|
|
|
|
|
|
zipPath = "RawData/Zip/"
|
|
|
|
|
|
txtPath = "RawData/Txt/"
|
|
|
|
|
|
logPath = "logs/"
|
|
|
|
|
|
batchSize = 5000 //提交数据
|
|
|
|
|
|
insertSize = 500 //一次性入库
|
|
|
|
|
|
insertChanSize = 50 //通道缓冲数
|
2023-02-28 18:48:53 +08:00
|
|
|
|
goSize = 50 //协程数
|
2023-02-27 18:34:17 +08:00
|
|
|
|
taskTime = 60
|
2023-02-27 19:09:43 +08:00
|
|
|
|
to = []string{"chejiulong@wemediacn.com", "xiayujuan@wemediacn.com", "wangyuanbing@wemediacn.com", "tangweiqi@wemediacn.com"}
|
2023-02-27 11:58:39 +08:00
|
|
|
|
default:
|
|
|
|
|
|
panic(fmt.Errorf("无效的运行模式: %s", env))
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2023-02-27 15:36:18 +08:00
|
|
|
|
//判断目录是否存在,不存在则创建
|
|
|
|
|
|
err = os.MkdirAll(filepath.Join(executableDir, zipPath), 0755)
|
|
|
|
|
|
err = os.MkdirAll(filepath.Join(executableDir, txtPath), 0755)
|
|
|
|
|
|
err = os.MkdirAll(filepath.Join(executableDir, logPath, time.Now().Format("2006_01")), 0755)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
log.Fatal(err)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2023-02-18 16:32:19 +08:00
|
|
|
|
redisClient = redis.NewClient(&redis.Options{
|
|
|
|
|
|
Addr: redisAddress,
|
|
|
|
|
|
Password: redisPassword,
|
|
|
|
|
|
DB: redisDB,
|
|
|
|
|
|
})
|
2023-02-28 18:48:53 +08:00
|
|
|
|
|
2023-02-18 16:32:19 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func main() {
|
2023-02-21 14:36:25 +08:00
|
|
|
|
// 打开一个文件作为锁
|
|
|
|
|
|
lockFile, err := os.OpenFile(filepath.Join(executableDir, ".lock"), os.O_CREATE|os.O_RDWR, 0666)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
fmt.Println("打开锁文件失败:", err)
|
|
|
|
|
|
os.Exit(1)
|
|
|
|
|
|
}
|
|
|
|
|
|
defer lockFile.Close()
|
|
|
|
|
|
// 尝试获取文件的独占锁
|
|
|
|
|
|
err = syscall.Flock(int(lockFile.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
fmt.Println("程序已经在运行,本程序无法同时运行多个")
|
|
|
|
|
|
os.Exit(1)
|
|
|
|
|
|
}
|
|
|
|
|
|
applogger = logrus.New()
|
2023-02-27 16:47:07 +08:00
|
|
|
|
logPath = filepath.Join(executableDir, "logs", time.Now().Format("2006_01"))
|
|
|
|
|
|
err = os.MkdirAll(logPath, 0755)
|
2023-02-27 11:58:39 +08:00
|
|
|
|
logFileName := "sms_processing_" + time.Now().Format("2006_01_02") + ".log"
|
2023-02-21 14:36:25 +08:00
|
|
|
|
logFileHook := &lumberjack.Logger{
|
|
|
|
|
|
Filename: filepath.Join(logPath, logFileName),
|
|
|
|
|
|
}
|
2023-02-27 18:31:18 +08:00
|
|
|
|
logOutput := io.MultiWriter(os.Stdout, logFileHook) // 将日志同时输出到控制台和文件
|
|
|
|
|
|
applogger.SetOutput(logOutput)
|
2023-02-28 18:48:53 +08:00
|
|
|
|
applogger.Info(fmt.Sprintf("程序启动,加载%s环境,尝试执行...", env))
|
2023-02-21 14:36:25 +08:00
|
|
|
|
go downloadDecompression() // 启动立即执行一次
|
2023-02-18 16:32:19 +08:00
|
|
|
|
|
2023-02-27 18:34:17 +08:00
|
|
|
|
ticker := time.NewTicker(time.Duration(taskTime) * time.Minute) // 创建一个定时器
|
2023-02-27 11:58:39 +08:00
|
|
|
|
defer ticker.Stop() // 延迟关闭定时器,确保所有的定时器事件都被处理完毕
|
2023-02-18 16:32:19 +08:00
|
|
|
|
// 循环处理任务
|
|
|
|
|
|
for {
|
|
|
|
|
|
select {
|
|
|
|
|
|
case <-ticker.C:
|
|
|
|
|
|
// 定时器触发时执行的任务函数
|
2023-02-27 16:47:07 +08:00
|
|
|
|
logPath = filepath.Join(executableDir, "logs", time.Now().Format("2006_01"))
|
|
|
|
|
|
logFileName = "sms_processing_" + time.Now().Format("2006_01_02") + ".log"
|
|
|
|
|
|
logFileHook = &lumberjack.Logger{
|
2023-02-27 11:58:39 +08:00
|
|
|
|
Filename: filepath.Join(logPath, logFileName),
|
|
|
|
|
|
}
|
2023-02-27 18:31:18 +08:00
|
|
|
|
logOutput := io.MultiWriter(os.Stdout, logFileHook) // 将日志同时输出到控制台和文件
|
|
|
|
|
|
applogger.SetOutput(logOutput)
|
2023-02-28 18:48:53 +08:00
|
|
|
|
applogger.Info("尝试执行...")
|
|
|
|
|
|
go downloadDecompression()
|
2023-02-18 16:32:19 +08:00
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func downloadDecompression() {
|
|
|
|
|
|
if exists, _ := redisClient.Exists("iniDataStatus").Result(); exists == 1 {
|
2023-02-21 14:36:25 +08:00
|
|
|
|
fmt.Print("上一批次执行中,跳过本次\n")
|
2023-02-18 16:32:19 +08:00
|
|
|
|
} else {
|
|
|
|
|
|
// 写入执行中标记
|
|
|
|
|
|
err := redisClient.Set("iniDataStatus", 1, 0).Err()
|
|
|
|
|
|
if err != nil {
|
2023-02-27 19:09:43 +08:00
|
|
|
|
applogger.Error(fmt.Sprintf("写入任务执行中标记失败:%v", err))
|
2023-02-18 16:32:19 +08:00
|
|
|
|
}
|
|
|
|
|
|
// Connect to SFTP server
|
|
|
|
|
|
sshConfig := &ssh.ClientConfig{
|
|
|
|
|
|
User: sftpUser,
|
|
|
|
|
|
Auth: []ssh.AuthMethod{
|
|
|
|
|
|
ssh.Password(sftpPassword),
|
|
|
|
|
|
},
|
|
|
|
|
|
HostKeyCallback: ssh.InsecureIgnoreHostKey(),
|
|
|
|
|
|
}
|
|
|
|
|
|
sshClient, err := ssh.Dial("tcp", sftpAddress, sshConfig)
|
|
|
|
|
|
if err != nil {
|
2023-02-27 19:09:43 +08:00
|
|
|
|
applogger.Error(fmt.Sprintf("写入任务执行中标记失败:%v", err))
|
2023-02-18 16:32:19 +08:00
|
|
|
|
}
|
|
|
|
|
|
sftpClient, err := sftp.NewClient(sshClient)
|
|
|
|
|
|
if err != nil {
|
2023-02-27 19:09:43 +08:00
|
|
|
|
applogger.Error(fmt.Sprintf("写入任务执行中标记失败:%v", err))
|
2023-02-18 16:32:19 +08:00
|
|
|
|
}
|
|
|
|
|
|
defer sftpClient.Close()
|
2023-02-20 18:44:32 +08:00
|
|
|
|
files, err := sftpClient.ReadDir(sftpDir)
|
2023-02-18 16:32:19 +08:00
|
|
|
|
if err != nil {
|
2023-02-27 19:09:43 +08:00
|
|
|
|
applogger.Error(fmt.Sprintf("sftp目录不存在: 错误信息:%v", err))
|
2023-02-18 16:32:19 +08:00
|
|
|
|
}
|
2023-02-20 18:44:32 +08:00
|
|
|
|
it := 1
|
|
|
|
|
|
fmt.Printf("共%d个文件\n", len(files))
|
2023-02-18 16:32:19 +08:00
|
|
|
|
for _, file := range files {
|
2023-02-21 14:36:25 +08:00
|
|
|
|
fmt.Printf("第%d个文件处理中\n", it)
|
2023-02-20 18:44:32 +08:00
|
|
|
|
it++
|
2023-02-18 16:32:19 +08:00
|
|
|
|
// Check if file has been downloaded before
|
|
|
|
|
|
fileKey := fmt.Sprintf("downloaded:%s", file.Name())
|
|
|
|
|
|
if exists, _ := redisClient.Exists(fileKey).Result(); exists == 1 {
|
2023-02-21 14:36:25 +08:00
|
|
|
|
fmt.Println("跳过已处理过的文件:" + file.Name())
|
2023-02-18 16:32:19 +08:00
|
|
|
|
continue
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if filepath.Ext(file.Name()) == ".zip" {
|
|
|
|
|
|
//fmt.Println("下载开始(数据包):" + file.Name())
|
|
|
|
|
|
// Download file
|
2023-02-20 18:44:32 +08:00
|
|
|
|
srcFile, err := sftpClient.Open(path.Join(sftpDir, file.Name()))
|
2023-02-18 16:32:19 +08:00
|
|
|
|
if err != nil {
|
2023-02-27 19:09:43 +08:00
|
|
|
|
applogger.Error(fmt.Sprintf("打开sftp文件失败,文件名:%s,错误信息: %v", file.Name(), err))
|
2023-02-18 16:32:19 +08:00
|
|
|
|
continue
|
|
|
|
|
|
}
|
|
|
|
|
|
defer srcFile.Close()
|
|
|
|
|
|
dstFile, err := os.Create(path.Join(executableDir, zipPath, file.Name()))
|
|
|
|
|
|
|
|
|
|
|
|
if err != nil {
|
2023-02-27 19:09:43 +08:00
|
|
|
|
applogger.Error(fmt.Sprintf("创建本地文件失败,文件名: %s,错误信息:%v", file.Name(), err))
|
2023-02-18 16:32:19 +08:00
|
|
|
|
continue
|
|
|
|
|
|
}
|
|
|
|
|
|
defer dstFile.Close()
|
|
|
|
|
|
if _, err := io.Copy(dstFile, srcFile); err != nil {
|
2023-02-27 19:09:43 +08:00
|
|
|
|
applogger.Error(fmt.Sprintf("下载文件失败,文件名: %s,错误信息:%v", file.Name(), err))
|
2023-02-18 16:32:19 +08:00
|
|
|
|
continue
|
|
|
|
|
|
}
|
2023-02-27 18:31:18 +08:00
|
|
|
|
//fmt.Printf("%s(数据包)下载完成\n", file.Name())
|
2023-02-28 18:48:53 +08:00
|
|
|
|
applogger.Info(fmt.Sprintf("%s(数据包)下载完成", file.Name()))
|
2023-02-18 16:32:19 +08:00
|
|
|
|
// Unzip file
|
|
|
|
|
|
zipReader, err := zip.OpenReader(path.Join(executableDir, zipPath, file.Name()))
|
|
|
|
|
|
if err != nil {
|
2023-02-27 19:09:43 +08:00
|
|
|
|
applogger.Error(fmt.Sprintf("解压文件失败: 文件名:%s,错误信息:%v", file.Name(), err))
|
2023-02-18 16:32:19 +08:00
|
|
|
|
continue
|
|
|
|
|
|
}
|
|
|
|
|
|
defer zipReader.Close()
|
|
|
|
|
|
//fmt.Println("压缩报文件数量:", len(zipReader.File))
|
|
|
|
|
|
for _, zipFile := range zipReader.File {
|
|
|
|
|
|
zipFileReader, err := zipFile.Open()
|
|
|
|
|
|
if strings.Contains(zipFile.Name, "__MACOSX/._") {
|
|
|
|
|
|
//fmt.Print("系统文件.DS_Store,跳过处理", zipFile.Name)
|
|
|
|
|
|
continue
|
|
|
|
|
|
} else if filepath.Ext(zipFile.Name) != ".txt" {
|
2023-02-27 19:09:43 +08:00
|
|
|
|
applogger.Error(fmt.Sprintf("文件类型不正确,跳过处理: %v", zipFile.Name))
|
2023-02-18 16:32:19 +08:00
|
|
|
|
continue
|
|
|
|
|
|
}
|
|
|
|
|
|
if err != nil || zipFileReader == nil {
|
2023-02-27 19:09:43 +08:00
|
|
|
|
applogger.Error(fmt.Sprintf("Failed to open zip file: %v", err))
|
2023-02-18 16:32:19 +08:00
|
|
|
|
fmt.Print("压缩文件处理结束")
|
|
|
|
|
|
continue
|
|
|
|
|
|
}
|
|
|
|
|
|
defer zipFileReader.Close()
|
|
|
|
|
|
// Create the file
|
|
|
|
|
|
unzipFile, err := os.Create(path.Join(executableDir, txtPath, zipFile.Name))
|
|
|
|
|
|
if err != nil {
|
2023-02-27 19:09:43 +08:00
|
|
|
|
applogger.Error(fmt.Sprintf("创建压缩后的文件失败,文件名:%s,错误信息: %v", zipFile.Name, err))
|
2023-02-18 16:32:19 +08:00
|
|
|
|
continue
|
|
|
|
|
|
}
|
|
|
|
|
|
defer unzipFile.Close()
|
|
|
|
|
|
// Write the unzip data to the file
|
|
|
|
|
|
_, err = io.Copy(unzipFile, zipFileReader)
|
|
|
|
|
|
if err != nil {
|
2023-02-27 19:09:43 +08:00
|
|
|
|
applogger.Error(fmt.Sprintf("文件解压失败,文件名:%s,错误信息: %v", zipFileReader, err))
|
2023-02-18 16:32:19 +08:00
|
|
|
|
continue
|
|
|
|
|
|
}
|
2023-02-21 14:36:25 +08:00
|
|
|
|
applogger.Info(fmt.Sprintf("%s(数据包)解压完成", zipFile.Name))
|
2023-02-27 18:31:18 +08:00
|
|
|
|
//fmt.Printf("%s(数据包)解压完成\n", zipFile.Name)
|
2023-02-18 16:32:19 +08:00
|
|
|
|
batchDataInsert(zipFile.Name)
|
2023-02-21 14:36:25 +08:00
|
|
|
|
|
2023-02-18 16:32:19 +08:00
|
|
|
|
}
|
|
|
|
|
|
} else if filepath.Ext(file.Name()) == ".txt" {
|
2023-02-20 18:44:32 +08:00
|
|
|
|
srcFile, err := sftpClient.Open(path.Join(sftpDir, file.Name()))
|
2023-02-18 16:32:19 +08:00
|
|
|
|
if err != nil {
|
2023-02-27 19:09:43 +08:00
|
|
|
|
applogger.Error(fmt.Sprintf("打开sftp文件失败,文件名:%s,错误信息: %v", file.Name(), err))
|
2023-02-18 16:32:19 +08:00
|
|
|
|
continue
|
|
|
|
|
|
}
|
|
|
|
|
|
defer srcFile.Close()
|
|
|
|
|
|
dstFile, err := os.Create(path.Join(executableDir, txtPath, file.Name()))
|
|
|
|
|
|
if err != nil {
|
2023-02-27 19:09:43 +08:00
|
|
|
|
applogger.Error(fmt.Sprintf("创建本地文件失败,文件名: %s,错误信息:%v", file.Name(), err))
|
2023-02-18 16:32:19 +08:00
|
|
|
|
continue
|
|
|
|
|
|
}
|
|
|
|
|
|
defer dstFile.Close()
|
|
|
|
|
|
if _, err := io.Copy(dstFile, srcFile); err != nil {
|
2023-02-27 19:09:43 +08:00
|
|
|
|
applogger.Error(fmt.Sprintf("下载文件失败,文件名: %s,错误信息:%v", file.Name(), err))
|
2023-02-18 16:32:19 +08:00
|
|
|
|
continue
|
|
|
|
|
|
}
|
2023-02-27 19:09:43 +08:00
|
|
|
|
//fmt.Printf("%s(批次文件)下载完成 \n", file.Name())
|
2023-02-21 14:36:25 +08:00
|
|
|
|
applogger.Info(fmt.Sprintf("%s(批次文件)下载完成", file.Name()))
|
2023-02-18 16:32:19 +08:00
|
|
|
|
batchInsert(file.Name())
|
|
|
|
|
|
}
|
|
|
|
|
|
err = redisClient.Set(fileKey, 1, 0).Err() //写入下载、解压、入库完成标记
|
|
|
|
|
|
if err != nil {
|
2023-02-27 18:31:18 +08:00
|
|
|
|
//fmt.Printf("写入文件处理完成标记失败文件名:%s,错误信息:%v\n", file.Name(), err)
|
2023-02-27 19:09:43 +08:00
|
|
|
|
applogger.Info(fmt.Sprintf("写入文件处理完成标记失败文件名:%s,错误信息:v%\n", file.Name(), err))
|
2023-02-18 16:32:19 +08:00
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
redisClient.Del("iniDataStatus") //删除任务执行中标记
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 批次入库
|
|
|
|
|
|
func batchInsert(fileName string) {
|
|
|
|
|
|
//fmt.Print("批次处理开始")
|
|
|
|
|
|
start := time.Now()
|
2023-02-21 14:36:25 +08:00
|
|
|
|
db, _ := connectToDB()
|
2023-02-18 16:32:19 +08:00
|
|
|
|
file, err := os.Open(path.Join(executableDir, txtPath, fileName))
|
|
|
|
|
|
if err != nil {
|
2023-02-27 19:09:43 +08:00
|
|
|
|
applogger.Error(fmt.Sprintf("文件打开失败文件名:%s,错误信息%v", fileName, err))
|
2023-02-21 14:36:25 +08:00
|
|
|
|
} else {
|
|
|
|
|
|
defer file.Close()
|
|
|
|
|
|
reader := csv.NewReader(bufio.NewReader(file))
|
|
|
|
|
|
reader.Read()
|
|
|
|
|
|
batchRows := 0
|
|
|
|
|
|
for {
|
|
|
|
|
|
record, err := reader.Read()
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
break
|
|
|
|
|
|
}
|
|
|
|
|
|
communicationChannelID, _ := strconv.ParseUint(record[0], 10, 32)
|
|
|
|
|
|
TargetsMember, _ := strconv.ParseUint(record[2], 10, 32)
|
|
|
|
|
|
templateID, _ := strconv.ParseUint(record[3], 10, 32)
|
|
|
|
|
|
status := uint(1)
|
|
|
|
|
|
|
|
|
|
|
|
batch := Batch{
|
|
|
|
|
|
CommunicationChannelID: uint(communicationChannelID),
|
|
|
|
|
|
CommunicationName: record[1],
|
|
|
|
|
|
TargetsMember: uint(TargetsMember),
|
|
|
|
|
|
TemplateID: uint(templateID),
|
|
|
|
|
|
Content: record[4],
|
|
|
|
|
|
Status: status,
|
|
|
|
|
|
DataFileName: fileName,
|
|
|
|
|
|
}
|
|
|
|
|
|
db.Create(&batch)
|
|
|
|
|
|
batchRows++
|
2023-02-18 16:32:19 +08:00
|
|
|
|
}
|
2023-02-21 14:36:25 +08:00
|
|
|
|
time.Sleep(time.Second)
|
|
|
|
|
|
elapsed := time.Since(start)
|
|
|
|
|
|
subject := "丝芙兰批次文件处理完成"
|
|
|
|
|
|
body := "批次文件:" + fileName + ";\n批次数:" + strconv.Itoa(batchRows) + ";\n处理完成,请前往管理平台查看处理。"
|
|
|
|
|
|
SendEmail(subject, body) //发送邮件
|
|
|
|
|
|
applogger.Info(fmt.Sprintf("%s(批次文件)入库完成", fileName))
|
2023-02-27 11:58:39 +08:00
|
|
|
|
applogger.Info(fmt.Sprintf("%s(批次文件)执行时间%s 插入批次次数:%d", fileName, elapsed, batchRows))
|
2023-02-18 16:32:19 +08:00
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func batchDataInsert(fileName string) {
|
|
|
|
|
|
start := time.Now()
|
2023-02-21 14:36:25 +08:00
|
|
|
|
// Open file
|
|
|
|
|
|
file, err := os.Open(path.Join(executableDir, txtPath, fileName))
|
2023-02-18 16:32:19 +08:00
|
|
|
|
if err != nil {
|
2023-02-27 19:09:43 +08:00
|
|
|
|
applogger.Error(fmt.Sprintf("文件打开失败,文件名:%s,错误信息%v", fileName, err))
|
2023-02-21 14:36:25 +08:00
|
|
|
|
} else {
|
|
|
|
|
|
defer file.Close()
|
|
|
|
|
|
db, _ := connectToDB()
|
|
|
|
|
|
// Insert data in batches using multiple goroutines
|
|
|
|
|
|
var wg sync.WaitGroup
|
|
|
|
|
|
dataBatchChan := make(chan []BatcheData, insertChanSize)
|
|
|
|
|
|
for i := 0; i < goSize; i++ {
|
|
|
|
|
|
wg.Add(1)
|
|
|
|
|
|
go func() {
|
|
|
|
|
|
for batch := range dataBatchChan {
|
|
|
|
|
|
retryCount := 0
|
|
|
|
|
|
for {
|
|
|
|
|
|
if err := db.CreateInBatches(batch, insertSize).Error; err != nil {
|
|
|
|
|
|
if retryCount >= 5 {
|
|
|
|
|
|
panic(err)
|
|
|
|
|
|
}
|
|
|
|
|
|
fmt.Printf("Insert failed, retrying in %v seconds...\n", 2*retryCount)
|
|
|
|
|
|
time.Sleep(time.Duration(2*retryCount) * time.Second)
|
|
|
|
|
|
retryCount++
|
|
|
|
|
|
} else {
|
|
|
|
|
|
break
|
2023-02-18 16:32:19 +08:00
|
|
|
|
}
|
2023-02-21 14:36:25 +08:00
|
|
|
|
|
2023-02-18 16:32:19 +08:00
|
|
|
|
}
|
|
|
|
|
|
}
|
2023-02-21 14:36:25 +08:00
|
|
|
|
wg.Done()
|
|
|
|
|
|
}()
|
2023-02-18 16:32:19 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2023-02-21 14:36:25 +08:00
|
|
|
|
// Create and initialize hashset
|
|
|
|
|
|
hs := make(map[string]bool)
|
|
|
|
|
|
// Prepare batch data
|
|
|
|
|
|
dataBatch := make([]BatcheData, 0, batchSize)
|
|
|
|
|
|
dataBatchDuplicate := make([]BatchDataDuplicateLog, 0, batchSize)
|
|
|
|
|
|
// Parse file line by line and insert data in batches
|
|
|
|
|
|
scanner := bufio.NewScanner(file)
|
|
|
|
|
|
scanner.Split(bufio.ScanLines)
|
|
|
|
|
|
scanner.Scan() // skip first line
|
|
|
|
|
|
bi := 0
|
|
|
|
|
|
duplicateCount := make(map[string]int)
|
|
|
|
|
|
insertsCount := make(map[string]int)
|
|
|
|
|
|
var count int
|
|
|
|
|
|
//ci := 1
|
|
|
|
|
|
for scanner.Scan() {
|
|
|
|
|
|
line := scanner.Text()
|
|
|
|
|
|
row, err := csv.NewReader(strings.NewReader(line)).Read()
|
|
|
|
|
|
if err != nil {
|
2023-02-27 19:09:43 +08:00
|
|
|
|
applogger.Error(fmt.Sprintf("文件按行读取失败,文件名:%s,错误信息%v", fileName, err))
|
2023-02-21 14:36:25 +08:00
|
|
|
|
continue
|
|
|
|
|
|
}
|
|
|
|
|
|
reservedFields := map[string]string{ //合并个性化字段
|
|
|
|
|
|
"ReservedField1": row[5],
|
|
|
|
|
|
"ReservedField2": row[6],
|
|
|
|
|
|
"ReservedField3": row[7],
|
|
|
|
|
|
"ReservedField4": row[8],
|
|
|
|
|
|
"ReservedField5": row[9],
|
|
|
|
|
|
"DataFileName": fileName,
|
|
|
|
|
|
}
|
|
|
|
|
|
reservedFieldsJson, err := json.Marshal(reservedFields) //个性化字段转json
|
|
|
|
|
|
if err != nil {
|
2023-02-27 19:09:43 +08:00
|
|
|
|
applogger.Error(fmt.Sprintf("个性化字段合并失败,文件名:%s,错误信息%v", fileName, err))
|
2023-02-21 14:36:25 +08:00
|
|
|
|
continue
|
|
|
|
|
|
}
|
2023-02-27 11:58:39 +08:00
|
|
|
|
if _, ok := duplicateCount[row[2]]; !ok {
|
|
|
|
|
|
duplicateCount[row[2]] = 0
|
|
|
|
|
|
}
|
2023-02-21 14:36:25 +08:00
|
|
|
|
// Check if record exists in hashset
|
2023-02-28 18:48:53 +08:00
|
|
|
|
key := fmt.Sprintf("%s-%s-%s", row[2], row[3], row[5])
|
2023-02-21 14:36:25 +08:00
|
|
|
|
if _, exists := hs[key]; exists { //如果批次数据重复
|
|
|
|
|
|
bi++
|
|
|
|
|
|
// Increment duplicate count
|
|
|
|
|
|
duplicateCount[row[2]]++
|
|
|
|
|
|
dataBatchDuplicate = append(dataBatchDuplicate, BatchDataDuplicateLog{
|
|
|
|
|
|
CommunicationChannelID: row[2],
|
|
|
|
|
|
Mobile: row[3],
|
|
|
|
|
|
FullName: row[4],
|
|
|
|
|
|
ReservedField: string(reservedFieldsJson),
|
|
|
|
|
|
})
|
|
|
|
|
|
if len(dataBatchDuplicate) >= batchSize {
|
|
|
|
|
|
err = db.CreateInBatches(dataBatchDuplicate, insertSize).Error
|
|
|
|
|
|
if err != nil {
|
2023-02-27 19:09:43 +08:00
|
|
|
|
applogger.Error(fmt.Sprintf("插入重复数据失败,文件名:%s,错误信息%v", fileName, err))
|
2023-02-21 14:36:25 +08:00
|
|
|
|
} else {
|
|
|
|
|
|
dataBatchDuplicate = make([]BatchDataDuplicateLog, 0, batchSize)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
continue
|
2023-02-18 16:32:19 +08:00
|
|
|
|
}
|
2023-02-21 14:36:25 +08:00
|
|
|
|
// Add record to hashset
|
|
|
|
|
|
hs[key] = true
|
2023-02-18 16:32:19 +08:00
|
|
|
|
|
2023-02-21 14:36:25 +08:00
|
|
|
|
dataBatch = append(dataBatch, BatcheData{
|
2023-02-18 16:32:19 +08:00
|
|
|
|
CommunicationChannelID: row[2],
|
|
|
|
|
|
Mobile: row[3],
|
|
|
|
|
|
FullName: row[4],
|
|
|
|
|
|
ReservedField: string(reservedFieldsJson),
|
|
|
|
|
|
})
|
|
|
|
|
|
|
2023-02-21 14:36:25 +08:00
|
|
|
|
if len(dataBatch) >= batchSize {
|
|
|
|
|
|
dataBatchChan <- dataBatch
|
|
|
|
|
|
dataBatch = make([]BatcheData, 0, batchSize)
|
|
|
|
|
|
//fmt.Print("提交通次数" + strconv.Itoa(ci))
|
|
|
|
|
|
//ci++
|
2023-02-18 16:32:19 +08:00
|
|
|
|
}
|
2023-02-21 14:36:25 +08:00
|
|
|
|
if _, ok := insertsCount[row[2]]; !ok {
|
|
|
|
|
|
insertsCount[row[2]] = 0
|
|
|
|
|
|
}
|
|
|
|
|
|
insertsCount[row[2]]++
|
|
|
|
|
|
count++
|
2023-02-18 16:32:19 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2023-02-21 14:36:25 +08:00
|
|
|
|
if len(dataBatch) > 0 {
|
2023-02-18 16:32:19 +08:00
|
|
|
|
dataBatchChan <- dataBatch
|
|
|
|
|
|
dataBatch = make([]BatcheData, 0, batchSize)
|
2023-02-21 14:36:25 +08:00
|
|
|
|
//fmt.Print("文件读取完成,最后一批提交至通道")
|
2023-02-18 16:32:19 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2023-02-21 14:36:25 +08:00
|
|
|
|
close(dataBatchChan)
|
2023-02-18 16:32:19 +08:00
|
|
|
|
|
2023-02-21 14:36:25 +08:00
|
|
|
|
wg.Wait() //所有入库全部完成
|
2023-02-18 16:32:19 +08:00
|
|
|
|
|
2023-02-21 14:36:25 +08:00
|
|
|
|
//插入批次处理信息
|
|
|
|
|
|
bpi := []BatchProcessingInformation{}
|
|
|
|
|
|
for key, value := range duplicateCount {
|
|
|
|
|
|
bpi = append(bpi, BatchProcessingInformation{
|
|
|
|
|
|
CommunicationChannelID: key,
|
|
|
|
|
|
RepeatTargetsMember: value,
|
|
|
|
|
|
InsertsTargetsMember: insertsCount[key],
|
|
|
|
|
|
DataFileName: fileName,
|
|
|
|
|
|
})
|
|
|
|
|
|
}
|
|
|
|
|
|
err = db.CreateInBatches(bpi, insertSize).Error
|
2023-02-18 16:32:19 +08:00
|
|
|
|
if err != nil {
|
2023-02-27 19:09:43 +08:00
|
|
|
|
applogger.Error(fmt.Sprintf("插入批次处理信息失败,文件名:%s,错误信息%v", fileName, err))
|
2023-02-18 16:32:19 +08:00
|
|
|
|
}
|
2023-02-20 12:35:41 +08:00
|
|
|
|
|
2023-02-21 14:36:25 +08:00
|
|
|
|
//插入批此重复数据
|
|
|
|
|
|
if len(dataBatchDuplicate) > 0 {
|
|
|
|
|
|
err = db.CreateInBatches(dataBatchDuplicate, insertSize).Error
|
|
|
|
|
|
if err != nil {
|
2023-02-27 19:09:43 +08:00
|
|
|
|
applogger.Error(fmt.Sprintf("插入重复数据失败,文件名:%s,错误信息%v", fileName, err))
|
2023-02-21 14:36:25 +08:00
|
|
|
|
} else {
|
|
|
|
|
|
dataBatchDuplicate = nil
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
2023-02-20 12:35:41 +08:00
|
|
|
|
|
2023-02-21 14:36:25 +08:00
|
|
|
|
elapsed := time.Since(start)
|
2023-02-18 16:32:19 +08:00
|
|
|
|
|
2023-02-21 14:36:25 +08:00
|
|
|
|
//发送提醒邮件
|
|
|
|
|
|
subject := "丝芙兰数据包处理完成"
|
|
|
|
|
|
body := "数据包:" + fileName + ";\n总数:" + strconv.Itoa(count+bi) + ";\n过滤重复数:" + strconv.Itoa(bi) + ";\n过滤后总数:" + strconv.Itoa(count) + ";\n处理完成,请前往管理平台查看处理。"
|
|
|
|
|
|
SendEmail(subject, body) //发送邮件
|
2023-02-28 18:48:53 +08:00
|
|
|
|
applogger.Info(fmt.Sprintf(fmt.Sprintf("%s(数据包) 入库完成", fileName)))
|
2023-02-27 11:58:39 +08:00
|
|
|
|
applogger.Info(fmt.Sprintf("%s(数据包)执行时间:%s 插入数据:%d条 过滤数数:%d条", fileName, elapsed, count, bi))
|
2023-02-21 14:36:25 +08:00
|
|
|
|
}
|
2023-02-18 16:32:19 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func connectToDB() (*gorm.DB, error) {
|
|
|
|
|
|
dsn := "sqlserver://" + dbUser + ":" + dbPassword + "@" + dbAddress + "?database=" + dbName + "&charset=utf8mb4"
|
|
|
|
|
|
var db *gorm.DB
|
|
|
|
|
|
var err error
|
|
|
|
|
|
attempt := 1
|
|
|
|
|
|
maxAttempts := 5
|
|
|
|
|
|
backoff := time.Second
|
2023-02-20 18:44:32 +08:00
|
|
|
|
logger := logger.Default.LogMode(logger.Error) //Silent、Error、Warn、Info
|
2023-02-18 16:32:19 +08:00
|
|
|
|
for {
|
2023-02-20 18:44:32 +08:00
|
|
|
|
db, err = gorm.Open(sqlserver.Open(dsn), &gorm.Config{Logger: logger, SkipDefaultTransaction: true})
|
2023-02-18 16:32:19 +08:00
|
|
|
|
if err == nil {
|
|
|
|
|
|
break
|
|
|
|
|
|
}
|
|
|
|
|
|
if attempt >= maxAttempts {
|
2023-02-27 19:09:43 +08:00
|
|
|
|
applogger.Error(fmt.Sprintf("数据库连接失败,错误信息%v", err))
|
2023-02-18 16:32:19 +08:00
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
|
|
|
|
|
time.Sleep(backoff)
|
|
|
|
|
|
backoff *= 2
|
|
|
|
|
|
attempt++
|
|
|
|
|
|
}
|
|
|
|
|
|
return db, nil
|
|
|
|
|
|
}
|
2023-02-18 19:31:39 +08:00
|
|
|
|
|
|
|
|
|
|
func SendEmail(subject string, body string) error {
|
|
|
|
|
|
// 邮箱认证信息
|
|
|
|
|
|
smtpHost := "smtp.exmail.qq.com"
|
|
|
|
|
|
smtpPort := 465
|
2023-02-21 14:36:25 +08:00
|
|
|
|
from := "auto_system@wemediacn.com"
|
|
|
|
|
|
password := "yJJYYcy5NKx2y69r"
|
|
|
|
|
|
|
2023-02-18 19:31:39 +08:00
|
|
|
|
// 邮件内容
|
|
|
|
|
|
m := gomail.NewMessage()
|
|
|
|
|
|
m.SetHeader("From", from)
|
|
|
|
|
|
m.SetHeader("To", to...)
|
|
|
|
|
|
m.SetHeader("Subject", subject)
|
|
|
|
|
|
m.SetBody("text/plain", body)
|
|
|
|
|
|
// 邮件发送
|
|
|
|
|
|
d := gomail.NewDialer(smtpHost, smtpPort, from, password)
|
|
|
|
|
|
d.TLSConfig = &tls.Config{InsecureSkipVerify: true}
|
|
|
|
|
|
err := d.DialAndSend(m)
|
|
|
|
|
|
if err != nil {
|
2023-02-27 19:09:43 +08:00
|
|
|
|
applogger.Warn(fmt.Sprintf("邮件发送失败,错误信息%v", err))
|
2023-02-18 19:31:39 +08:00
|
|
|
|
}
|
|
|
|
|
|
return nil
|
|
|
|
|
|
}
|