GO/main.go

1323 lines
43 KiB
Go
Raw Normal View History

2023-02-18 16:32:19 +08:00
package main
import (
"archive/zip"
"bufio"
2023-03-05 13:18:40 +08:00
"bytes"
"crypto/hmac"
"crypto/sha256"
2023-02-18 19:31:39 +08:00
"crypto/tls"
2023-02-18 16:32:19 +08:00
"encoding/csv"
"encoding/hex"
2023-02-18 16:32:19 +08:00
"encoding/json"
"flag"
2023-02-18 16:32:19 +08:00
"fmt"
"io"
"log"
2023-03-05 13:18:40 +08:00
"net/http"
2023-02-18 16:32:19 +08:00
"os"
"path"
"path/filepath"
2023-03-05 13:18:40 +08:00
"regexp"
"sort"
2023-02-18 16:32:19 +08:00
"strconv"
"strings"
"sync"
"syscall"
"time"
"github.com/go-redis/redis"
2023-03-05 13:18:40 +08:00
"github.com/gorilla/websocket"
2023-02-18 16:32:19 +08:00
"github.com/pkg/sftp"
"github.com/sirupsen/logrus"
"golang.org/x/crypto/ssh"
2023-02-18 19:31:39 +08:00
"gopkg.in/gomail.v2"
2023-02-21 14:36:25 +08:00
"gopkg.in/natefinch/lumberjack.v2"
2023-02-18 16:32:19 +08:00
"gorm.io/driver/sqlserver"
"gorm.io/gorm"
"gorm.io/gorm/logger"
)
2023-03-05 13:18:40 +08:00
func init() {
monopolize() //独占锁
iniConfi() //初始化环境变量
iniPath() //初始化路径
applogger = logrus.New() //创建日志
iniLog() //初始化日志配置
redisClient = redis.NewClient(&redis.Options{
Addr: redisAddress,
Password: redisPassword,
DB: redisDB,
})
go startWebSocket() // 启动 WebSocket 服务,使用协程方式运行
2023-03-05 13:18:40 +08:00
applogger.Info(fmt.Sprintf("程序启动,加载%s环境尝试执行...", env))
go downloadDecompression() // 启动立即执行一次数据下载、处理
go queryBatchState()
2023-03-15 10:03:55 +08:00
2023-02-18 16:32:19 +08:00
}
2023-03-05 13:18:40 +08:00
func main() {
ticker := time.NewTicker(time.Duration(taskTime) * time.Minute) // 数据下载、处理
ticker_merge := time.NewTicker(5 * time.Minute) //名单合并
2023-03-05 13:18:40 +08:00
defer ticker.Stop()
defer ticker_merge.Stop()
2023-02-18 16:32:19 +08:00
2023-03-05 13:18:40 +08:00
for {
select {
case <-ticker.C:
iniLog()
applogger.Info("尝试执行数据处理...")
go downloadDecompression()
2023-02-18 16:32:19 +08:00
2023-03-05 13:18:40 +08:00
case <-ticker_merge.C:
iniLog()
fmt.Print("查询批次状态...\n")
queryBatchState()
2023-03-05 13:18:40 +08:00
}
}
2023-02-18 16:32:19 +08:00
}
2023-03-05 13:18:40 +08:00
func startWebSocket() {
http.HandleFunc("/ws", handleWebSocket)
err := http.ListenAndServe(":8080", nil)
2023-02-18 16:32:19 +08:00
if err != nil {
2023-03-05 13:18:40 +08:00
fmt.Println("启动 WebSocket 服务失败:", err)
} else {
fmt.Println("启动 WebSocket 服务成功:")
}
2023-03-05 13:18:40 +08:00
}
2023-03-05 13:18:40 +08:00
func handleWebSocket(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
2023-03-05 13:18:40 +08:00
fmt.Println("升级连接失败:", err)
return
}
2023-03-05 13:18:40 +08:00
// 打印客户端地址
fmt.Printf("客户端 %s 连接成功\n", conn.RemoteAddr().String())
2023-03-05 13:18:40 +08:00
// 添加客户端连接信息
c := &client{conn}
clients[c] = true
// 接收消息
go func() {
for {
_, message, err := conn.ReadMessage()
if err != nil {
fmt.Println("接收消息失败:", err)
return
}
fmt.Println("收到消息:", string(message))
2023-03-06 19:38:28 +08:00
var returnMessage []byte
task := Task{}
2023-03-06 19:38:28 +08:00
err = json.Unmarshal([]byte(message), &task)
if err != nil {
returnMessage = []byte(`{"code": 2001,"err": "json指令解析失败"}`)
conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端
} else {
if !verify_signature(task.Signature.Signature, task.Signature.Nonce, task.Signature.Timestamp, task.TaskData) { // 签名验证失败或超时
returnMessage = []byte(`{"code": 2401,"err": "Unauthorized"}`)
conn.WriteMessage(websocket.TextMessage, returnMessage)
conn.Close()
break
}
switch task.TaskData.Command {
2023-03-06 19:38:28 +08:00
case "lastCall":
2023-03-07 19:51:06 +08:00
//判断lastCall是否有执行中
if exists, _ := redisClient.Exists("iniLastCallDataStatus").Result(); exists == 1 {
returnMessage = []byte(`{"code": 2007,"err": "有lastCall执行中请稍后重试"}`)
conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端
break
}
//判断ExcludedFilename是否执行过
lastCallTask := fmt.Sprintf("lastCallTask:%s", task.TaskData.ExcludedFilename)
if exists, _ := redisClient.Exists(lastCallTask).Result(); exists == 1 {
returnMessage = []byte(`{"code": 2008,"err": "ExcludedFilename重复跳过执行"}`)
conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端
break
}
//判断ExcludedFilename是否存在
if !fileExists(path.Join(executableDir, lastCallPath, task.TaskData.ExcludedFilename)) {
2023-03-06 19:38:28 +08:00
returnMessage = []byte(`{"code": 2003,"err": "task.ExcludedFilename 不存在"}`)
conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端
break
}
2023-03-07 19:51:06 +08:00
//判断BatchFilename是否存在
if !fileExists(path.Join(executableDir, txtPath, task.TaskData.BatchFilename)) {
2023-03-06 19:38:28 +08:00
returnMessage = []byte(`{"code": 2004,"err": "task.BatchFilename 不存在"}`)
conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端
break
}
2023-03-07 19:51:06 +08:00
//判断DataFilename是否存在
if !fileExists(path.Join(executableDir, txtPath, task.TaskData.DataFilename)) {
2023-03-06 19:38:28 +08:00
returnMessage = []byte(`{"code": 2005,"err": "task.DataFilename 不存在"}`)
conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端
break
}
returnMessage = []byte(`{"code": 2000,"err": "开始处理lastCall"}`)
conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端
2023-03-07 19:51:06 +08:00
//读取排重文件
lastCallKeys, err := readExcludedFile(path.Join(executableDir, lastCallPath, task.TaskData.ExcludedFilename))
2023-03-06 19:38:28 +08:00
if err != nil {
returnMessage = []byte(`{"code": 2006,"err": "打开ExcludedFilename失败"}`)
conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端
break
} else {
returnMessage = []byte(`{"code": 2000,"err": "ExcludedFilename读取完成"}`)
conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端
}
2023-03-07 19:51:06 +08:00
//无错误,执行逻辑代码,
subject := "丝芙兰短信处理程序异常"
err = redisClient.Set("iniLastCallDataStatus", 1, 0).Err()
if err != nil {
body := fmt.Sprintf("写入lastCall任务执行中标记失败%v", err)
applogger.Error(body)
returnMessage = []byte(fmt.Sprintf(`{"code": 2008,"err": "写入lastCall任务执行中标记失败%v"}`, err))
2023-03-06 19:38:28 +08:00
conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端
2023-03-07 19:51:06 +08:00
SendEmail(subject, body) //发送邮件
2023-03-06 19:38:28 +08:00
} else {
2023-03-07 19:51:06 +08:00
batchInsert(task.TaskData.BatchFilename, true, task.TaskData.ExcludedFilename) //创建批次
returnMessage = []byte(`{"code": 2000,"err": "批次创建完成"}`)
conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端
batchDataInsert(task.TaskData.DataFilename, lastCallKeys, task.TaskData.ExcludedFilename) //添加数据
redisClient.Del("iniLastCallDataStatus") //删除任务执行中标记
returnMessage = []byte(`{"code": 2000,"err": "结束处理lastCall"}`)
conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端
redisClient.Set(lastCallTask, 1, 0).Err() //记录ExcludedFilename执行完成
2023-03-06 19:38:28 +08:00
}
default:
returnMessage = []byte(`{"code": 2002,"err": "task.Command 不存在"}`)
conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端
2023-03-05 13:18:40 +08:00
}
}
}
}()
// 设置连接关闭时的回调函数
conn.SetCloseHandler(func(code int, text string) error {
fmt.Println("连接已关闭code:", code, "text:", text)
fmt.Printf("客户端 %s 关闭连接\n", conn.RemoteAddr().String())
delete(clients, c)
return nil
2023-02-18 16:32:19 +08:00
})
2023-02-28 18:48:53 +08:00
2023-02-18 16:32:19 +08:00
}
2023-03-07 16:50:12 +08:00
func verify_signature(signature string, nonce string, timestamp int64, data interface{}) bool {
fmt.Printf("Received signature: %s\n", signature)
fmt.Printf("Received timestamp: %d\n", timestamp)
2023-03-07 16:50:12 +08:00
fmt.Printf("Received nonce: %s\n", nonce)
fmt.Printf("Received data: %v\n", data)
received_signature := signature
received_timestamp, _ := strconv.ParseInt(fmt.Sprintf("%v", timestamp), 10, 64)
2023-03-07 16:50:12 +08:00
received_nonce := nonce //strconv.Atoi(fmt.Sprintf("%v", nonce))
if time.Now().Unix()-received_timestamp > 7200 {
fmt.Println("Timestamp expired")
return false
}
received_data_bytes, _ := json.Marshal(data)
received_data := string(received_data_bytes)
2023-03-07 16:50:12 +08:00
expected_data := fmt.Sprintf("%d|%s|%s", received_timestamp, received_nonce, received_data)
fmt.Printf("Expected data: %s\n", expected_data)
mac := hmac.New(sha256.New, []byte(verifySignatureKey))
mac.Write([]byte(expected_data))
expected_signature := hex.EncodeToString(mac.Sum(nil))
fmt.Printf("Expected signature: %s\n", expected_signature)
if received_signature != expected_signature {
fmt.Println("Signature does not match")
return false
}
return true
}
2023-03-06 19:38:28 +08:00
func readExcludedFile(filename string) (map[string]bool, error) {
file, err := os.Open(filename)
if err != nil {
return nil, err
}
defer file.Close()
scanner := bufio.NewScanner(file)
scanner.Split(bufio.ScanLines)
scanner.Scan() // skip first line
lastCallKeys := make(map[string]bool)
for scanner.Scan() {
line := scanner.Text()
fields := strings.Split(line, ",")
lastCallKey := fmt.Sprintf("%s-%s", fields[0], fields[1])
lastCallKeys[lastCallKey] = true
}
return lastCallKeys, nil
}
/*
2023-03-05 13:18:40 +08:00
// 聊天室消息广播
func broadcast(message []byte) {
for c := range clients {
err := c.conn.WriteMessage(websocket.TextMessage, message)
if err != nil {
fmt.Println("发送消息失败:", err)
delete(clients, c)
2023-02-18 16:32:19 +08:00
}
}
2023-03-06 19:38:28 +08:00
}*/
2023-02-18 16:32:19 +08:00
func downloadDecompression() {
if exists, _ := redisClient.Exists("iniDataStatus").Result(); exists == 1 {
2023-02-21 14:36:25 +08:00
fmt.Print("上一批次执行中,跳过本次\n")
2023-02-18 16:32:19 +08:00
} else {
subject := "丝芙兰短信处理程序异常"
2023-02-18 16:32:19 +08:00
// 写入执行中标记
err := redisClient.Set("iniDataStatus", 1, 0).Err()
if err != nil {
body := fmt.Sprintf("写入任务执行中标记失败:%v", err)
applogger.Error(body)
SendEmail(subject, body) //发送邮件
2023-02-18 16:32:19 +08:00
}
// Connect to SFTP server
sshConfig := &ssh.ClientConfig{
User: sftpUser,
Auth: []ssh.AuthMethod{
ssh.Password(sftpPassword),
},
HostKeyCallback: ssh.InsecureIgnoreHostKey(),
}
sshClient, err := ssh.Dial("tcp", sftpAddress, sshConfig)
if err != nil {
body := fmt.Sprintf("sshClient连接失败%v", err)
applogger.Error(body)
SendEmail(subject, body) //发送邮件
2023-02-18 16:32:19 +08:00
}
sftpClient, err := sftp.NewClient(sshClient)
if err != nil {
body := fmt.Sprintf("sftp连接失败%v", err)
applogger.Error(body)
SendEmail(subject, body) //发送邮件
2023-02-18 16:32:19 +08:00
}
defer sftpClient.Close()
files, err := sftpClient.ReadDir(sftpDir)
2023-02-18 16:32:19 +08:00
if err != nil {
body := fmt.Sprintf("sftp目录%s不存在, 错误信息:%v", sftpDir, err)
applogger.Error(body)
SendEmail(subject, body) //发送邮件
2023-02-18 16:32:19 +08:00
}
it := 1
fmt.Printf("共%d个文件\n", len(files))
2023-03-05 13:18:40 +08:00
sort.Sort(FileSorter(files))
2023-02-18 16:32:19 +08:00
for _, file := range files {
processingStatus := -1
2023-02-21 14:36:25 +08:00
fmt.Printf("第%d个文件处理中\n", it)
it++
2023-02-18 16:32:19 +08:00
// Check if file has been downloaded before
fileKey := fmt.Sprintf("downloaded:%s", file.Name())
if exists, _ := redisClient.Exists(fileKey).Result(); exists == 1 {
2023-02-21 14:36:25 +08:00
fmt.Println("跳过已处理过的文件:" + file.Name())
2023-02-18 16:32:19 +08:00
continue
}
if filepath.Ext(file.Name()) == ".zip" {
//fmt.Println("下载开始(数据包):" + file.Name())
// Download file
srcFile, err := sftpClient.Open(path.Join(sftpDir, file.Name()))
2023-02-18 16:32:19 +08:00
if err != nil {
body := fmt.Sprintf("打开sftp文件失败文件名:%s错误信息 %v", file.Name(), err)
applogger.Error(body)
SendEmail(subject, body) //发送邮件
2023-02-18 16:32:19 +08:00
continue
}
defer srcFile.Close()
dstFile, err := os.Create(path.Join(executableDir, zipPath, file.Name()))
if err != nil {
body := fmt.Sprintf("创建本地文件失败,文件名: %s错误信息%v", file.Name(), err)
applogger.Error(body)
SendEmail(subject, body) //发送邮件
2023-02-18 16:32:19 +08:00
continue
}
defer dstFile.Close()
if _, err := io.Copy(dstFile, srcFile); err != nil {
body := fmt.Sprintf("下载文件失败,文件名: %s错误信息%v", file.Name(), err)
applogger.Error(body)
SendEmail(subject, body) //发送邮件
2023-02-18 16:32:19 +08:00
continue
}
2023-02-28 18:48:53 +08:00
applogger.Info(fmt.Sprintf("%s数据包下载完成", file.Name()))
2023-02-18 16:32:19 +08:00
// Unzip file
zipReader, err := zip.OpenReader(path.Join(executableDir, zipPath, file.Name()))
if err != nil {
body := fmt.Sprintf("解压文件失败: 文件名:%s错误信息%v", file.Name(), err)
applogger.Error(body)
SendEmail(subject, body) //发送邮件
2023-02-18 16:32:19 +08:00
continue
}
defer zipReader.Close()
for _, zipFile := range zipReader.File {
zipFileReader, err := zipFile.Open()
if strings.Contains(zipFile.Name, "__MACOSX/._") {
continue
} else if filepath.Ext(zipFile.Name) != ".txt" {
applogger.Error(fmt.Sprintf("文件类型不正确,跳过处理: %v", zipFile.Name))
2023-02-18 16:32:19 +08:00
continue
}
if err != nil || zipFileReader == nil {
applogger.Error(fmt.Sprintf("Failed to open zip file: %v", err))
2023-02-18 16:32:19 +08:00
fmt.Print("压缩文件处理结束")
continue
}
defer zipFileReader.Close()
// Create the file
unzipFile, err := os.Create(path.Join(executableDir, txtPath, zipFile.Name))
if err != nil {
body := fmt.Sprintf("创建压缩后的文件失败,文件名:%s错误信息 %v", zipFile.Name, err)
applogger.Error(body)
SendEmail(subject, body) //发送邮件
2023-02-18 16:32:19 +08:00
continue
}
defer unzipFile.Close()
// Write the unzip data to the file
_, err = io.Copy(unzipFile, zipFileReader)
if err != nil {
body := fmt.Sprintf("文件解压失败,文件名:%s错误信息 %v", zipFileReader, err)
applogger.Error(body)
SendEmail(subject, body) //发送邮件
2023-02-18 16:32:19 +08:00
continue
}
2023-02-21 14:36:25 +08:00
applogger.Info(fmt.Sprintf("%s数据包解压完成", zipFile.Name))
processingStatus = batchDataInsert(zipFile.Name, nil, "")
2023-02-18 16:32:19 +08:00
}
} else if filepath.Ext(file.Name()) == ".txt" {
srcFile, err := sftpClient.Open(path.Join(sftpDir, file.Name()))
2023-02-18 16:32:19 +08:00
if err != nil {
body := fmt.Sprintf("打开sftp文件失败文件名:%s错误信息 %v", file.Name(), err)
applogger.Error(body)
SendEmail(subject, body) //发送邮件
2023-02-18 16:32:19 +08:00
continue
}
defer srcFile.Close()
dstFile, err := os.Create(path.Join(executableDir, txtPath, file.Name()))
if err != nil {
body := fmt.Sprintf("创建本地文件失败,文件名: %s错误信息%v", file.Name(), err)
applogger.Error(body)
SendEmail(subject, body) //发送邮件
2023-02-18 16:32:19 +08:00
continue
}
defer dstFile.Close()
if _, err := io.Copy(dstFile, srcFile); err != nil {
body := fmt.Sprintf("下载文件失败,文件名: %s错误信息%v", file.Name(), err)
applogger.Error(body)
SendEmail(subject, body) //发送邮件
2023-02-18 16:32:19 +08:00
continue
}
2023-02-21 14:36:25 +08:00
applogger.Info(fmt.Sprintf("%s批次文件下载完成", file.Name()))
processingStatus = batchInsert(file.Name(), false, "")
2023-02-18 16:32:19 +08:00
}
if processingStatus != -1 {
err = redisClient.Set(fileKey, 1, 0).Err() //写入文件处理完成
if err != nil {
body := fmt.Sprintf("写入文件处理完成标记失败文件名:%s错误信息%v\n", file.Name(), err)
applogger.Error(body)
SendEmail(subject, body) //发送邮件
}
2023-02-18 16:32:19 +08:00
}
2023-02-18 16:32:19 +08:00
}
redisClient.Del("iniDataStatus") //删除任务执行中标记
}
}
// 批次入库
func batchInsert(fileName string, isLastCall bool, excludedFilename string) int {
2023-02-18 16:32:19 +08:00
//fmt.Print("批次处理开始")
start := time.Now()
2023-02-21 14:36:25 +08:00
db, _ := connectToDB()
2023-02-18 16:32:19 +08:00
file, err := os.Open(path.Join(executableDir, txtPath, fileName))
if err != nil {
return -1
2023-02-21 14:36:25 +08:00
} else {
defer file.Close()
reader := csv.NewReader(bufio.NewReader(file))
reader.Read()
batchRows := 0
for {
record, err := reader.Read()
if err != nil {
//return -1
2023-02-21 14:36:25 +08:00
break
}
2023-03-05 13:18:40 +08:00
2023-03-08 13:51:15 +08:00
//communicationChannelID, _ := strconv.ParseUint(record[0], 10, 32)
2023-02-21 14:36:25 +08:00
TargetsMember, _ := strconv.ParseUint(record[2], 10, 32)
templateID, _ := strconv.ParseUint(record[3], 10, 32)
2023-03-15 10:03:55 +08:00
status := 1
2023-02-21 14:36:25 +08:00
2023-03-05 13:18:40 +08:00
t := time.Now()
s := t.Format("2006-01-02 15:04:05")
2023-03-06 19:38:28 +08:00
var batchName, dataFileName string
if isLastCall {
2023-03-08 17:02:32 +08:00
batchName = fmt.Sprintf("lastCall-%s-%s-%s", record[1], record[0], excludedFilename)
dataFileName = fmt.Sprintf("lastCall-%s", excludedFilename)
2023-03-06 19:38:28 +08:00
} else {
2023-03-08 13:51:15 +08:00
batchName = fmt.Sprintf("%s-%s", record[1], record[0])
dataFileName = fileName[:len(fileName)-10]
2023-03-06 19:38:28 +08:00
}
2023-03-05 13:18:40 +08:00
batchParams := BatchParams{
2023-03-06 19:38:28 +08:00
BatchName: batchName,
2023-03-05 13:18:40 +08:00
BatchDesc: record[4],
IsPersonal: 0,
Message: record[4],
IsInternational: 0,
IsSchedule: 0, //点发
ScheduleTime: s,
Token: token,
}
// 调用发送短信批次接口
sid, err := CreateBatch(batchParams)
if err != nil {
fmt.Println(err)
return -1
2023-03-05 13:18:40 +08:00
}
fmt.Println(sid)
2023-02-21 14:36:25 +08:00
batch := Batch{
2023-03-08 13:51:15 +08:00
CommunicationChannelID: record[0],
2023-03-06 19:38:28 +08:00
CommunicationName: batchName,
2023-02-21 14:36:25 +08:00
TargetsMember: uint(TargetsMember),
TemplateID: uint(templateID),
Content: record[4],
Status: status,
2023-03-06 19:38:28 +08:00
DataFileName: dataFileName,
2023-03-05 13:18:40 +08:00
Sid: sid,
2023-02-21 14:36:25 +08:00
}
db.Create(&batch)
batchRows++
2023-02-18 16:32:19 +08:00
}
2023-02-21 14:36:25 +08:00
time.Sleep(time.Second)
elapsed := time.Since(start)
2023-02-21 14:36:25 +08:00
subject := "丝芙兰批次文件处理完成"
body := fmt.Sprintf("批次数:%d;\n批次文件%s;\n处理完成请前往管理平台查看处理。", batchRows, fileName)
2023-02-21 14:36:25 +08:00
SendEmail(subject, body) //发送邮件
2023-02-21 14:36:25 +08:00
applogger.Info(fmt.Sprintf("%s批次文件入库完成", fileName))
applogger.Info(fmt.Sprintf("%s批次文件执行时间%s 插入批次次数:%d", fileName, elapsed, batchRows))
return 0
2023-02-18 16:32:19 +08:00
}
}
2023-03-05 13:18:40 +08:00
func CreateBatch(batchParams BatchParams) (int, error) {
// 将请求参数编码为JSON字符串
jsonBytes, err := json.Marshal(batchParams)
if err != nil {
return -1, err
}
jsonStr := string(jsonBytes)
// 发送POST请求
url := "http://www.wemediacn.net/webservice/BatchService?service=sms.createbatch"
resp, err := http.Post(url, "application/json; charset=utf-8", bytes.NewBuffer([]byte(jsonStr)))
if err != nil {
return -1, err
}
defer resp.Body.Close()
// 解析响应数据
var retobj map[string]interface{}
err = json.NewDecoder(resp.Body).Decode(&retobj)
if err != nil {
return -1, err
}
fmt.Print(retobj)
code := int(retobj["code"].(float64))
fmt.Printf("批次创建API 返回:%d\n", code)
if code == 0 {
sid := int(retobj["sid"].(float64))
return sid, nil
} else {
return -1, fmt.Errorf("create batch failed, error code: %d", code)
}
}
func smsApi(method string, sendSMSDataJson string) (int, error) {
// 发送POST请求
url := "http://www.wemediacn.net/webservice/BatchService?service=sms." + method
resp, err := http.Post(url, "application/json; charset=utf-8", strings.NewReader(sendSMSDataJson))
if err != nil {
return -1, err
}
defer resp.Body.Close()
// 解析响应数据
var retobj map[string]interface{}
err = json.NewDecoder(resp.Body).Decode(&retobj)
if err != nil {
return -1, err
}
jsonStr, err := json.Marshal(retobj)
if err != nil {
fmt.Println(err)
return -1, err
}
fmt.Printf("API 返回:%s\n", string(jsonStr))
code := int(retobj["code"].(float64))
if code == 0 {
//sid := int(retobj["sid"].(float64))
fmt.Printf("提交批次成功code%d\n", code)
return code, nil
} else {
applogger.Error(string(jsonStr))
return -1, fmt.Errorf(string(jsonStr))
2023-03-05 13:18:40 +08:00
}
}
func batchDataInsert(fileName string, lastCallKeys map[string]bool, excludedFilename string) int {
2023-02-18 16:32:19 +08:00
start := time.Now()
2023-02-21 14:36:25 +08:00
// Open file
fileKey := fmt.Sprintf("downloaded:%s", strings.Replace(fileName, filepath.Ext(fileName), ".zip", 1))
2023-02-21 14:36:25 +08:00
file, err := os.Open(path.Join(executableDir, txtPath, fileName))
2023-02-18 16:32:19 +08:00
if err != nil {
redisClient.Del(fileKey) //删除文件处理完成的标志位
applogger.Error(fmt.Sprintf("文件打开失败,文件名:%s错误信息%v", fileName, err))
return -1
2023-02-21 14:36:25 +08:00
} else {
defer file.Close()
db, _ := connectToDB()
// Insert data in batches using multiple goroutines
var wg sync.WaitGroup
dataBatchChan := make(chan []BatcheData, insertChanSize)
for i := 0; i < goSize; i++ {
wg.Add(1)
go func() {
for batch := range dataBatchChan {
retryCount := 0
for {
if err := db.CreateInBatches(batch, insertSize).Error; err != nil {
if retryCount >= 5 {
panic(err)
}
fmt.Printf("Insert failed, retrying in %v seconds...\n", 2*retryCount)
time.Sleep(time.Duration(2*retryCount) * time.Second)
retryCount++
} else {
break
2023-02-18 16:32:19 +08:00
}
}
}
2023-02-21 14:36:25 +08:00
wg.Done()
}()
2023-02-18 16:32:19 +08:00
}
hs := make(map[string]bool) //排重
dataBatch := make([]BatcheData, 0, batchSize) //短信数据
dataBatchDuplicate := make([]BatchDataDuplicateLog, 0, batchSize) //重复短信数据
bi := 0 //重复总数
count := 0 // 总数
//数据文件中批次
batches := []Batch{} //查询批次
2023-03-06 19:38:28 +08:00
fileNameDate := ""
dataFileName := ""
fields := strings.Split(fileName, "_")
datetime := fields[len(fields)-1]
2023-03-06 19:38:28 +08:00
if lastCallKeys != nil {
2023-03-08 17:02:32 +08:00
fileNameDate = fmt.Sprintf("lastCall-%s", excludedFilename)
fmt.Printf("fileNameDate : %s\n", fileNameDate)
2023-03-06 19:38:28 +08:00
// 模糊查询文件名包含“20230103”字符串的批次记录
db.Table("batches b1").
Select("b1.*").
Where("b1.data_file_name LIKE ?", "%"+fileNameDate+"%").
Joins("INNER JOIN (SELECT communication_channel_id, MAX(created_at) AS created_at FROM batches GROUP BY communication_channel_id) b2 ON b1.communication_channel_id = b2.communication_channel_id AND b1.created_at = b2.created_at").
Order("b1.created_at desc").
Find(&batches)
2023-03-08 17:02:32 +08:00
dataFileName = fmt.Sprintf("lastCall-%s", excludedFilename)
2023-03-06 19:38:28 +08:00
} else {
fileNameDate = datetime[:8]
2023-03-06 19:38:28 +08:00
db.Table("batches b1").
Select("b1.*").
Where("b1.data_file_name LIKE ?", "%"+fileNameDate+"%").
2023-03-06 19:38:28 +08:00
Joins("INNER JOIN (SELECT communication_channel_id, MAX(created_at) AS created_at FROM batches GROUP BY communication_channel_id) b2 ON b1.communication_channel_id = b2.communication_channel_id AND b1.created_at = b2.created_at").
Order("b1.created_at desc").
Find(&batches)
2023-03-08 17:02:32 +08:00
dataFileName = strings.Replace(fileName, "targets", "definition", -1)
dataFileName = dataFileName[:len(dataFileName)-10]
2023-03-06 19:38:28 +08:00
}
batchCount := len(batches)
sendMobiles := make(map[string]SmsData, batchCount) //短信数据,用于提交发送平台
duplicateCount := make(map[string]int, batchCount) //按批次重复数
lastCallduplicateCount := make(map[string]int, batchCount) //按批次重复数
insertsCount := make(map[string]int, batchCount) //按批次插入数
ccids := make(map[string]bool, batchCount)
if batchCount > 0 { //如果查询到数据需要的批次信息,开始处理数据
// 定义一个名为result的map类型的变量并以CommunicationChannelID作为键
result := make(map[string][]Batch)
for _, batch := range batches {
2023-03-08 17:02:32 +08:00
cckdKey := batch.CommunicationChannelID
result[cckdKey] = append(result[cckdKey], batch)
print(batch.CommunicationChannelID, "\n")
}
2023-03-05 13:18:40 +08:00
scanner := bufio.NewScanner(file)
scanner.Split(bufio.ScanLines)
scanner.Scan() // skip first line
for scanner.Scan() {
line := scanner.Text()
row, err := csv.NewReader(strings.NewReader(line)).Read()
if err != nil {
applogger.Error(fmt.Sprintf("文件按行读取失败,文件名:%s错误信息%v", fileName, err))
continue
}
reservedFields := map[string]string{ //合并个性化字段
"ReservedField1": row[5],
"ReservedField2": row[6],
"ReservedField3": row[7],
"ReservedField4": row[8],
"ReservedField5": row[9],
"FullName": row[4],
}
reservedFieldsJson, err := json.Marshal(reservedFields) //个性化字段转json
if err != nil {
applogger.Error(fmt.Sprintf("个性化字段合并失败,文件名:%s错误信息%v", fileName, err))
continue
}
if _, ok := duplicateCount[row[2]]; !ok {
duplicateCount[row[2]] = 0
}
if _, ok := lastCallduplicateCount[row[2]]; !ok {
lastCallduplicateCount[row[2]] = 0
}
// Check if record exists in hashset
key := fmt.Sprintf("%s-%s-%s", row[2], row[3], row[5])
if _, exists := hs[key]; exists { //如果批次数据重复
bi++
// Increment duplicate count
duplicateCount[row[2]]++
dataBatchDuplicate = append(dataBatchDuplicate, BatchDataDuplicateLog{
CommunicationChannelID: row[2],
Mobile: row[3],
ReservedField: string(reservedFieldsJson),
})
if len(dataBatchDuplicate) >= batchSize {
err = db.CreateInBatches(dataBatchDuplicate, insertSize).Error
if err != nil {
applogger.Error(fmt.Sprintf("插入重复数据失败,文件名:%s错误信息%v", fileName, err))
} else {
dataBatchDuplicate = make([]BatchDataDuplicateLog, 0, batchSize)
}
}
continue
}
2023-03-06 19:38:28 +08:00
if lastCallKeys != nil && lastCallKeys[fmt.Sprintf("%s-%s", row[2], row[3])] {
lastCallduplicateCount[row[2]]++
2023-03-06 19:38:28 +08:00
continue
}
// Add record to hashset
hs[key] = true
ccids[row[2]] = true
dataBatch = append(dataBatch, BatcheData{
2023-02-21 14:36:25 +08:00
CommunicationChannelID: row[2],
Mobile: row[3],
ReservedField: string(reservedFieldsJson),
DataFileName: dataFileName,
2023-02-21 14:36:25 +08:00
})
2023-03-05 13:18:40 +08:00
if len(dataBatch) >= batchSize {
dataBatchChan <- dataBatch
dataBatch = make([]BatcheData, 0, batchSize)
}
if batches, ok := result[row[2]]; ok && len(batches) > 0 && batches[0].Content != "" {
2023-03-06 19:38:28 +08:00
content := fmt.Sprintf("%s /", batches[0].Content)
Sid := batches[0].Sid
pattern := regexp.MustCompile(`\{.*\}`)
matched := pattern.MatchString(content)
if matched { //个性化短信
smsData, ok := sendMobiles[row[2]]
if !ok {
smsData = SmsData{
Sid: Sid,
IsPersonalizedMsg: true,
SmsList: make([]SmsList, 0),
}
}
placeholderMap := map[string]string{
"{RESERVED_FIELD_1}": row[5],
"{RESERVED_FIELD_2}": row[6],
"{RESERVED_FIELD_3}": row[7],
"{RESERVED_FIELD_4}": row[8],
"{RESERVED_FIELD_5}": row[9],
}
for k, v := range placeholderMap {
content = strings.ReplaceAll(content, k, v)
}
smsData.SmsList = append(smsData.SmsList, SmsList{M: row[3], C: content, F: 8})
if len(smsData.SmsList) >= batchSize/2 {
sd := Sms{Sid: Sid, Data: smsData.SmsList, Token: token}
sendSMSDataJson, _ := json.Marshal(sd)
//
resp, err := smsApi("appendbatchdata", string(sendSMSDataJson))
if resp != 0 {
fmt.Printf("smsApi returned error: %d\n", err)
//发送提醒邮件
subject := "丝芙兰数据包数据提交异常"
body := fmt.Sprintf("数据包:%s;\nCommunicationChannelID%s;\n错误信息%s\n 请前往管理平台查看处理。", fileName, row[2], err)
applogger.Error(body)
SendEmail(subject, body) //发送邮件
}
smsData.SmsList = []SmsList{} // reset mobiles slice
}
sendMobiles[row[2]] = smsData
2023-02-21 14:36:25 +08:00
} else {
// 处理非个性化短信
smsData, ok := sendMobiles[row[2]]
if !ok {
smsData = SmsData{
Sid: Sid,
IsPersonalizedMsg: false,
Content: content,
Mobiles: make([]string, 0),
}
}
smsData.Mobiles = append(smsData.Mobiles, row[3])
if len(smsData.Mobiles) >= batchSize {
mobiles := smsData.Mobiles
mobileStr := strings.Join(mobiles, ",")
var sl []SmsList
sl = append(sl, SmsList{M: mobileStr, C: content, F: 8})
sd := Sms{Sid: Sid, Data: sl, Token: token}
sendSMSDataJson, _ := json.Marshal(sd)
resp, err := smsApi("appendbatchdata", string(sendSMSDataJson))
if resp != 0 {
fmt.Printf("smsApi returned error: %d\n", err)
//发送提醒邮件
subject := "丝芙兰数据包数据提交异常"
body := fmt.Sprintf("数据包:%s;\nCommunicationChannelID%s;\n错误信息%s\n 请前往管理平台查看处理。", fileName, row[2], err)
applogger.Error(body)
SendEmail(subject, body) //发送邮件
}
smsData.Mobiles = []string{} // reset mobiles slice
}
sendMobiles[row[2]] = smsData
2023-02-21 14:36:25 +08:00
}
}
if _, ok := insertsCount[row[2]]; !ok {
insertsCount[row[2]] = 0
}
insertsCount[row[2]]++
count++
2023-02-18 16:32:19 +08:00
}
if len(dataBatch) > 0 {
2023-02-21 14:36:25 +08:00
dataBatchChan <- dataBatch
2023-02-18 16:32:19 +08:00
}
close(dataBatchChan)
2023-03-05 13:18:40 +08:00
for ccid := range ccids {
smsData, ok := sendMobiles[ccid]
if !ok {
continue
}
if smsData.IsPersonalizedMsg { //个性化
smsList := smsData.SmsList
if len(smsList) > 0 {
sd := Sms{Sid: smsData.Sid, Data: smsList, Token: token}
2023-03-05 13:18:40 +08:00
sendSMSDataJson, _ := json.Marshal(sd)
smsApi("appendbatchdata", string(sendSMSDataJson))
smsData.SmsList = []SmsList{} // reset mobiles slice
2023-03-05 13:18:40 +08:00
}
} else { //非个性化
mobiles, ok := smsData.Mobiles, len(smsData.Mobiles) > 0 && true
if ok && len(mobiles) > 0 {
2023-03-05 13:18:40 +08:00
mobileStr := strings.Join(mobiles, ",")
var sl []SmsList
sl = append(sl, SmsList{M: mobileStr, C: smsData.Content, F: 8})
sd := Sms{Sid: smsData.Sid, Data: sl, Token: token}
2023-03-05 13:18:40 +08:00
sendSMSDataJson, _ := json.Marshal(sd)
smsApi("appendbatchdata", string(sendSMSDataJson))
smsData.Mobiles = []string{} // reset mobiles slice
2023-03-05 13:18:40 +08:00
}
}
sf := SmsFinish{Sid: smsData.Sid, Token: token}
sfJson, _ := json.Marshal(sf)
smsApi("finishbatch", string(sfJson))
bpi := []BatchProcessingInformation{}
bpi = append(bpi, BatchProcessingInformation{
CommunicationChannelID: ccid,
RepeatTargetsMember: duplicateCount[ccid],
LastCallRepeatTargetsMember: lastCallduplicateCount[ccid],
InsertsTargetsMember: insertsCount[ccid],
DataFileName: dataFileName,
})
err = db.CreateInBatches(bpi, insertSize).Error
if err != nil {
applogger.Error(fmt.Sprintf("插入批次处理信息失败,文件名:%s错误信息%v", fileName, err))
2023-03-05 13:18:40 +08:00
}
smsData = SmsData{}
sendMobiles[ccid] = smsData
}
2023-03-05 13:18:40 +08:00
wg.Wait() //所有入库全部完成
2023-02-18 16:32:19 +08:00
//插入批此重复数据
if len(dataBatchDuplicate) > 0 {
err = db.CreateInBatches(dataBatchDuplicate, insertSize).Error
if err != nil {
applogger.Error(fmt.Sprintf("插入重复数据失败,文件名:%s错误信息%v", fileName, err))
} else {
dataBatchDuplicate = nil
2023-03-05 13:18:40 +08:00
}
}
elapsed := time.Since(start)
//发送提醒邮件
subject := "丝芙兰数据包处理完成"
body := fmt.Sprintf("总数:%d;\n数据包%s;\n过滤重复数%d;\n过滤后总数%d;\n处理完成请前往管理平台查看处理。", count+bi, fileName, bi, count)
SendEmail(subject, body) //发送邮件
applogger.Info(fmt.Sprintf("%s数据包 入库完成", fileName))
applogger.Info(fmt.Sprintf("%s数据包执行时间:%s 插入数据:%d条 过滤数数:%d条", fileName, elapsed, count, bi))
return 0
} else { //如果没有查询到批次信息,跳过处理
applogger.Error(fmt.Sprintf("未查询到批次数据,文件名:%s截取批次标识%s", fileName, dataFileName))
return -1
2023-02-21 14:36:25 +08:00
}
}
2023-02-18 16:32:19 +08:00
}
func queryBatchState() {
db, _ := connectToDB()
2023-03-15 10:03:55 +08:00
//if err := db.AutoMigrate(&Batch{}); err != nil {
//applogger.Error(fmt.Sprintf("AutoMigrate失败%s", err))
//}
var batches []Batch
if err := db.Where("status NOT IN (?)", []int{6, 8}).Find(&batches).Error; err != nil {
applogger.Error(fmt.Sprintf("查询批次状态失败:%s", err))
} else {
for _, batch := range batches {
sf := SmsFinish{Sid: batch.Sid, Token: token}
sfJson, _ := json.Marshal(sf)
url := "http://www.wemediacn.net/webservice/BatchService?service=sms.querybatchstate"
resp, err := http.Post(url, "application/json; charset=utf-8", strings.NewReader(string(sfJson)))
if err != nil {
applogger.Error(fmt.Sprintf("查询批次状态失败:%s", err))
} else {
// 解析响应数据
var retobj map[string]interface{}
err = json.NewDecoder(resp.Body).Decode(&retobj)
if err != nil {
fmt.Println(err)
}
jsonStr, err := json.Marshal(retobj)
if err != nil {
fmt.Println(err)
}
fmt.Printf("查询批次信息API 返回:%s\n", string(jsonStr))
fmt.Print("\n")
fmt.Print("\n")
code := int(retobj["code"].(float64))
if code == 0 {
2023-03-15 10:03:55 +08:00
status := int(retobj["state"].(float64))
if batch.Status != status || status == 5 {
fmt.Println(batch)
fmt.Print("\n")
fmt.Print("状态不一致或者发送中,更新状态\n")
updates := map[string]interface{}{
"status": status,
}
if endTime, ok := retobj["endTime"].(string); ok {
if endTimeTime, err := time.Parse("2006-01-02 15:04:05", endTime); err == nil {
updates["end_time"] = &endTimeTime
}
} else {
updates["end_time"] = nil
}
if endTime, ok := retobj["startTime"].(string); ok {
if endTimeTime, err := time.Parse("2006-01-02 15:04:05", endTime); err == nil {
2023-03-15 10:03:55 +08:00
updates["start_time"] = &endTimeTime
}
} else {
2023-03-15 10:03:55 +08:00
updates["start_time"] = nil
}
if mc, ok := retobj["mc"].(float64); ok {
updates["mc"] = int(mc)
}
if rc, ok := retobj["rc"].(float64); ok {
updates["rc"] = int(rc)
}
if sc, ok := retobj["sc"].(float64); ok {
updates["sc"] = int(sc)
}
fmt.Print("\n")
fmt.Print(updates)
fmt.Print("\n")
fmt.Print("\n")
if err := db.Model(&batch).Updates(updates).Error; err != nil {
applogger.Error(fmt.Sprintf("修改批次状态失败:%s", err))
}
}
} else {
applogger.Error(fmt.Sprintf("查询批次状态失败返回不为0%s", string(jsonStr)))
}
}
defer resp.Body.Close()
}
}
}
2023-02-18 16:32:19 +08:00
func connectToDB() (*gorm.DB, error) {
dsn := "sqlserver://" + dbUser + ":" + dbPassword + "@" + dbAddress + "?database=" + dbName + "&charset=utf8mb4"
var db *gorm.DB
var err error
attempt := 1
maxAttempts := 5
backoff := time.Second
logger := logger.Default.LogMode(logger.Error) //Silent、Error、Warn、Info
2023-02-18 16:32:19 +08:00
for {
db, err = gorm.Open(sqlserver.Open(dsn), &gorm.Config{Logger: logger, SkipDefaultTransaction: true})
2023-02-18 16:32:19 +08:00
if err == nil {
break
}
if attempt >= maxAttempts {
applogger.Error(fmt.Sprintf("数据库连接失败,错误信息%v", err))
2023-02-18 16:32:19 +08:00
return nil, err
}
time.Sleep(backoff)
backoff *= 2
attempt++
}
return db, nil
}
2023-02-18 19:31:39 +08:00
2023-03-06 19:38:28 +08:00
func fileExists(filename string) bool {
info, err := os.Stat(filename)
if os.IsNotExist(err) {
return false
}
return !info.IsDir()
}
2023-03-05 13:18:40 +08:00
func iniPath() {
// 获取可执行文件所在的路径
executablePath, err := os.Executable()
if err != nil {
log.Fatal("failed to get executable path: ", err)
}
executableDir = filepath.Dir(executablePath)
//判断目录是否存在,不存在则创建
err = os.MkdirAll(filepath.Join(executableDir, zipPath), 0755)
if err != nil {
log.Fatal(err)
}
err = os.MkdirAll(filepath.Join(executableDir, txtPath), 0755)
if err != nil {
log.Fatal(err)
}
2023-03-06 19:38:28 +08:00
err = os.MkdirAll(filepath.Join(executableDir, lastCallPath), 0755)
if err != nil {
log.Fatal(err)
}
2023-03-05 13:18:40 +08:00
err = os.MkdirAll(filepath.Join(executableDir, logPath, time.Now().Format("2006_01")), 0755)
if err != nil {
log.Fatal(err)
}
}
func monopolize() {
// 打开一个文件作为锁
lockFile, err := os.OpenFile(filepath.Join(executableDir, ".lock"), os.O_CREATE|os.O_RDWR, 0666)
if err != nil {
fmt.Println("打开锁文件失败:", err)
os.Exit(1)
}
defer lockFile.Close()
// 尝试获取文件的独占锁
err = syscall.Flock(int(lockFile.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
if err != nil {
fmt.Println("程序已经在运行,本程序无法同时运行多个")
os.Exit(1)
}
}
func iniConfi() {
flag.StringVar(&env, "env", "dev", "运行模式")
flag.Parse()
switch env {
case "dev":
//fmt.Print("测试环境配置已生效\n")
redisAddress = "mysql5.weu.me:6379"
redisPassword = ""
redisDB = 1
sftpAddress = "192.168.10.86:49156"
sftpUser = "demo"
sftpPassword = "demo"
sftpDir = "/sftp/test"
dbAddress = "192.168.10.18:1433"
dbUser = "sa"
dbPassword = "Aa123123"
dbName = "sephora"
zipPath = "RawData/Zip/"
txtPath = "RawData/Txt/"
logPath = "logs/"
batchSize = 5000 //提交数据
insertSize = 500 //一次性入库
2023-03-05 13:18:40 +08:00
insertChanSize = 10 //通道缓冲数
goSize = 10 //协程数
taskTime = 1
to = []string{"chejiulong@wemediacn.com"}
token = "7100477930234217"
2023-03-06 19:38:28 +08:00
lastCallPath = "RawData/LastCall"
verifySignatureKey = "qZ6v1&H#Wjx+yRm2D@*sJF$tnfL83Ia"
2023-03-05 13:18:40 +08:00
case "prod":
//fmt.Print("正式环境配置已生效\n")
redisAddress = "r-bp11564d96842414128.redis.rds.aliyuncs.com:6379"
redisPassword = "3Nsb4Pmsl9bcLs24mL12l"
redisDB = 233
sftpAddress = "esftp.sephora.com.cn:20981"
sftpUser = "CHN-CRMTOWemedia-wemedia"
sftpPassword = "uoWdMHEv39ZFjiOg"
sftpDir = "/CN-CRMTOWemedia/SMS"
dbAddress = "rm-bp16l424ln96q1ouk.sqlserver.rds.aliyuncs.com:3433"
dbUser = "sephora_sms"
dbPassword = "5ORiiLmgkniC0EqF"
dbName = "sephora_sms"
zipPath = "RawData/Zip/"
txtPath = "RawData/Txt/"
logPath = "logs/"
batchSize = 5000 //提交数据
insertSize = 500 //一次性入库
insertChanSize = 100 //通道缓冲数
goSize = 50 //协程数
2023-03-05 13:18:40 +08:00
taskTime = 60
to = []string{"chejiulong@wemediacn.com", "xiayujuan@wemediacn.com", "wangyuanbing@wemediacn.com", "tangweiqi@wemediacn.com"}
2023-03-15 10:03:55 +08:00
token = "7100178600777091" //7100477930234217
2023-03-06 19:38:28 +08:00
lastCallPath = "RawData/LastCall"
verifySignatureKey = "Lzxf6asgwFRTsANZ8gkXJnP3EmOyNO#&"
2023-03-05 13:18:40 +08:00
default:
panic(fmt.Errorf("无效的运行模式: %s", env))
}
}
func iniLog() {
logPath = filepath.Join(executableDir, "logs", time.Now().Format("2006_01"))
os.MkdirAll(logPath, 0755)
logFileName := "sms_processing_" + time.Now().Format("2006_01_02") + ".log"
logFileHook := &lumberjack.Logger{
Filename: filepath.Join(logPath, logFileName),
}
logOutput := io.MultiWriter(os.Stdout, logFileHook) // 将日志同时输出到控制台和文件
applogger.SetOutput(logOutput)
}
2023-02-18 19:31:39 +08:00
func SendEmail(subject string, body string) error {
// 邮箱认证信息
smtpHost := "smtp.exmail.qq.com"
smtpPort := 465
2023-02-21 14:36:25 +08:00
from := "auto_system@wemediacn.com"
password := "yJJYYcy5NKx2y69r"
2023-02-18 19:31:39 +08:00
// 邮件内容
m := gomail.NewMessage()
m.SetHeader("From", from)
m.SetHeader("To", to...)
m.SetHeader("Subject", subject)
m.SetBody("text/plain", body)
// 邮件发送
d := gomail.NewDialer(smtpHost, smtpPort, from, password)
d.TLSConfig = &tls.Config{InsecureSkipVerify: true}
err := d.DialAndSend(m)
if err != nil {
applogger.Warn(fmt.Sprintf("邮件发送失败,错误信息%v", err))
2023-02-18 19:31:39 +08:00
}
return nil
}
2023-03-05 13:18:40 +08:00
func (f FileSorter) Len() int {
return len(f)
}
func (f FileSorter) Swap(i, j int) {
f[i], f[j] = f[j], f[i]
}
func (f FileSorter) Less(i, j int) bool {
// 首先检查文件扩展名是否为“.txt”如果是则将其移到文件列表的开头
if strings.HasSuffix(f[i].Name(), ".txt") && !strings.HasSuffix(f[j].Name(), ".txt") {
return true
}
// 如果文件扩展名都是“.txt”或都不是“.txt”则按字母顺序排序
return f[i].Name() < f[j].Name()
}
var ( //初始化变量
env string
applogger *logrus.Logger
redisClient *redis.Client
executableDir string
redisAddress string
redisPassword string
redisDB int
sftpAddress string
sftpUser string
sftpPassword string
sftpDir string
dbAddress string
dbUser string
dbPassword string
dbName string
zipPath string
txtPath string
logPath string
batchSize int //提交数据
insertSize int //一次性入库
insertChanSize int //通道缓冲数
goSize int //协程数
taskTime int
to []string
token string
lastCallPath string
verifySignatureKey string
2023-03-05 13:18:40 +08:00
)
type Batch struct {
ID uint `gorm:"primary_key"`
2023-03-08 13:51:15 +08:00
CommunicationChannelID string
2023-03-05 13:18:40 +08:00
CommunicationName string `gorm:"type:varchar(255)"`
TargetsMember uint `gorm:"type:int"`
TemplateID uint
Content string `gorm:"type:text"`
CreatedAt time.Time `gorm:"default:getdate()"`
UpdatedAt time.Time `gorm:"default:getdate()"`
2023-03-15 10:03:55 +08:00
Status int `gorm:"column:status"`
DataFileName string `gorm:"type:text"`
Sid int `gorm:"type:int"`
EndTime *time.Time `gorm:"column:end_time"`
StartTime *time.Time `gorm:"column:start_time"`
MC *int `gorm:"column:mc"`
RC *int `gorm:"column:rc"`
SC *int `gorm:"column:sc"`
2023-03-05 13:18:40 +08:00
}
type BatcheData struct {
ID uint `gorm:"primary_key"`
CommunicationChannelID string `gorm:"column:communication_channel_id"`
Mobile string `gorm:"column:mobile"`
ReservedField string `gorm:"column:reserved_field"`
DataFileName string `gorm:"column:data_file_name"`
2023-03-05 13:18:40 +08:00
}
func (BatcheData) TableName() string {
return "batche_datas"
}
type BatchProcessingInformation struct {
ID uint `gorm:"primaryKey;autoIncrement"`
CommunicationChannelID string `gorm:"column:communication_channel_id"`
RepeatTargetsMember int `gorm:"column:repeat_targets_member"`
LastCallRepeatTargetsMember int `gorm:"column:last_call_repeat_targets_member"`
InsertsTargetsMember int `gorm:"column:inserts_targets_member"`
DataFileName string `gorm:"column:data_file_name"`
2023-03-05 13:18:40 +08:00
}
func (BatchProcessingInformation) TableName() string {
return "batches_processing_informations"
}
type BatchDataDuplicateLog struct {
ID int `gorm:"primaryKey;autoIncrement"`
CommunicationChannelID string `gorm:"column:communication_channel_id"`
Mobile string `gorm:"column:mobile"`
ReservedField string `gorm:"column:reserved_field"`
DataFileName string `gorm:"column:data_file_name"`
2023-03-05 13:18:40 +08:00
}
func (BatchDataDuplicateLog) TableName() string {
return "batche_data_duplicate_logs"
}
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}
// 客户端连接信息
type client struct {
conn *websocket.Conn
}
// 在线客户端列表
var clients = make(map[*client]bool)
// 定义一个FileSorter结构体
type FileSorter []os.FileInfo
type BatchParams struct {
BatchName string `json:"batchName"`
BatchDesc string `json:"batchDesc"`
IsPersonal int `json:"isPersonal"`
Message string `json:"message"`
IsInternational int `json:"isInternational"`
IsSchedule int `json:"isSchedule"`
ScheduleTime string `json:"scheduleTime"`
Token string `json:"token"`
}
type SmsList struct {
M string `json:"m"`
C string `json:"c"`
F int `json:"f"`
}
type Sms struct {
Sid int `json:"sid"`
Data []SmsList `json:"data"`
Token string `json:"token"`
}
type SmsFinish struct {
Sid int `json:"sid"`
Token string `json:"token"`
}
type SmsData struct {
Sid int
IsPersonalizedMsg bool
Content string
Mobiles []string
SmsList []SmsList
}
2023-03-06 19:38:28 +08:00
type Task struct {
TaskData TaskData
Signature Signature
}
type TaskData struct {
2023-03-06 19:38:28 +08:00
Command string `json:"command"`
ExcludedFilename string `json:"excluded_filename"`
BatchFilename string `json:"batch_filename"`
DataFilename string `json:"data_filename"`
}
type Signature struct {
Signature string `json:"signature"`
Timestamp int64 `json:"timestamp"`
2023-03-07 16:50:12 +08:00
Nonce string `json:"nonce"`
}