GO/main.go

567 lines
16 KiB
Go
Raw Normal View History

2023-02-18 16:32:19 +08:00
package main
import (
"archive/zip"
"bufio"
2023-02-18 19:31:39 +08:00
"crypto/tls"
2023-02-18 16:32:19 +08:00
"encoding/csv"
"encoding/json"
"fmt"
"io"
"log"
"os"
"path"
"path/filepath"
"strconv"
"strings"
"sync"
"syscall"
"time"
"github.com/go-redis/redis"
"github.com/pkg/sftp"
"github.com/sirupsen/logrus"
"golang.org/x/crypto/ssh"
2023-02-18 19:31:39 +08:00
"gopkg.in/gomail.v2"
2023-02-18 16:32:19 +08:00
"gorm.io/driver/sqlserver"
_ "gorm.io/driver/sqlserver"
"gorm.io/gorm"
"gorm.io/gorm/logger"
)
var (
applogger *logrus.Logger
redisClient *redis.Client
executableDir string
redisAddress = "mysql5.weu.me:6379"
redisPassword = ""
redisDB = 1
sftpAddress = "192.168.10.86:49156"
sftpUser = "demo"
sftpPassword = "demo"
dbAddress = "192.168.10.18:1433"
dbUser = "sa"
dbPassword = "Aa123123"
dbName = "sephora"
zipPath = "RawData/Zip/"
txtPath = "RawData/Txt/"
logPath = "logs/"
batchSize = 5000 //提交数据
insertSize = 500 //一次性入库
cSize = 10 //入库协程数
)
type Batch struct {
ID uint `gorm:"primary_key"`
CommunicationChannelID uint
CommunicationName string `gorm:"type:varchar(255)"`
TargetsMember uint `gorm:"type:int"`
TemplateID uint
Content string `gorm:"type:text"`
CreatedAt time.Time `gorm:"default:getdate()"`
UpdatedAt time.Time `gorm:"default:getdate()"`
Status uint `gorm:"type:int"`
DataFileName string `gorm:"type:text"`
}
type BatcheData struct {
ID uint `gorm:"primary_key"`
CommunicationChannelID string `gorm:"column:communication_channel_id"`
Mobile string `gorm:"column:mobile"`
FullName string `gorm:"column:full_name"`
ReservedField string `gorm:"column:reserved_field"`
}
func (BatcheData) TableName() string {
return "batche_datas"
}
type BatchProcessingInformation struct {
ID uint `gorm:"primaryKey;autoIncrement"`
CommunicationChannelID string `gorm:"column:communication_channel_id"`
RepeatTargetsMember int `gorm:"column:repeat_targets_member"`
DataFileName string `gorm:"column:data_file_name"`
}
func (BatchProcessingInformation) TableName() string {
return "batches_processing_informations"
}
type BatchDataDuplicateLog struct {
ID int `gorm:"primaryKey;autoIncrement"`
CommunicationChannelID string `gorm:"column:communication_channel_id"`
Mobile string `gorm:"column:mobile"`
FullName string `gorm:"column:full_name"`
ReservedField string `gorm:"column:reserved_field"`
}
func (BatchDataDuplicateLog) TableName() string {
return "batche_data_duplicate_logs"
}
func init() {
// 获取可执行文件所在的路径
executablePath, err := os.Executable()
if err != nil {
log.Fatal("failed to get executable path: ", err)
}
executableDir = filepath.Dir(executablePath)
//判断目录是否存在,不存在则创建
err = os.MkdirAll(filepath.Join(executableDir, zipPath), 0755)
err = os.MkdirAll(filepath.Join(executableDir, txtPath), 0755)
err = os.MkdirAll(filepath.Join(executableDir, logPath), 0755)
if err != nil {
log.Fatal(err)
}
applogger = logrus.New()
logFile := filepath.Join(executableDir, logPath, "download.log")
_, err = os.Stat(logFile)
if os.IsNotExist(err) {
if _, err := os.Create(logFile); err != nil {
log.Fatal(err)
}
}
f, err := os.OpenFile(logFile, os.O_APPEND|os.O_WRONLY, 0666)
if err != nil {
log.Fatal(err)
}
2023-02-18 19:31:39 +08:00
//defer f.Close()
2023-02-18 16:32:19 +08:00
applogger.SetOutput(f)
redisClient = redis.NewClient(&redis.Options{
Addr: redisAddress,
Password: redisPassword,
DB: redisDB,
})
}
func main() {
// 打开一个文件作为锁
lockFile, err := os.OpenFile(".lock", os.O_CREATE|os.O_RDWR, 0666)
if err != nil {
fmt.Println("打开锁文件失败:", err)
os.Exit(1)
}
defer lockFile.Close()
// 尝试获取文件的独占锁
err = syscall.Flock(int(lockFile.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
if err != nil {
fmt.Println("程序已经在运行,本程序无法同时运行多个")
os.Exit(1)
}
ticker := time.NewTicker(time.Minute) // 创建一个定时器,每隔一分钟触发一次任务函数
//tickCount := 1 //记录循环次数
defer ticker.Stop() // 延迟关闭定时器,确保所有的定时器事件都被处理完毕
// 循环处理任务
for {
select {
case <-ticker.C:
// 定时器触发时执行的任务函数
//fmt.Printf("尝试第%d执行....\n", tickCount)
go downloadDecompression() // 在新协程中异步执行
//tickCount++
}
}
}
func downloadDecompression() {
if exists, _ := redisClient.Exists("iniDataStatus").Result(); exists == 1 {
//fmt.Println("批次执行中,跳过本次")
} else {
// 写入执行中标记
err := redisClient.Set("iniDataStatus", 1, 0).Err()
if err != nil {
fmt.Println("写入标记失败:%d", err)
}
// Connect to SFTP server
sshConfig := &ssh.ClientConfig{
User: sftpUser,
Auth: []ssh.AuthMethod{
ssh.Password(sftpPassword),
},
HostKeyCallback: ssh.InsecureIgnoreHostKey(),
}
sshClient, err := ssh.Dial("tcp", sftpAddress, sshConfig)
if err != nil {
panic(err)
}
sftpClient, err := sftp.NewClient(sshClient)
if err != nil {
panic(err)
}
defer sftpClient.Close()
files, err := sftpClient.ReadDir("/sftp/test")
if err != nil {
applogger.Fatalf("Failed to read SFTP directory: %v", err)
}
2023-02-18 19:31:39 +08:00
//it := 1
//fmt.Printf("共%d个文件\n", len(files))
2023-02-18 16:32:19 +08:00
for _, file := range files {
2023-02-18 19:31:39 +08:00
//fmt.Printf("第%d个文件\n", it)
//it++
2023-02-18 16:32:19 +08:00
// Check if file has been downloaded before
fileKey := fmt.Sprintf("downloaded:%s", file.Name())
if exists, _ := redisClient.Exists(fileKey).Result(); exists == 1 {
continue
}
if filepath.Ext(file.Name()) == ".zip" {
//fmt.Println("下载开始(数据包):" + file.Name())
// Download file
srcFile, err := sftpClient.Open(path.Join("/sftp/test", file.Name()))
if err != nil {
applogger.Fatalf("Failed to download file: %v", err)
continue
}
defer srcFile.Close()
dstFile, err := os.Create(path.Join(executableDir, zipPath, file.Name()))
if err != nil {
applogger.Fatalf("Failed to download file: %v", err)
continue
}
defer dstFile.Close()
if _, err := io.Copy(dstFile, srcFile); err != nil {
applogger.Fatalf("Failed to download file: %v", err)
continue
}
fmt.Println("下载完成(数据包):" + file.Name())
2023-02-18 19:31:39 +08:00
applogger.Info("下载完成(数据包):%d", file.Name())
2023-02-18 16:32:19 +08:00
// Unzip file
zipReader, err := zip.OpenReader(path.Join(executableDir, zipPath, file.Name()))
if err != nil {
applogger.Fatalf("Failed to download file: %v", err)
continue
}
defer zipReader.Close()
//fmt.Println("压缩报文件数量:", len(zipReader.File))
for _, zipFile := range zipReader.File {
zipFileReader, err := zipFile.Open()
if strings.Contains(zipFile.Name, "__MACOSX/._") {
//fmt.Print("系统文件.DS_Store,跳过处理", zipFile.Name)
continue
} else if filepath.Ext(zipFile.Name) != ".txt" {
fmt.Print("文件类型不正确,跳过处理", zipFile.Name)
continue
}
if err != nil || zipFileReader == nil {
applogger.Fatalf("Failed to open zip file: %v", err)
fmt.Print("压缩文件处理结束")
continue
}
defer zipFileReader.Close()
// Create the file
unzipFile, err := os.Create(path.Join(executableDir, txtPath, zipFile.Name))
if err != nil {
applogger.Fatalf("Failed to create unzip file: %v", err)
continue
}
defer unzipFile.Close()
// Write the unzip data to the file
_, err = io.Copy(unzipFile, zipFileReader)
if err != nil {
applogger.Fatalf("Failed to write unzip data to file %v", err)
continue
}
2023-02-18 19:31:39 +08:00
applogger.Info("解压完成(数据包):%d", zipFile.Name)
fmt.Print("解压完成(数据包):" + zipFile.Name)
2023-02-18 16:32:19 +08:00
batchDataInsert(zipFile.Name)
2023-02-18 19:31:39 +08:00
applogger.Info("入库完成(数据包):%d", zipFile.Name)
fmt.Print("入库完成(数据包)::" + zipFile.Name)
2023-02-18 16:32:19 +08:00
}
} else if filepath.Ext(file.Name()) == ".txt" {
srcFile, err := sftpClient.Open(path.Join("/sftp/test", file.Name()))
if err != nil {
applogger.Fatalf("Failed to download file: %v", err)
continue
}
defer srcFile.Close()
dstFile, err := os.Create(path.Join(executableDir, txtPath, file.Name()))
if err != nil {
applogger.Fatalf("Failed to download file: %v", err)
continue
}
defer dstFile.Close()
if _, err := io.Copy(dstFile, srcFile); err != nil {
applogger.Fatalf("Failed to download file: %v", err)
continue
}
fmt.Println("下载完成(批次文件):" + file.Name())
2023-02-18 19:31:39 +08:00
applogger.Info("下载完成(批次文件):%d", file.Name())
2023-02-18 16:32:19 +08:00
batchInsert(file.Name())
2023-02-18 19:31:39 +08:00
fmt.Println("入库完成(批次文件)" + file.Name())
2023-02-18 19:44:44 +08:00
applogger.Info("入库完成(批次文件):%d\n", file.Name())
2023-02-18 16:32:19 +08:00
}
err = redisClient.Set(fileKey, 1, 0).Err() //写入下载、解压、入库完成标记
if err != nil {
fmt.Println("写入标记失败:%", err)
}
}
redisClient.Del("iniDataStatus") //删除任务执行中标记
}
}
// 批次入库
func batchInsert(fileName string) {
//fmt.Print("批次处理开始")
start := time.Now()
db, err := connectToDB()
if err != nil {
panic("failed to connect database")
}
file, err := os.Open(path.Join(executableDir, txtPath, fileName))
2023-02-18 19:31:39 +08:00
defer file.Close()
2023-02-18 16:32:19 +08:00
if err != nil {
panic("failed to open file")
applogger.Fatalf("ailed to open file: %v", fileName)
}
reader := csv.NewReader(bufio.NewReader(file))
reader.Read()
batchRows := 0
for {
record, err := reader.Read()
if err != nil {
break
}
communicationChannelID, _ := strconv.ParseUint(record[0], 10, 32)
TargetsMember, _ := strconv.ParseUint(record[2], 10, 32)
templateID, _ := strconv.ParseUint(record[3], 10, 32)
status := uint(1)
batch := Batch{
CommunicationChannelID: uint(communicationChannelID),
CommunicationName: record[1],
TargetsMember: uint(TargetsMember),
TemplateID: uint(templateID),
Content: record[4],
Status: status,
DataFileName: fileName,
}
db.Create(&batch)
batchRows++
}
time.Sleep(time.Second)
2023-02-18 19:31:39 +08:00
elapsed := time.Since(start)
subject := "丝芙兰批次文件处理完成"
body := "批次文件:" + fileName + ";\n批次数" + strconv.Itoa(batchRows) + ";\n处理完成请前往管理平台查看处理。"
err = SendEmail(subject, body) //发送邮件
if err != nil {
applogger.Info("邮件发送失:%d", err)
fmt.Print(err)
}
fmt.Printf("执行时间%s\n插入批次次数%d\n", elapsed, batchRows)
2023-02-18 16:32:19 +08:00
}
func batchDataInsert(fileName string) {
//fmt.Print("批次数据处理开始")
start := time.Now()
db, err := connectToDB()
if err != nil {
panic("failed to connect database")
}
// Insert data in batches using multiple goroutines
var wg sync.WaitGroup
dataBatchChan := make(chan []BatcheData, cSize)
for i := 0; i < cSize; i++ {
wg.Add(1)
go func() {
for batch := range dataBatchChan {
retryCount := 0
for {
if err := db.CreateInBatches(batch, insertSize).Error; err != nil {
if retryCount >= 5 {
panic(err)
}
fmt.Printf("Insert failed, retrying in %v seconds...\n", 2*retryCount)
time.Sleep(time.Duration(2*retryCount) * time.Second)
retryCount++
} else {
break
}
}
}
wg.Done()
}()
}
// Open file
file, err := os.Open(path.Join(executableDir, txtPath, fileName))
if err != nil {
panic(err)
}
defer file.Close()
2023-02-18 19:31:39 +08:00
2023-02-18 16:32:19 +08:00
// Create and initialize hashset
hs := make(map[string]bool)
// Prepare batch data
dataBatch := make([]BatcheData, 0, batchSize)
dataBatchDuplicate := make([]BatchDataDuplicateLog, 0, batchSize)
// Parse file line by line and insert data in batches
scanner := bufio.NewScanner(file)
scanner.Split(bufio.ScanLines)
scanner.Scan() // skip first line
bi := 0
duplicateCount := make(map[string]int)
2023-02-18 19:44:44 +08:00
var count int
2023-02-18 16:32:19 +08:00
for scanner.Scan() {
line := scanner.Text()
row, err := csv.NewReader(strings.NewReader(line)).Read()
if err != nil {
panic(err)
}
reservedFields := map[string]string{ //合并个性化字段
"ReservedField1": row[5],
"ReservedField2": row[6],
"ReservedField3": row[7],
"ReservedField4": row[8],
"ReservedField5": row[9],
"DataFileName": fileName,
}
reservedFieldsJson, err := json.Marshal(reservedFields) //个性化字段转json
if err != nil {
panic(err)
}
// Check if record exists in hashset
key := fmt.Sprintf("%s-%s", row[2], row[3])
if _, exists := hs[key]; exists {
bi++
//filtered = append(filtered, key)
// Increment duplicate count
if _, ok := duplicateCount[row[2]]; !ok {
duplicateCount[row[2]] = 0
}
duplicateCount[row[2]]++
dataBatchDuplicate = append(dataBatchDuplicate, BatchDataDuplicateLog{
CommunicationChannelID: row[2],
Mobile: row[3],
FullName: row[4],
ReservedField: string(reservedFieldsJson),
})
if len(dataBatchDuplicate) >= batchSize {
err = db.CreateInBatches(dataBatchDuplicate, insertSize).Error
if err != nil {
panic(err)
}
dataBatchDuplicate = make([]BatchDataDuplicateLog, 0, batchSize)
}
2023-02-18 20:21:56 +08:00
2023-02-18 16:32:19 +08:00
continue
}
// Add record to hashset
hs[key] = true
dataBatch = append(dataBatch, BatcheData{
CommunicationChannelID: row[2],
Mobile: row[3],
FullName: row[4],
ReservedField: string(reservedFieldsJson),
})
if len(dataBatch) >= batchSize {
dataBatchChan <- dataBatch
dataBatch = make([]BatcheData, 0, batchSize)
}
count++
}
if len(dataBatch) > 0 {
dataBatchChan <- dataBatch
dataBatch = make([]BatcheData, 0, batchSize)
}
close(dataBatchChan)
wg.Wait()
//插入批次处理信息
bpi := []BatchProcessingInformation{}
for key, value := range duplicateCount {
bpi = append(bpi, BatchProcessingInformation{
CommunicationChannelID: key,
RepeatTargetsMember: value,
DataFileName: fileName,
})
}
err = db.CreateInBatches(bpi, insertSize).Error
if err != nil {
panic(err)
}
2023-02-18 19:31:39 +08:00
//插入批此重复数据
2023-02-18 16:32:19 +08:00
if len(dataBatchDuplicate) > 0 {
err = db.CreateInBatches(dataBatchDuplicate, insertSize).Error
if err != nil {
panic(err)
}
dataBatchDuplicate = make([]BatchDataDuplicateLog, 0, batchSize)
}
2023-02-18 19:31:39 +08:00
subject := "丝芙兰数据包处理完成"
2023-02-18 20:21:56 +08:00
body := "数据包:" + fileName + ";\n总数" + strconv.Itoa(count+bi) + ";\n过滤重复数" + strconv.Itoa(bi) + ";\n过滤后总数" + strconv.Itoa(count) + ";\n处理完成请前往管理平台查看处理。"
2023-02-18 19:31:39 +08:00
err = SendEmail(subject, body) //发送邮件
if err != nil {
applogger.Info("邮件发送失:%d", err)
fmt.Print(err)
}
2023-02-18 16:32:19 +08:00
elapsed := time.Since(start)
fmt.Printf("批次数据入库函数执行时间:%s;\n插入数:%d条\n过滤数%d条\n", elapsed, count, bi)
}
func connectToDB() (*gorm.DB, error) {
dsn := "sqlserver://" + dbUser + ":" + dbPassword + "@" + dbAddress + "?database=" + dbName + "&charset=utf8mb4"
var db *gorm.DB
var err error
attempt := 1
maxAttempts := 5
backoff := time.Second
logger := logger.Default.LogMode(logger.Silent)
for {
db, err = gorm.Open(sqlserver.Open(dsn), &gorm.Config{Logger: logger})
if err == nil {
break
}
if attempt >= maxAttempts {
return nil, err
}
time.Sleep(backoff)
backoff *= 2
attempt++
}
return db, nil
}
2023-02-18 19:31:39 +08:00
func SendEmail(subject string, body string) error {
// 邮箱认证信息
smtpHost := "smtp.exmail.qq.com"
smtpPort := 465
from := "chejiulong@wemediacn.com"
password := "hdQfpav4x8LwbJPH"
2023-02-18 20:21:56 +08:00
to := []string{"chejiulong@wemediacn.com", "wangyuanbing@wemediacn.com"}
2023-02-18 19:31:39 +08:00
// 邮件内容
m := gomail.NewMessage()
m.SetHeader("From", from)
m.SetHeader("To", to...)
m.SetHeader("Subject", subject)
m.SetBody("text/plain", body)
// 邮件发送
d := gomail.NewDialer(smtpHost, smtpPort, from, password)
d.TLSConfig = &tls.Config{InsecureSkipVerify: true}
err := d.DialAndSend(m)
if err != nil {
panic(err)
//return
}
return nil
}