GO/main.go
2023-02-18 19:31:39 +08:00

575 lines
16 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package main
import (
"archive/zip"
"bufio"
"crypto/tls"
"encoding/csv"
"encoding/json"
"fmt"
"io"
"log"
"os"
"path"
"path/filepath"
"strconv"
"strings"
"sync"
"syscall"
"time"
"github.com/go-redis/redis"
"github.com/pkg/sftp"
"github.com/sirupsen/logrus"
"golang.org/x/crypto/ssh"
"gopkg.in/gomail.v2"
"gorm.io/driver/sqlserver"
_ "gorm.io/driver/sqlserver"
"gorm.io/gorm"
"gorm.io/gorm/logger"
)
var (
applogger *logrus.Logger
redisClient *redis.Client
executableDir string
redisAddress = "mysql5.weu.me:6379"
redisPassword = ""
redisDB = 1
sftpAddress = "192.168.10.86:49156"
sftpUser = "demo"
sftpPassword = "demo"
dbAddress = "192.168.10.18:1433"
dbUser = "sa"
dbPassword = "Aa123123"
dbName = "sephora"
zipPath = "RawData/Zip/"
txtPath = "RawData/Txt/"
logPath = "logs/"
batchSize = 5000 //提交数据
insertSize = 500 //一次性入库
cSize = 10 //入库协程数
)
type Batch struct {
ID uint `gorm:"primary_key"`
CommunicationChannelID uint
CommunicationName string `gorm:"type:varchar(255)"`
TargetsMember uint `gorm:"type:int"`
TemplateID uint
Content string `gorm:"type:text"`
CreatedAt time.Time `gorm:"default:getdate()"`
UpdatedAt time.Time `gorm:"default:getdate()"`
Status uint `gorm:"type:int"`
DataFileName string `gorm:"type:text"`
}
type BatcheData struct {
ID uint `gorm:"primary_key"`
CommunicationChannelID string `gorm:"column:communication_channel_id"`
Mobile string `gorm:"column:mobile"`
FullName string `gorm:"column:full_name"`
ReservedField string `gorm:"column:reserved_field"`
}
func (BatcheData) TableName() string {
return "batche_datas"
}
type BatchProcessingInformation struct {
ID uint `gorm:"primaryKey;autoIncrement"`
CommunicationChannelID string `gorm:"column:communication_channel_id"`
RepeatTargetsMember int `gorm:"column:repeat_targets_member"`
DataFileName string `gorm:"column:data_file_name"`
}
func (BatchProcessingInformation) TableName() string {
return "batches_processing_informations"
}
type BatchDataDuplicateLog struct {
ID int `gorm:"primaryKey;autoIncrement"`
CommunicationChannelID string `gorm:"column:communication_channel_id"`
Mobile string `gorm:"column:mobile"`
FullName string `gorm:"column:full_name"`
ReservedField string `gorm:"column:reserved_field"`
}
func (BatchDataDuplicateLog) TableName() string {
return "batche_data_duplicate_logs"
}
func init() {
// 获取可执行文件所在的路径
executablePath, err := os.Executable()
if err != nil {
log.Fatal("failed to get executable path: ", err)
}
executableDir = filepath.Dir(executablePath)
//判断目录是否存在,不存在则创建
err = os.MkdirAll(filepath.Join(executableDir, zipPath), 0755)
err = os.MkdirAll(filepath.Join(executableDir, txtPath), 0755)
err = os.MkdirAll(filepath.Join(executableDir, logPath), 0755)
if err != nil {
log.Fatal(err)
}
applogger = logrus.New()
logFile := filepath.Join(executableDir, logPath, "download.log")
_, err = os.Stat(logFile)
if os.IsNotExist(err) {
if _, err := os.Create(logFile); err != nil {
log.Fatal(err)
}
}
f, err := os.OpenFile(logFile, os.O_APPEND|os.O_WRONLY, 0666)
if err != nil {
log.Fatal(err)
}
//defer f.Close()
applogger.SetOutput(f)
redisClient = redis.NewClient(&redis.Options{
Addr: redisAddress,
Password: redisPassword,
DB: redisDB,
})
}
func main() {
// 打开一个文件作为锁
lockFile, err := os.OpenFile(".lock", os.O_CREATE|os.O_RDWR, 0666)
if err != nil {
fmt.Println("打开锁文件失败:", err)
os.Exit(1)
}
defer lockFile.Close()
// 尝试获取文件的独占锁
err = syscall.Flock(int(lockFile.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
if err != nil {
fmt.Println("程序已经在运行,本程序无法同时运行多个")
os.Exit(1)
}
ticker := time.NewTicker(time.Minute) // 创建一个定时器,每隔一分钟触发一次任务函数
//tickCount := 1 //记录循环次数
defer ticker.Stop() // 延迟关闭定时器,确保所有的定时器事件都被处理完毕
// 循环处理任务
for {
select {
case <-ticker.C:
// 定时器触发时执行的任务函数
//fmt.Printf("尝试第%d执行....\n", tickCount)
go downloadDecompression() // 在新协程中异步执行
//tickCount++
}
}
}
func downloadDecompression() {
if exists, _ := redisClient.Exists("iniDataStatus").Result(); exists == 1 {
//fmt.Println("批次执行中,跳过本次")
} else {
// 写入执行中标记
err := redisClient.Set("iniDataStatus", 1, 0).Err()
if err != nil {
fmt.Println("写入标记失败:%d", err)
}
// Connect to SFTP server
sshConfig := &ssh.ClientConfig{
User: sftpUser,
Auth: []ssh.AuthMethod{
ssh.Password(sftpPassword),
},
HostKeyCallback: ssh.InsecureIgnoreHostKey(),
}
sshClient, err := ssh.Dial("tcp", sftpAddress, sshConfig)
if err != nil {
panic(err)
}
sftpClient, err := sftp.NewClient(sshClient)
if err != nil {
panic(err)
}
defer sftpClient.Close()
files, err := sftpClient.ReadDir("/sftp/test")
if err != nil {
applogger.Fatalf("Failed to read SFTP directory: %v", err)
}
//it := 1
//fmt.Printf("共%d个文件\n", len(files))
for _, file := range files {
//fmt.Printf("第%d个文件\n", it)
//it++
// Check if file has been downloaded before
fileKey := fmt.Sprintf("downloaded:%s", file.Name())
if exists, _ := redisClient.Exists(fileKey).Result(); exists == 1 {
continue
}
if filepath.Ext(file.Name()) == ".zip" {
//fmt.Println("下载开始(数据包):" + file.Name())
// Download file
srcFile, err := sftpClient.Open(path.Join("/sftp/test", file.Name()))
if err != nil {
applogger.Fatalf("Failed to download file: %v", err)
continue
}
defer srcFile.Close()
dstFile, err := os.Create(path.Join(executableDir, zipPath, file.Name()))
if err != nil {
applogger.Fatalf("Failed to download file: %v", err)
continue
}
defer dstFile.Close()
if _, err := io.Copy(dstFile, srcFile); err != nil {
applogger.Fatalf("Failed to download file: %v", err)
continue
}
fmt.Println("下载完成(数据包):" + file.Name())
applogger.Info("下载完成(数据包):%d", file.Name())
// Unzip file
zipReader, err := zip.OpenReader(path.Join(executableDir, zipPath, file.Name()))
if err != nil {
applogger.Fatalf("Failed to download file: %v", err)
continue
}
defer zipReader.Close()
//fmt.Println("压缩报文件数量:", len(zipReader.File))
for _, zipFile := range zipReader.File {
zipFileReader, err := zipFile.Open()
if strings.Contains(zipFile.Name, "__MACOSX/._") {
//fmt.Print("系统文件.DS_Store,跳过处理", zipFile.Name)
continue
} else if filepath.Ext(zipFile.Name) != ".txt" {
fmt.Print("文件类型不正确,跳过处理", zipFile.Name)
continue
}
if err != nil || zipFileReader == nil {
applogger.Fatalf("Failed to open zip file: %v", err)
fmt.Print("压缩文件处理结束")
continue
}
defer zipFileReader.Close()
// Create the file
unzipFile, err := os.Create(path.Join(executableDir, txtPath, zipFile.Name))
if err != nil {
applogger.Fatalf("Failed to create unzip file: %v", err)
continue
}
defer unzipFile.Close()
// Write the unzip data to the file
_, err = io.Copy(unzipFile, zipFileReader)
if err != nil {
applogger.Fatalf("Failed to write unzip data to file %v", err)
continue
}
applogger.Info("解压完成(数据包):%d", zipFile.Name)
fmt.Print("解压完成(数据包):" + zipFile.Name)
batchDataInsert(zipFile.Name)
applogger.Info("入库完成(数据包):%d", zipFile.Name)
fmt.Print("入库完成(数据包)::" + zipFile.Name)
}
} else if filepath.Ext(file.Name()) == ".txt" {
srcFile, err := sftpClient.Open(path.Join("/sftp/test", file.Name()))
if err != nil {
applogger.Fatalf("Failed to download file: %v", err)
continue
}
defer srcFile.Close()
dstFile, err := os.Create(path.Join(executableDir, txtPath, file.Name()))
if err != nil {
applogger.Fatalf("Failed to download file: %v", err)
continue
}
defer dstFile.Close()
if _, err := io.Copy(dstFile, srcFile); err != nil {
applogger.Fatalf("Failed to download file: %v", err)
continue
}
fmt.Println("下载完成(批次文件):" + file.Name())
applogger.Info("下载完成(批次文件):%d", file.Name())
batchInsert(file.Name())
fmt.Println("入库完成(批次文件)" + file.Name())
applogger.Info("入库完成(批次文件):%d", file.Name())
}
err = redisClient.Set(fileKey, 1, 0).Err() //写入下载、解压、入库完成标记
if err != nil {
fmt.Println("写入标记失败:%", err)
}
}
redisClient.Del("iniDataStatus") //删除任务执行中标记
}
}
// 批次入库
func batchInsert(fileName string) {
//fmt.Print("批次处理开始")
start := time.Now()
db, err := connectToDB()
if err != nil {
panic("failed to connect database")
}
file, err := os.Open(path.Join(executableDir, txtPath, fileName))
defer file.Close()
if err != nil {
panic("failed to open file")
applogger.Fatalf("ailed to open file: %v", fileName)
}
reader := csv.NewReader(bufio.NewReader(file))
reader.Read()
batchRows := 0
for {
record, err := reader.Read()
if err != nil {
break
}
communicationChannelID, _ := strconv.ParseUint(record[0], 10, 32)
TargetsMember, _ := strconv.ParseUint(record[2], 10, 32)
templateID, _ := strconv.ParseUint(record[3], 10, 32)
status := uint(1)
batch := Batch{
CommunicationChannelID: uint(communicationChannelID),
CommunicationName: record[1],
TargetsMember: uint(TargetsMember),
TemplateID: uint(templateID),
Content: record[4],
Status: status,
DataFileName: fileName,
}
db.Create(&batch)
batchRows++
}
time.Sleep(time.Second)
elapsed := time.Since(start)
subject := "丝芙兰批次文件处理完成"
body := "批次文件:" + fileName + ";\n批次数" + strconv.Itoa(batchRows) + ";\n处理完成请前往管理平台查看处理。"
err = SendEmail(subject, body) //发送邮件
if err != nil {
applogger.Info("邮件发送失:%d", err)
fmt.Print(err)
}
fmt.Printf("执行时间%s\n插入批次次数%d\n", elapsed, batchRows)
}
func batchDataInsert(fileName string) {
//fmt.Print("批次数据处理开始")
start := time.Now()
db, err := connectToDB()
if err != nil {
panic("failed to connect database")
}
// Insert data in batches using multiple goroutines
var wg sync.WaitGroup
dataBatchChan := make(chan []BatcheData, cSize)
for i := 0; i < cSize; i++ {
wg.Add(1)
go func() {
for batch := range dataBatchChan {
retryCount := 0
for {
if err := db.CreateInBatches(batch, insertSize).Error; err != nil {
if retryCount >= 5 {
panic(err)
}
fmt.Printf("Insert failed, retrying in %v seconds...\n", 2*retryCount)
time.Sleep(time.Duration(2*retryCount) * time.Second)
retryCount++
} else {
break
}
}
}
wg.Done()
}()
}
// Open file
file, err := os.Open(path.Join(executableDir, txtPath, fileName))
if err != nil {
panic(err)
}
defer file.Close()
// Create and initialize hashset
hs := make(map[string]bool)
// Prepare batch data
dataBatch := make([]BatcheData, 0, batchSize)
dataBatchDuplicate := make([]BatchDataDuplicateLog, 0, batchSize)
// Parse file line by line and insert data in batches
scanner := bufio.NewScanner(file)
scanner.Split(bufio.ScanLines)
scanner.Scan() // skip first line
bi := 0
bi2 := 0
//var filtered []string
duplicateCount := make(map[string]int)
var count int64
for scanner.Scan() {
bi2++
line := scanner.Text()
row, err := csv.NewReader(strings.NewReader(line)).Read()
if err != nil {
panic(err)
}
reservedFields := map[string]string{ //合并个性化字段
"ReservedField1": row[5],
"ReservedField2": row[6],
"ReservedField3": row[7],
"ReservedField4": row[8],
"ReservedField5": row[9],
"DataFileName": fileName,
}
reservedFieldsJson, err := json.Marshal(reservedFields) //个性化字段转json
if err != nil {
panic(err)
}
// Check if record exists in hashset
key := fmt.Sprintf("%s-%s", row[2], row[3])
if _, exists := hs[key]; exists {
bi++
//filtered = append(filtered, key)
// Increment duplicate count
if _, ok := duplicateCount[row[2]]; !ok {
duplicateCount[row[2]] = 0
}
duplicateCount[row[2]]++
dataBatchDuplicate = append(dataBatchDuplicate, BatchDataDuplicateLog{
CommunicationChannelID: row[2],
Mobile: row[3],
FullName: row[4],
ReservedField: string(reservedFieldsJson),
})
if len(dataBatchDuplicate) >= batchSize {
err = db.CreateInBatches(dataBatchDuplicate, insertSize).Error
if err != nil {
panic(err)
}
dataBatchDuplicate = make([]BatchDataDuplicateLog, 0, batchSize)
}
continue
}
// Add record to hashset
hs[key] = true
dataBatch = append(dataBatch, BatcheData{
CommunicationChannelID: row[2],
Mobile: row[3],
FullName: row[4],
ReservedField: string(reservedFieldsJson),
})
if len(dataBatch) >= batchSize {
dataBatchChan <- dataBatch
dataBatch = make([]BatcheData, 0, batchSize)
}
count++
}
if len(dataBatch) > 0 {
dataBatchChan <- dataBatch
dataBatch = make([]BatcheData, 0, batchSize)
}
close(dataBatchChan)
wg.Wait()
//插入批次处理信息
bpi := []BatchProcessingInformation{}
for key, value := range duplicateCount {
bpi = append(bpi, BatchProcessingInformation{
CommunicationChannelID: key,
RepeatTargetsMember: value,
DataFileName: fileName,
})
}
err = db.CreateInBatches(bpi, insertSize).Error
if err != nil {
panic(err)
}
//插入批此重复数据
if len(dataBatchDuplicate) > 0 {
err = db.CreateInBatches(dataBatchDuplicate, insertSize).Error
if err != nil {
panic(err)
}
dataBatchDuplicate = make([]BatchDataDuplicateLog, 0, batchSize)
}
subject := "丝芙兰数据包处理完成"
body := "数据包:" + fileName + ";\n总数据(过滤后)" + strconv.FormatInt(count, 10) + ";\n处理完成请前往管理平台查看处理。"
err = SendEmail(subject, body) //发送邮件
if err != nil {
applogger.Info("邮件发送失:%d", err)
fmt.Print(err)
}
elapsed := time.Since(start)
fmt.Printf("批次数据入库函数执行时间:%s;\n插入数:%d条\n过滤数%d条\n", elapsed, count, bi)
}
func connectToDB() (*gorm.DB, error) {
dsn := "sqlserver://" + dbUser + ":" + dbPassword + "@" + dbAddress + "?database=" + dbName + "&charset=utf8mb4"
var db *gorm.DB
var err error
attempt := 1
maxAttempts := 5
backoff := time.Second
logger := logger.Default.LogMode(logger.Silent)
for {
db, err = gorm.Open(sqlserver.Open(dsn), &gorm.Config{Logger: logger})
if err == nil {
break
}
if attempt >= maxAttempts {
return nil, err
}
time.Sleep(backoff)
backoff *= 2
attempt++
}
return db, nil
}
func SendEmail(subject string, body string) error {
// 邮箱认证信息
smtpHost := "smtp.exmail.qq.com"
smtpPort := 465
from := "chejiulong@wemediacn.com"
password := "hdQfpav4x8LwbJPH"
//from := "auto_system@wemediacn.com"
//password := "EJkp39HCajDhpWsx"
to := []string{"chejiulong@wemediacn.com", "99779212@qq.com", "wangyuanbing@wemediacn.com"}
// 邮件内容
m := gomail.NewMessage()
m.SetHeader("From", from)
m.SetHeader("To", to...)
m.SetHeader("Subject", subject)
m.SetBody("text/plain", body)
// 邮件发送
d := gomail.NewDialer(smtpHost, smtpPort, from, password)
d.TLSConfig = &tls.Config{InsecureSkipVerify: true}
err := d.DialAndSend(m)
if err != nil {
panic(err)
//return
}
return nil
}