1208 lines
39 KiB
Go
1208 lines
39 KiB
Go
package main
|
||
|
||
import (
|
||
"archive/zip"
|
||
"bufio"
|
||
"bytes"
|
||
"crypto/hmac"
|
||
"crypto/sha256"
|
||
"crypto/tls"
|
||
"encoding/csv"
|
||
"encoding/hex"
|
||
"encoding/json"
|
||
"flag"
|
||
"fmt"
|
||
"io"
|
||
"log"
|
||
"net/http"
|
||
"os"
|
||
"path"
|
||
"path/filepath"
|
||
"regexp"
|
||
"sort"
|
||
"strconv"
|
||
"strings"
|
||
"sync"
|
||
"syscall"
|
||
"time"
|
||
|
||
"github.com/go-redis/redis"
|
||
"github.com/gorilla/websocket"
|
||
"github.com/pkg/sftp"
|
||
"github.com/sirupsen/logrus"
|
||
"golang.org/x/crypto/ssh"
|
||
"gopkg.in/gomail.v2"
|
||
"gopkg.in/natefinch/lumberjack.v2"
|
||
"gorm.io/driver/sqlserver"
|
||
"gorm.io/gorm"
|
||
"gorm.io/gorm/logger"
|
||
)
|
||
|
||
func init() {
|
||
monopolize() //独占锁
|
||
iniConfi() //初始化环境变量
|
||
iniPath() //初始化路径
|
||
applogger = logrus.New() //创建日志
|
||
iniLog() //初始化日志配置
|
||
redisClient = redis.NewClient(&redis.Options{
|
||
Addr: redisAddress,
|
||
Password: redisPassword,
|
||
DB: redisDB,
|
||
})
|
||
go startWebSocket() // 启动 WebSocket 服务,使用协程方式运行
|
||
applogger.Info(fmt.Sprintf("程序启动,加载%s环境,尝试执行...", env))
|
||
go downloadDecompression() // 启动立即执行一次数据下载、处理
|
||
}
|
||
|
||
func main() {
|
||
ticker := time.NewTicker(time.Duration(taskTime) * time.Minute) // 数据下载、处理
|
||
ticker_merge := time.NewTicker(time.Minute) //名单合并
|
||
defer ticker.Stop()
|
||
defer ticker_merge.Stop()
|
||
|
||
for {
|
||
select {
|
||
case <-ticker.C:
|
||
iniLog()
|
||
applogger.Info("尝试执行数据处理...")
|
||
go downloadDecompression()
|
||
|
||
case <-ticker_merge.C:
|
||
iniLog()
|
||
fmt.Print("尝试执行名单合并...\n")
|
||
}
|
||
}
|
||
}
|
||
|
||
func startWebSocket() {
|
||
http.HandleFunc("/ws", handleWebSocket)
|
||
err := http.ListenAndServe(":8080", nil)
|
||
if err != nil {
|
||
fmt.Println("启动 WebSocket 服务失败:", err)
|
||
} else {
|
||
fmt.Println("启动 WebSocket 服务成功:")
|
||
}
|
||
}
|
||
|
||
func handleWebSocket(w http.ResponseWriter, r *http.Request) {
|
||
conn, err := upgrader.Upgrade(w, r, nil)
|
||
if err != nil {
|
||
fmt.Println("升级连接失败:", err)
|
||
return
|
||
}
|
||
// 打印客户端地址
|
||
fmt.Printf("客户端 %s 连接成功\n", conn.RemoteAddr().String())
|
||
|
||
// 添加客户端连接信息
|
||
c := &client{conn}
|
||
clients[c] = true
|
||
|
||
// 接收消息
|
||
go func() {
|
||
for {
|
||
_, message, err := conn.ReadMessage()
|
||
if err != nil {
|
||
fmt.Println("接收消息失败:", err)
|
||
return
|
||
}
|
||
fmt.Println("收到消息:", string(message))
|
||
var returnMessage []byte
|
||
task := Task{}
|
||
|
||
err = json.Unmarshal([]byte(message), &task)
|
||
if err != nil {
|
||
returnMessage = []byte(`{"code": 2001,"err": "json指令解析失败"}`)
|
||
conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端
|
||
} else {
|
||
if !verify_signature(task.Signature.Signature, task.Signature.Nonce, task.Signature.Timestamp, task.TaskData) { // 签名验证失败或超时
|
||
returnMessage = []byte(`{"code": 2401,"err": "Unauthorized"}`)
|
||
conn.WriteMessage(websocket.TextMessage, returnMessage)
|
||
conn.Close()
|
||
break
|
||
}
|
||
switch task.TaskData.Command {
|
||
case "lastCall":
|
||
if !fileExists(path.Join(executableDir, lastCallPath, task.TaskData.ExcludedFilename)) {
|
||
returnMessage = []byte(`{"code": 2003,"err": "task.ExcludedFilename 不存在"}`)
|
||
conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端
|
||
break
|
||
}
|
||
if !fileExists(path.Join(executableDir, txtPath, task.TaskData.BatchFilename)) {
|
||
returnMessage = []byte(`{"code": 2004,"err": "task.BatchFilename 不存在"}`)
|
||
conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端
|
||
break
|
||
}
|
||
if !fileExists(path.Join(executableDir, txtPath, task.TaskData.DataFilename)) {
|
||
returnMessage = []byte(`{"code": 2005,"err": "task.DataFilename 不存在"}`)
|
||
conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端
|
||
break
|
||
}
|
||
returnMessage = []byte(`{"code": 2000,"err": "开始处理lastCall"}`)
|
||
conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端
|
||
lastCallKeys, err := readExcludedFile(path.Join(executableDir, lastCallPath, task.TaskData.ExcludedFilename))
|
||
if err != nil {
|
||
returnMessage = []byte(`{"code": 2006,"err": "打开ExcludedFilename失败"}`)
|
||
conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端
|
||
break
|
||
} else {
|
||
returnMessage = []byte(`{"code": 2000,"err": "ExcludedFilename读取完成"}`)
|
||
conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端
|
||
}
|
||
if exists, _ := redisClient.Exists("iniLastCallDataStatus").Result(); exists == 1 {
|
||
returnMessage = []byte(`{"code": 2007,"err": "有lastCall执行中,请稍后重试"}`)
|
||
conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端
|
||
|
||
} else {
|
||
//无错误,执行逻辑代码,
|
||
subject := "丝芙兰短信处理程序异常"
|
||
err := redisClient.Set("iniLastCallDataStatus", 1, 0).Err()
|
||
if err != nil {
|
||
body := fmt.Sprintf("写入lastCall任务执行中标记失败:%v", err)
|
||
applogger.Error(body)
|
||
returnMessage = []byte(fmt.Sprintf(`{"code": 2008,"err": "写入lastCall任务执行中标记失败%v"}`, err))
|
||
conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端
|
||
SendEmail(subject, body) //发送邮件
|
||
} else {
|
||
batchInsert(task.TaskData.BatchFilename, true) //创建批次
|
||
returnMessage = []byte(`{"code": 2000,"err": "批次创建完成"}`)
|
||
conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端
|
||
batchDataInsert(task.TaskData.DataFilename, lastCallKeys) //添加数据
|
||
redisClient.Del("iniLastCallDataStatus") //删除任务执行中标记
|
||
returnMessage = []byte(`{"code": 2000,"err": "结束处理lastCall"}`)
|
||
conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端
|
||
}
|
||
}
|
||
default:
|
||
returnMessage = []byte(`{"code": 2002,"err": "task.Command 不存在"}`)
|
||
conn.WriteMessage(websocket.TextMessage, returnMessage) //发送消息给客户端
|
||
}
|
||
}
|
||
}
|
||
}()
|
||
|
||
// 设置连接关闭时的回调函数
|
||
conn.SetCloseHandler(func(code int, text string) error {
|
||
fmt.Println("连接已关闭,code:", code, "text:", text)
|
||
fmt.Printf("客户端 %s 关闭连接\n", conn.RemoteAddr().String())
|
||
delete(clients, c)
|
||
return nil
|
||
})
|
||
|
||
}
|
||
|
||
func verify_signature(signature string, nonce string, timestamp int64, data interface{}) bool {
|
||
key := "qZ6v1&H#Wjx+yRm2D@*sJF$tnfL83Ia"
|
||
|
||
fmt.Printf("Received signature: %s\n", signature)
|
||
fmt.Printf("Received timestamp: %d\n", timestamp)
|
||
fmt.Printf("Received nonce: %s\n", nonce)
|
||
fmt.Printf("Received data: %v\n", data)
|
||
|
||
received_signature := signature
|
||
received_timestamp, _ := strconv.ParseInt(fmt.Sprintf("%v", timestamp), 10, 64)
|
||
received_nonce := nonce //strconv.Atoi(fmt.Sprintf("%v", nonce))
|
||
|
||
if time.Now().Unix()-received_timestamp > 7200 {
|
||
fmt.Println("Timestamp expired")
|
||
return false
|
||
}
|
||
|
||
received_data_bytes, _ := json.Marshal(data)
|
||
received_data := string(received_data_bytes)
|
||
expected_data := fmt.Sprintf("%d|%s|%s", received_timestamp, received_nonce, received_data)
|
||
fmt.Printf("Expected data: %s\n", expected_data)
|
||
|
||
mac := hmac.New(sha256.New, []byte(key))
|
||
mac.Write([]byte(expected_data))
|
||
expected_signature := hex.EncodeToString(mac.Sum(nil))
|
||
fmt.Printf("Expected signature: %s\n", expected_signature)
|
||
|
||
if received_signature != expected_signature {
|
||
fmt.Println("Signature does not match")
|
||
return false
|
||
}
|
||
|
||
return true
|
||
}
|
||
|
||
func readExcludedFile(filename string) (map[string]bool, error) {
|
||
file, err := os.Open(filename)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
defer file.Close()
|
||
scanner := bufio.NewScanner(file)
|
||
scanner.Split(bufio.ScanLines)
|
||
scanner.Scan() // skip first line
|
||
lastCallKeys := make(map[string]bool)
|
||
for scanner.Scan() {
|
||
line := scanner.Text()
|
||
fields := strings.Split(line, ",")
|
||
lastCallKey := fmt.Sprintf("%s-%s", fields[0], fields[1])
|
||
lastCallKeys[lastCallKey] = true
|
||
}
|
||
return lastCallKeys, nil
|
||
}
|
||
|
||
/*
|
||
// 聊天室消息广播
|
||
func broadcast(message []byte) {
|
||
for c := range clients {
|
||
err := c.conn.WriteMessage(websocket.TextMessage, message)
|
||
if err != nil {
|
||
fmt.Println("发送消息失败:", err)
|
||
delete(clients, c)
|
||
}
|
||
}
|
||
}*/
|
||
|
||
func downloadDecompression() {
|
||
if exists, _ := redisClient.Exists("iniDataStatus").Result(); exists == 1 {
|
||
fmt.Print("上一批次执行中,跳过本次\n")
|
||
} else {
|
||
subject := "丝芙兰短信处理程序异常"
|
||
// 写入执行中标记
|
||
err := redisClient.Set("iniDataStatus", 1, 0).Err()
|
||
if err != nil {
|
||
body := fmt.Sprintf("写入任务执行中标记失败:%v", err)
|
||
applogger.Error(body)
|
||
SendEmail(subject, body) //发送邮件
|
||
}
|
||
// Connect to SFTP server
|
||
sshConfig := &ssh.ClientConfig{
|
||
User: sftpUser,
|
||
Auth: []ssh.AuthMethod{
|
||
ssh.Password(sftpPassword),
|
||
},
|
||
HostKeyCallback: ssh.InsecureIgnoreHostKey(),
|
||
}
|
||
sshClient, err := ssh.Dial("tcp", sftpAddress, sshConfig)
|
||
if err != nil {
|
||
body := fmt.Sprintf("sshClient连接失败:%v", err)
|
||
applogger.Error(body)
|
||
SendEmail(subject, body) //发送邮件
|
||
}
|
||
sftpClient, err := sftp.NewClient(sshClient)
|
||
if err != nil {
|
||
body := fmt.Sprintf("sftp连接失败:%v", err)
|
||
applogger.Error(body)
|
||
SendEmail(subject, body) //发送邮件
|
||
}
|
||
defer sftpClient.Close()
|
||
files, err := sftpClient.ReadDir(sftpDir)
|
||
if err != nil {
|
||
body := fmt.Sprintf("sftp目录%s不存在, 错误信息:%v", sftpDir, err)
|
||
applogger.Error(body)
|
||
SendEmail(subject, body) //发送邮件
|
||
}
|
||
it := 1
|
||
fmt.Printf("共%d个文件\n", len(files))
|
||
|
||
sort.Sort(FileSorter(files))
|
||
|
||
for _, file := range files {
|
||
processingStatus := -1
|
||
fmt.Printf("第%d个文件处理中\n", it)
|
||
it++
|
||
// Check if file has been downloaded before
|
||
fileKey := fmt.Sprintf("downloaded:%s", file.Name())
|
||
if exists, _ := redisClient.Exists(fileKey).Result(); exists == 1 {
|
||
fmt.Println("跳过已处理过的文件:" + file.Name())
|
||
continue
|
||
}
|
||
if filepath.Ext(file.Name()) == ".zip" {
|
||
//fmt.Println("下载开始(数据包):" + file.Name())
|
||
// Download file
|
||
srcFile, err := sftpClient.Open(path.Join(sftpDir, file.Name()))
|
||
if err != nil {
|
||
body := fmt.Sprintf("打开sftp文件失败,文件名:%s,错误信息: %v", file.Name(), err)
|
||
applogger.Error(body)
|
||
SendEmail(subject, body) //发送邮件
|
||
continue
|
||
}
|
||
defer srcFile.Close()
|
||
dstFile, err := os.Create(path.Join(executableDir, zipPath, file.Name()))
|
||
|
||
if err != nil {
|
||
body := fmt.Sprintf("创建本地文件失败,文件名: %s,错误信息:%v", file.Name(), err)
|
||
applogger.Error(body)
|
||
SendEmail(subject, body) //发送邮件
|
||
continue
|
||
}
|
||
defer dstFile.Close()
|
||
if _, err := io.Copy(dstFile, srcFile); err != nil {
|
||
body := fmt.Sprintf("下载文件失败,文件名: %s,错误信息:%v", file.Name(), err)
|
||
applogger.Error(body)
|
||
SendEmail(subject, body) //发送邮件
|
||
continue
|
||
}
|
||
applogger.Info(fmt.Sprintf("%s(数据包)下载完成", file.Name()))
|
||
// Unzip file
|
||
zipReader, err := zip.OpenReader(path.Join(executableDir, zipPath, file.Name()))
|
||
if err != nil {
|
||
body := fmt.Sprintf("解压文件失败: 文件名:%s,错误信息:%v", file.Name(), err)
|
||
applogger.Error(body)
|
||
SendEmail(subject, body) //发送邮件
|
||
continue
|
||
}
|
||
defer zipReader.Close()
|
||
for _, zipFile := range zipReader.File {
|
||
zipFileReader, err := zipFile.Open()
|
||
if strings.Contains(zipFile.Name, "__MACOSX/._") {
|
||
continue
|
||
} else if filepath.Ext(zipFile.Name) != ".txt" {
|
||
applogger.Error(fmt.Sprintf("文件类型不正确,跳过处理: %v", zipFile.Name))
|
||
continue
|
||
}
|
||
if err != nil || zipFileReader == nil {
|
||
applogger.Error(fmt.Sprintf("Failed to open zip file: %v", err))
|
||
fmt.Print("压缩文件处理结束")
|
||
continue
|
||
}
|
||
defer zipFileReader.Close()
|
||
// Create the file
|
||
unzipFile, err := os.Create(path.Join(executableDir, txtPath, zipFile.Name))
|
||
if err != nil {
|
||
body := fmt.Sprintf("创建压缩后的文件失败,文件名:%s,错误信息: %v", zipFile.Name, err)
|
||
applogger.Error(body)
|
||
SendEmail(subject, body) //发送邮件
|
||
continue
|
||
}
|
||
defer unzipFile.Close()
|
||
// Write the unzip data to the file
|
||
_, err = io.Copy(unzipFile, zipFileReader)
|
||
if err != nil {
|
||
body := fmt.Sprintf("文件解压失败,文件名:%s,错误信息: %v", zipFileReader, err)
|
||
applogger.Error(body)
|
||
SendEmail(subject, body) //发送邮件
|
||
continue
|
||
}
|
||
applogger.Info(fmt.Sprintf("%s(数据包)解压完成", zipFile.Name))
|
||
processingStatus = batchDataInsert(zipFile.Name, nil)
|
||
}
|
||
} else if filepath.Ext(file.Name()) == ".txt" {
|
||
srcFile, err := sftpClient.Open(path.Join(sftpDir, file.Name()))
|
||
if err != nil {
|
||
body := fmt.Sprintf("打开sftp文件失败,文件名:%s,错误信息: %v", file.Name(), err)
|
||
applogger.Error(body)
|
||
SendEmail(subject, body) //发送邮件
|
||
continue
|
||
}
|
||
defer srcFile.Close()
|
||
dstFile, err := os.Create(path.Join(executableDir, txtPath, file.Name()))
|
||
if err != nil {
|
||
body := fmt.Sprintf("创建本地文件失败,文件名: %s,错误信息:%v", file.Name(), err)
|
||
applogger.Error(body)
|
||
SendEmail(subject, body) //发送邮件
|
||
continue
|
||
}
|
||
defer dstFile.Close()
|
||
if _, err := io.Copy(dstFile, srcFile); err != nil {
|
||
body := fmt.Sprintf("下载文件失败,文件名: %s,错误信息:%v", file.Name(), err)
|
||
applogger.Error(body)
|
||
SendEmail(subject, body) //发送邮件
|
||
continue
|
||
}
|
||
applogger.Info(fmt.Sprintf("%s(批次文件)下载完成", file.Name()))
|
||
processingStatus = batchInsert(file.Name(), false)
|
||
}
|
||
if processingStatus != -1 {
|
||
err = redisClient.Set(fileKey, 1, 0).Err() //写入文件处理完成
|
||
if err != nil {
|
||
body := fmt.Sprintf("写入文件处理完成标记失败文件名:%s,错误信息:%v\n", file.Name(), err)
|
||
applogger.Error(body)
|
||
SendEmail(subject, body) //发送邮件
|
||
}
|
||
}
|
||
|
||
}
|
||
redisClient.Del("iniDataStatus") //删除任务执行中标记
|
||
}
|
||
}
|
||
|
||
// 批次入库
|
||
func batchInsert(fileName string, isLastCall bool) int {
|
||
//fmt.Print("批次处理开始")
|
||
start := time.Now()
|
||
db, _ := connectToDB()
|
||
file, err := os.Open(path.Join(executableDir, txtPath, fileName))
|
||
if err != nil {
|
||
return -1
|
||
} else {
|
||
defer file.Close()
|
||
reader := csv.NewReader(bufio.NewReader(file))
|
||
reader.Read()
|
||
batchRows := 0
|
||
for {
|
||
record, err := reader.Read()
|
||
if err != nil {
|
||
//return -1
|
||
break
|
||
}
|
||
|
||
communicationChannelID, _ := strconv.ParseUint(record[0], 10, 32)
|
||
TargetsMember, _ := strconv.ParseUint(record[2], 10, 32)
|
||
templateID, _ := strconv.ParseUint(record[3], 10, 32)
|
||
status := uint(1)
|
||
|
||
t := time.Now()
|
||
s := t.Format("2006-01-02 15:04:05")
|
||
var batchName, dataFileName string
|
||
|
||
if isLastCall {
|
||
batchName = fmt.Sprintf("lastCall-%s-%s", record[1], strconv.Itoa(int(communicationChannelID)))
|
||
dataFileName = fmt.Sprintf("lastCall-%s", fileName)
|
||
} else {
|
||
batchName = fmt.Sprintf("%s-%s", record[1], strconv.Itoa(int(communicationChannelID)))
|
||
dataFileName = fileName
|
||
}
|
||
|
||
batchParams := BatchParams{
|
||
BatchName: batchName,
|
||
BatchDesc: record[4],
|
||
IsPersonal: 0,
|
||
Message: record[4],
|
||
IsInternational: 0,
|
||
IsSchedule: 0, //点发
|
||
ScheduleTime: s,
|
||
Token: token,
|
||
}
|
||
|
||
// 调用发送短信批次接口
|
||
sid, err := CreateBatch(batchParams)
|
||
if err != nil {
|
||
fmt.Println(err)
|
||
return -1
|
||
}
|
||
fmt.Println(sid)
|
||
|
||
batch := Batch{
|
||
CommunicationChannelID: uint(communicationChannelID),
|
||
CommunicationName: batchName,
|
||
TargetsMember: uint(TargetsMember),
|
||
TemplateID: uint(templateID),
|
||
Content: record[4],
|
||
Status: status,
|
||
DataFileName: dataFileName,
|
||
Sid: sid,
|
||
}
|
||
db.Create(&batch)
|
||
batchRows++
|
||
}
|
||
time.Sleep(time.Second)
|
||
elapsed := time.Since(start)
|
||
|
||
subject := "丝芙兰批次文件处理完成"
|
||
body := fmt.Sprintf("批次数:%d;\n批次文件:%s;\n处理完成,请前往管理平台查看处理。", batchRows, fileName)
|
||
SendEmail(subject, body) //发送邮件
|
||
|
||
applogger.Info(fmt.Sprintf("%s(批次文件)入库完成", fileName))
|
||
applogger.Info(fmt.Sprintf("%s(批次文件)执行时间%s 插入批次次数:%d", fileName, elapsed, batchRows))
|
||
return 0
|
||
}
|
||
}
|
||
|
||
func CreateBatch(batchParams BatchParams) (int, error) {
|
||
// 将请求参数编码为JSON字符串
|
||
jsonBytes, err := json.Marshal(batchParams)
|
||
if err != nil {
|
||
return -1, err
|
||
}
|
||
jsonStr := string(jsonBytes)
|
||
|
||
// 发送POST请求
|
||
url := "http://www.wemediacn.net/webservice/BatchService?service=sms.createbatch"
|
||
resp, err := http.Post(url, "application/json; charset=utf-8", bytes.NewBuffer([]byte(jsonStr)))
|
||
if err != nil {
|
||
return -1, err
|
||
}
|
||
defer resp.Body.Close()
|
||
|
||
// 解析响应数据
|
||
var retobj map[string]interface{}
|
||
err = json.NewDecoder(resp.Body).Decode(&retobj)
|
||
if err != nil {
|
||
return -1, err
|
||
}
|
||
fmt.Print(retobj)
|
||
|
||
code := int(retobj["code"].(float64))
|
||
fmt.Printf("批次创建API 返回:%d\n", code)
|
||
|
||
if code == 0 {
|
||
sid := int(retobj["sid"].(float64))
|
||
return sid, nil
|
||
} else {
|
||
return -1, fmt.Errorf("create batch failed, error code: %d", code)
|
||
}
|
||
}
|
||
|
||
func smsApi(method string, sendSMSDataJson string) (int, error) {
|
||
// 发送POST请求
|
||
url := "http://www.wemediacn.net/webservice/BatchService?service=sms." + method
|
||
resp, err := http.Post(url, "application/json; charset=utf-8", strings.NewReader(sendSMSDataJson))
|
||
if err != nil {
|
||
return -1, err
|
||
}
|
||
defer resp.Body.Close()
|
||
// 解析响应数据
|
||
var retobj map[string]interface{}
|
||
err = json.NewDecoder(resp.Body).Decode(&retobj)
|
||
if err != nil {
|
||
return -1, err
|
||
}
|
||
jsonStr, err := json.Marshal(retobj)
|
||
if err != nil {
|
||
fmt.Println(err)
|
||
return -1, err
|
||
}
|
||
fmt.Printf("API 返回:%s\n", string(jsonStr))
|
||
code := int(retobj["code"].(float64))
|
||
if code == 0 {
|
||
//sid := int(retobj["sid"].(float64))
|
||
fmt.Printf("提交批次成功code:%d\n", code)
|
||
return code, nil
|
||
} else {
|
||
applogger.Error(string(jsonStr))
|
||
return -1, fmt.Errorf(string(jsonStr))
|
||
}
|
||
}
|
||
|
||
func batchDataInsert(fileName string, lastCallKeys map[string]bool) int {
|
||
start := time.Now()
|
||
// Open file
|
||
fileKey := fmt.Sprintf("downloaded:%s", strings.Replace(fileName, filepath.Ext(fileName), ".zip", 1))
|
||
file, err := os.Open(path.Join(executableDir, txtPath, fileName))
|
||
if err != nil {
|
||
redisClient.Del(fileKey) //删除文件处理完成的标志位
|
||
applogger.Error(fmt.Sprintf("文件打开失败,文件名:%s,错误信息%v", fileName, err))
|
||
return -1
|
||
} else {
|
||
defer file.Close()
|
||
db, _ := connectToDB()
|
||
// Insert data in batches using multiple goroutines
|
||
var wg sync.WaitGroup
|
||
dataBatchChan := make(chan []BatcheData, insertChanSize)
|
||
for i := 0; i < goSize; i++ {
|
||
wg.Add(1)
|
||
go func() {
|
||
for batch := range dataBatchChan {
|
||
retryCount := 0
|
||
for {
|
||
if err := db.CreateInBatches(batch, insertSize).Error; err != nil {
|
||
if retryCount >= 5 {
|
||
panic(err)
|
||
}
|
||
fmt.Printf("Insert failed, retrying in %v seconds...\n", 2*retryCount)
|
||
time.Sleep(time.Duration(2*retryCount) * time.Second)
|
||
retryCount++
|
||
} else {
|
||
break
|
||
}
|
||
}
|
||
}
|
||
wg.Done()
|
||
}()
|
||
}
|
||
|
||
hs := make(map[string]bool) //排重
|
||
dataBatch := make([]BatcheData, 0, batchSize) //短信数据
|
||
dataBatchDuplicate := make([]BatchDataDuplicateLog, 0, batchSize) //重复短信数据
|
||
bi := 0 //重复总数
|
||
count := 0 // 总数
|
||
//数据文件中批次
|
||
batches := []Batch{} //查询批次
|
||
fileNameDate := ""
|
||
dataFileName := ""
|
||
|
||
if lastCallKeys != nil {
|
||
fields := strings.Split(fileName, "_")
|
||
datetime := fields[len(fields)-1]
|
||
fileNameDate = datetime[:8]
|
||
// 模糊查询文件名包含“20230103”字符串的批次记录
|
||
db.Table("batches b1").
|
||
Select("b1.*").
|
||
Where("b1.data_file_name LIKE ?", "%"+fileNameDate+"%").
|
||
Joins("INNER JOIN (SELECT communication_channel_id, MAX(created_at) AS created_at FROM batches GROUP BY communication_channel_id) b2 ON b1.communication_channel_id = b2.communication_channel_id AND b1.created_at = b2.created_at").
|
||
Order("b1.created_at desc").
|
||
Find(&batches)
|
||
dataFileName = fmt.Sprintf("lastCall-%s", fileName)
|
||
} else {
|
||
fileName = fmt.Sprintf("lastCall-%s", fileName)
|
||
db.Table("batches b1").
|
||
Select("b1.*").
|
||
Where("b1.data_file_name = ?", fileNameDate).
|
||
Joins("INNER JOIN (SELECT communication_channel_id, MAX(created_at) AS created_at FROM batches GROUP BY communication_channel_id) b2 ON b1.communication_channel_id = b2.communication_channel_id AND b1.created_at = b2.created_at").
|
||
Order("b1.created_at desc").
|
||
Find(&batches)
|
||
dataFileName = fileName
|
||
}
|
||
|
||
batchCount := len(batches)
|
||
sendMobiles := make(map[string]SmsData, batchCount) //短信数据,用于提交发送平台
|
||
duplicateCount := make(map[string]int, batchCount) //按批次重复数
|
||
insertsCount := make(map[string]int, batchCount) //按批次插入数
|
||
ccids := make(map[string]bool, batchCount)
|
||
|
||
if batchCount > 0 { //如果查询到数据需要的批次信息,开始处理数据
|
||
// 定义一个名为result的map类型的变量,并以CommunicationChannelID作为键
|
||
result := make(map[string][]Batch)
|
||
for _, batch := range batches {
|
||
cckdKey := strconv.FormatUint(uint64(batch.CommunicationChannelID), 10)
|
||
result[cckdKey] = append(result[cckdKey], batch)
|
||
print(batch.CommunicationChannelID, "\n")
|
||
}
|
||
|
||
scanner := bufio.NewScanner(file)
|
||
scanner.Split(bufio.ScanLines)
|
||
scanner.Scan() // skip first line
|
||
for scanner.Scan() {
|
||
line := scanner.Text()
|
||
row, err := csv.NewReader(strings.NewReader(line)).Read()
|
||
if err != nil {
|
||
applogger.Error(fmt.Sprintf("文件按行读取失败,文件名:%s,错误信息%v", fileName, err))
|
||
continue
|
||
}
|
||
reservedFields := map[string]string{ //合并个性化字段
|
||
"ReservedField1": row[5],
|
||
"ReservedField2": row[6],
|
||
"ReservedField3": row[7],
|
||
"ReservedField4": row[8],
|
||
"ReservedField5": row[9],
|
||
"DataFileName": dataFileName,
|
||
"FullName": row[4],
|
||
}
|
||
reservedFieldsJson, err := json.Marshal(reservedFields) //个性化字段转json
|
||
if err != nil {
|
||
applogger.Error(fmt.Sprintf("个性化字段合并失败,文件名:%s,错误信息%v", fileName, err))
|
||
continue
|
||
}
|
||
if _, ok := duplicateCount[row[2]]; !ok {
|
||
duplicateCount[row[2]] = 0
|
||
}
|
||
|
||
// Check if record exists in hashset
|
||
key := fmt.Sprintf("%s-%s-%s", row[2], row[3], row[5])
|
||
if _, exists := hs[key]; exists { //如果批次数据重复
|
||
bi++
|
||
// Increment duplicate count
|
||
duplicateCount[row[2]]++
|
||
dataBatchDuplicate = append(dataBatchDuplicate, BatchDataDuplicateLog{
|
||
CommunicationChannelID: row[2],
|
||
Mobile: row[3],
|
||
ReservedField: string(reservedFieldsJson),
|
||
})
|
||
|
||
if len(dataBatchDuplicate) >= batchSize {
|
||
err = db.CreateInBatches(dataBatchDuplicate, insertSize).Error
|
||
if err != nil {
|
||
applogger.Error(fmt.Sprintf("插入重复数据失败,文件名:%s,错误信息%v", fileName, err))
|
||
} else {
|
||
dataBatchDuplicate = make([]BatchDataDuplicateLog, 0, batchSize)
|
||
}
|
||
}
|
||
continue
|
||
}
|
||
if lastCallKeys != nil && lastCallKeys[fmt.Sprintf("%s-%s", row[2], row[3])] {
|
||
continue
|
||
}
|
||
|
||
// Add record to hashset
|
||
hs[key] = true
|
||
ccids[row[2]] = true
|
||
dataBatch = append(dataBatch, BatcheData{
|
||
CommunicationChannelID: row[2],
|
||
Mobile: row[3],
|
||
ReservedField: string(reservedFieldsJson),
|
||
})
|
||
|
||
if len(dataBatch) >= batchSize {
|
||
dataBatchChan <- dataBatch
|
||
dataBatch = make([]BatcheData, 0, batchSize)
|
||
}
|
||
|
||
if batches, ok := result[row[2]]; ok && len(batches) > 0 && batches[0].Content != "" {
|
||
content := fmt.Sprintf("%s /", batches[0].Content)
|
||
Sid := batches[0].Sid
|
||
pattern := regexp.MustCompile(`\{.*\}`)
|
||
matched := pattern.MatchString(content)
|
||
if matched { //个性化短信
|
||
smsData, ok := sendMobiles[row[2]]
|
||
if !ok {
|
||
smsData = SmsData{
|
||
Sid: Sid,
|
||
IsPersonalizedMsg: true,
|
||
SmsList: make([]SmsList, 0),
|
||
}
|
||
}
|
||
placeholderMap := map[string]string{
|
||
"{RESERVED_FIELD_1}": row[5],
|
||
"{RESERVED_FIELD_2}": row[6],
|
||
"{RESERVED_FIELD_3}": row[7],
|
||
"{RESERVED_FIELD_4}": row[8],
|
||
"{RESERVED_FIELD_5}": row[9],
|
||
}
|
||
for k, v := range placeholderMap {
|
||
content = strings.ReplaceAll(content, k, v)
|
||
}
|
||
smsData.SmsList = append(smsData.SmsList, SmsList{M: row[3], C: content, F: 8})
|
||
if len(smsData.SmsList) >= batchSize/2 {
|
||
sd := Sms{Sid: Sid, Data: smsData.SmsList, Token: token}
|
||
sendSMSDataJson, _ := json.Marshal(sd)
|
||
//
|
||
resp, err := smsApi("appendbatchdata", string(sendSMSDataJson))
|
||
if resp != 0 {
|
||
fmt.Printf("smsApi returned error: %d\n", err)
|
||
//发送提醒邮件
|
||
subject := "丝芙兰数据包数据提交异常"
|
||
body := fmt.Sprintf("数据包:%s;\nCommunicationChannelID:%s;\n错误信息:%s\n 请前往管理平台查看处理。", fileName, row[2], err)
|
||
applogger.Error(body)
|
||
SendEmail(subject, body) //发送邮件
|
||
}
|
||
smsData.SmsList = []SmsList{} // reset mobiles slice
|
||
}
|
||
sendMobiles[row[2]] = smsData
|
||
} else {
|
||
// 处理非个性化短信
|
||
smsData, ok := sendMobiles[row[2]]
|
||
if !ok {
|
||
smsData = SmsData{
|
||
Sid: Sid,
|
||
IsPersonalizedMsg: false,
|
||
Content: content,
|
||
Mobiles: make([]string, 0),
|
||
}
|
||
}
|
||
smsData.Mobiles = append(smsData.Mobiles, row[3])
|
||
if len(smsData.Mobiles) >= batchSize {
|
||
mobiles := smsData.Mobiles
|
||
mobileStr := strings.Join(mobiles, ",")
|
||
var sl []SmsList
|
||
sl = append(sl, SmsList{M: mobileStr, C: content, F: 8})
|
||
sd := Sms{Sid: Sid, Data: sl, Token: token}
|
||
sendSMSDataJson, _ := json.Marshal(sd)
|
||
resp, err := smsApi("appendbatchdata", string(sendSMSDataJson))
|
||
if resp != 0 {
|
||
fmt.Printf("smsApi returned error: %d\n", err)
|
||
//发送提醒邮件
|
||
subject := "丝芙兰数据包数据提交异常"
|
||
body := fmt.Sprintf("数据包:%s;\nCommunicationChannelID:%s;\n错误信息:%s\n 请前往管理平台查看处理。", fileName, row[2], err)
|
||
applogger.Error(body)
|
||
SendEmail(subject, body) //发送邮件
|
||
}
|
||
smsData.Mobiles = []string{} // reset mobiles slice
|
||
}
|
||
sendMobiles[row[2]] = smsData
|
||
}
|
||
}
|
||
if _, ok := insertsCount[row[2]]; !ok {
|
||
insertsCount[row[2]] = 0
|
||
}
|
||
insertsCount[row[2]]++
|
||
count++
|
||
}
|
||
|
||
if len(dataBatch) > 0 {
|
||
dataBatchChan <- dataBatch
|
||
}
|
||
close(dataBatchChan)
|
||
|
||
for ccid := range ccids {
|
||
smsData, ok := sendMobiles[ccid]
|
||
if !ok {
|
||
continue
|
||
}
|
||
if smsData.IsPersonalizedMsg { //个性化
|
||
smsList := smsData.SmsList
|
||
if len(smsList) > 0 {
|
||
sd := Sms{Sid: smsData.Sid, Data: smsList, Token: token}
|
||
sendSMSDataJson, _ := json.Marshal(sd)
|
||
smsApi("appendbatchdata", string(sendSMSDataJson))
|
||
smsData.SmsList = []SmsList{} // reset mobiles slice
|
||
}
|
||
} else { //非个性化
|
||
mobiles, ok := smsData.Mobiles, len(smsData.Mobiles) > 0 && true
|
||
if ok && len(mobiles) > 0 {
|
||
mobileStr := strings.Join(mobiles, ",")
|
||
var sl []SmsList
|
||
sl = append(sl, SmsList{M: mobileStr, C: smsData.Content, F: 8})
|
||
sd := Sms{Sid: smsData.Sid, Data: sl, Token: token}
|
||
sendSMSDataJson, _ := json.Marshal(sd)
|
||
smsApi("appendbatchdata", string(sendSMSDataJson))
|
||
smsData.Mobiles = []string{} // reset mobiles slice
|
||
}
|
||
}
|
||
sf := SmsFinish{Sid: smsData.Sid, Token: token}
|
||
sfJson, _ := json.Marshal(sf)
|
||
smsApi("finishbatch", string(sfJson))
|
||
bpi := []BatchProcessingInformation{}
|
||
bpi = append(bpi, BatchProcessingInformation{
|
||
CommunicationChannelID: ccid,
|
||
RepeatTargetsMember: duplicateCount[ccid],
|
||
InsertsTargetsMember: insertsCount[ccid],
|
||
DataFileName: fileName,
|
||
})
|
||
err = db.CreateInBatches(bpi, insertSize).Error
|
||
if err != nil {
|
||
applogger.Error(fmt.Sprintf("插入批次处理信息失败,文件名:%s,错误信息%v", fileName, err))
|
||
}
|
||
smsData = SmsData{}
|
||
sendMobiles[ccid] = smsData
|
||
}
|
||
|
||
wg.Wait() //所有入库全部完成
|
||
|
||
//插入批此重复数据
|
||
if len(dataBatchDuplicate) > 0 {
|
||
err = db.CreateInBatches(dataBatchDuplicate, insertSize).Error
|
||
if err != nil {
|
||
applogger.Error(fmt.Sprintf("插入重复数据失败,文件名:%s,错误信息%v", fileName, err))
|
||
} else {
|
||
dataBatchDuplicate = nil
|
||
}
|
||
}
|
||
|
||
elapsed := time.Since(start)
|
||
|
||
//发送提醒邮件
|
||
subject := "丝芙兰数据包处理完成"
|
||
body := fmt.Sprintf("总数:%d;\n数据包:%s;\n过滤重复数:%d;\n过滤后总数:%d;\n处理完成,请前往管理平台查看处理。", count+bi, fileName, bi, count)
|
||
SendEmail(subject, body) //发送邮件
|
||
applogger.Info(fmt.Sprintf("%s(数据包) 入库完成", fileName))
|
||
applogger.Info(fmt.Sprintf("%s(数据包)执行时间:%s 插入数据:%d条 过滤数数:%d条", fileName, elapsed, count, bi))
|
||
return 0
|
||
} else { //如果没有查询到批次信息,跳过处理
|
||
return -1
|
||
}
|
||
}
|
||
}
|
||
|
||
func connectToDB() (*gorm.DB, error) {
|
||
dsn := "sqlserver://" + dbUser + ":" + dbPassword + "@" + dbAddress + "?database=" + dbName + "&charset=utf8mb4"
|
||
var db *gorm.DB
|
||
var err error
|
||
attempt := 1
|
||
maxAttempts := 5
|
||
backoff := time.Second
|
||
logger := logger.Default.LogMode(logger.Error) //Silent、Error、Warn、Info
|
||
for {
|
||
db, err = gorm.Open(sqlserver.Open(dsn), &gorm.Config{Logger: logger, SkipDefaultTransaction: true})
|
||
if err == nil {
|
||
break
|
||
}
|
||
if attempt >= maxAttempts {
|
||
applogger.Error(fmt.Sprintf("数据库连接失败,错误信息%v", err))
|
||
return nil, err
|
||
}
|
||
time.Sleep(backoff)
|
||
backoff *= 2
|
||
attempt++
|
||
}
|
||
return db, nil
|
||
}
|
||
|
||
func fileExists(filename string) bool {
|
||
info, err := os.Stat(filename)
|
||
if os.IsNotExist(err) {
|
||
return false
|
||
}
|
||
return !info.IsDir()
|
||
}
|
||
|
||
func iniPath() {
|
||
// 获取可执行文件所在的路径
|
||
executablePath, err := os.Executable()
|
||
if err != nil {
|
||
log.Fatal("failed to get executable path: ", err)
|
||
}
|
||
executableDir = filepath.Dir(executablePath)
|
||
|
||
//判断目录是否存在,不存在则创建
|
||
err = os.MkdirAll(filepath.Join(executableDir, zipPath), 0755)
|
||
if err != nil {
|
||
log.Fatal(err)
|
||
}
|
||
err = os.MkdirAll(filepath.Join(executableDir, txtPath), 0755)
|
||
if err != nil {
|
||
log.Fatal(err)
|
||
}
|
||
err = os.MkdirAll(filepath.Join(executableDir, lastCallPath), 0755)
|
||
if err != nil {
|
||
log.Fatal(err)
|
||
}
|
||
err = os.MkdirAll(filepath.Join(executableDir, logPath, time.Now().Format("2006_01")), 0755)
|
||
if err != nil {
|
||
log.Fatal(err)
|
||
}
|
||
}
|
||
|
||
func monopolize() {
|
||
// 打开一个文件作为锁
|
||
lockFile, err := os.OpenFile(filepath.Join(executableDir, ".lock"), os.O_CREATE|os.O_RDWR, 0666)
|
||
if err != nil {
|
||
fmt.Println("打开锁文件失败:", err)
|
||
os.Exit(1)
|
||
}
|
||
defer lockFile.Close()
|
||
// 尝试获取文件的独占锁
|
||
err = syscall.Flock(int(lockFile.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
|
||
if err != nil {
|
||
fmt.Println("程序已经在运行,本程序无法同时运行多个")
|
||
os.Exit(1)
|
||
}
|
||
}
|
||
|
||
func iniConfi() {
|
||
flag.StringVar(&env, "env", "dev", "运行模式")
|
||
flag.Parse()
|
||
switch env {
|
||
case "dev":
|
||
//fmt.Print("测试环境配置已生效\n")
|
||
redisAddress = "mysql5.weu.me:6379"
|
||
redisPassword = ""
|
||
redisDB = 1
|
||
sftpAddress = "192.168.10.86:49156"
|
||
sftpUser = "demo"
|
||
sftpPassword = "demo"
|
||
sftpDir = "/sftp/test"
|
||
dbAddress = "192.168.10.18:1433"
|
||
dbUser = "sa"
|
||
dbPassword = "Aa123123"
|
||
dbName = "sephora"
|
||
zipPath = "RawData/Zip/"
|
||
txtPath = "RawData/Txt/"
|
||
logPath = "logs/"
|
||
batchSize = 5000 //提交数据
|
||
insertSize = 660 //一次性入库
|
||
insertChanSize = 10 //通道缓冲数
|
||
goSize = 10 //协程数
|
||
taskTime = 1
|
||
to = []string{"chejiulong@wemediacn.com"}
|
||
token = "7100477930234217"
|
||
lastCallPath = "RawData/LastCall"
|
||
case "prod":
|
||
//fmt.Print("正式环境配置已生效\n")
|
||
redisAddress = "r-bp11564d96842414128.redis.rds.aliyuncs.com:6379"
|
||
redisPassword = "3Nsb4Pmsl9bcLs24mL12l"
|
||
redisDB = 233
|
||
sftpAddress = "esftp.sephora.com.cn:20981"
|
||
sftpUser = "CHN-CRMTOWemedia-wemedia"
|
||
sftpPassword = "uoWdMHEv39ZFjiOg"
|
||
sftpDir = "/CN-CRMTOWemedia/SMS"
|
||
dbAddress = "rm-bp16l424ln96q1ouk.sqlserver.rds.aliyuncs.com:3433"
|
||
dbUser = "sephora_sms"
|
||
dbPassword = "5ORiiLmgkniC0EqF"
|
||
dbName = "sephora_sms"
|
||
zipPath = "RawData/Zip/"
|
||
txtPath = "RawData/Txt/"
|
||
logPath = "logs/"
|
||
batchSize = 5000 //提交数据
|
||
insertSize = 660 //一次性入库
|
||
insertChanSize = 100 //通道缓冲数
|
||
goSize = 50 //协程数
|
||
taskTime = 60
|
||
to = []string{"chejiulong@wemediacn.com", "xiayujuan@wemediacn.com", "wangyuanbing@wemediacn.com", "tangweiqi@wemediacn.com"}
|
||
token = "7100477930234217"
|
||
lastCallPath = "RawData/LastCall"
|
||
default:
|
||
panic(fmt.Errorf("无效的运行模式: %s", env))
|
||
}
|
||
}
|
||
|
||
func iniLog() {
|
||
logPath = filepath.Join(executableDir, "logs", time.Now().Format("2006_01"))
|
||
os.MkdirAll(logPath, 0755)
|
||
logFileName := "sms_processing_" + time.Now().Format("2006_01_02") + ".log"
|
||
logFileHook := &lumberjack.Logger{
|
||
Filename: filepath.Join(logPath, logFileName),
|
||
}
|
||
logOutput := io.MultiWriter(os.Stdout, logFileHook) // 将日志同时输出到控制台和文件
|
||
applogger.SetOutput(logOutput)
|
||
}
|
||
|
||
func SendEmail(subject string, body string) error {
|
||
// 邮箱认证信息
|
||
smtpHost := "smtp.exmail.qq.com"
|
||
smtpPort := 465
|
||
from := "auto_system@wemediacn.com"
|
||
password := "yJJYYcy5NKx2y69r"
|
||
|
||
// 邮件内容
|
||
m := gomail.NewMessage()
|
||
m.SetHeader("From", from)
|
||
m.SetHeader("To", to...)
|
||
m.SetHeader("Subject", subject)
|
||
m.SetBody("text/plain", body)
|
||
// 邮件发送
|
||
d := gomail.NewDialer(smtpHost, smtpPort, from, password)
|
||
d.TLSConfig = &tls.Config{InsecureSkipVerify: true}
|
||
err := d.DialAndSend(m)
|
||
if err != nil {
|
||
applogger.Warn(fmt.Sprintf("邮件发送失败,错误信息%v", err))
|
||
}
|
||
return nil
|
||
}
|
||
|
||
func (f FileSorter) Len() int {
|
||
return len(f)
|
||
}
|
||
|
||
func (f FileSorter) Swap(i, j int) {
|
||
f[i], f[j] = f[j], f[i]
|
||
}
|
||
|
||
func (f FileSorter) Less(i, j int) bool {
|
||
// 首先检查文件扩展名是否为“.txt”,如果是,则将其移到文件列表的开头
|
||
if strings.HasSuffix(f[i].Name(), ".txt") && !strings.HasSuffix(f[j].Name(), ".txt") {
|
||
return true
|
||
}
|
||
// 如果文件扩展名都是“.txt”,或都不是“.txt”,则按字母顺序排序
|
||
return f[i].Name() < f[j].Name()
|
||
}
|
||
|
||
var ( //初始化变量
|
||
env string
|
||
applogger *logrus.Logger
|
||
redisClient *redis.Client
|
||
executableDir string
|
||
redisAddress string
|
||
redisPassword string
|
||
redisDB int
|
||
sftpAddress string
|
||
sftpUser string
|
||
sftpPassword string
|
||
sftpDir string
|
||
dbAddress string
|
||
dbUser string
|
||
dbPassword string
|
||
dbName string
|
||
zipPath string
|
||
txtPath string
|
||
logPath string
|
||
batchSize int //提交数据
|
||
insertSize int //一次性入库
|
||
insertChanSize int //通道缓冲数
|
||
goSize int //协程数
|
||
taskTime int
|
||
to []string
|
||
token string
|
||
lastCallPath string
|
||
)
|
||
|
||
type Batch struct {
|
||
ID uint `gorm:"primary_key"`
|
||
CommunicationChannelID uint
|
||
CommunicationName string `gorm:"type:varchar(255)"`
|
||
TargetsMember uint `gorm:"type:int"`
|
||
TemplateID uint
|
||
Content string `gorm:"type:text"`
|
||
CreatedAt time.Time `gorm:"default:getdate()"`
|
||
UpdatedAt time.Time `gorm:"default:getdate()"`
|
||
Status uint `gorm:"type:int"`
|
||
DataFileName string `gorm:"type:text"`
|
||
Sid int `gorm:"type:int"`
|
||
}
|
||
|
||
type BatcheData struct {
|
||
ID uint `gorm:"primary_key"`
|
||
CommunicationChannelID string `gorm:"column:communication_channel_id"`
|
||
Mobile string `gorm:"column:mobile"`
|
||
ReservedField string `gorm:"column:reserved_field"`
|
||
}
|
||
|
||
func (BatcheData) TableName() string {
|
||
return "batche_datas"
|
||
}
|
||
|
||
type BatchProcessingInformation struct {
|
||
ID uint `gorm:"primaryKey;autoIncrement"`
|
||
CommunicationChannelID string `gorm:"column:communication_channel_id"`
|
||
RepeatTargetsMember int `gorm:"column:repeat_targets_member"`
|
||
InsertsTargetsMember int `gorm:"column:inserts_targets_member"`
|
||
DataFileName string `gorm:"column:data_file_name"`
|
||
}
|
||
|
||
func (BatchProcessingInformation) TableName() string {
|
||
return "batches_processing_informations"
|
||
}
|
||
|
||
type BatchDataDuplicateLog struct {
|
||
ID int `gorm:"primaryKey;autoIncrement"`
|
||
CommunicationChannelID string `gorm:"column:communication_channel_id"`
|
||
Mobile string `gorm:"column:mobile"`
|
||
ReservedField string `gorm:"column:reserved_field"`
|
||
}
|
||
|
||
func (BatchDataDuplicateLog) TableName() string {
|
||
return "batche_data_duplicate_logs"
|
||
}
|
||
|
||
var upgrader = websocket.Upgrader{
|
||
ReadBufferSize: 1024,
|
||
WriteBufferSize: 1024,
|
||
}
|
||
|
||
// 客户端连接信息
|
||
type client struct {
|
||
conn *websocket.Conn
|
||
}
|
||
|
||
// 在线客户端列表
|
||
|
||
var clients = make(map[*client]bool)
|
||
|
||
// 定义一个FileSorter结构体
|
||
type FileSorter []os.FileInfo
|
||
|
||
type BatchParams struct {
|
||
BatchName string `json:"batchName"`
|
||
BatchDesc string `json:"batchDesc"`
|
||
IsPersonal int `json:"isPersonal"`
|
||
Message string `json:"message"`
|
||
IsInternational int `json:"isInternational"`
|
||
IsSchedule int `json:"isSchedule"`
|
||
ScheduleTime string `json:"scheduleTime"`
|
||
Token string `json:"token"`
|
||
}
|
||
|
||
type SmsList struct {
|
||
M string `json:"m"`
|
||
C string `json:"c"`
|
||
F int `json:"f"`
|
||
}
|
||
|
||
type Sms struct {
|
||
Sid int `json:"sid"`
|
||
Data []SmsList `json:"data"`
|
||
Token string `json:"token"`
|
||
}
|
||
|
||
type SmsFinish struct {
|
||
Sid int `json:"sid"`
|
||
Token string `json:"token"`
|
||
}
|
||
type SmsData struct {
|
||
Sid int
|
||
IsPersonalizedMsg bool
|
||
Content string
|
||
Mobiles []string
|
||
SmsList []SmsList
|
||
}
|
||
|
||
type Task struct {
|
||
TaskData TaskData
|
||
Signature Signature
|
||
}
|
||
|
||
type TaskData struct {
|
||
Command string `json:"command"`
|
||
ExcludedFilename string `json:"excluded_filename"`
|
||
BatchFilename string `json:"batch_filename"`
|
||
DataFilename string `json:"data_filename"`
|
||
}
|
||
type Signature struct {
|
||
Signature string `json:"signature"`
|
||
Timestamp int64 `json:"timestamp"`
|
||
Nonce string `json:"nonce"`
|
||
}
|