GO/main.go

1505 lines
48 KiB
Go
Raw Normal View History

2023-02-18 16:32:19 +08:00
package main
import (
"archive/zip"
"bufio"
2023-03-05 13:18:40 +08:00
"bytes"
"context"
"crypto/hmac"
"crypto/sha256"
2023-02-18 19:31:39 +08:00
"crypto/tls"
2023-02-18 16:32:19 +08:00
"encoding/csv"
"encoding/hex"
2023-02-18 16:32:19 +08:00
"encoding/json"
"flag"
2023-02-18 16:32:19 +08:00
"fmt"
"io"
"log"
2023-03-05 13:18:40 +08:00
"net/http"
2023-02-18 16:32:19 +08:00
"os"
"path"
"path/filepath"
"reflect"
2023-03-05 13:18:40 +08:00
"regexp"
"sort"
2023-02-18 16:32:19 +08:00
"strconv"
"strings"
"sync"
"syscall"
"time"
"github.com/go-redis/redis"
2023-03-05 13:18:40 +08:00
"github.com/gorilla/websocket"
2023-02-18 16:32:19 +08:00
"github.com/pkg/sftp"
"github.com/sirupsen/logrus"
"golang.org/x/crypto/ssh"
"golang.org/x/time/rate"
2023-02-18 19:31:39 +08:00
"gopkg.in/gomail.v2"
2023-02-21 14:36:25 +08:00
"gopkg.in/natefinch/lumberjack.v2"
2023-04-04 12:07:58 +08:00
"gorm.io/driver/mysql"
2023-02-18 16:32:19 +08:00
"gorm.io/gorm"
"gorm.io/gorm/logger"
)
2023-03-05 13:18:40 +08:00
func init() {
monopolize() //独占锁
iniConfi() //初始化环境变量
iniPath() //初始化路径
applogger = logrus.New() //创建日志
iniLog() //初始化日志配置
redisClient = redis.NewClient(&redis.Options{
Addr: redisAddress,
Password: redisPassword,
DB: redisDB,
})
go startWebSocket() // 启动 WebSocket 服务,使用协程方式运行
2023-03-05 13:18:40 +08:00
applogger.Info(fmt.Sprintf("程序启动,加载%s环境尝试执行...", env))
go downloadDecompression() // 启动立即执行一次数据下载、处理
go queryBatchState()
//go delData()
2023-02-18 16:32:19 +08:00
}
2023-04-11 10:47:53 +08:00
// 主函数
2023-03-05 13:18:40 +08:00
func main() {
2023-03-15 16:59:09 +08:00
ticker := time.NewTicker(time.Duration(taskTime) * time.Minute) // 数据下载、处理
ticker_merge := time.NewTicker(time.Duration(batchStatusTaskTime) * time.Minute) //查询批次
notification := time.NewTicker(time.Until(nextNotificationTime())) // 每天凌晨3点输出提示
2023-03-05 13:18:40 +08:00
defer ticker.Stop()
defer ticker_merge.Stop()
defer notification.Stop()
2023-03-05 13:18:40 +08:00
for {
select {
case <-ticker.C:
iniLog()
applogger.Info("尝试执行数据处理...")
go downloadDecompression()
2023-02-18 16:32:19 +08:00
2023-03-05 13:18:40 +08:00
case <-ticker_merge.C:
iniLog()
fmt.Print("查询批次状态...\n")
queryBatchState()
case <-notification.C:
iniLog()
go delData()
notification.Reset(time.Until(nextNotificationTime()))
2023-03-05 13:18:40 +08:00
}
}
2023-02-18 16:32:19 +08:00
}
// 删除历史数据和历史重复数据
func delData() {
applogger.Info("开始清除历史数据")
// 删除15天前的批次数据
err := deleteOldData(&BatcheData{}, dataExpirationDays)
handleError(err, "删除15天前的批次数据失败")
// 删除15天前的批次数据重复日志
err = deleteOldData(&BatchDataDuplicateLog{}, dataExpirationDays)
handleError(err, "删除15天前的批次数据重复日志失败")
applogger.Info("清除历史数据完成")
}
// 删除15天前的数据
func deleteOldData(model interface{}, daysAgo int) error {
start := time.Now()
totalRowsAffected := int64(0)
db, err := connectToDB()
if err != nil {
applogger.Error("连接数据库失败: ", err)
return fmt.Errorf("连接数据库失败: %w", err)
}
threshold := time.Now().AddDate(0, 0, -daysAgo)
for {
tx := db.Begin()
applogger.Info("开始事务")
defer func() {
if r := recover(); r != nil {
applogger.Error("异常,回滚事务: ", r)
tx.Rollback()
}
}()
if err := tx.Error; err != nil {
return err
}
var ids []int
result := tx.Model(model).Where("created_at < ?", threshold).Limit(delDataSize).Pluck("id", &ids)
if result.Error != nil {
tx.Rollback()
return result.Error
}
if len(ids) == 0 {
applogger.Info("没有找到符合条件的数据,退出循环")
break
}
result = tx.Where("id IN (?)", ids).Delete(model)
if result.Error != nil {
applogger.Error("删除操作失败,回滚事务: ", result.Error)
tx.Rollback()
return result.Error
}
if err := tx.Commit().Error; err != nil {
applogger.Error("事务提交失败: ", err)
return err
}
applogger.Info("事务提交成功")
totalRowsAffected += result.RowsAffected
if len(ids) < batchSize {
applogger.Info("已处理所有符合条件的数据,退出循环")
break
}
}
time.Sleep(time.Second)
elapsed := time.Since(start)
modelName := reflect.TypeOf(model).Elem().Name()
applogger.Info(fmt.Sprintf("执行删除%v用时%s成功删除数据%d", modelName, elapsed.String(), totalRowsAffected))
closeDb(db)
return nil
}
// 计算下一次执行时间
func nextNotificationTime() time.Time {
now := time.Now()
notificationTime := time.Date(now.Year(), now.Month(), now.Day(), 3, 0, 0, 0, now.Location())
if now.After(notificationTime) {
notificationTime = notificationTime.Add(24 * time.Hour)
}
return notificationTime
}
2023-03-05 13:18:40 +08:00
func startWebSocket() {
http.HandleFunc("/ws", handleWebSocket)
err := http.ListenAndServe(":8080", nil)
2023-02-18 16:32:19 +08:00
if err != nil {
2023-03-05 13:18:40 +08:00
fmt.Println("启动 WebSocket 服务失败:", err)
} else {
fmt.Println("启动 WebSocket 服务成功:")
}
2023-03-05 13:18:40 +08:00
}
// 客户端连接信息
2023-03-05 13:18:40 +08:00
func handleWebSocket(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
2023-03-05 13:18:40 +08:00
fmt.Println("升级连接失败:", err)
return
}
2023-03-05 13:18:40 +08:00
// 打印客户端地址
fmt.Printf("客户端 %s 连接成功\n", conn.RemoteAddr().String())
2023-03-05 13:18:40 +08:00
// 添加客户端连接信息
c := &client{conn}
clients[c] = true
// 接收消息
go func() {
for {
_, message, err := conn.ReadMessage()
if err != nil {
fmt.Println("接收消息失败:", err)
return
}
fmt.Println("收到消息:", string(message))
2023-03-06 19:38:28 +08:00
var returnMessage []byte
task := Task{}
2023-03-06 19:38:28 +08:00
err = json.Unmarshal([]byte(message), &task)
if err != nil {
returnMessage = []byte(`{"code": 2001,"err": "json指令解析失败"}`)
conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端
} else {
if !verify_signature(task.Signature.Signature, task.Signature.Nonce, task.Signature.Timestamp, task.TaskData) { // 签名验证失败或超时
returnMessage = []byte(`{"code": 2401,"err": "Unauthorized"}`)
conn.WriteMessage(websocket.TextMessage, returnMessage)
conn.Close()
break
}
switch task.TaskData.Command {
2023-03-06 19:38:28 +08:00
case "lastCall":
2023-03-07 19:51:06 +08:00
//判断lastCall是否有执行中
if exists, _ := redisClient.Exists("iniLastCallDataStatus").Result(); exists == 1 {
returnMessage = []byte(`{"code": 2007,"err": "有lastCall执行中请稍后重试"}`)
conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端
break
}
//判断ExcludedFilename是否执行过
lastCallTask := fmt.Sprintf("lastCallTask:%s", task.TaskData.ExcludedFilename)
if exists, _ := redisClient.Exists(lastCallTask).Result(); exists == 1 {
returnMessage = []byte(`{"code": 2008,"err": "ExcludedFilename重复跳过执行"}`)
conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端
break
}
//判断ExcludedFilename是否存在
if !fileExists(path.Join(executableDir, lastCallPath, task.TaskData.ExcludedFilename)) {
2023-03-06 19:38:28 +08:00
returnMessage = []byte(`{"code": 2003,"err": "task.ExcludedFilename 不存在"}`)
conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端
break
}
2023-03-07 19:51:06 +08:00
//判断BatchFilename是否存在
if !fileExists(path.Join(executableDir, txtPath, task.TaskData.BatchFilename)) {
2023-03-06 19:38:28 +08:00
returnMessage = []byte(`{"code": 2004,"err": "task.BatchFilename 不存在"}`)
conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端
break
}
2023-03-07 19:51:06 +08:00
//判断DataFilename是否存在
if !fileExists(path.Join(executableDir, txtPath, task.TaskData.DataFilename)) {
2023-03-06 19:38:28 +08:00
returnMessage = []byte(`{"code": 2005,"err": "task.DataFilename 不存在"}`)
conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端
break
}
returnMessage = []byte(`{"code": 2000,"err": "开始处理lastCall"}`)
conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端
2023-03-07 19:51:06 +08:00
//读取排重文件
lastCallKeys, err := readExcludedFile(path.Join(executableDir, lastCallPath, task.TaskData.ExcludedFilename))
2023-03-06 19:38:28 +08:00
if err != nil {
returnMessage = []byte(`{"code": 2006,"err": "打开ExcludedFilename失败"}`)
conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端
break
} else {
returnMessage = []byte(`{"code": 2000,"err": "ExcludedFilename读取完成"}`)
conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端
}
2023-03-07 19:51:06 +08:00
//无错误,执行逻辑代码,
subject := "丝芙兰短信处理程序异常"
err = redisClient.Set("iniLastCallDataStatus", 1, 0).Err()
if err != nil {
body := fmt.Sprintf("写入lastCall任务执行中标记失败%v", err)
applogger.Error(body)
returnMessage = []byte(fmt.Sprintf(`{"code": 2008,"err": "写入lastCall任务执行中标记失败%v"}`, err))
2023-03-06 19:38:28 +08:00
conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端
2023-03-07 19:51:06 +08:00
SendEmail(subject, body) //发送邮件
2023-03-06 19:38:28 +08:00
} else {
2023-03-07 19:51:06 +08:00
batchInsert(task.TaskData.BatchFilename, true, task.TaskData.ExcludedFilename) //创建批次
returnMessage = []byte(`{"code": 2000,"err": "批次创建完成"}`)
conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端
batchDataInsert(task.TaskData.DataFilename, lastCallKeys, task.TaskData.ExcludedFilename) //添加数据
redisClient.Del("iniLastCallDataStatus") //删除任务执行中标记
returnMessage = []byte(`{"code": 2000,"err": "结束处理lastCall"}`)
conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端
redisClient.Set(lastCallTask, 1, 0).Err() //记录ExcludedFilename执行完成
2023-03-06 19:38:28 +08:00
}
default:
returnMessage = []byte(`{"code": 2002,"err": "task.Command 不存在"}`)
conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端
2023-03-05 13:18:40 +08:00
}
}
}
}()
// 设置连接关闭时的回调函数
conn.SetCloseHandler(func(code int, text string) error {
fmt.Println("连接已关闭code:", code, "text:", text)
fmt.Printf("客户端 %s 关闭连接\n", conn.RemoteAddr().String())
delete(clients, c)
return nil
2023-02-18 16:32:19 +08:00
})
2023-02-28 18:48:53 +08:00
2023-02-18 16:32:19 +08:00
}
// 验证签名
2023-03-07 16:50:12 +08:00
func verify_signature(signature string, nonce string, timestamp int64, data interface{}) bool {
fmt.Printf("Received signature: %s\n", signature)
fmt.Printf("Received timestamp: %d\n", timestamp)
2023-03-07 16:50:12 +08:00
fmt.Printf("Received nonce: %s\n", nonce)
fmt.Printf("Received data: %v\n", data)
received_signature := signature
received_timestamp, _ := strconv.ParseInt(fmt.Sprintf("%v", timestamp), 10, 64)
2023-03-07 16:50:12 +08:00
received_nonce := nonce //strconv.Atoi(fmt.Sprintf("%v", nonce))
if time.Now().Unix()-received_timestamp > 7200 {
fmt.Println("Timestamp expired")
return false
}
received_data_bytes, _ := json.Marshal(data)
received_data := string(received_data_bytes)
2023-03-07 16:50:12 +08:00
expected_data := fmt.Sprintf("%d|%s|%s", received_timestamp, received_nonce, received_data)
fmt.Printf("Expected data: %s\n", expected_data)
mac := hmac.New(sha256.New, []byte(verifySignatureKey))
mac.Write([]byte(expected_data))
expected_signature := hex.EncodeToString(mac.Sum(nil))
fmt.Printf("Expected signature: %s\n", expected_signature)
if received_signature != expected_signature {
fmt.Println("Signature does not match")
return false
}
return true
}
2023-03-06 19:38:28 +08:00
func readExcludedFile(filename string) (map[string]bool, error) {
file, err := os.Open(filename)
if err != nil {
return nil, err
}
defer file.Close()
scanner := bufio.NewScanner(file)
scanner.Split(bufio.ScanLines)
scanner.Scan() // skip first line
lastCallKeys := make(map[string]bool)
for scanner.Scan() {
line := scanner.Text()
fields := strings.Split(line, ",")
lastCallKey := fmt.Sprintf("%s-%s", fields[0], fields[1])
lastCallKeys[lastCallKey] = true
}
return lastCallKeys, nil
}
/*
2023-03-05 13:18:40 +08:00
// 聊天室消息广播
func broadcast(message []byte) {
for c := range clients {
err := c.conn.WriteMessage(websocket.TextMessage, message)
if err != nil {
fmt.Println("发送消息失败:", err)
delete(clients, c)
2023-02-18 16:32:19 +08:00
}
}
2023-03-06 19:38:28 +08:00
}*/
2023-02-18 16:32:19 +08:00
func downloadDecompression() {
if exists, _ := redisClient.Exists("iniDataStatus").Result(); exists == 1 {
2023-02-21 14:36:25 +08:00
fmt.Print("上一批次执行中,跳过本次\n")
2023-02-18 16:32:19 +08:00
} else {
subject := "丝芙兰短信处理程序异常"
2023-02-18 16:32:19 +08:00
// 写入执行中标记
err := redisClient.Set("iniDataStatus", 1, 0).Err()
if err != nil {
body := fmt.Sprintf("写入任务执行中标记失败:%v", err)
applogger.Error(body)
SendEmail(subject, body) //发送邮件
2023-02-18 16:32:19 +08:00
}
// Connect to SFTP server
sshConfig := &ssh.ClientConfig{
User: sftpUser,
Auth: []ssh.AuthMethod{
ssh.Password(sftpPassword),
},
HostKeyCallback: ssh.InsecureIgnoreHostKey(),
}
sshClient, err := ssh.Dial("tcp", sftpAddress, sshConfig)
if err != nil {
body := fmt.Sprintf("sshClient连接失败%v", err)
applogger.Error(body)
SendEmail(subject, body) //发送邮件
2023-02-18 16:32:19 +08:00
}
sftpClient, err := sftp.NewClient(sshClient)
if err != nil {
body := fmt.Sprintf("sftp连接失败%v", err)
applogger.Error(body)
SendEmail(subject, body) //发送邮件
2023-02-18 16:32:19 +08:00
}
defer func() {
sftpClient.Close()
sshClient.Close()
}()
files, err := sftpClient.ReadDir(sftpDir)
2023-02-18 16:32:19 +08:00
if err != nil {
body := fmt.Sprintf("sftp目录%s不存在, 错误信息:%v", sftpDir, err)
applogger.Error(body)
SendEmail(subject, body) //发送邮件
2023-02-18 16:32:19 +08:00
}
it := 1
fmt.Printf("共%d个文件\n", len(files))
2023-03-05 13:18:40 +08:00
sort.Sort(FileSorter(files))
// 限制下载速度为 15MB/s
limiter := rate.NewLimiter(rate.Limit(rateLimiter*1024*1024), int(rateLimiter*1024*1024))
2023-02-18 16:32:19 +08:00
for _, file := range files {
processingStatus := -1
2023-02-21 14:36:25 +08:00
fmt.Printf("第%d个文件处理中\n", it)
it++
2023-02-18 16:32:19 +08:00
// Check if file has been downloaded before
fileKey := fmt.Sprintf("downloaded:%s", file.Name())
if exists, _ := redisClient.Exists(fileKey).Result(); exists == 1 {
2023-02-21 14:36:25 +08:00
fmt.Println("跳过已处理过的文件:" + file.Name())
2023-02-18 16:32:19 +08:00
continue
}
if filepath.Ext(file.Name()) == ".zip" {
//fmt.Println("下载开始(数据包):" + file.Name())
// Download file
srcFile, err := sftpClient.Open(path.Join(sftpDir, file.Name()))
2023-02-18 16:32:19 +08:00
if err != nil {
body := fmt.Sprintf("打开sftp文件失败文件名:%s错误信息 %v", file.Name(), err)
applogger.Error(body)
SendEmail(subject, body) //发送邮件
2023-02-18 16:32:19 +08:00
continue
}
defer srcFile.Close()
dstFile, err := os.Create(path.Join(executableDir, zipPath, file.Name()))
if err != nil {
body := fmt.Sprintf("创建本地文件失败,文件名: %s错误信息%v", file.Name(), err)
applogger.Error(body)
SendEmail(subject, body) //发送邮件
2023-02-18 16:32:19 +08:00
continue
}
defer dstFile.Close()
buf := make([]byte, 4096)
for {
n, err := srcFile.Read(buf)
if err != nil && err != io.EOF {
body := fmt.Sprintf("下载文件失败,文件名: %s错误信息%v", file.Name(), err)
applogger.Error(body)
SendEmail(subject, body) //发送邮件
break
}
if n == 0 {
break
}
// 等待令牌桶中有足够的令牌可用
limiter.WaitN(context.Background(), n)
if _, err := dstFile.Write(buf[:n]); err != nil {
body := fmt.Sprintf("写入文件失败,文件名: %s错误信息%v", file.Name(), err)
applogger.Error(body)
SendEmail(subject, body) //发送邮件
break
}
2023-02-18 16:32:19 +08:00
}
2023-02-28 18:48:53 +08:00
applogger.Info(fmt.Sprintf("%s数据包下载完成", file.Name()))
2023-02-18 16:32:19 +08:00
// Unzip file
zipReader, err := zip.OpenReader(path.Join(executableDir, zipPath, file.Name()))
if err != nil {
body := fmt.Sprintf("解压文件失败: 文件名:%s错误信息%v", file.Name(), err)
applogger.Error(body)
SendEmail(subject, body) //发送邮件
2023-02-18 16:32:19 +08:00
continue
}
defer zipReader.Close()
for _, zipFile := range zipReader.File {
zipFileReader, err := zipFile.Open()
if strings.Contains(zipFile.Name, "__MACOSX/._") {
continue
} else if filepath.Ext(zipFile.Name) != ".txt" {
applogger.Error(fmt.Sprintf("文件类型不正确,跳过处理: %v", zipFile.Name))
2023-02-18 16:32:19 +08:00
continue
}
if err != nil || zipFileReader == nil {
applogger.Error(fmt.Sprintf("Failed to open zip file: %v", err))
2023-02-18 16:32:19 +08:00
fmt.Print("压缩文件处理结束")
continue
}
defer zipFileReader.Close()
// Create the file
unzipFile, err := os.Create(path.Join(executableDir, txtPath, zipFile.Name))
if err != nil {
body := fmt.Sprintf("创建压缩后的文件失败,文件名:%s错误信息 %v", zipFile.Name, err)
applogger.Error(body)
SendEmail(subject, body) //发送邮件
2023-02-18 16:32:19 +08:00
continue
}
defer unzipFile.Close()
// Write the unzip data to the file
_, err = io.Copy(unzipFile, zipFileReader)
if err != nil {
body := fmt.Sprintf("文件解压失败,文件名:%s错误信息 %v", zipFileReader, err)
applogger.Error(body)
SendEmail(subject, body) //发送邮件
2023-02-18 16:32:19 +08:00
continue
}
2023-02-21 14:36:25 +08:00
applogger.Info(fmt.Sprintf("%s数据包解压完成", zipFile.Name))
processingStatus = batchDataInsert(zipFile.Name, nil, "")
2023-02-18 16:32:19 +08:00
}
} else if filepath.Ext(file.Name()) == ".txt" {
srcFile, err := sftpClient.Open(path.Join(sftpDir, file.Name()))
2023-02-18 16:32:19 +08:00
if err != nil {
body := fmt.Sprintf("打开sftp文件失败文件名:%s错误信息 %v", file.Name(), err)
applogger.Error(body)
SendEmail(subject, body) //发送邮件
2023-02-18 16:32:19 +08:00
continue
}
defer srcFile.Close()
dstFile, err := os.Create(path.Join(executableDir, txtPath, file.Name()))
if err != nil {
body := fmt.Sprintf("创建本地文件失败,文件名: %s错误信息%v", file.Name(), err)
applogger.Error(body)
SendEmail(subject, body) //发送邮件
2023-02-18 16:32:19 +08:00
continue
}
defer dstFile.Close()
buf := make([]byte, 4096)
for {
n, err := srcFile.Read(buf)
if err != nil && err != io.EOF {
body := fmt.Sprintf("下载文件失败,文件名: %s错误信息%v", file.Name(), err)
applogger.Error(body)
SendEmail(subject, body) //发送邮件
break
}
if n == 0 {
break
}
// 等待令牌桶中有足够的令牌可用
limiter.WaitN(context.Background(), n)
if _, err := dstFile.Write(buf[:n]); err != nil {
body := fmt.Sprintf("写入文件失败,文件名: %s错误信息%v", file.Name(), err)
applogger.Error(body)
SendEmail(subject, body) //发送邮件
break
}
2023-02-18 16:32:19 +08:00
}
2023-02-21 14:36:25 +08:00
applogger.Info(fmt.Sprintf("%s批次文件下载完成", file.Name()))
processingStatus = batchInsert(file.Name(), false, "")
2023-02-18 16:32:19 +08:00
}
if processingStatus != -1 {
err = redisClient.Set(fileKey, 1, 0).Err() //写入文件处理完成
if err != nil {
body := fmt.Sprintf("写入文件处理完成标记失败文件名:%s错误信息%v\n", file.Name(), err)
applogger.Error(body)
SendEmail(subject, body) //发送邮件
}
2023-02-18 16:32:19 +08:00
}
2023-02-18 16:32:19 +08:00
}
redisClient.Del("iniDataStatus") //删除任务执行中标记
}
}
// 批次信息入库
func batchInsert(fileName string, isLastCall bool, excludedFilename string) int {
2023-02-18 16:32:19 +08:00
start := time.Now()
2023-03-22 15:10:42 +08:00
db, err := connectToDB()
handleError(err, "连接数据库失败")
2023-02-18 16:32:19 +08:00
file, err := os.Open(path.Join(executableDir, txtPath, fileName))
2023-03-22 15:10:42 +08:00
handleError(err, fmt.Sprintf("打开文件失败: %s", fileName))
defer file.Close()
2023-03-05 13:18:40 +08:00
2023-03-22 15:10:42 +08:00
reader := csv.NewReader(bufio.NewReader(file))
reader.Read()
batchRows := 0
for {
record, err := reader.Read()
if err != nil {
break
}
2023-02-21 14:36:25 +08:00
2023-03-22 15:10:42 +08:00
TargetsMember, _ := strconv.ParseUint(strings.TrimSpace(record[2]), 10, 32)
templateID, _ := strconv.ParseUint(strings.TrimSpace(record[3]), 10, 32)
status := 1
2023-03-06 19:38:28 +08:00
2023-03-22 15:10:42 +08:00
s := time.Now().Format("2006-01-02 15:04:05")
var batchName, dataFileName string
2023-03-06 19:38:28 +08:00
2023-03-22 15:10:42 +08:00
if isLastCall {
batchName = fmt.Sprintf("lastCall-%s-%s-%s", record[1], record[0], excludedFilename)
dataFileName = fmt.Sprintf("lastCall-%s", excludedFilename)
} else {
batchName = fmt.Sprintf("%s-%s", record[1], record[0])
dataFileName = fileName[:len(fileName)-10]
}
2023-03-05 13:18:40 +08:00
2023-03-22 15:10:42 +08:00
batchParams := BatchParams{
BatchName: batchName,
BatchDesc: record[4],
IsPersonal: 0,
Message: record[4],
IsInternational: 0,
IsSchedule: 0,
ScheduleTime: s,
Token: token,
}
2023-03-05 13:18:40 +08:00
2023-03-22 15:10:42 +08:00
sid, err := CreateBatch(batchParams)
handleError(err, "创建批次失败")
batch := Batch{
CommunicationChannelID: record[0],
CommunicationName: batchName,
TargetsMember: uint(TargetsMember),
TemplateID: uint(templateID),
Content: record[4],
Status: status,
DataFileName: dataFileName,
Sid: sid,
2023-02-18 16:32:19 +08:00
}
2023-03-22 15:10:42 +08:00
db.Create(&batch)
batchRows++
}
time.Sleep(time.Second)
elapsed := time.Since(start)
2023-03-22 15:10:42 +08:00
subject := "丝芙兰批次文件处理完成"
body := fmt.Sprintf("批次数:%d;\n批次文件%s;\n处理完成请前往管理平台查看处理。", batchRows, fileName)
SendEmail(subject, body)
applogger.Info(fmt.Sprintf("%s批次文件入库完成", fileName))
applogger.Info(fmt.Sprintf("%s批次文件执行时间%s 插入批次次数:%d", fileName, elapsed, batchRows))
closeDb(db)
2023-03-22 15:10:42 +08:00
return 0
2023-02-18 16:32:19 +08:00
}
2023-03-05 13:18:40 +08:00
func CreateBatch(batchParams BatchParams) (int, error) {
// 将请求参数编码为JSON字符串
jsonBytes, err := json.Marshal(batchParams)
if err != nil {
return -1, err
}
jsonStr := string(jsonBytes)
// 发送POST请求
url := "http://www.wemediacn.net/webservice/BatchService?service=sms.createbatch"
resp, err := http.Post(url, "application/json; charset=utf-8", bytes.NewBuffer([]byte(jsonStr)))
if err != nil {
return -1, err
}
defer resp.Body.Close()
// 解析响应数据
var retobj map[string]interface{}
err = json.NewDecoder(resp.Body).Decode(&retobj)
if err != nil {
return -1, err
}
fmt.Print(retobj)
code := int(retobj["code"].(float64))
fmt.Printf("批次创建API 返回:%d\n", code)
if code == 0 {
sid := int(retobj["sid"].(float64))
return sid, nil
} else {
return -1, fmt.Errorf("create batch failed, error code: %d", code)
}
}
func smsApi(method string, sendSMSDataJson string) (int, error) {
// 发送POST请求
url := "http://www.wemediacn.net/webservice/BatchService?service=sms." + method
resp, err := http.Post(url, "application/json; charset=utf-8", strings.NewReader(sendSMSDataJson))
if err != nil {
return -1, err
}
defer resp.Body.Close()
// 解析响应数据
var retobj map[string]interface{}
err = json.NewDecoder(resp.Body).Decode(&retobj)
if err != nil {
return -1, err
}
jsonStr, err := json.Marshal(retobj)
if err != nil {
fmt.Println(err)
return -1, err
}
fmt.Printf("API 返回:%s\n", string(jsonStr))
code := int(retobj["code"].(float64))
if code == 0 {
//sid := int(retobj["sid"].(float64))
fmt.Printf("提交批次成功code%d\n", code)
return code, nil
} else {
applogger.Error(string(jsonStr))
return -1, fmt.Errorf(string(jsonStr))
2023-03-05 13:18:40 +08:00
}
}
func batchDataInsert(fileName string, lastCallKeys map[string]bool, excludedFilename string) int {
2023-02-18 16:32:19 +08:00
start := time.Now()
2023-02-21 14:36:25 +08:00
// Open file
fileKey := fmt.Sprintf("downloaded:%s", strings.Replace(fileName, filepath.Ext(fileName), ".zip", 1))
2023-02-21 14:36:25 +08:00
file, err := os.Open(path.Join(executableDir, txtPath, fileName))
2023-02-18 16:32:19 +08:00
if err != nil {
redisClient.Del(fileKey) //删除文件处理完成的标志位
applogger.Error(fmt.Sprintf("文件打开失败,文件名:%s错误信息%v", fileName, err))
return -1
2023-02-21 14:36:25 +08:00
} else {
defer file.Close()
db, _ := connectToDB()
// Insert data in batches using multiple goroutines
var wg sync.WaitGroup
dataBatchChan := make(chan []BatcheData, insertChanSize)
for i := 0; i < goSize; i++ {
wg.Add(1)
go func() {
for batch := range dataBatchChan {
retryCount := 0
for {
if err := db.CreateInBatches(batch, insertSize).Error; err != nil {
if retryCount >= 5 {
panic(err)
}
fmt.Printf("Insert failed, retrying in %v seconds...\n", 2*retryCount)
time.Sleep(time.Duration(2*retryCount) * time.Second)
retryCount++
} else {
break
2023-02-18 16:32:19 +08:00
}
}
}
2023-02-21 14:36:25 +08:00
wg.Done()
}()
2023-02-18 16:32:19 +08:00
}
hs := make(map[string]bool) //排重
dataBatch := make([]BatcheData, 0, batchSize) //短信数据
dataBatchDuplicate := make([]BatchDataDuplicateLog, 0, batchSize) //重复短信数据
bi := 0 //重复总数
count := 0 // 总数
// Data file batches
batches := []Batch{} // Query batches
2023-03-06 19:38:28 +08:00
fileNameDate := ""
dataFileName := ""
fields := strings.Split(fileName, "_")
datetime := fields[len(fields)-1]
2023-03-06 19:38:28 +08:00
if lastCallKeys != nil {
2023-03-08 17:02:32 +08:00
fileNameDate = fmt.Sprintf("lastCall-%s", excludedFilename)
fmt.Printf("fileNameDate : %s\n", fileNameDate)
2023-03-08 17:02:32 +08:00
dataFileName = fmt.Sprintf("lastCall-%s", excludedFilename)
2023-03-06 19:38:28 +08:00
} else {
fileNameDate = datetime[:8]
2023-03-08 17:02:32 +08:00
dataFileName = strings.Replace(fileName, "targets", "definition", -1)
dataFileName = dataFileName[:len(dataFileName)-10]
2023-03-06 19:38:28 +08:00
}
db.Table("batches AS b1").
Select("b1.*").
Where("b1.data_file_name LIKE ?", "%"+fileNameDate+"%").
Joins("INNER JOIN (SELECT communication_channel_id, MAX(created_at) AS max_created_at FROM batches GROUP BY communication_channel_id) AS b2 ON b1.communication_channel_id = b2.communication_channel_id AND b1.created_at = b2.max_created_at").
Order("b1.created_at DESC").
Find(&batches)
batchCount := len(batches)
sendMobiles := make(map[string]SmsData, batchCount) //短信数据,用于提交发送平台
duplicateCount := make(map[string]int, batchCount) //按批次重复数
lastCallduplicateCount := make(map[string]int, batchCount) //按批次重复数
insertsCount := make(map[string]int, batchCount) //按批次插入数
ccids := make(map[string]bool, batchCount)
if batchCount > 0 { //如果查询到数据需要的批次信息,开始处理数据
// 定义一个名为result的map类型的变量并以CommunicationChannelID作为键
result := make(map[string][]Batch)
for _, batch := range batches {
2023-03-08 17:02:32 +08:00
cckdKey := batch.CommunicationChannelID
result[cckdKey] = append(result[cckdKey], batch)
print(batch.CommunicationChannelID, "\n")
}
2023-03-05 13:18:40 +08:00
scanner := bufio.NewScanner(file)
scanner.Split(bufio.ScanLines)
scanner.Scan() // skip first line
for scanner.Scan() {
line := scanner.Text()
row, err := csv.NewReader(strings.NewReader(line)).Read()
if err != nil {
applogger.Error(fmt.Sprintf("文件按行读取失败,文件名:%s错误信息%v", fileName, err))
continue
}
reservedFields := map[string]string{ //合并个性化字段
"ReservedField1": row[5],
"ReservedField2": row[6],
"ReservedField3": row[7],
"ReservedField4": row[8],
"ReservedField5": row[9],
"FullName": row[4],
}
reservedFieldsJson, err := json.Marshal(reservedFields) //个性化字段转json
if err != nil {
applogger.Error(fmt.Sprintf("个性化字段合并失败,文件名:%s错误信息%v", fileName, err))
continue
}
if _, ok := duplicateCount[row[2]]; !ok {
duplicateCount[row[2]] = 0
}
if _, ok := lastCallduplicateCount[row[2]]; !ok {
lastCallduplicateCount[row[2]] = 0
}
// Check if record exists in hashset
key := fmt.Sprintf("%s-%s-%s", row[2], row[3], row[5])
if _, exists := hs[key]; exists { //如果批次数据重复
bi++
// Increment duplicate count
duplicateCount[row[2]]++
dataBatchDuplicate = append(dataBatchDuplicate, BatchDataDuplicateLog{
CommunicationChannelID: row[2],
Mobile: row[3],
ReservedField: string(reservedFieldsJson),
})
if len(dataBatchDuplicate) >= batchSize {
err = db.CreateInBatches(dataBatchDuplicate, insertSize).Error
if err != nil {
applogger.Error(fmt.Sprintf("插入重复数据失败,文件名:%s错误信息%v", fileName, err))
} else {
dataBatchDuplicate = make([]BatchDataDuplicateLog, 0, batchSize)
}
}
continue
}
2023-03-06 19:38:28 +08:00
if lastCallKeys != nil && lastCallKeys[fmt.Sprintf("%s-%s", row[2], row[3])] {
lastCallduplicateCount[row[2]]++
2023-03-06 19:38:28 +08:00
continue
}
// Add record to hashset
hs[key] = true
ccids[row[2]] = true
dataBatch = append(dataBatch, BatcheData{
2023-02-21 14:36:25 +08:00
CommunicationChannelID: row[2],
Mobile: row[3],
ReservedField: string(reservedFieldsJson),
DataFileName: dataFileName,
2023-02-21 14:36:25 +08:00
})
2023-03-05 13:18:40 +08:00
if len(dataBatch) >= batchSize {
dataBatchChan <- dataBatch
dataBatch = make([]BatcheData, 0, batchSize)
}
if batches, ok := result[row[2]]; ok && len(batches) > 0 && batches[0].Content != "" {
2023-03-06 19:38:28 +08:00
content := fmt.Sprintf("%s /", batches[0].Content)
Sid := batches[0].Sid
pattern := regexp.MustCompile(`\{.*\}`)
matched := pattern.MatchString(content)
if matched { //个性化短信
smsData, ok := sendMobiles[row[2]]
if !ok {
smsData = SmsData{
Sid: Sid,
IsPersonalizedMsg: true,
SmsList: make([]SmsList, 0),
}
}
placeholderMap := map[string]string{
"{RESERVED_FIELD_1}": row[5],
"{RESERVED_FIELD_2}": row[6],
"{RESERVED_FIELD_3}": row[7],
"{RESERVED_FIELD_4}": row[8],
"{RESERVED_FIELD_5}": row[9],
}
for k, v := range placeholderMap {
content = strings.ReplaceAll(content, k, v)
}
smsData.SmsList = append(smsData.SmsList, SmsList{M: row[3], C: content, F: 8})
if len(smsData.SmsList) >= batchSize/2 {
sd := Sms{Sid: Sid, Data: smsData.SmsList, Token: token}
sendSMSDataJson, _ := json.Marshal(sd)
//
resp, err := smsApi("appendbatchdata", string(sendSMSDataJson))
if resp != 0 {
fmt.Printf("smsApi returned error: %d\n", err)
//发送提醒邮件
subject := "丝芙兰数据包数据提交异常"
body := fmt.Sprintf("数据包:%s;\nCommunicationChannelID%s;\n错误信息%s\n 请前往管理平台查看处理。", fileName, row[2], err)
applogger.Error(body)
SendEmail(subject, body) //发送邮件
}
smsData.SmsList = []SmsList{} // reset mobiles slice
}
sendMobiles[row[2]] = smsData
2023-02-21 14:36:25 +08:00
} else {
// 处理非个性化短信
smsData, ok := sendMobiles[row[2]]
if !ok {
smsData = SmsData{
Sid: Sid,
IsPersonalizedMsg: false,
Content: content,
Mobiles: make([]string, 0),
}
}
smsData.Mobiles = append(smsData.Mobiles, row[3])
if len(smsData.Mobiles) >= batchSize {
mobiles := smsData.Mobiles
mobileStr := strings.Join(mobiles, ",")
var sl []SmsList
sl = append(sl, SmsList{M: mobileStr, C: content, F: 8})
sd := Sms{Sid: Sid, Data: sl, Token: token}
sendSMSDataJson, _ := json.Marshal(sd)
resp, err := smsApi("appendbatchdata", string(sendSMSDataJson))
if resp != 0 {
fmt.Printf("smsApi returned error: %d\n", err)
//发送提醒邮件
subject := "丝芙兰数据包数据提交异常"
body := fmt.Sprintf("数据包:%s;\nCommunicationChannelID%s;\n错误信息%s\n 请前往管理平台查看处理。", fileName, row[2], err)
applogger.Error(body)
SendEmail(subject, body) //发送邮件
}
smsData.Mobiles = []string{} // reset mobiles slice
}
sendMobiles[row[2]] = smsData
2023-02-21 14:36:25 +08:00
}
}
if _, ok := insertsCount[row[2]]; !ok {
insertsCount[row[2]] = 0
}
insertsCount[row[2]]++
count++
2023-02-18 16:32:19 +08:00
}
if len(dataBatch) > 0 {
2023-02-21 14:36:25 +08:00
dataBatchChan <- dataBatch
2023-02-18 16:32:19 +08:00
}
close(dataBatchChan)
2023-03-05 13:18:40 +08:00
for ccid := range ccids {
smsData, ok := sendMobiles[ccid]
if !ok {
continue
}
if smsData.IsPersonalizedMsg { //个性化
smsList := smsData.SmsList
if len(smsList) > 0 {
sd := Sms{Sid: smsData.Sid, Data: smsList, Token: token}
2023-03-05 13:18:40 +08:00
sendSMSDataJson, _ := json.Marshal(sd)
smsApi("appendbatchdata", string(sendSMSDataJson))
smsData.SmsList = []SmsList{} // reset mobiles slice
2023-03-05 13:18:40 +08:00
}
} else { //非个性化
mobiles, ok := smsData.Mobiles, len(smsData.Mobiles) > 0 && true
if ok && len(mobiles) > 0 {
2023-03-05 13:18:40 +08:00
mobileStr := strings.Join(mobiles, ",")
var sl []SmsList
sl = append(sl, SmsList{M: mobileStr, C: smsData.Content, F: 8})
sd := Sms{Sid: smsData.Sid, Data: sl, Token: token}
2023-03-05 13:18:40 +08:00
sendSMSDataJson, _ := json.Marshal(sd)
smsApi("appendbatchdata", string(sendSMSDataJson))
smsData.Mobiles = []string{} // reset mobiles slice
2023-03-05 13:18:40 +08:00
}
}
sf := SmsFinish{Sid: smsData.Sid, Token: token}
sfJson, _ := json.Marshal(sf)
smsApi("finishbatch", string(sfJson))
bpi := []BatchProcessingInformation{}
bpi = append(bpi, BatchProcessingInformation{
CommunicationChannelID: ccid,
RepeatTargetsMember: duplicateCount[ccid],
LastCallRepeatTargetsMember: lastCallduplicateCount[ccid],
InsertsTargetsMember: insertsCount[ccid],
DataFileName: dataFileName,
})
err = db.CreateInBatches(bpi, insertSize).Error
if err != nil {
applogger.Error(fmt.Sprintf("插入批次处理信息失败,文件名:%s错误信息%v", fileName, err))
2023-03-05 13:18:40 +08:00
}
smsData = SmsData{}
sendMobiles[ccid] = smsData
}
2023-03-05 13:18:40 +08:00
wg.Wait() //所有入库全部完成
2023-02-18 16:32:19 +08:00
//插入批此重复数据
if len(dataBatchDuplicate) > 0 {
err = db.CreateInBatches(dataBatchDuplicate, insertSize).Error
if err != nil {
applogger.Error(fmt.Sprintf("插入重复数据失败,文件名:%s错误信息%v", fileName, err))
} else {
dataBatchDuplicate = nil
2023-03-05 13:18:40 +08:00
}
}
elapsed := time.Since(start)
//发送提醒邮件
subject := "丝芙兰数据包处理完成"
body := fmt.Sprintf("总数:%d;\n数据包%s;\n过滤重复数%d;\n过滤后总数%d;\n处理完成请前往管理平台查看处理。", count+bi, fileName, bi, count)
SendEmail(subject, body) //发送邮件
applogger.Info(fmt.Sprintf("%s数据包 入库完成", fileName))
applogger.Info(fmt.Sprintf("%s数据包执行时间:%s 插入数据:%d条 过滤数数:%d条", fileName, elapsed, count, bi))
closeDb(db)
return 0
} else { //如果没有查询到批次信息,跳过处理
applogger.Error(fmt.Sprintf("未查询到批次数据,文件名:%s截取批次标识%s", fileName, dataFileName))
closeDb(db)
return -1
2023-02-21 14:36:25 +08:00
}
}
2023-02-18 16:32:19 +08:00
}
func queryBatchState() {
2023-03-15 16:59:09 +08:00
db, err := connectToDB()
if err != nil {
handleError(err, "数据库连接失败")
return
}
var batches []Batch
2023-03-15 16:59:09 +08:00
err = db.Where("status NOT IN (?)", []int{6, 8}).Find(&batches).Error
handleError(err, "查询批次状态失败")
if err != nil {
return
}
for _, batch := range batches {
sf := SmsFinish{Sid: batch.Sid, Token: token}
sfJson, _ := json.Marshal(sf)
url := "http://www.wemediacn.net/webservice/BatchService?service=sms.querybatchstate"
resp, err := http.Post(url, "application/json; charset=utf-8", strings.NewReader(string(sfJson)))
if err != nil {
handleError(err, "查询批次状态失败")
continue
}
defer resp.Body.Close()
var retobj map[string]interface{}
err = json.NewDecoder(resp.Body).Decode(&retobj)
if err != nil {
handleError(err, "解析响应数据失败")
continue
}
code := int(retobj["code"].(float64))
if code == 0 {
status := int(retobj["state"].(float64))
if batch.Status != status || status == 5 {
updates := createUpdatesMap(retobj)
err = db.Model(&batch).Updates(updates).Error
handleError(err, "修改批次状态失败")
}
2023-03-15 16:59:09 +08:00
} else {
jsonStr, _ := json.Marshal(retobj)
handleError(fmt.Errorf("返回不为0"), fmt.Sprintf("查询批次状态失败:%s", string(jsonStr)))
}
}
closeDb(db)
}
2023-03-15 16:59:09 +08:00
func createUpdatesMap(retobj map[string]interface{}) map[string]interface{} {
updates := map[string]interface{}{
"status": int(retobj["state"].(float64)),
}
if endTime, ok := retobj["endTime"].(string); ok {
if endTimeTime, err := time.Parse("2006-01-02 15:04:05", endTime); err == nil {
updates["end_time"] = &endTimeTime
}
}
if startTime, ok := retobj["startTime"].(string); ok {
if startTimeTime, err := time.Parse("2006-01-02 15:04:05", startTime); err == nil {
updates["start_time"] = &startTimeTime
}
}
if mc, ok := retobj["mc"].(float64); ok {
updates["mc"] = int(mc)
}
if rc, ok := retobj["rc"].(float64); ok {
updates["rc"] = int(rc)
}
if sc, ok := retobj["sc"].(float64); ok {
updates["sc"] = int(sc)
}
return updates
}
func handleError(err error, errMsg string) {
if err != nil {
applogger.Error(fmt.Sprintf("%s: %s", errMsg, err))
}
}
2023-02-18 16:32:19 +08:00
func connectToDB() (*gorm.DB, error) {
2023-04-04 12:07:58 +08:00
dsn := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8mb4&parseTime=True&loc=Local", dbUser, dbPassword, dbAddress, dbPort, dbName)
var db *gorm.DB
var err error
attempt := 1
maxAttempts := 5
backoff := time.Second
logger := logger.Default.LogMode(logger.Error) //Silent、Error、Warn、Info
for {
db, err = gorm.Open(mysql.Open(dsn), &gorm.Config{Logger: logger, SkipDefaultTransaction: true})
if err == nil {
break
}
if attempt >= maxAttempts {
handleError(err, "数据库连接失败,错误信息")
return nil, err
}
time.Sleep(backoff)
backoff *= 2
attempt++
}
// 获取底层 *sql.DB 实例
sqlDB, err := db.DB()
if err != nil {
return nil, err
}
// 设置最大连接数
sqlDB.SetMaxOpenConns(dbMaxOpenConns)
// 设置最大空闲连接数
sqlDB.SetMaxIdleConns(dbMaxIdleConns)
// 设置空闲连接的最长生命周期
sqlDB.SetConnMaxLifetime(time.Duration(dbConnMaxLifetime) * time.Minute)
2023-02-18 16:32:19 +08:00
return db, nil
}
2023-02-18 19:31:39 +08:00
func closeDb(db *gorm.DB) {
sqlDB, err := db.DB()
if err != nil {
handleError(err, "关闭数据库连接失败")
return
}
sqlDB.Close()
}
2023-03-06 19:38:28 +08:00
func fileExists(filename string) bool {
info, err := os.Stat(filename)
if os.IsNotExist(err) {
return false
}
return !info.IsDir()
}
2023-03-05 13:18:40 +08:00
func iniPath() {
// 获取可执行文件所在的路径
executablePath, err := os.Executable()
if err != nil {
log.Fatal("failed to get executable path: ", err)
}
executableDir = filepath.Dir(executablePath)
//判断目录是否存在,不存在则创建
err = os.MkdirAll(filepath.Join(executableDir, zipPath), 0755)
if err != nil {
log.Fatal(err)
}
err = os.MkdirAll(filepath.Join(executableDir, txtPath), 0755)
if err != nil {
log.Fatal(err)
}
2023-03-06 19:38:28 +08:00
err = os.MkdirAll(filepath.Join(executableDir, lastCallPath), 0755)
if err != nil {
log.Fatal(err)
}
2023-03-05 13:18:40 +08:00
err = os.MkdirAll(filepath.Join(executableDir, logPath, time.Now().Format("2006_01")), 0755)
if err != nil {
log.Fatal(err)
}
}
func monopolize() {
// 打开一个文件作为锁
lockFile, err := os.OpenFile(filepath.Join(executableDir, ".lock"), os.O_CREATE|os.O_RDWR, 0666)
if err != nil {
fmt.Println("打开锁文件失败:", err)
os.Exit(1)
}
defer lockFile.Close()
// 尝试获取文件的独占锁
err = syscall.Flock(int(lockFile.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
if err != nil {
fmt.Println("程序已经在运行,本程序无法同时运行多个")
os.Exit(1)
}
}
func iniConfi() {
flag.StringVar(&env, "env", "dev", "运行模式")
flag.Parse()
switch env {
case "dev":
//fmt.Print("测试环境配置已生效\n")
redisAddress = "mysql5.weu.me:6379"
redisPassword = ""
redisDB = 1
sftpAddress = "192.168.10.86:49156"
sftpUser = "demo"
sftpPassword = "demo"
sftpDir = "/sftp/test"
2023-04-04 12:07:58 +08:00
dbAddress = "mysql.weu.me"
dbPort = "3306"
dbUser = "root"
dbPassword = "root_123"
2023-03-05 13:18:40 +08:00
dbName = "sephora"
zipPath = "RawData/Zip/"
txtPath = "RawData/Txt/"
logPath = "logs/"
batchSize = 5000 //提交数据
insertSize = 5000 //一次性入库
insertChanSize = 100000 //通道缓冲数
goSize = 10 //协程数
2023-03-05 13:18:40 +08:00
taskTime = 1
2023-03-15 16:59:09 +08:00
batchStatusTaskTime = 1
2023-03-05 13:18:40 +08:00
to = []string{"chejiulong@wemediacn.com"}
token = "7100477930234217"
2023-03-06 19:38:28 +08:00
lastCallPath = "RawData/LastCall"
verifySignatureKey = "qZ6v1&H#Wjx+yRm2D@*sJF$tnfL83Ia"
dataExpirationDays = 14
delDataSize = 60000
rateLimiter = 1
dbMaxOpenConns = 500
dbMaxIdleConns = 5
dbConnMaxLifetime = 10
2023-03-05 13:18:40 +08:00
case "prod":
//fmt.Print("正式环境配置已生效\n")
redisAddress = "r-bp11564d96842414128.redis.rds.aliyuncs.com:6379"
redisPassword = "3Nsb4Pmsl9bcLs24mL12l"
redisDB = 233
sftpAddress = "esftp.sephora.com.cn:20981"
sftpUser = "CHN-CRMTOWemedia-wemedia"
sftpPassword = "uoWdMHEv39ZFjiOg"
sftpDir = "/CN-CRMTOWemedia/SMS"
dbAddress = "rds0yslqyg1iuze8txux545.mysql.rds.aliyuncs.com"
dbPort = "3306"
dbUser = "sephora"
dbPassword = "YfbGJWsFkH4pXgPY"
dbName = "sephora"
2023-03-05 13:18:40 +08:00
zipPath = "RawData/Zip/"
txtPath = "RawData/Txt/"
logPath = "logs/"
batchSize = 5000 //提交数据
insertSize = 5000 //一次性入库
insertChanSize = 100000 //通道缓冲数
goSize = 50 //协程数
2023-03-05 13:18:40 +08:00
taskTime = 60
2023-03-15 16:59:09 +08:00
batchStatusTaskTime = 5
2023-03-05 13:18:40 +08:00
to = []string{"chejiulong@wemediacn.com", "xiayujuan@wemediacn.com", "wangyuanbing@wemediacn.com", "tangweiqi@wemediacn.com"}
2023-03-15 10:03:55 +08:00
token = "7100178600777091" //7100477930234217
2023-03-06 19:38:28 +08:00
lastCallPath = "RawData/LastCall"
verifySignatureKey = "Lzxf6asgwFRTsANZ8gkXJnP3EmOyNO#&"
dataExpirationDays = 1
delDataSize = 60000
rateLimiter = 15
dbMaxOpenConns = 500
dbMaxIdleConns = 5
dbConnMaxLifetime = 60
2023-03-05 13:18:40 +08:00
default:
panic(fmt.Errorf("无效的运行模式: %s", env))
}
}
func iniLog() {
logPath = filepath.Join(executableDir, "logs", time.Now().Format("2006_01"))
os.MkdirAll(logPath, 0755)
logFileName := "sms_processing_" + time.Now().Format("2006_01_02") + ".log"
logFileHook := &lumberjack.Logger{
Filename: filepath.Join(logPath, logFileName),
}
logOutput := io.MultiWriter(os.Stdout, logFileHook) // 将日志同时输出到控制台和文件
applogger.SetOutput(logOutput)
}
2023-02-18 19:31:39 +08:00
func SendEmail(subject string, body string) error {
// 邮箱认证信息
smtpHost := "smtp.exmail.qq.com"
smtpPort := 465
2023-02-21 14:36:25 +08:00
from := "auto_system@wemediacn.com"
password := "yJJYYcy5NKx2y69r"
2023-02-18 19:31:39 +08:00
// 邮件内容
m := gomail.NewMessage()
m.SetHeader("From", from)
m.SetHeader("To", to...)
m.SetHeader("Subject", subject)
m.SetBody("text/plain", body)
// 邮件发送
d := gomail.NewDialer(smtpHost, smtpPort, from, password)
d.TLSConfig = &tls.Config{InsecureSkipVerify: true}
err := d.DialAndSend(m)
if err != nil {
applogger.Warn(fmt.Sprintf("邮件发送失败,错误信息%v", err))
2023-02-18 19:31:39 +08:00
}
return nil
}
2023-03-05 13:18:40 +08:00
func (f FileSorter) Len() int {
return len(f)
}
func (f FileSorter) Swap(i, j int) {
f[i], f[j] = f[j], f[i]
}
func (f FileSorter) Less(i, j int) bool {
// 首先检查文件扩展名是否为“.txt”如果是则将其移到文件列表的开头
if strings.HasSuffix(f[i].Name(), ".txt") && !strings.HasSuffix(f[j].Name(), ".txt") {
return true
}
// 如果文件扩展名都是“.txt”或都不是“.txt”则按字母顺序排序
return f[i].Name() < f[j].Name()
}
var ( //初始化变量
2023-03-15 16:59:09 +08:00
env string
applogger *logrus.Logger
redisClient *redis.Client
executableDir string
redisAddress string
redisPassword string
redisDB int
sftpAddress string
sftpUser string
sftpPassword string
sftpDir string
dbAddress string
2023-04-04 12:07:58 +08:00
dbPort string
2023-03-15 16:59:09 +08:00
dbUser string
dbPassword string
dbName string
zipPath string
txtPath string
logPath string
batchSize int //提交数据
insertSize int //一次性入库
insertChanSize int //通道缓冲数
goSize int //协程数
taskTime int
batchStatusTaskTime int
to []string
token string
lastCallPath string
verifySignatureKey string
dataExpirationDays int
delDataSize int
rateLimiter uint
dbMaxOpenConns int
dbMaxIdleConns int
dbConnMaxLifetime int
2023-03-05 13:18:40 +08:00
)
type Batch struct {
ID uint `gorm:"primary_key"`
2023-03-08 13:51:15 +08:00
CommunicationChannelID string
2023-03-05 13:18:40 +08:00
CommunicationName string `gorm:"type:varchar(255)"`
TargetsMember uint `gorm:"type:int"`
TemplateID uint
Content string `gorm:"type:text"`
2023-04-04 12:07:58 +08:00
CreatedAt time.Time `gorm:"default:CURRENT_TIMESTAMP"`
UpdatedAt time.Time `gorm:"default:CURRENT_TIMESTAMP"`
2023-03-15 10:03:55 +08:00
Status int `gorm:"column:status"`
DataFileName string `gorm:"type:text"`
Sid int `gorm:"type:int"`
EndTime *time.Time `gorm:"column:end_time"`
StartTime *time.Time `gorm:"column:start_time"`
MC *int `gorm:"column:mc"`
RC *int `gorm:"column:rc"`
SC *int `gorm:"column:sc"`
2023-03-05 13:18:40 +08:00
}
type BatcheData struct {
ID uint `gorm:"primary_key"`
CommunicationChannelID string `gorm:"column:communication_channel_id"`
Mobile string `gorm:"column:mobile"`
ReservedField string `gorm:"column:reserved_field"`
DataFileName string `gorm:"column:data_file_name"`
CreatedAt time.Time `gorm:"default:CURRENT_TIMESTAMP"`
UpdatedAt time.Time `gorm:"default:CURRENT_TIMESTAMP"`
2023-03-05 13:18:40 +08:00
}
func (BatcheData) TableName() string {
return "batche_datas"
}
type BatchProcessingInformation struct {
ID uint `gorm:"primaryKey;autoIncrement"`
CommunicationChannelID string `gorm:"column:communication_channel_id"`
RepeatTargetsMember int `gorm:"column:repeat_targets_member"`
LastCallRepeatTargetsMember int `gorm:"column:last_call_repeat_targets_member"`
InsertsTargetsMember int `gorm:"column:inserts_targets_member"`
DataFileName string `gorm:"column:data_file_name"`
2023-03-05 13:18:40 +08:00
}
func (BatchProcessingInformation) TableName() string {
return "batches_processing_informations"
}
type BatchDataDuplicateLog struct {
ID int `gorm:"primaryKey;autoIncrement"`
CommunicationChannelID string `gorm:"column:communication_channel_id"`
Mobile string `gorm:"column:mobile"`
ReservedField string `gorm:"column:reserved_field"`
DataFileName string `gorm:"column:data_file_name"`
CreatedAt time.Time `gorm:"default:CURRENT_TIMESTAMP"`
UpdatedAt time.Time `gorm:"default:CURRENT_TIMESTAMP"`
2023-03-05 13:18:40 +08:00
}
func (BatchDataDuplicateLog) TableName() string {
return "batche_data_duplicate_logs"
}
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}
// 客户端连接信息
type client struct {
conn *websocket.Conn
}
// 在线客户端列表
var clients = make(map[*client]bool)
// 定义一个FileSorter结构体
type FileSorter []os.FileInfo
type BatchParams struct {
BatchName string `json:"batchName"`
BatchDesc string `json:"batchDesc"`
IsPersonal int `json:"isPersonal"`
Message string `json:"message"`
IsInternational int `json:"isInternational"`
IsSchedule int `json:"isSchedule"`
ScheduleTime string `json:"scheduleTime"`
Token string `json:"token"`
}
type SmsList struct {
M string `json:"m"`
C string `json:"c"`
F int `json:"f"`
}
type Sms struct {
Sid int `json:"sid"`
Data []SmsList `json:"data"`
Token string `json:"token"`
}
type SmsFinish struct {
Sid int `json:"sid"`
Token string `json:"token"`
}
type SmsData struct {
Sid int
IsPersonalizedMsg bool
Content string
Mobiles []string
SmsList []SmsList
}
2023-03-06 19:38:28 +08:00
type Task struct {
TaskData TaskData
Signature Signature
}
type TaskData struct {
2023-03-06 19:38:28 +08:00
Command string `json:"command"`
ExcludedFilename string `json:"excluded_filename"`
BatchFilename string `json:"batch_filename"`
DataFilename string `json:"data_filename"`
}
type Signature struct {
Signature string `json:"signature"`
Timestamp int64 `json:"timestamp"`
2023-03-07 16:50:12 +08:00
Nonce string `json:"nonce"`
}