增加短信推送至发送平台gonna

This commit is contained in:
chejiulong 2023-03-05 13:18:40 +08:00
parent 3c9a16cf2f
commit 34ab8962e0
4 changed files with 578 additions and 197 deletions

1
go.mod
View File

@ -16,6 +16,7 @@ require (
require (
github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 // indirect
github.com/golang-sql/sqlexp v0.1.0 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect
github.com/kr/fs v0.1.0 // indirect

2
go.sum
View File

@ -37,6 +37,8 @@ github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4=
github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs=

Binary file not shown.

772
main.go
View File

@ -3,6 +3,7 @@ package main
import (
"archive/zip"
"bufio"
"bytes"
"crypto/tls"
"encoding/csv"
"encoding/json"
@ -10,9 +11,12 @@ import (
"fmt"
"io"
"log"
"net/http"
"os"
"path"
"path/filepath"
"regexp"
"sort"
"strconv"
"strings"
"sync"
@ -20,210 +24,124 @@ import (
"time"
"github.com/go-redis/redis"
"github.com/gorilla/websocket"
"github.com/pkg/sftp"
"github.com/sirupsen/logrus"
"golang.org/x/crypto/ssh"
"gopkg.in/gomail.v2"
"gopkg.in/natefinch/lumberjack.v2"
"gorm.io/driver/sqlserver"
_ "gorm.io/driver/sqlserver"
"gorm.io/gorm"
"gorm.io/gorm/logger"
)
var ( //初始化变量
env string
applogger *logrus.Logger
redisClient *redis.Client
executableDir string
redisAddress string
redisPassword string
redisDB int
sftpAddress string
sftpUser string
sftpPassword string
sftpDir string
dbAddress string
dbUser string
dbPassword string
dbName string
zipPath string
txtPath string
logPath string
batchSize int //提交数据
insertSize int //一次性入库
insertChanSize int //通道缓冲数
goSize int //协程数
taskTime int
to []string
)
type Batch struct {
ID uint `gorm:"primary_key"`
CommunicationChannelID uint
CommunicationName string `gorm:"type:varchar(255)"`
TargetsMember uint `gorm:"type:int"`
TemplateID uint
Content string `gorm:"type:text"`
CreatedAt time.Time `gorm:"default:getdate()"`
UpdatedAt time.Time `gorm:"default:getdate()"`
Status uint `gorm:"type:int"`
DataFileName string `gorm:"type:text"`
}
type BatcheData struct {
ID uint `gorm:"primary_key"`
CommunicationChannelID string `gorm:"column:communication_channel_id"`
Mobile string `gorm:"column:mobile"`
FullName string `gorm:"column:full_name"`
ReservedField string `gorm:"column:reserved_field"`
}
func (BatcheData) TableName() string {
return "batche_datas"
}
type BatchProcessingInformation struct {
ID uint `gorm:"primaryKey;autoIncrement"`
CommunicationChannelID string `gorm:"column:communication_channel_id"`
RepeatTargetsMember int `gorm:"column:repeat_targets_member"`
InsertsTargetsMember int `gorm:"column:inserts_targets_member"`
DataFileName string `gorm:"column:data_file_name"`
}
func (BatchProcessingInformation) TableName() string {
return "batches_processing_informations"
}
type BatchDataDuplicateLog struct {
ID int `gorm:"primaryKey;autoIncrement"`
CommunicationChannelID string `gorm:"column:communication_channel_id"`
Mobile string `gorm:"column:mobile"`
FullName string `gorm:"column:full_name"`
ReservedField string `gorm:"column:reserved_field"`
}
func (BatchDataDuplicateLog) TableName() string {
return "batche_data_duplicate_logs"
}
func init() {
// 获取可执行文件所在的路径
executablePath, err := os.Executable()
if err != nil {
log.Fatal("failed to get executable path: ", err)
}
executableDir = filepath.Dir(executablePath)
flag.StringVar(&env, "env", "dev", "运行模式")
flag.Parse()
switch env {
case "dev":
//fmt.Print("测试环境配置已生效\n")
redisAddress = "mysql5.weu.me:6379"
redisPassword = ""
redisDB = 1
sftpAddress = "192.168.10.86:49156"
sftpUser = "demo"
sftpPassword = "demo"
sftpDir = "/sftp/test"
dbAddress = "192.168.10.18:1433"
dbUser = "sa"
dbPassword = "Aa123123"
dbName = "sephora"
zipPath = "RawData/Zip/"
txtPath = "RawData/Txt/"
logPath = "logs/"
batchSize = 5000 //提交数据
insertSize = 500 //一次性入库
insertChanSize = 10 //通道缓冲数
goSize = 10 //协程数
taskTime = 1
to = []string{"chejiulong@wemediacn.com"}
case "prod":
//fmt.Print("正式环境配置已生效\n")
redisAddress = "r-bp11564d96842414128.redis.rds.aliyuncs.com:6379"
redisPassword = "3Nsb4Pmsl9bcLs24mL12l"
redisDB = 233
sftpAddress = "esftp.sephora.com.cn:20981"
sftpUser = "CHN-CRMTOWemedia-wemedia"
sftpPassword = "uoWdMHEv39ZFjiOg"
sftpDir = "/CN-CRMTOWemedia/SMS"
dbAddress = "rm-bp16l424ln96q1ouk.sqlserver.rds.aliyuncs.com:3433"
dbUser = "sephora_sms"
dbPassword = "5ORiiLmgkniC0EqF"
dbName = "sephora_sms"
zipPath = "RawData/Zip/"
txtPath = "RawData/Txt/"
logPath = "logs/"
batchSize = 5000 //提交数据
insertSize = 500 //一次性入库
insertChanSize = 50 //通道缓冲数
goSize = 50 //协程数
taskTime = 60
to = []string{"chejiulong@wemediacn.com", "xiayujuan@wemediacn.com", "wangyuanbing@wemediacn.com", "tangweiqi@wemediacn.com"}
default:
panic(fmt.Errorf("无效的运行模式: %s", env))
}
//判断目录是否存在,不存在则创建
err = os.MkdirAll(filepath.Join(executableDir, zipPath), 0755)
err = os.MkdirAll(filepath.Join(executableDir, txtPath), 0755)
err = os.MkdirAll(filepath.Join(executableDir, logPath, time.Now().Format("2006_01")), 0755)
if err != nil {
log.Fatal(err)
}
monopolize() //独占锁
iniConfi() //初始化环境变量
iniPath() //初始化路径
applogger = logrus.New() //创建日志
iniLog() //初始化日志配置
//链接Redis
redisClient = redis.NewClient(&redis.Options{
Addr: redisAddress,
Password: redisPassword,
DB: redisDB,
})
//go startWebSocket() // 启动 WebSocket 服务,使用协程方式运行
applogger.Info(fmt.Sprintf("程序启动,加载%s环境尝试执行...", env))
go downloadDecompression() // 启动立即执行一次数据下载、处理
}
func main() {
// 打开一个文件作为锁
lockFile, err := os.OpenFile(filepath.Join(executableDir, ".lock"), os.O_CREATE|os.O_RDWR, 0666)
if err != nil {
fmt.Println("打开锁文件失败:", err)
os.Exit(1)
}
defer lockFile.Close()
// 尝试获取文件的独占锁
err = syscall.Flock(int(lockFile.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
if err != nil {
fmt.Println("程序已经在运行,本程序无法同时运行多个")
os.Exit(1)
}
applogger = logrus.New()
logPath = filepath.Join(executableDir, "logs", time.Now().Format("2006_01"))
err = os.MkdirAll(logPath, 0755)
logFileName := "sms_processing_" + time.Now().Format("2006_01_02") + ".log"
logFileHook := &lumberjack.Logger{
Filename: filepath.Join(logPath, logFileName),
}
logOutput := io.MultiWriter(os.Stdout, logFileHook) // 将日志同时输出到控制台和文件
applogger.SetOutput(logOutput)
applogger.Info(fmt.Sprintf("程序启动,加载%s环境尝试执行...", env))
go downloadDecompression() // 启动立即执行一次
ticker := time.NewTicker(time.Duration(taskTime) * time.Minute) // 数据下载、处理
ticker_merge := time.NewTicker(time.Minute) //名单合并
defer ticker.Stop()
defer ticker_merge.Stop()
ticker := time.NewTicker(time.Duration(taskTime) * time.Minute) // 创建一个定时器
defer ticker.Stop() // 延迟关闭定时器,确保所有的定时器事件都被处理完毕
// 循环处理任务
for {
select {
case <-ticker.C:
// 定时器触发时执行的任务函数
logPath = filepath.Join(executableDir, "logs", time.Now().Format("2006_01"))
logFileName = "sms_processing_" + time.Now().Format("2006_01_02") + ".log"
logFileHook = &lumberjack.Logger{
Filename: filepath.Join(logPath, logFileName),
}
logOutput := io.MultiWriter(os.Stdout, logFileHook) // 将日志同时输出到控制台和文件
applogger.SetOutput(logOutput)
applogger.Info("尝试执行...")
iniLog()
applogger.Info("尝试执行数据处理...")
go downloadDecompression()
case <-ticker_merge.C:
iniLog()
fmt.Print("尝试执行名单合并...\n")
//go downloadDecompression()
}
}
}
func startWebSocket() {
http.HandleFunc("/ws", handleWebSocket)
err := http.ListenAndServe(":8080", nil)
if err != nil {
fmt.Println("启动 WebSocket 服务失败:", err)
} else {
fmt.Println("启动 WebSocket 服务成功:")
}
}
func handleWebSocket(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
fmt.Println("升级连接失败:", err)
return
}
// 打印客户端地址
fmt.Printf("客户端 %s 连接成功\n", conn.RemoteAddr().String())
// 添加客户端连接信息
c := &client{conn}
clients[c] = true
// 接收消息
go func() {
for {
_, message, err := conn.ReadMessage()
if err != nil {
fmt.Println("接收消息失败:", err)
return
}
fmt.Println("收到消息:", string(message))
if string(message) == "a" {
fmt.Println("触发合并操作")
message := []byte("触发合并操作")
err := conn.WriteMessage(websocket.TextMessage, message) //发送消息给客户端
if err != nil {
fmt.Println("发送消息失败:", err)
delete(clients, c)
return
}
} else if string(message) == "c" {
for c := range clients {
fmt.Printf("客户端 %s 在线\n", c.conn.RemoteAddr().String())
}
} else if string(message) == "all" {
message = []byte("所有的客户端们你们好!")
go broadcast(message)
}
}
}()
// 设置连接关闭时的回调函数
conn.SetCloseHandler(func(code int, text string) error {
fmt.Println("连接已关闭code:", code, "text:", text)
fmt.Printf("客户端 %s 关闭连接\n", conn.RemoteAddr().String())
delete(clients, c)
return nil
})
}
// 聊天室消息广播
func broadcast(message []byte) {
for c := range clients {
err := c.conn.WriteMessage(websocket.TextMessage, message)
if err != nil {
fmt.Println("发送消息失败:", err)
delete(clients, c)
}
}
}
@ -260,6 +178,9 @@ func downloadDecompression() {
}
it := 1
fmt.Printf("共%d个文件\n", len(files))
sort.Sort(FileSorter(files))
for _, file := range files {
fmt.Printf("第%d个文件处理中\n", it)
it++
@ -356,8 +277,7 @@ func downloadDecompression() {
}
err = redisClient.Set(fileKey, 1, 0).Err() //写入下载、解压、入库完成标记
if err != nil {
//fmt.Printf("写入文件处理完成标记失败文件名:%s错误信息%v\n", file.Name(), err)
applogger.Info(fmt.Sprintf("写入文件处理完成标记失败文件名:%s错误信息v%\n", file.Name(), err))
applogger.Info(fmt.Sprintf("写入文件处理完成标记失败文件名:%s错误信息%v\n", file.Name(), err))
}
}
redisClient.Del("iniDataStatus") //删除任务执行中标记
@ -383,11 +303,34 @@ func batchInsert(fileName string) {
if err != nil {
break
}
communicationChannelID, _ := strconv.ParseUint(record[0], 10, 32)
TargetsMember, _ := strconv.ParseUint(record[2], 10, 32)
templateID, _ := strconv.ParseUint(record[3], 10, 32)
status := uint(1)
t := time.Now()
s := t.Format("2006-01-02 15:04:05")
//key :=
batchParams := BatchParams{
BatchName: fmt.Sprintf("%s-%s", record[1], strconv.Itoa(int(communicationChannelID))),
BatchDesc: record[4],
IsPersonal: 0,
Message: record[4],
IsInternational: 0,
IsSchedule: 0, //点发
ScheduleTime: s,
Token: token,
}
// 调用发送短信批次接口
sid, err := CreateBatch(batchParams)
if err != nil {
fmt.Println(err)
return
}
fmt.Println(sid)
batch := Batch{
CommunicationChannelID: uint(communicationChannelID),
CommunicationName: record[1],
@ -396,6 +339,7 @@ func batchInsert(fileName string) {
Content: record[4],
Status: status,
DataFileName: fileName,
Sid: sid,
}
db.Create(&batch)
batchRows++
@ -410,6 +354,78 @@ func batchInsert(fileName string) {
}
}
func CreateBatch(batchParams BatchParams) (int, error) {
// 将请求参数编码为JSON字符串
jsonBytes, err := json.Marshal(batchParams)
if err != nil {
return -1, err
}
jsonStr := string(jsonBytes)
// 发送POST请求
url := "http://www.wemediacn.net/webservice/BatchService?service=sms.createbatch"
resp, err := http.Post(url, "application/json; charset=utf-8", bytes.NewBuffer([]byte(jsonStr)))
if err != nil {
return -1, err
}
defer resp.Body.Close()
// 解析响应数据
var retobj map[string]interface{}
err = json.NewDecoder(resp.Body).Decode(&retobj)
if err != nil {
return -1, err
}
fmt.Print(retobj)
code := int(retobj["code"].(float64))
fmt.Printf("批次创建API 返回:%d\n", code)
if code == 0 {
sid := int(retobj["sid"].(float64))
return sid, nil
} else {
return -1, fmt.Errorf("create batch failed, error code: %d", code)
}
}
func smsApi(method string, sendSMSDataJson string) (int, error) {
// 发送POST请求
url := "http://www.wemediacn.net/webservice/BatchService?service=sms." + method
resp, err := http.Post(url, "application/json; charset=utf-8", strings.NewReader(sendSMSDataJson))
if err != nil {
return -1, err
}
defer resp.Body.Close()
// 解析响应数据
var retobj map[string]interface{}
err = json.NewDecoder(resp.Body).Decode(&retobj)
if err != nil {
return -1, err
}
fmt.Print(retobj)
jsonStr, err := json.Marshal(retobj)
if err != nil {
fmt.Println(err)
return -1, err
}
fmt.Printf("API 返回:%s\n", string(jsonStr))
code := int(retobj["code"].(float64))
//fmt.Print("code", code)
if code == 0 {
//sid := int(retobj["sid"].(float64))
fmt.Printf("提交批次成功code%d\n", code)
return code, nil
} else {
return -1, fmt.Errorf("create batch failed, error code: %d", code)
}
}
func batchDataInsert(fileName string) {
start := time.Now()
// Open file
@ -457,7 +473,30 @@ func batchDataInsert(fileName string) {
bi := 0
duplicateCount := make(map[string]int)
insertsCount := make(map[string]int)
//sendSMSData := make(map[string][]Sms)
sendMobiles := make(map[string]map[string]interface{})
var count int
ccids := make(map[string]bool)
// 通过下划线将文件名拆分为多个字段
fields := strings.Split(fileName, "_")
// 最后一个字段是日期和时间,我们只需要日期部分
datetime := fields[len(fields)-1]
fileNameDate := datetime[:8]
var batches []Batch
// 模糊查询包含“20230103”字符串的记录
db.Where("data_file_name LIKE ?", "%"+fileNameDate+"%").Find(&batches)
// 定义一个名为result的map类型的变量并以CommunicationChannelID作为键
result := make(map[string][]Batch)
for _, batch := range batches {
cckdKey := strconv.FormatUint(uint64(batch.CommunicationChannelID), 10)
result[cckdKey] = append(result[cckdKey], batch)
print(batch.CommunicationChannelID, "\n")
}
//ci := 1
for scanner.Scan() {
line := scanner.Text()
@ -482,6 +521,7 @@ func batchDataInsert(fileName string) {
if _, ok := duplicateCount[row[2]]; !ok {
duplicateCount[row[2]] = 0
}
// Check if record exists in hashset
key := fmt.Sprintf("%s-%s-%s", row[2], row[3], row[5])
if _, exists := hs[key]; exists { //如果批次数据重复
@ -494,6 +534,7 @@ func batchDataInsert(fileName string) {
FullName: row[4],
ReservedField: string(reservedFieldsJson),
})
if len(dataBatchDuplicate) >= batchSize {
err = db.CreateInBatches(dataBatchDuplicate, insertSize).Error
if err != nil {
@ -507,7 +548,8 @@ func batchDataInsert(fileName string) {
}
// Add record to hashset
hs[key] = true
//tccid, _ := strconv.ParseUint(row[2], 10, 32)
ccids[row[2]] = true
dataBatch = append(dataBatch, BatcheData{
CommunicationChannelID: row[2],
Mobile: row[3],
@ -521,6 +563,67 @@ func batchDataInsert(fileName string) {
//fmt.Print("提交通次数" + strconv.Itoa(ci))
//ci++
}
if batches, ok := result[row[2]]; ok && len(batches) > 0 && batches[0].Content != "" {
content := batches[0].Content
Sid := batches[0].Sid
pattern := regexp.MustCompile(`\{.*\}`)
matched := pattern.MatchString(content)
if matched { //个性化短信
if _, ok := sendMobiles[row[2]]; !ok {
sendMobiles[row[2]] = make(map[string]interface{})
sendMobiles[row[2]]["sid"] = Sid
sendMobiles[row[2]]["isPersonalizedMsg"] = true
sendMobiles[row[2]]["mobiles"] = make([]SmsList, 0)
}
placeholderMap := map[string]string{
"{RESERVED_FIELD_1}": row[5],
"{RESERVED_FIELD_2}": row[6],
"{RESERVED_FIELD_3}": row[7],
"{RESERVED_FIELD_4}": row[8],
"{RESERVED_FIELD_5}": row[9],
}
for k, v := range placeholderMap {
content = strings.ReplaceAll(content, k, v)
}
sendMobiles[row[2]]["mobiles"] = append(sendMobiles[row[2]]["mobiles"].([]SmsList), SmsList{M: row[3], C: content, F: 8})
if len(sendMobiles[row[2]]["mobiles"].([]SmsList)) >= batchSize/2 {
sd := Sms{Sid: Sid, Data: sendMobiles[row[2]]["mobiles"].([]SmsList), Token: token}
sendSMSDataJson, _ := json.Marshal(sd)
smsApi("appendbatchdata", string(sendSMSDataJson))
//fmt.Print(string(sendSMSDataJson))
sendMobiles[row[2]]["mobiles"] = []SmsList{} // reset mobiles slice
}
} else {
// 处理非个性化短信
if _, ok := sendMobiles[row[2]]; !ok {
sendMobiles[row[2]] = make(map[string]interface{})
sendMobiles[row[2]]["sid"] = Sid
sendMobiles[row[2]]["isPersonalizedMsg"] = false
sendMobiles[row[2]]["content"] = content
sendMobiles[row[2]]["mobiles"] = []string{}
}
sendMobiles[row[2]]["mobiles"] = append(sendMobiles[row[2]]["mobiles"].([]string), row[3])
if len(sendMobiles[row[2]]["mobiles"].([]string)) >= batchSize {
mobiles := sendMobiles[row[2]]["mobiles"].([]string)
mobileStr := strings.Join(mobiles, ",")
var sl []SmsList
sl = append(sl, SmsList{M: mobileStr, C: content, F: 8})
sd := Sms{Sid: Sid, Data: sl, Token: token}
sendSMSDataJson, _ := json.Marshal(sd)
smsApi("appendbatchdata", string(sendSMSDataJson))
sendMobiles[row[2]]["mobiles"] = []string{} // reset mobiles slice
}
}
}
if _, ok := insertsCount[row[2]]; !ok {
insertsCount[row[2]] = 0
}
@ -530,28 +633,58 @@ func batchDataInsert(fileName string) {
if len(dataBatch) > 0 {
dataBatchChan <- dataBatch
dataBatch = make([]BatcheData, 0, batchSize)
//fmt.Print("文件读取完成,最后一批提交至通道")
}
close(dataBatchChan)
wg.Wait() //所有入库全部完成
//fmt.Println("结束批次v%\n", ccids)
//fmt.Print("ccids 长度", len(ccids), "\n")
for ccid := range ccids { // 处理各个批次剩余数据同时处理批次结束api
//插入批次处理信息
bpi := []BatchProcessingInformation{}
for key, value := range duplicateCount {
fmt.Println("循环处理批次结束\n", ccid)
//batches := result[ccid]
if sendMobiles[ccid]["isPersonalizedMsg"].(bool) { //个性化
smsList := sendMobiles[ccid]["mobiles"].([]SmsList)
if len(smsList) > 0 {
sd := Sms{Sid: sendMobiles[ccid]["sid"].(int), Data: sendMobiles[ccid]["mobiles"].([]SmsList), Token: token}
sendSMSDataJson, _ := json.Marshal(sd)
smsApi("appendbatchdata", string(sendSMSDataJson))
//fmt.Print(string(sendSMSDataJson))
sendMobiles[ccid]["mobiles"] = []SmsList{} // reset mobiles slice
}
} else { //非个性化
if mobiles, ok := sendMobiles[ccid]["mobiles"].([]string); ok && len(mobiles) > 0 {
mobileStr := strings.Join(sendMobiles[ccid]["mobiles"].([]string), ",")
var sl []SmsList
sl = append(sl, SmsList{M: mobileStr, C: sendMobiles[ccid]["content"].(string), F: 8})
sd := Sms{Sid: sendMobiles[ccid]["sid"].(int), Data: sl, Token: token}
sendSMSDataJson, _ := json.Marshal(sd)
smsApi("appendbatchdata", string(sendSMSDataJson))
}
}
//关闭批次
sf := SmsFinish{Sid: sendMobiles[ccid]["sid"].(int), Token: token}
sfJson, _ := json.Marshal(sf)
smsApi("finishbatch", string(sfJson))
//插入批次处理数据
bpi := []BatchProcessingInformation{}
bpi = append(bpi, BatchProcessingInformation{
CommunicationChannelID: key,
RepeatTargetsMember: value,
InsertsTargetsMember: insertsCount[key],
CommunicationChannelID: ccid,
RepeatTargetsMember: duplicateCount[ccid],
InsertsTargetsMember: insertsCount[ccid],
DataFileName: fileName,
})
err = db.CreateInBatches(bpi, insertSize).Error
if err != nil {
applogger.Error(fmt.Sprintf("插入批次处理信息失败,文件名:%s错误信息%v", fileName, err))
}
}
err = db.CreateInBatches(bpi, insertSize).Error
if err != nil {
applogger.Error(fmt.Sprintf("插入批次处理信息失败,文件名:%s错误信息%v", fileName, err))
}
wg.Wait() //所有入库全部完成
//插入批此重复数据
if len(dataBatchDuplicate) > 0 {
@ -569,7 +702,7 @@ func batchDataInsert(fileName string) {
subject := "丝芙兰数据包处理完成"
body := "数据包:" + fileName + ";\n总数" + strconv.Itoa(count+bi) + ";\n过滤重复数" + strconv.Itoa(bi) + ";\n过滤后总数" + strconv.Itoa(count) + ";\n处理完成请前往管理平台查看处理。"
SendEmail(subject, body) //发送邮件
applogger.Info(fmt.Sprintf(fmt.Sprintf("%s数据包 入库完成", fileName)))
applogger.Info(fmt.Sprintf("%s数据包 入库完成", fileName))
applogger.Info(fmt.Sprintf("%s数据包执行时间:%s 插入数据:%d条 过滤数数:%d条", fileName, elapsed, count, bi))
}
}
@ -598,6 +731,111 @@ func connectToDB() (*gorm.DB, error) {
return db, nil
}
func iniPath() {
// 获取可执行文件所在的路径
executablePath, err := os.Executable()
if err != nil {
log.Fatal("failed to get executable path: ", err)
}
executableDir = filepath.Dir(executablePath)
//判断目录是否存在,不存在则创建
err = os.MkdirAll(filepath.Join(executableDir, zipPath), 0755)
if err != nil {
log.Fatal(err)
}
err = os.MkdirAll(filepath.Join(executableDir, txtPath), 0755)
if err != nil {
log.Fatal(err)
}
err = os.MkdirAll(filepath.Join(executableDir, logPath, time.Now().Format("2006_01")), 0755)
if err != nil {
log.Fatal(err)
}
}
func monopolize() {
// 打开一个文件作为锁
lockFile, err := os.OpenFile(filepath.Join(executableDir, ".lock"), os.O_CREATE|os.O_RDWR, 0666)
if err != nil {
fmt.Println("打开锁文件失败:", err)
os.Exit(1)
}
defer lockFile.Close()
// 尝试获取文件的独占锁
err = syscall.Flock(int(lockFile.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
if err != nil {
fmt.Println("程序已经在运行,本程序无法同时运行多个")
os.Exit(1)
}
}
func iniConfi() {
flag.StringVar(&env, "env", "dev", "运行模式")
flag.Parse()
switch env {
case "dev":
//fmt.Print("测试环境配置已生效\n")
redisAddress = "mysql5.weu.me:6379"
redisPassword = ""
redisDB = 1
sftpAddress = "192.168.10.86:49156"
sftpUser = "demo"
sftpPassword = "demo"
sftpDir = "/sftp/test"
dbAddress = "192.168.10.18:1433"
dbUser = "sa"
dbPassword = "Aa123123"
dbName = "sephora"
zipPath = "RawData/Zip/"
txtPath = "RawData/Txt/"
logPath = "logs/"
batchSize = 5000 //提交数据
insertSize = 500 //一次性入库
insertChanSize = 10 //通道缓冲数
goSize = 10 //协程数
taskTime = 1
to = []string{"chejiulong@wemediacn.com"}
token = "7100477930234217"
case "prod":
//fmt.Print("正式环境配置已生效\n")
redisAddress = "r-bp11564d96842414128.redis.rds.aliyuncs.com:6379"
redisPassword = "3Nsb4Pmsl9bcLs24mL12l"
redisDB = 233
sftpAddress = "esftp.sephora.com.cn:20981"
sftpUser = "CHN-CRMTOWemedia-wemedia"
sftpPassword = "uoWdMHEv39ZFjiOg"
sftpDir = "/CN-CRMTOWemedia/SMS"
dbAddress = "rm-bp16l424ln96q1ouk.sqlserver.rds.aliyuncs.com:3433"
dbUser = "sephora_sms"
dbPassword = "5ORiiLmgkniC0EqF"
dbName = "sephora_sms"
zipPath = "RawData/Zip/"
txtPath = "RawData/Txt/"
logPath = "logs/"
batchSize = 5000 //提交数据
insertSize = 500 //一次性入库
insertChanSize = 50 //通道缓冲数
goSize = 50 //协程数
taskTime = 60
to = []string{"chejiulong@wemediacn.com", "xiayujuan@wemediacn.com", "wangyuanbing@wemediacn.com", "tangweiqi@wemediacn.com"}
token = "7100477930234217"
default:
panic(fmt.Errorf("无效的运行模式: %s", env))
}
}
func iniLog() {
logPath = filepath.Join(executableDir, "logs", time.Now().Format("2006_01"))
os.MkdirAll(logPath, 0755)
logFileName := "sms_processing_" + time.Now().Format("2006_01_02") + ".log"
logFileHook := &lumberjack.Logger{
Filename: filepath.Join(logPath, logFileName),
}
logOutput := io.MultiWriter(os.Stdout, logFileHook) // 将日志同时输出到控制台和文件
applogger.SetOutput(logOutput)
}
func SendEmail(subject string, body string) error {
// 邮箱认证信息
smtpHost := "smtp.exmail.qq.com"
@ -620,3 +858,143 @@ func SendEmail(subject string, body string) error {
}
return nil
}
func (f FileSorter) Len() int {
return len(f)
}
func (f FileSorter) Swap(i, j int) {
f[i], f[j] = f[j], f[i]
}
func (f FileSorter) Less(i, j int) bool {
// 首先检查文件扩展名是否为“.txt”如果是则将其移到文件列表的开头
if strings.HasSuffix(f[i].Name(), ".txt") && !strings.HasSuffix(f[j].Name(), ".txt") {
return true
}
// 如果文件扩展名都是“.txt”或都不是“.txt”则按字母顺序排序
return f[i].Name() < f[j].Name()
}
var ( //初始化变量
env string
applogger *logrus.Logger
redisClient *redis.Client
executableDir string
redisAddress string
redisPassword string
redisDB int
sftpAddress string
sftpUser string
sftpPassword string
sftpDir string
dbAddress string
dbUser string
dbPassword string
dbName string
zipPath string
txtPath string
logPath string
batchSize int //提交数据
insertSize int //一次性入库
insertChanSize int //通道缓冲数
goSize int //协程数
taskTime int
to []string
token string
)
type Batch struct {
ID uint `gorm:"primary_key"`
CommunicationChannelID uint
CommunicationName string `gorm:"type:varchar(255)"`
TargetsMember uint `gorm:"type:int"`
TemplateID uint
Content string `gorm:"type:text"`
CreatedAt time.Time `gorm:"default:getdate()"`
UpdatedAt time.Time `gorm:"default:getdate()"`
Status uint `gorm:"type:int"`
DataFileName string `gorm:"type:text"`
Sid int `gorm:"type:int"`
}
type BatcheData struct {
ID uint `gorm:"primary_key"`
CommunicationChannelID string `gorm:"column:communication_channel_id"`
Mobile string `gorm:"column:mobile"`
FullName string `gorm:"column:full_name"`
ReservedField string `gorm:"column:reserved_field"`
}
func (BatcheData) TableName() string {
return "batche_datas"
}
type BatchProcessingInformation struct {
ID uint `gorm:"primaryKey;autoIncrement"`
CommunicationChannelID string `gorm:"column:communication_channel_id"`
RepeatTargetsMember int `gorm:"column:repeat_targets_member"`
InsertsTargetsMember int `gorm:"column:inserts_targets_member"`
DataFileName string `gorm:"column:data_file_name"`
}
func (BatchProcessingInformation) TableName() string {
return "batches_processing_informations"
}
type BatchDataDuplicateLog struct {
ID int `gorm:"primaryKey;autoIncrement"`
CommunicationChannelID string `gorm:"column:communication_channel_id"`
Mobile string `gorm:"column:mobile"`
FullName string `gorm:"column:full_name"`
ReservedField string `gorm:"column:reserved_field"`
}
func (BatchDataDuplicateLog) TableName() string {
return "batche_data_duplicate_logs"
}
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}
// 客户端连接信息
type client struct {
conn *websocket.Conn
}
// 在线客户端列表
var clients = make(map[*client]bool)
// 定义一个FileSorter结构体
type FileSorter []os.FileInfo
type BatchParams struct {
BatchName string `json:"batchName"`
BatchDesc string `json:"batchDesc"`
IsPersonal int `json:"isPersonal"`
Message string `json:"message"`
IsInternational int `json:"isInternational"`
IsSchedule int `json:"isSchedule"`
ScheduleTime string `json:"scheduleTime"`
Token string `json:"token"`
}
type SmsList struct {
M string `json:"m"`
C string `json:"c"`
F int `json:"f"`
}
type Sms struct {
Sid int `json:"sid"`
Data []SmsList `json:"data"`
Token string `json:"token"`
}
type SmsFinish struct {
Sid int `json:"sid"`
Token string `json:"token"`
}