2023-02-18 16:32:19 +08:00
package main
import (
"archive/zip"
"bufio"
2023-03-05 13:18:40 +08:00
"bytes"
2023-04-07 15:11:29 +08:00
"context"
2023-03-07 10:18:46 +08:00
"crypto/hmac"
"crypto/sha256"
2023-02-18 19:31:39 +08:00
"crypto/tls"
2023-02-18 16:32:19 +08:00
"encoding/csv"
2023-03-07 10:18:46 +08:00
"encoding/hex"
2023-02-18 16:32:19 +08:00
"encoding/json"
2023-02-27 11:58:39 +08:00
"flag"
2023-02-18 16:32:19 +08:00
"fmt"
"io"
"log"
2023-03-05 13:18:40 +08:00
"net/http"
2023-02-18 16:32:19 +08:00
"os"
"path"
"path/filepath"
2023-03-05 13:18:40 +08:00
"regexp"
"sort"
2023-02-18 16:32:19 +08:00
"strconv"
"strings"
"sync"
"syscall"
"time"
"github.com/go-redis/redis"
2023-03-05 13:18:40 +08:00
"github.com/gorilla/websocket"
2023-02-18 16:32:19 +08:00
"github.com/pkg/sftp"
"github.com/sirupsen/logrus"
"golang.org/x/crypto/ssh"
2023-04-07 15:11:29 +08:00
"golang.org/x/time/rate"
2023-02-18 19:31:39 +08:00
"gopkg.in/gomail.v2"
2023-02-21 14:36:25 +08:00
"gopkg.in/natefinch/lumberjack.v2"
2023-04-04 12:07:58 +08:00
"gorm.io/driver/mysql"
2023-02-18 16:32:19 +08:00
"gorm.io/gorm"
"gorm.io/gorm/logger"
)
2023-03-05 13:18:40 +08:00
func init ( ) {
monopolize ( ) //独占锁
iniConfi ( ) //初始化环境变量
iniPath ( ) //初始化路径
applogger = logrus . New ( ) //创建日志
iniLog ( ) //初始化日志配置
redisClient = redis . NewClient ( & redis . Options {
Addr : redisAddress ,
Password : redisPassword ,
DB : redisDB ,
} )
2023-03-05 17:51:04 +08:00
go startWebSocket ( ) // 启动 WebSocket 服务,使用协程方式运行
2023-03-05 13:18:40 +08:00
applogger . Info ( fmt . Sprintf ( "程序启动,加载%s环境, 尝试执行..." , env ) )
go downloadDecompression ( ) // 启动立即执行一次数据下载、处理
2023-03-14 11:31:30 +08:00
go queryBatchState ( )
2023-04-04 18:21:54 +08:00
//go delData()
2023-02-18 16:32:19 +08:00
}
2023-04-11 10:47:53 +08:00
// 主函数
2023-03-05 13:18:40 +08:00
func main ( ) {
2023-03-15 16:59:09 +08:00
ticker := time . NewTicker ( time . Duration ( taskTime ) * time . Minute ) // 数据下载、处理
ticker_merge := time . NewTicker ( time . Duration ( batchStatusTaskTime ) * time . Minute ) //查询批次
2023-04-04 16:02:28 +08:00
notification := time . NewTicker ( time . Until ( nextNotificationTime ( ) ) ) // 每天凌晨3点输出提示
2023-03-05 13:18:40 +08:00
defer ticker . Stop ( )
defer ticker_merge . Stop ( )
2023-04-04 16:02:28 +08:00
defer notification . Stop ( )
2023-03-05 13:18:40 +08:00
for {
select {
case <- ticker . C :
iniLog ( )
applogger . Info ( "尝试执行数据处理..." )
go downloadDecompression ( )
2023-02-18 16:32:19 +08:00
2023-03-05 13:18:40 +08:00
case <- ticker_merge . C :
iniLog ( )
2023-03-14 11:31:30 +08:00
fmt . Print ( "查询批次状态...\n" )
queryBatchState ( )
2023-04-04 16:02:28 +08:00
case <- notification . C :
iniLog ( )
2023-04-10 20:00:26 +08:00
go delData ( )
2023-04-04 16:02:28 +08:00
notification . Reset ( time . Until ( nextNotificationTime ( ) ) )
2023-03-05 13:18:40 +08:00
}
}
2023-02-18 16:32:19 +08:00
}
2023-04-04 18:21:54 +08:00
// 删除历史数据和历史重复数据
func delData ( ) {
2023-04-10 20:00:26 +08:00
applogger . Info ( "开始清除历史数据" )
2023-04-04 18:21:54 +08:00
// 删除15天前的批次数据
err := deleteOldData ( & BatcheData { } , dataExpirationDays )
handleError ( err , "删除15天前的批次数据失败" )
// 删除15天前的批次数据重复日志
err = deleteOldData ( & BatchDataDuplicateLog { } , dataExpirationDays )
handleError ( err , "删除15天前的批次数据重复日志失败" )
2023-04-10 20:00:26 +08:00
applogger . Info ( "清除历史数据完成" )
2023-04-04 18:21:54 +08:00
}
2023-04-04 16:02:28 +08:00
// 删除15天前的数据
2023-04-04 18:21:54 +08:00
func deleteOldData ( model interface { } , daysAgo int ) error {
start := time . Now ( )
totalRowsAffected := int64 ( 0 )
2023-04-04 16:02:28 +08:00
db , err := connectToDB ( )
if err != nil {
2023-04-10 20:00:26 +08:00
applogger . Error ( "连接数据库失败: " , err )
2023-04-04 18:21:54 +08:00
return fmt . Errorf ( "连接数据库失败: %w" , err )
2023-04-04 16:02:28 +08:00
}
threshold := time . Now ( ) . AddDate ( 0 , 0 , - daysAgo )
2023-04-04 18:21:54 +08:00
2023-08-07 09:51:31 +08:00
tx := db . Begin ( )
applogger . Info ( "开始事务" )
defer func ( ) {
if r := recover ( ) ; r != nil {
applogger . Error ( "异常,回滚事务: " , r )
tx . Rollback ( )
2023-04-04 18:21:54 +08:00
}
2023-08-07 09:51:31 +08:00
} ( )
if err := tx . Error ; err != nil {
return err
}
2023-04-04 18:21:54 +08:00
2023-08-07 09:51:31 +08:00
for {
2023-04-04 18:21:54 +08:00
var ids [ ] int
result := tx . Model ( model ) . Where ( "created_at < ?" , threshold ) . Limit ( delDataSize ) . Pluck ( "id" , & ids )
if result . Error != nil {
tx . Rollback ( )
return result . Error
}
if len ( ids ) == 0 {
2023-04-10 20:00:26 +08:00
applogger . Info ( "没有找到符合条件的数据,退出循环" )
2023-04-04 18:21:54 +08:00
break
}
result = tx . Where ( "id IN (?)" , ids ) . Delete ( model )
if result . Error != nil {
2023-04-10 20:00:26 +08:00
applogger . Error ( "删除操作失败,回滚事务: " , result . Error )
2023-04-04 18:21:54 +08:00
tx . Rollback ( )
return result . Error
}
totalRowsAffected += result . RowsAffected
2023-08-07 09:51:31 +08:00
applogger . Info ( fmt . Sprintf ( "删除 %d 条数据" , result . RowsAffected ) )
2023-04-04 16:02:28 +08:00
}
2023-04-04 18:21:54 +08:00
2023-08-07 09:51:31 +08:00
if err := tx . Commit ( ) . Error ; err != nil {
applogger . Error ( "提交事务失败: " , err )
return fmt . Errorf ( "提交事务失败: %w" , err )
}
elapsed := time . Since ( start )
applogger . Info ( fmt . Sprintf ( "删除 %d 条数据,共耗时 %s" , totalRowsAffected , elapsed ) )
2023-04-04 18:21:54 +08:00
return nil
2023-04-04 16:02:28 +08:00
}
// 计算下一次执行时间
func nextNotificationTime ( ) time . Time {
now := time . Now ( )
notificationTime := time . Date ( now . Year ( ) , now . Month ( ) , now . Day ( ) , 3 , 0 , 0 , 0 , now . Location ( ) )
if now . After ( notificationTime ) {
notificationTime = notificationTime . Add ( 24 * time . Hour )
}
return notificationTime
}
2023-03-05 13:18:40 +08:00
func startWebSocket ( ) {
http . HandleFunc ( "/ws" , handleWebSocket )
err := http . ListenAndServe ( ":8080" , nil )
2023-02-18 16:32:19 +08:00
if err != nil {
2023-03-05 13:18:40 +08:00
fmt . Println ( "启动 WebSocket 服务失败:" , err )
} else {
fmt . Println ( "启动 WebSocket 服务成功:" )
2023-02-27 11:58:39 +08:00
}
2023-03-05 13:18:40 +08:00
}
2023-02-27 11:58:39 +08:00
2023-04-04 16:02:28 +08:00
// 客户端连接信息
2023-03-05 13:18:40 +08:00
func handleWebSocket ( w http . ResponseWriter , r * http . Request ) {
conn , err := upgrader . Upgrade ( w , r , nil )
2023-02-27 15:36:18 +08:00
if err != nil {
2023-03-05 13:18:40 +08:00
fmt . Println ( "升级连接失败:" , err )
return
2023-02-27 15:36:18 +08:00
}
2023-03-05 13:18:40 +08:00
// 打印客户端地址
fmt . Printf ( "客户端 %s 连接成功\n" , conn . RemoteAddr ( ) . String ( ) )
2023-02-27 15:36:18 +08:00
2023-03-05 13:18:40 +08:00
// 添加客户端连接信息
c := & client { conn }
clients [ c ] = true
// 接收消息
go func ( ) {
for {
_ , message , err := conn . ReadMessage ( )
if err != nil {
fmt . Println ( "接收消息失败:" , err )
return
}
fmt . Println ( "收到消息:" , string ( message ) )
2023-03-06 19:38:28 +08:00
var returnMessage [ ] byte
2023-03-07 10:18:46 +08:00
task := Task { }
2023-03-06 19:38:28 +08:00
err = json . Unmarshal ( [ ] byte ( message ) , & task )
if err != nil {
returnMessage = [ ] byte ( ` { "code": 2001,"err": "json指令解析失败"} ` )
conn . WriteMessage ( websocket . TextMessage , returnMessage ) //发送消息给客户端
} else {
2023-03-07 10:18:46 +08:00
if ! verify_signature ( task . Signature . Signature , task . Signature . Nonce , task . Signature . Timestamp , task . TaskData ) { // 签名验证失败或超时
returnMessage = [ ] byte ( ` { "code": 2401,"err": "Unauthorized"} ` )
conn . WriteMessage ( websocket . TextMessage , returnMessage )
conn . Close ( )
break
}
switch task . TaskData . Command {
2023-03-06 19:38:28 +08:00
case "lastCall" :
2023-03-07 19:51:06 +08:00
//判断lastCall是否有执行中
if exists , _ := redisClient . Exists ( "iniLastCallDataStatus" ) . Result ( ) ; exists == 1 {
returnMessage = [ ] byte ( ` { "code": 2007,"err": "有lastCall执行中, 请稍后重试"} ` )
conn . WriteMessage ( websocket . TextMessage , returnMessage ) //发送消息给客户端
break
}
//判断ExcludedFilename是否执行过
lastCallTask := fmt . Sprintf ( "lastCallTask:%s" , task . TaskData . ExcludedFilename )
if exists , _ := redisClient . Exists ( lastCallTask ) . Result ( ) ; exists == 1 {
returnMessage = [ ] byte ( ` { "code": 2008,"err": "ExcludedFilename重复跳过执行"} ` )
conn . WriteMessage ( websocket . TextMessage , returnMessage ) //发送消息给客户端
break
}
//判断ExcludedFilename是否存在
2023-03-07 10:18:46 +08:00
if ! fileExists ( path . Join ( executableDir , lastCallPath , task . TaskData . ExcludedFilename ) ) {
2023-03-06 19:38:28 +08:00
returnMessage = [ ] byte ( ` { "code": 2003,"err": "task.ExcludedFilename 不存在"} ` )
conn . WriteMessage ( websocket . TextMessage , returnMessage ) //发送消息给客户端
break
}
2023-03-07 19:51:06 +08:00
//判断BatchFilename是否存在
2023-03-07 10:18:46 +08:00
if ! fileExists ( path . Join ( executableDir , txtPath , task . TaskData . BatchFilename ) ) {
2023-03-06 19:38:28 +08:00
returnMessage = [ ] byte ( ` { "code": 2004,"err": "task.BatchFilename 不存在"} ` )
conn . WriteMessage ( websocket . TextMessage , returnMessage ) //发送消息给客户端
break
}
2023-03-07 19:51:06 +08:00
//判断DataFilename是否存在
2023-03-07 10:18:46 +08:00
if ! fileExists ( path . Join ( executableDir , txtPath , task . TaskData . DataFilename ) ) {
2023-03-06 19:38:28 +08:00
returnMessage = [ ] byte ( ` { "code": 2005,"err": "task.DataFilename 不存在"} ` )
conn . WriteMessage ( websocket . TextMessage , returnMessage ) //发送消息给客户端
break
}
returnMessage = [ ] byte ( ` { "code": 2000,"err": "开始处理lastCall"} ` )
conn . WriteMessage ( websocket . TextMessage , returnMessage ) //发送消息给客户端
2023-03-07 19:51:06 +08:00
//读取排重文件
2023-03-07 10:18:46 +08:00
lastCallKeys , err := readExcludedFile ( path . Join ( executableDir , lastCallPath , task . TaskData . ExcludedFilename ) )
2023-03-06 19:38:28 +08:00
if err != nil {
returnMessage = [ ] byte ( ` { "code": 2006,"err": "打开ExcludedFilename失败"} ` )
conn . WriteMessage ( websocket . TextMessage , returnMessage ) //发送消息给客户端
break
} else {
returnMessage = [ ] byte ( ` { "code": 2000,"err": "ExcludedFilename读取完成"} ` )
conn . WriteMessage ( websocket . TextMessage , returnMessage ) //发送消息给客户端
}
2023-03-07 19:51:06 +08:00
//无错误,执行逻辑代码,
subject := "丝芙兰短信处理程序异常"
err = redisClient . Set ( "iniLastCallDataStatus" , 1 , 0 ) . Err ( )
if err != nil {
body := fmt . Sprintf ( "写入lastCall任务执行中标记失败: %v" , err )
applogger . Error ( body )
returnMessage = [ ] byte ( fmt . Sprintf ( ` { "code": 2008,"err": "写入lastCall任务执行中标记失败%v"} ` , err ) )
2023-03-06 19:38:28 +08:00
conn . WriteMessage ( websocket . TextMessage , returnMessage ) //发送消息给客户端
2023-03-07 19:51:06 +08:00
SendEmail ( subject , body ) //发送邮件
2023-03-06 19:38:28 +08:00
} else {
2023-03-07 19:51:06 +08:00
batchInsert ( task . TaskData . BatchFilename , true , task . TaskData . ExcludedFilename ) //创建批次
returnMessage = [ ] byte ( ` { "code": 2000,"err": "批次创建完成"} ` )
conn . WriteMessage ( websocket . TextMessage , returnMessage ) //发送消息给客户端
batchDataInsert ( task . TaskData . DataFilename , lastCallKeys , task . TaskData . ExcludedFilename ) //添加数据
redisClient . Del ( "iniLastCallDataStatus" ) //删除任务执行中标记
returnMessage = [ ] byte ( ` { "code": 2000,"err": "结束处理lastCall"} ` )
conn . WriteMessage ( websocket . TextMessage , returnMessage ) //发送消息给客户端
redisClient . Set ( lastCallTask , 1 , 0 ) . Err ( ) //记录ExcludedFilename执行完成
2023-03-06 19:38:28 +08:00
}
default :
returnMessage = [ ] byte ( ` { "code": 2002,"err": "task.Command 不存在"} ` )
conn . WriteMessage ( websocket . TextMessage , returnMessage ) //发送消息给客户端
2023-03-05 13:18:40 +08:00
}
}
}
} ( )
// 设置连接关闭时的回调函数
conn . SetCloseHandler ( func ( code int , text string ) error {
fmt . Println ( "连接已关闭, code:" , code , "text:" , text )
fmt . Printf ( "客户端 %s 关闭连接\n" , conn . RemoteAddr ( ) . String ( ) )
delete ( clients , c )
return nil
2023-02-18 16:32:19 +08:00
} )
2023-02-28 18:48:53 +08:00
2023-02-18 16:32:19 +08:00
}
2023-04-04 16:02:28 +08:00
// 验证签名
2023-03-07 16:50:12 +08:00
func verify_signature ( signature string , nonce string , timestamp int64 , data interface { } ) bool {
2023-03-07 10:18:46 +08:00
fmt . Printf ( "Received signature: %s\n" , signature )
fmt . Printf ( "Received timestamp: %d\n" , timestamp )
2023-03-07 16:50:12 +08:00
fmt . Printf ( "Received nonce: %s\n" , nonce )
2023-03-07 10:18:46 +08:00
fmt . Printf ( "Received data: %v\n" , data )
received_signature := signature
received_timestamp , _ := strconv . ParseInt ( fmt . Sprintf ( "%v" , timestamp ) , 10 , 64 )
2023-03-07 16:50:12 +08:00
received_nonce := nonce //strconv.Atoi(fmt.Sprintf("%v", nonce))
2023-03-07 10:18:46 +08:00
if time . Now ( ) . Unix ( ) - received_timestamp > 7200 {
fmt . Println ( "Timestamp expired" )
return false
}
received_data_bytes , _ := json . Marshal ( data )
received_data := string ( received_data_bytes )
2023-03-07 16:50:12 +08:00
expected_data := fmt . Sprintf ( "%d|%s|%s" , received_timestamp , received_nonce , received_data )
2023-03-07 10:18:46 +08:00
fmt . Printf ( "Expected data: %s\n" , expected_data )
2023-03-07 18:23:07 +08:00
mac := hmac . New ( sha256 . New , [ ] byte ( verifySignatureKey ) )
2023-03-07 10:18:46 +08:00
mac . Write ( [ ] byte ( expected_data ) )
expected_signature := hex . EncodeToString ( mac . Sum ( nil ) )
fmt . Printf ( "Expected signature: %s\n" , expected_signature )
if received_signature != expected_signature {
fmt . Println ( "Signature does not match" )
return false
}
return true
}
2023-03-06 19:38:28 +08:00
func readExcludedFile ( filename string ) ( map [ string ] bool , error ) {
file , err := os . Open ( filename )
if err != nil {
return nil , err
}
defer file . Close ( )
scanner := bufio . NewScanner ( file )
scanner . Split ( bufio . ScanLines )
scanner . Scan ( ) // skip first line
lastCallKeys := make ( map [ string ] bool )
for scanner . Scan ( ) {
line := scanner . Text ( )
fields := strings . Split ( line , "," )
lastCallKey := fmt . Sprintf ( "%s-%s" , fields [ 0 ] , fields [ 1 ] )
lastCallKeys [ lastCallKey ] = true
}
return lastCallKeys , nil
}
/ *
2023-03-05 13:18:40 +08:00
// 聊天室消息广播
func broadcast ( message [ ] byte ) {
for c := range clients {
err := c . conn . WriteMessage ( websocket . TextMessage , message )
if err != nil {
fmt . Println ( "发送消息失败:" , err )
delete ( clients , c )
2023-02-18 16:32:19 +08:00
}
}
2023-03-06 19:38:28 +08:00
} * /
2023-02-18 16:32:19 +08:00
func downloadDecompression ( ) {
if exists , _ := redisClient . Exists ( "iniDataStatus" ) . Result ( ) ; exists == 1 {
2023-02-21 14:36:25 +08:00
fmt . Print ( "上一批次执行中,跳过本次\n" )
2023-02-18 16:32:19 +08:00
} else {
2023-03-05 17:51:04 +08:00
subject := "丝芙兰短信处理程序异常"
2023-02-18 16:32:19 +08:00
// 写入执行中标记
err := redisClient . Set ( "iniDataStatus" , 1 , 0 ) . Err ( )
if err != nil {
2023-03-05 17:51:04 +08:00
body := fmt . Sprintf ( "写入任务执行中标记失败:%v" , err )
applogger . Error ( body )
SendEmail ( subject , body ) //发送邮件
2023-02-18 16:32:19 +08:00
}
// Connect to SFTP server
sshConfig := & ssh . ClientConfig {
User : sftpUser ,
Auth : [ ] ssh . AuthMethod {
ssh . Password ( sftpPassword ) ,
} ,
HostKeyCallback : ssh . InsecureIgnoreHostKey ( ) ,
}
sshClient , err := ssh . Dial ( "tcp" , sftpAddress , sshConfig )
if err != nil {
2023-03-05 17:51:04 +08:00
body := fmt . Sprintf ( "sshClient连接失败: %v" , err )
applogger . Error ( body )
SendEmail ( subject , body ) //发送邮件
2023-02-18 16:32:19 +08:00
}
sftpClient , err := sftp . NewClient ( sshClient )
if err != nil {
2023-03-05 17:51:04 +08:00
body := fmt . Sprintf ( "sftp连接失败: %v" , err )
applogger . Error ( body )
SendEmail ( subject , body ) //发送邮件
2023-02-18 16:32:19 +08:00
}
2023-04-10 20:00:26 +08:00
defer func ( ) {
sftpClient . Close ( )
sshClient . Close ( )
} ( )
2023-02-20 18:44:32 +08:00
files , err := sftpClient . ReadDir ( sftpDir )
2023-02-18 16:32:19 +08:00
if err != nil {
2023-03-05 17:51:04 +08:00
body := fmt . Sprintf ( "sftp目录%s不存在, 错误信息:%v" , sftpDir , err )
applogger . Error ( body )
SendEmail ( subject , body ) //发送邮件
2023-02-18 16:32:19 +08:00
}
2023-02-20 18:44:32 +08:00
it := 1
fmt . Printf ( "共%d个文件\n" , len ( files ) )
2023-03-05 13:18:40 +08:00
sort . Sort ( FileSorter ( files ) )
2023-04-10 20:00:26 +08:00
// 限制下载速度为 15MB/s
2023-04-07 15:11:29 +08:00
limiter := rate . NewLimiter ( rate . Limit ( rateLimiter * 1024 * 1024 ) , int ( rateLimiter * 1024 * 1024 ) )
2023-02-18 16:32:19 +08:00
for _ , file := range files {
2023-03-06 11:49:44 +08:00
processingStatus := - 1
2023-02-21 14:36:25 +08:00
fmt . Printf ( "第%d个文件处理中\n" , it )
2023-02-20 18:44:32 +08:00
it ++
2023-02-18 16:32:19 +08:00
// Check if file has been downloaded before
fileKey := fmt . Sprintf ( "downloaded:%s" , file . Name ( ) )
if exists , _ := redisClient . Exists ( fileKey ) . Result ( ) ; exists == 1 {
2023-02-21 14:36:25 +08:00
fmt . Println ( "跳过已处理过的文件:" + file . Name ( ) )
2023-02-18 16:32:19 +08:00
continue
}
if filepath . Ext ( file . Name ( ) ) == ".zip" {
//fmt.Println("下载开始(数据包):" + file.Name())
// Download file
2023-02-20 18:44:32 +08:00
srcFile , err := sftpClient . Open ( path . Join ( sftpDir , file . Name ( ) ) )
2023-02-18 16:32:19 +08:00
if err != nil {
2023-03-05 17:51:04 +08:00
body := fmt . Sprintf ( "打开sftp文件失败, 文件名:%s, 错误信息: %v" , file . Name ( ) , err )
applogger . Error ( body )
SendEmail ( subject , body ) //发送邮件
2023-02-18 16:32:19 +08:00
continue
}
defer srcFile . Close ( )
dstFile , err := os . Create ( path . Join ( executableDir , zipPath , file . Name ( ) ) )
if err != nil {
2023-03-05 17:51:04 +08:00
body := fmt . Sprintf ( "创建本地文件失败,文件名: %s, 错误信息: %v" , file . Name ( ) , err )
applogger . Error ( body )
SendEmail ( subject , body ) //发送邮件
2023-02-18 16:32:19 +08:00
continue
}
defer dstFile . Close ( )
2023-04-07 15:11:29 +08:00
buf := make ( [ ] byte , 4096 )
for {
n , err := srcFile . Read ( buf )
if err != nil && err != io . EOF {
body := fmt . Sprintf ( "下载文件失败,文件名: %s, 错误信息: %v" , file . Name ( ) , err )
applogger . Error ( body )
SendEmail ( subject , body ) //发送邮件
break
}
if n == 0 {
break
}
// 等待令牌桶中有足够的令牌可用
limiter . WaitN ( context . Background ( ) , n )
if _ , err := dstFile . Write ( buf [ : n ] ) ; err != nil {
body := fmt . Sprintf ( "写入文件失败,文件名: %s, 错误信息: %v" , file . Name ( ) , err )
applogger . Error ( body )
SendEmail ( subject , body ) //发送邮件
break
}
2023-02-18 16:32:19 +08:00
}
2023-02-28 18:48:53 +08:00
applogger . Info ( fmt . Sprintf ( "%s( 数据包) 下载完成" , file . Name ( ) ) )
2023-02-18 16:32:19 +08:00
// Unzip file
zipReader , err := zip . OpenReader ( path . Join ( executableDir , zipPath , file . Name ( ) ) )
if err != nil {
2023-03-05 17:51:04 +08:00
body := fmt . Sprintf ( "解压文件失败: 文件名:%s, 错误信息: %v" , file . Name ( ) , err )
applogger . Error ( body )
SendEmail ( subject , body ) //发送邮件
2023-02-18 16:32:19 +08:00
continue
}
defer zipReader . Close ( )
for _ , zipFile := range zipReader . File {
zipFileReader , err := zipFile . Open ( )
if strings . Contains ( zipFile . Name , "__MACOSX/._" ) {
continue
} else if filepath . Ext ( zipFile . Name ) != ".txt" {
2023-02-27 19:09:43 +08:00
applogger . Error ( fmt . Sprintf ( "文件类型不正确,跳过处理: %v" , zipFile . Name ) )
2023-02-18 16:32:19 +08:00
continue
}
if err != nil || zipFileReader == nil {
2023-02-27 19:09:43 +08:00
applogger . Error ( fmt . Sprintf ( "Failed to open zip file: %v" , err ) )
2023-02-18 16:32:19 +08:00
fmt . Print ( "压缩文件处理结束" )
continue
}
defer zipFileReader . Close ( )
// Create the file
unzipFile , err := os . Create ( path . Join ( executableDir , txtPath , zipFile . Name ) )
if err != nil {
2023-03-05 17:51:04 +08:00
body := fmt . Sprintf ( "创建压缩后的文件失败,文件名:%s, 错误信息: %v" , zipFile . Name , err )
applogger . Error ( body )
SendEmail ( subject , body ) //发送邮件
2023-02-18 16:32:19 +08:00
continue
}
defer unzipFile . Close ( )
// Write the unzip data to the file
_ , err = io . Copy ( unzipFile , zipFileReader )
if err != nil {
2023-03-05 17:51:04 +08:00
body := fmt . Sprintf ( "文件解压失败,文件名:%s, 错误信息: %v" , zipFileReader , err )
applogger . Error ( body )
SendEmail ( subject , body ) //发送邮件
2023-02-18 16:32:19 +08:00
continue
}
2023-02-21 14:36:25 +08:00
applogger . Info ( fmt . Sprintf ( "%s( 数据包) 解压完成" , zipFile . Name ) )
2023-03-07 18:23:07 +08:00
processingStatus = batchDataInsert ( zipFile . Name , nil , "" )
2023-02-18 16:32:19 +08:00
}
} else if filepath . Ext ( file . Name ( ) ) == ".txt" {
2023-02-20 18:44:32 +08:00
srcFile , err := sftpClient . Open ( path . Join ( sftpDir , file . Name ( ) ) )
2023-02-18 16:32:19 +08:00
if err != nil {
2023-03-05 17:51:04 +08:00
body := fmt . Sprintf ( "打开sftp文件失败, 文件名:%s, 错误信息: %v" , file . Name ( ) , err )
applogger . Error ( body )
SendEmail ( subject , body ) //发送邮件
2023-02-18 16:32:19 +08:00
continue
}
defer srcFile . Close ( )
dstFile , err := os . Create ( path . Join ( executableDir , txtPath , file . Name ( ) ) )
if err != nil {
2023-03-05 17:51:04 +08:00
body := fmt . Sprintf ( "创建本地文件失败,文件名: %s, 错误信息: %v" , file . Name ( ) , err )
applogger . Error ( body )
SendEmail ( subject , body ) //发送邮件
2023-02-18 16:32:19 +08:00
continue
}
defer dstFile . Close ( )
2023-04-07 15:11:29 +08:00
buf := make ( [ ] byte , 4096 )
for {
n , err := srcFile . Read ( buf )
if err != nil && err != io . EOF {
body := fmt . Sprintf ( "下载文件失败,文件名: %s, 错误信息: %v" , file . Name ( ) , err )
applogger . Error ( body )
SendEmail ( subject , body ) //发送邮件
break
}
if n == 0 {
break
}
// 等待令牌桶中有足够的令牌可用
limiter . WaitN ( context . Background ( ) , n )
if _ , err := dstFile . Write ( buf [ : n ] ) ; err != nil {
body := fmt . Sprintf ( "写入文件失败,文件名: %s, 错误信息: %v" , file . Name ( ) , err )
applogger . Error ( body )
SendEmail ( subject , body ) //发送邮件
break
}
2023-02-18 16:32:19 +08:00
}
2023-02-21 14:36:25 +08:00
applogger . Info ( fmt . Sprintf ( "%s( 批次文件) 下载完成" , file . Name ( ) ) )
2023-03-07 18:23:07 +08:00
processingStatus = batchInsert ( file . Name ( ) , false , "" )
2023-02-18 16:32:19 +08:00
}
2023-03-06 11:49:44 +08:00
if processingStatus != - 1 {
2023-03-05 17:51:04 +08:00
err = redisClient . Set ( fileKey , 1 , 0 ) . Err ( ) //写入文件处理完成
if err != nil {
body := fmt . Sprintf ( "写入文件处理完成标记失败文件名:%s, 错误信息: %v\n" , file . Name ( ) , err )
applogger . Error ( body )
SendEmail ( subject , body ) //发送邮件
}
2023-02-18 16:32:19 +08:00
}
2023-03-05 17:51:04 +08:00
2023-02-18 16:32:19 +08:00
}
redisClient . Del ( "iniDataStatus" ) //删除任务执行中标记
}
}
2023-04-10 20:00:26 +08:00
// 批次信息入库
2023-03-07 18:23:07 +08:00
func batchInsert ( fileName string , isLastCall bool , excludedFilename string ) int {
2023-02-18 16:32:19 +08:00
start := time . Now ( )
2023-03-22 15:10:42 +08:00
db , err := connectToDB ( )
handleError ( err , "连接数据库失败" )
2023-02-18 16:32:19 +08:00
file , err := os . Open ( path . Join ( executableDir , txtPath , fileName ) )
2023-03-22 15:10:42 +08:00
handleError ( err , fmt . Sprintf ( "打开文件失败: %s" , fileName ) )
defer file . Close ( )
2023-03-05 13:18:40 +08:00
2023-03-22 15:10:42 +08:00
reader := csv . NewReader ( bufio . NewReader ( file ) )
reader . Read ( )
batchRows := 0
for {
record , err := reader . Read ( )
if err != nil {
break
}
2023-02-21 14:36:25 +08:00
2023-03-22 15:10:42 +08:00
TargetsMember , _ := strconv . ParseUint ( strings . TrimSpace ( record [ 2 ] ) , 10 , 32 )
templateID , _ := strconv . ParseUint ( strings . TrimSpace ( record [ 3 ] ) , 10 , 32 )
status := 1
2023-03-06 19:38:28 +08:00
2023-03-22 15:10:42 +08:00
s := time . Now ( ) . Format ( "2006-01-02 15:04:05" )
var batchName , dataFileName string
2023-03-06 19:38:28 +08:00
2023-03-22 15:10:42 +08:00
if isLastCall {
batchName = fmt . Sprintf ( "lastCall-%s-%s-%s" , record [ 1 ] , record [ 0 ] , excludedFilename )
dataFileName = fmt . Sprintf ( "lastCall-%s" , excludedFilename )
} else {
batchName = fmt . Sprintf ( "%s-%s" , record [ 1 ] , record [ 0 ] )
dataFileName = fileName [ : len ( fileName ) - 10 ]
}
2023-03-05 13:18:40 +08:00
2023-03-22 15:10:42 +08:00
batchParams := BatchParams {
BatchName : batchName ,
BatchDesc : record [ 4 ] ,
IsPersonal : 0 ,
Message : record [ 4 ] ,
IsInternational : 0 ,
IsSchedule : 0 ,
ScheduleTime : s ,
Token : token ,
}
2023-03-05 13:18:40 +08:00
2023-03-22 15:10:42 +08:00
sid , err := CreateBatch ( batchParams )
handleError ( err , "创建批次失败" )
batch := Batch {
CommunicationChannelID : record [ 0 ] ,
CommunicationName : batchName ,
TargetsMember : uint ( TargetsMember ) ,
TemplateID : uint ( templateID ) ,
Content : record [ 4 ] ,
Status : status ,
DataFileName : dataFileName ,
Sid : sid ,
2023-02-18 16:32:19 +08:00
}
2023-03-22 15:10:42 +08:00
db . Create ( & batch )
batchRows ++
}
time . Sleep ( time . Second )
elapsed := time . Since ( start )
2023-03-05 17:51:04 +08:00
2023-03-22 15:10:42 +08:00
subject := "丝芙兰批次文件处理完成"
body := fmt . Sprintf ( "批次数:%d;\n批次文件: %s;\n处理完成, 请前往管理平台查看处理。" , batchRows , fileName )
SendEmail ( subject , body )
applogger . Info ( fmt . Sprintf ( "%s( 批次文件) 入库完成" , fileName ) )
applogger . Info ( fmt . Sprintf ( "%s( 批次文件) 执行时间%s 插入批次次数:%d" , fileName , elapsed , batchRows ) )
2023-04-10 20:00:26 +08:00
closeDb ( db )
2023-03-22 15:10:42 +08:00
return 0
2023-03-05 17:51:04 +08:00
2023-02-18 16:32:19 +08:00
}
2023-03-05 13:18:40 +08:00
func CreateBatch ( batchParams BatchParams ) ( int , error ) {
// 将请求参数编码为JSON字符串
jsonBytes , err := json . Marshal ( batchParams )
if err != nil {
return - 1 , err
}
jsonStr := string ( jsonBytes )
// 发送POST请求
url := "http://www.wemediacn.net/webservice/BatchService?service=sms.createbatch"
resp , err := http . Post ( url , "application/json; charset=utf-8" , bytes . NewBuffer ( [ ] byte ( jsonStr ) ) )
if err != nil {
return - 1 , err
}
defer resp . Body . Close ( )
// 解析响应数据
var retobj map [ string ] interface { }
err = json . NewDecoder ( resp . Body ) . Decode ( & retobj )
if err != nil {
return - 1 , err
}
fmt . Print ( retobj )
code := int ( retobj [ "code" ] . ( float64 ) )
fmt . Printf ( "批次创建API 返回:%d\n" , code )
if code == 0 {
sid := int ( retobj [ "sid" ] . ( float64 ) )
return sid , nil
} else {
return - 1 , fmt . Errorf ( "create batch failed, error code: %d" , code )
}
}
func smsApi ( method string , sendSMSDataJson string ) ( int , error ) {
// 发送POST请求
url := "http://www.wemediacn.net/webservice/BatchService?service=sms." + method
resp , err := http . Post ( url , "application/json; charset=utf-8" , strings . NewReader ( sendSMSDataJson ) )
if err != nil {
return - 1 , err
}
defer resp . Body . Close ( )
// 解析响应数据
var retobj map [ string ] interface { }
err = json . NewDecoder ( resp . Body ) . Decode ( & retobj )
if err != nil {
return - 1 , err
}
jsonStr , err := json . Marshal ( retobj )
if err != nil {
fmt . Println ( err )
return - 1 , err
}
fmt . Printf ( "API 返回:%s\n" , string ( jsonStr ) )
code := int ( retobj [ "code" ] . ( float64 ) )
if code == 0 {
//sid := int(retobj["sid"].(float64))
fmt . Printf ( "提交批次成功code: %d\n" , code )
return code , nil
} else {
2023-03-05 17:51:04 +08:00
applogger . Error ( string ( jsonStr ) )
return - 1 , fmt . Errorf ( string ( jsonStr ) )
2023-03-05 13:18:40 +08:00
}
}
2023-03-07 18:23:07 +08:00
func batchDataInsert ( fileName string , lastCallKeys map [ string ] bool , excludedFilename string ) int {
2023-02-18 16:32:19 +08:00
start := time . Now ( )
2023-02-21 14:36:25 +08:00
// Open file
2023-03-05 17:51:04 +08:00
fileKey := fmt . Sprintf ( "downloaded:%s" , strings . Replace ( fileName , filepath . Ext ( fileName ) , ".zip" , 1 ) )
2023-02-21 14:36:25 +08:00
file , err := os . Open ( path . Join ( executableDir , txtPath , fileName ) )
2023-02-18 16:32:19 +08:00
if err != nil {
2023-03-05 17:51:04 +08:00
redisClient . Del ( fileKey ) //删除文件处理完成的标志位
2023-02-27 19:09:43 +08:00
applogger . Error ( fmt . Sprintf ( "文件打开失败,文件名:%s, 错误信息%v" , fileName , err ) )
2023-03-05 17:51:04 +08:00
return - 1
2023-02-21 14:36:25 +08:00
} else {
defer file . Close ( )
db , _ := connectToDB ( )
// Insert data in batches using multiple goroutines
var wg sync . WaitGroup
dataBatchChan := make ( chan [ ] BatcheData , insertChanSize )
for i := 0 ; i < goSize ; i ++ {
wg . Add ( 1 )
go func ( ) {
for batch := range dataBatchChan {
retryCount := 0
for {
if err := db . CreateInBatches ( batch , insertSize ) . Error ; err != nil {
if retryCount >= 5 {
panic ( err )
}
fmt . Printf ( "Insert failed, retrying in %v seconds...\n" , 2 * retryCount )
time . Sleep ( time . Duration ( 2 * retryCount ) * time . Second )
retryCount ++
} else {
break
2023-02-18 16:32:19 +08:00
}
}
}
2023-02-21 14:36:25 +08:00
wg . Done ( )
} ( )
2023-02-18 16:32:19 +08:00
}
2023-03-05 17:51:04 +08:00
hs := make ( map [ string ] bool ) //排重
dataBatch := make ( [ ] BatcheData , 0 , batchSize ) //短信数据
dataBatchDuplicate := make ( [ ] BatchDataDuplicateLog , 0 , batchSize ) //重复短信数据
bi := 0 //重复总数
count := 0 // 总数
2023-04-04 16:02:28 +08:00
// Data file batches
batches := [ ] Batch { } // Query batches
2023-03-06 19:38:28 +08:00
fileNameDate := ""
dataFileName := ""
2023-03-07 18:23:07 +08:00
fields := strings . Split ( fileName , "_" )
datetime := fields [ len ( fields ) - 1 ]
2023-04-04 16:02:28 +08:00
2023-03-06 19:38:28 +08:00
if lastCallKeys != nil {
2023-03-08 17:02:32 +08:00
fileNameDate = fmt . Sprintf ( "lastCall-%s" , excludedFilename )
2023-03-07 18:23:07 +08:00
fmt . Printf ( "fileNameDate : %s\n" , fileNameDate )
2023-03-08 17:02:32 +08:00
dataFileName = fmt . Sprintf ( "lastCall-%s" , excludedFilename )
2023-03-06 19:38:28 +08:00
} else {
2023-03-07 18:23:07 +08:00
fileNameDate = datetime [ : 8 ]
2023-03-08 17:02:32 +08:00
dataFileName = strings . Replace ( fileName , "targets" , "definition" , - 1 )
2023-03-08 17:48:33 +08:00
dataFileName = dataFileName [ : len ( dataFileName ) - 10 ]
2023-03-06 19:38:28 +08:00
}
2023-03-05 17:51:04 +08:00
2023-04-04 16:02:28 +08:00
db . Table ( "batches AS b1" ) .
Select ( "b1.*" ) .
Where ( "b1.data_file_name LIKE ?" , "%" + fileNameDate + "%" ) .
Joins ( "INNER JOIN (SELECT communication_channel_id, MAX(created_at) AS max_created_at FROM batches GROUP BY communication_channel_id) AS b2 ON b1.communication_channel_id = b2.communication_channel_id AND b1.created_at = b2.max_created_at" ) .
Order ( "b1.created_at DESC" ) .
Find ( & batches )
2023-03-05 17:51:04 +08:00
batchCount := len ( batches )
2023-03-07 18:23:07 +08:00
sendMobiles := make ( map [ string ] SmsData , batchCount ) //短信数据,用于提交发送平台
duplicateCount := make ( map [ string ] int , batchCount ) //按批次重复数
lastCallduplicateCount := make ( map [ string ] int , batchCount ) //按批次重复数
insertsCount := make ( map [ string ] int , batchCount ) //按批次插入数
2023-03-05 17:51:04 +08:00
ccids := make ( map [ string ] bool , batchCount )
if batchCount > 0 { //如果查询到数据需要的批次信息,开始处理数据
// 定义一个名为result的map类型的变量, 并以CommunicationChannelID作为键
result := make ( map [ string ] [ ] Batch )
for _ , batch := range batches {
2023-03-08 17:02:32 +08:00
cckdKey := batch . CommunicationChannelID
2023-03-05 17:51:04 +08:00
result [ cckdKey ] = append ( result [ cckdKey ] , batch )
print ( batch . CommunicationChannelID , "\n" )
2023-02-27 11:58:39 +08:00
}
2023-03-05 13:18:40 +08:00
2023-03-05 17:51:04 +08:00
scanner := bufio . NewScanner ( file )
scanner . Split ( bufio . ScanLines )
scanner . Scan ( ) // skip first line
for scanner . Scan ( ) {
line := scanner . Text ( )
row , err := csv . NewReader ( strings . NewReader ( line ) ) . Read ( )
if err != nil {
applogger . Error ( fmt . Sprintf ( "文件按行读取失败,文件名:%s, 错误信息%v" , fileName , err ) )
continue
}
reservedFields := map [ string ] string { //合并个性化字段
"ReservedField1" : row [ 5 ] ,
"ReservedField2" : row [ 6 ] ,
"ReservedField3" : row [ 7 ] ,
"ReservedField4" : row [ 8 ] ,
"ReservedField5" : row [ 9 ] ,
2023-03-06 11:49:44 +08:00
"FullName" : row [ 4 ] ,
2023-03-05 17:51:04 +08:00
}
reservedFieldsJson , err := json . Marshal ( reservedFields ) //个性化字段转json
if err != nil {
applogger . Error ( fmt . Sprintf ( "个性化字段合并失败,文件名:%s, 错误信息%v" , fileName , err ) )
continue
}
if _ , ok := duplicateCount [ row [ 2 ] ] ; ! ok {
duplicateCount [ row [ 2 ] ] = 0
}
2023-03-07 18:23:07 +08:00
if _ , ok := lastCallduplicateCount [ row [ 2 ] ] ; ! ok {
lastCallduplicateCount [ row [ 2 ] ] = 0
}
2023-03-05 17:51:04 +08:00
// Check if record exists in hashset
key := fmt . Sprintf ( "%s-%s-%s" , row [ 2 ] , row [ 3 ] , row [ 5 ] )
if _ , exists := hs [ key ] ; exists { //如果批次数据重复
bi ++
// Increment duplicate count
duplicateCount [ row [ 2 ] ] ++
dataBatchDuplicate = append ( dataBatchDuplicate , BatchDataDuplicateLog {
CommunicationChannelID : row [ 2 ] ,
Mobile : row [ 3 ] ,
ReservedField : string ( reservedFieldsJson ) ,
} )
if len ( dataBatchDuplicate ) >= batchSize {
err = db . CreateInBatches ( dataBatchDuplicate , insertSize ) . Error
if err != nil {
applogger . Error ( fmt . Sprintf ( "插入重复数据失败,文件名:%s, 错误信息%v" , fileName , err ) )
} else {
dataBatchDuplicate = make ( [ ] BatchDataDuplicateLog , 0 , batchSize )
}
}
continue
}
2023-03-06 19:38:28 +08:00
if lastCallKeys != nil && lastCallKeys [ fmt . Sprintf ( "%s-%s" , row [ 2 ] , row [ 3 ] ) ] {
2023-03-07 18:23:07 +08:00
lastCallduplicateCount [ row [ 2 ] ] ++
2023-03-06 19:38:28 +08:00
continue
}
2023-03-05 17:51:04 +08:00
// Add record to hashset
hs [ key ] = true
ccids [ row [ 2 ] ] = true
dataBatch = append ( dataBatch , BatcheData {
2023-02-21 14:36:25 +08:00
CommunicationChannelID : row [ 2 ] ,
Mobile : row [ 3 ] ,
ReservedField : string ( reservedFieldsJson ) ,
2023-03-08 17:48:33 +08:00
DataFileName : dataFileName ,
2023-02-21 14:36:25 +08:00
} )
2023-03-05 13:18:40 +08:00
2023-03-05 17:51:04 +08:00
if len ( dataBatch ) >= batchSize {
dataBatchChan <- dataBatch
dataBatch = make ( [ ] BatcheData , 0 , batchSize )
}
if batches , ok := result [ row [ 2 ] ] ; ok && len ( batches ) > 0 && batches [ 0 ] . Content != "" {
2023-03-06 19:38:28 +08:00
content := fmt . Sprintf ( "%s /" , batches [ 0 ] . Content )
2023-03-05 17:51:04 +08:00
Sid := batches [ 0 ] . Sid
pattern := regexp . MustCompile ( ` \ { .*\} ` )
matched := pattern . MatchString ( content )
if matched { //个性化短信
smsData , ok := sendMobiles [ row [ 2 ] ]
if ! ok {
smsData = SmsData {
Sid : Sid ,
IsPersonalizedMsg : true ,
SmsList : make ( [ ] SmsList , 0 ) ,
}
}
placeholderMap := map [ string ] string {
"{RESERVED_FIELD_1}" : row [ 5 ] ,
"{RESERVED_FIELD_2}" : row [ 6 ] ,
"{RESERVED_FIELD_3}" : row [ 7 ] ,
"{RESERVED_FIELD_4}" : row [ 8 ] ,
"{RESERVED_FIELD_5}" : row [ 9 ] ,
}
for k , v := range placeholderMap {
content = strings . ReplaceAll ( content , k , v )
}
smsData . SmsList = append ( smsData . SmsList , SmsList { M : row [ 3 ] , C : content , F : 8 } )
if len ( smsData . SmsList ) >= batchSize / 2 {
sd := Sms { Sid : Sid , Data : smsData . SmsList , Token : token }
sendSMSDataJson , _ := json . Marshal ( sd )
//
resp , err := smsApi ( "appendbatchdata" , string ( sendSMSDataJson ) )
if resp != 0 {
fmt . Printf ( "smsApi returned error: %d\n" , err )
//发送提醒邮件
subject := "丝芙兰数据包数据提交异常"
body := fmt . Sprintf ( "数据包:%s;\nCommunicationChannelID: %s;\n错误信息: %s\n 请前往管理平台查看处理。" , fileName , row [ 2 ] , err )
applogger . Error ( body )
SendEmail ( subject , body ) //发送邮件
}
smsData . SmsList = [ ] SmsList { } // reset mobiles slice
}
sendMobiles [ row [ 2 ] ] = smsData
2023-02-21 14:36:25 +08:00
} else {
2023-03-05 17:51:04 +08:00
// 处理非个性化短信
smsData , ok := sendMobiles [ row [ 2 ] ]
if ! ok {
smsData = SmsData {
Sid : Sid ,
IsPersonalizedMsg : false ,
Content : content ,
Mobiles : make ( [ ] string , 0 ) ,
}
}
smsData . Mobiles = append ( smsData . Mobiles , row [ 3 ] )
if len ( smsData . Mobiles ) >= batchSize {
mobiles := smsData . Mobiles
mobileStr := strings . Join ( mobiles , "," )
var sl [ ] SmsList
sl = append ( sl , SmsList { M : mobileStr , C : content , F : 8 } )
sd := Sms { Sid : Sid , Data : sl , Token : token }
sendSMSDataJson , _ := json . Marshal ( sd )
resp , err := smsApi ( "appendbatchdata" , string ( sendSMSDataJson ) )
if resp != 0 {
fmt . Printf ( "smsApi returned error: %d\n" , err )
//发送提醒邮件
subject := "丝芙兰数据包数据提交异常"
body := fmt . Sprintf ( "数据包:%s;\nCommunicationChannelID: %s;\n错误信息: %s\n 请前往管理平台查看处理。" , fileName , row [ 2 ] , err )
applogger . Error ( body )
SendEmail ( subject , body ) //发送邮件
}
smsData . Mobiles = [ ] string { } // reset mobiles slice
}
sendMobiles [ row [ 2 ] ] = smsData
2023-02-21 14:36:25 +08:00
}
}
2023-03-05 17:51:04 +08:00
if _ , ok := insertsCount [ row [ 2 ] ] ; ! ok {
insertsCount [ row [ 2 ] ] = 0
}
insertsCount [ row [ 2 ] ] ++
count ++
2023-02-18 16:32:19 +08:00
}
2023-03-05 17:51:04 +08:00
if len ( dataBatch ) > 0 {
2023-02-21 14:36:25 +08:00
dataBatchChan <- dataBatch
2023-02-18 16:32:19 +08:00
}
2023-03-05 17:51:04 +08:00
close ( dataBatchChan )
2023-03-05 13:18:40 +08:00
2023-03-05 17:51:04 +08:00
for ccid := range ccids {
smsData , ok := sendMobiles [ ccid ]
if ! ok {
continue
}
if smsData . IsPersonalizedMsg { //个性化
smsList := smsData . SmsList
if len ( smsList ) > 0 {
sd := Sms { Sid : smsData . Sid , Data : smsList , Token : token }
2023-03-05 13:18:40 +08:00
sendSMSDataJson , _ := json . Marshal ( sd )
smsApi ( "appendbatchdata" , string ( sendSMSDataJson ) )
2023-03-05 17:51:04 +08:00
smsData . SmsList = [ ] SmsList { } // reset mobiles slice
2023-03-05 13:18:40 +08:00
}
2023-03-05 17:51:04 +08:00
} else { //非个性化
mobiles , ok := smsData . Mobiles , len ( smsData . Mobiles ) > 0 && true
if ok && len ( mobiles ) > 0 {
2023-03-05 13:18:40 +08:00
mobileStr := strings . Join ( mobiles , "," )
var sl [ ] SmsList
2023-03-05 17:51:04 +08:00
sl = append ( sl , SmsList { M : mobileStr , C : smsData . Content , F : 8 } )
sd := Sms { Sid : smsData . Sid , Data : sl , Token : token }
2023-03-05 13:18:40 +08:00
sendSMSDataJson , _ := json . Marshal ( sd )
smsApi ( "appendbatchdata" , string ( sendSMSDataJson ) )
2023-03-05 17:51:04 +08:00
smsData . Mobiles = [ ] string { } // reset mobiles slice
2023-03-05 13:18:40 +08:00
}
}
2023-03-05 17:51:04 +08:00
sf := SmsFinish { Sid : smsData . Sid , Token : token }
sfJson , _ := json . Marshal ( sf )
smsApi ( "finishbatch" , string ( sfJson ) )
bpi := [ ] BatchProcessingInformation { }
bpi = append ( bpi , BatchProcessingInformation {
2023-03-07 18:23:07 +08:00
CommunicationChannelID : ccid ,
RepeatTargetsMember : duplicateCount [ ccid ] ,
LastCallRepeatTargetsMember : lastCallduplicateCount [ ccid ] ,
InsertsTargetsMember : insertsCount [ ccid ] ,
DataFileName : dataFileName ,
2023-03-05 17:51:04 +08:00
} )
err = db . CreateInBatches ( bpi , insertSize ) . Error
if err != nil {
applogger . Error ( fmt . Sprintf ( "插入批次处理信息失败,文件名:%s, 错误信息%v" , fileName , err ) )
2023-03-05 13:18:40 +08:00
}
2023-03-05 17:51:04 +08:00
smsData = SmsData { }
sendMobiles [ ccid ] = smsData
}
2023-03-05 13:18:40 +08:00
2023-03-05 17:51:04 +08:00
wg . Wait ( ) //所有入库全部完成
2023-02-18 16:32:19 +08:00
2023-03-05 17:51:04 +08:00
//插入批此重复数据
if len ( dataBatchDuplicate ) > 0 {
err = db . CreateInBatches ( dataBatchDuplicate , insertSize ) . Error
if err != nil {
applogger . Error ( fmt . Sprintf ( "插入重复数据失败,文件名:%s, 错误信息%v" , fileName , err ) )
} else {
dataBatchDuplicate = nil
2023-03-05 13:18:40 +08:00
}
}
2023-03-05 17:51:04 +08:00
elapsed := time . Since ( start )
//发送提醒邮件
subject := "丝芙兰数据包处理完成"
body := fmt . Sprintf ( "总数:%d;\n数据包: %s;\n过滤重复数: %d;\n过滤后总数: %d;\n处理完成, 请前往管理平台查看处理。" , count + bi , fileName , bi , count )
SendEmail ( subject , body ) //发送邮件
applogger . Info ( fmt . Sprintf ( "%s( 数据包) 入库完成" , fileName ) )
applogger . Info ( fmt . Sprintf ( "%s( 数据包) 执行时间:%s 插入数据:%d条 过滤数数:%d条" , fileName , elapsed , count , bi ) )
2023-04-10 20:00:26 +08:00
closeDb ( db )
2023-03-05 17:51:04 +08:00
return 0
} else { //如果没有查询到批次信息,跳过处理
2023-03-14 11:31:30 +08:00
applogger . Error ( fmt . Sprintf ( "未查询到批次数据,文件名:%s, 截取批次标识: %s" , fileName , dataFileName ) )
2023-04-10 20:00:26 +08:00
closeDb ( db )
2023-03-05 17:51:04 +08:00
return - 1
2023-02-21 14:36:25 +08:00
}
}
2023-02-18 16:32:19 +08:00
}
2023-03-14 11:31:30 +08:00
func queryBatchState ( ) {
2023-03-15 16:59:09 +08:00
db , err := connectToDB ( )
if err != nil {
handleError ( err , "数据库连接失败" )
return
}
2023-03-14 11:31:30 +08:00
var batches [ ] Batch
2023-03-15 16:59:09 +08:00
err = db . Where ( "status NOT IN (?)" , [ ] int { 6 , 8 } ) . Find ( & batches ) . Error
handleError ( err , "查询批次状态失败" )
if err != nil {
return
}
for _ , batch := range batches {
sf := SmsFinish { Sid : batch . Sid , Token : token }
sfJson , _ := json . Marshal ( sf )
url := "http://www.wemediacn.net/webservice/BatchService?service=sms.querybatchstate"
resp , err := http . Post ( url , "application/json; charset=utf-8" , strings . NewReader ( string ( sfJson ) ) )
if err != nil {
handleError ( err , "查询批次状态失败" )
continue
}
defer resp . Body . Close ( )
var retobj map [ string ] interface { }
err = json . NewDecoder ( resp . Body ) . Decode ( & retobj )
if err != nil {
handleError ( err , "解析响应数据失败" )
continue
}
code := int ( retobj [ "code" ] . ( float64 ) )
if code == 0 {
status := int ( retobj [ "state" ] . ( float64 ) )
if batch . Status != status || status == 5 {
updates := createUpdatesMap ( retobj )
err = db . Model ( & batch ) . Updates ( updates ) . Error
handleError ( err , "修改批次状态失败" )
2023-03-14 11:31:30 +08:00
}
2023-03-15 16:59:09 +08:00
} else {
jsonStr , _ := json . Marshal ( retobj )
handleError ( fmt . Errorf ( "返回不为0" ) , fmt . Sprintf ( "查询批次状态失败:%s" , string ( jsonStr ) ) )
2023-03-14 11:31:30 +08:00
}
}
2023-04-10 20:00:26 +08:00
closeDb ( db )
2023-03-14 11:31:30 +08:00
}
2023-03-15 16:59:09 +08:00
func createUpdatesMap ( retobj map [ string ] interface { } ) map [ string ] interface { } {
updates := map [ string ] interface { } {
"status" : int ( retobj [ "state" ] . ( float64 ) ) ,
}
if endTime , ok := retobj [ "endTime" ] . ( string ) ; ok {
if endTimeTime , err := time . Parse ( "2006-01-02 15:04:05" , endTime ) ; err == nil {
updates [ "end_time" ] = & endTimeTime
}
}
if startTime , ok := retobj [ "startTime" ] . ( string ) ; ok {
if startTimeTime , err := time . Parse ( "2006-01-02 15:04:05" , startTime ) ; err == nil {
updates [ "start_time" ] = & startTimeTime
}
}
if mc , ok := retobj [ "mc" ] . ( float64 ) ; ok {
updates [ "mc" ] = int ( mc )
}
if rc , ok := retobj [ "rc" ] . ( float64 ) ; ok {
updates [ "rc" ] = int ( rc )
}
if sc , ok := retobj [ "sc" ] . ( float64 ) ; ok {
updates [ "sc" ] = int ( sc )
}
return updates
}
func handleError ( err error , errMsg string ) {
if err != nil {
applogger . Error ( fmt . Sprintf ( "%s: %s" , errMsg , err ) )
}
}
2023-02-18 16:32:19 +08:00
func connectToDB ( ) ( * gorm . DB , error ) {
2023-04-04 12:07:58 +08:00
dsn := fmt . Sprintf ( "%s:%s@tcp(%s:%s)/%s?charset=utf8mb4&parseTime=True&loc=Local" , dbUser , dbPassword , dbAddress , dbPort , dbName )
var db * gorm . DB
var err error
attempt := 1
maxAttempts := 5
backoff := time . Second
logger := logger . Default . LogMode ( logger . Error ) //Silent、Error、Warn、Info
for {
db , err = gorm . Open ( mysql . Open ( dsn ) , & gorm . Config { Logger : logger , SkipDefaultTransaction : true } )
if err == nil {
break
}
if attempt >= maxAttempts {
handleError ( err , "数据库连接失败,错误信息" )
return nil , err
}
time . Sleep ( backoff )
backoff *= 2
attempt ++
}
2023-04-10 20:00:26 +08:00
// 获取底层 *sql.DB 实例
sqlDB , err := db . DB ( )
if err != nil {
return nil , err
}
// 设置最大连接数
sqlDB . SetMaxOpenConns ( dbMaxOpenConns )
// 设置最大空闲连接数
sqlDB . SetMaxIdleConns ( dbMaxIdleConns )
// 设置空闲连接的最长生命周期
sqlDB . SetConnMaxLifetime ( time . Duration ( dbConnMaxLifetime ) * time . Minute )
2023-02-18 16:32:19 +08:00
return db , nil
}
2023-02-18 19:31:39 +08:00
2023-04-10 20:00:26 +08:00
func closeDb ( db * gorm . DB ) {
sqlDB , err := db . DB ( )
if err != nil {
handleError ( err , "关闭数据库连接失败" )
return
}
sqlDB . Close ( )
}
2023-03-06 19:38:28 +08:00
func fileExists ( filename string ) bool {
info , err := os . Stat ( filename )
if os . IsNotExist ( err ) {
return false
}
return ! info . IsDir ( )
}
2023-03-05 13:18:40 +08:00
func iniPath ( ) {
// 获取可执行文件所在的路径
executablePath , err := os . Executable ( )
if err != nil {
log . Fatal ( "failed to get executable path: " , err )
}
executableDir = filepath . Dir ( executablePath )
//判断目录是否存在,不存在则创建
err = os . MkdirAll ( filepath . Join ( executableDir , zipPath ) , 0755 )
if err != nil {
log . Fatal ( err )
}
err = os . MkdirAll ( filepath . Join ( executableDir , txtPath ) , 0755 )
if err != nil {
log . Fatal ( err )
}
2023-03-06 19:38:28 +08:00
err = os . MkdirAll ( filepath . Join ( executableDir , lastCallPath ) , 0755 )
if err != nil {
log . Fatal ( err )
}
2023-03-05 13:18:40 +08:00
err = os . MkdirAll ( filepath . Join ( executableDir , logPath , time . Now ( ) . Format ( "2006_01" ) ) , 0755 )
if err != nil {
log . Fatal ( err )
}
}
func monopolize ( ) {
// 打开一个文件作为锁
lockFile , err := os . OpenFile ( filepath . Join ( executableDir , ".lock" ) , os . O_CREATE | os . O_RDWR , 0666 )
if err != nil {
fmt . Println ( "打开锁文件失败:" , err )
os . Exit ( 1 )
}
defer lockFile . Close ( )
// 尝试获取文件的独占锁
err = syscall . Flock ( int ( lockFile . Fd ( ) ) , syscall . LOCK_EX | syscall . LOCK_NB )
if err != nil {
fmt . Println ( "程序已经在运行,本程序无法同时运行多个" )
os . Exit ( 1 )
}
}
func iniConfi ( ) {
flag . StringVar ( & env , "env" , "dev" , "运行模式" )
flag . Parse ( )
switch env {
case "dev" :
//fmt.Print("测试环境配置已生效\n")
redisAddress = "mysql5.weu.me:6379"
redisPassword = ""
redisDB = 1
sftpAddress = "192.168.10.86:49156"
sftpUser = "demo"
sftpPassword = "demo"
sftpDir = "/sftp/test"
2023-04-04 12:07:58 +08:00
dbAddress = "mysql.weu.me"
dbPort = "3306"
dbUser = "root"
dbPassword = "root_123"
2023-03-05 13:18:40 +08:00
dbName = "sephora"
zipPath = "RawData/Zip/"
txtPath = "RawData/Txt/"
logPath = "logs/"
2023-04-04 16:02:28 +08:00
batchSize = 5000 //提交数据
insertSize = 5000 //一次性入库
insertChanSize = 100000 //通道缓冲数
goSize = 10 //协程数
2023-03-05 13:18:40 +08:00
taskTime = 1
2023-03-15 16:59:09 +08:00
batchStatusTaskTime = 1
2023-03-05 13:18:40 +08:00
to = [ ] string { "chejiulong@wemediacn.com" }
token = "7100477930234217"
2023-03-06 19:38:28 +08:00
lastCallPath = "RawData/LastCall"
2023-03-07 18:23:07 +08:00
verifySignatureKey = "qZ6v1&H#Wjx+yRm2D@*sJF$tnfL83Ia"
2023-04-04 18:21:54 +08:00
dataExpirationDays = 14
delDataSize = 60000
2023-04-07 15:11:29 +08:00
rateLimiter = 1
2023-04-10 20:00:26 +08:00
dbMaxOpenConns = 500
dbMaxIdleConns = 5
dbConnMaxLifetime = 10
2023-03-07 18:23:07 +08:00
2023-03-05 13:18:40 +08:00
case "prod" :
//fmt.Print("正式环境配置已生效\n")
redisAddress = "r-bp11564d96842414128.redis.rds.aliyuncs.com:6379"
redisPassword = "3Nsb4Pmsl9bcLs24mL12l"
redisDB = 233
sftpAddress = "esftp.sephora.com.cn:20981"
sftpUser = "CHN-CRMTOWemedia-wemedia"
sftpPassword = "uoWdMHEv39ZFjiOg"
sftpDir = "/CN-CRMTOWemedia/SMS"
2023-04-04 18:21:54 +08:00
dbAddress = "rds0yslqyg1iuze8txux545.mysql.rds.aliyuncs.com"
2023-04-04 16:02:28 +08:00
dbPort = "3306"
dbUser = "sephora"
dbPassword = "YfbGJWsFkH4pXgPY"
dbName = "sephora"
2023-03-05 13:18:40 +08:00
zipPath = "RawData/Zip/"
txtPath = "RawData/Txt/"
logPath = "logs/"
2023-04-04 16:02:28 +08:00
batchSize = 5000 //提交数据
insertSize = 5000 //一次性入库
insertChanSize = 100000 //通道缓冲数
goSize = 50 //协程数
2023-03-05 13:18:40 +08:00
taskTime = 60
2023-03-15 16:59:09 +08:00
batchStatusTaskTime = 5
2023-03-05 13:18:40 +08:00
to = [ ] string { "chejiulong@wemediacn.com" , "xiayujuan@wemediacn.com" , "wangyuanbing@wemediacn.com" , "tangweiqi@wemediacn.com" }
2023-03-15 10:03:55 +08:00
token = "7100178600777091" //7100477930234217
2023-03-06 19:38:28 +08:00
lastCallPath = "RawData/LastCall"
2023-03-07 18:23:07 +08:00
verifySignatureKey = "Lzxf6asgwFRTsANZ8gkXJnP3EmOyNO#&"
2023-04-10 20:00:26 +08:00
dataExpirationDays = 1
2023-04-04 18:21:54 +08:00
delDataSize = 60000
2023-04-10 20:00:26 +08:00
rateLimiter = 15
dbMaxOpenConns = 500
dbMaxIdleConns = 5
dbConnMaxLifetime = 60
2023-03-07 18:23:07 +08:00
2023-03-05 13:18:40 +08:00
default :
panic ( fmt . Errorf ( "无效的运行模式: %s" , env ) )
}
}
func iniLog ( ) {
logPath = filepath . Join ( executableDir , "logs" , time . Now ( ) . Format ( "2006_01" ) )
os . MkdirAll ( logPath , 0755 )
logFileName := "sms_processing_" + time . Now ( ) . Format ( "2006_01_02" ) + ".log"
logFileHook := & lumberjack . Logger {
Filename : filepath . Join ( logPath , logFileName ) ,
}
logOutput := io . MultiWriter ( os . Stdout , logFileHook ) // 将日志同时输出到控制台和文件
applogger . SetOutput ( logOutput )
}
2023-02-18 19:31:39 +08:00
func SendEmail ( subject string , body string ) error {
// 邮箱认证信息
smtpHost := "smtp.exmail.qq.com"
smtpPort := 465
2023-02-21 14:36:25 +08:00
from := "auto_system@wemediacn.com"
password := "yJJYYcy5NKx2y69r"
2023-02-18 19:31:39 +08:00
// 邮件内容
m := gomail . NewMessage ( )
m . SetHeader ( "From" , from )
m . SetHeader ( "To" , to ... )
m . SetHeader ( "Subject" , subject )
m . SetBody ( "text/plain" , body )
// 邮件发送
d := gomail . NewDialer ( smtpHost , smtpPort , from , password )
d . TLSConfig = & tls . Config { InsecureSkipVerify : true }
err := d . DialAndSend ( m )
if err != nil {
2023-02-27 19:09:43 +08:00
applogger . Warn ( fmt . Sprintf ( "邮件发送失败,错误信息%v" , err ) )
2023-02-18 19:31:39 +08:00
}
return nil
}
2023-03-05 13:18:40 +08:00
func ( f FileSorter ) Len ( ) int {
return len ( f )
}
func ( f FileSorter ) Swap ( i , j int ) {
f [ i ] , f [ j ] = f [ j ] , f [ i ]
}
func ( f FileSorter ) Less ( i , j int ) bool {
// 首先检查文件扩展名是否为“.txt”, 如果是, 则将其移到文件列表的开头
if strings . HasSuffix ( f [ i ] . Name ( ) , ".txt" ) && ! strings . HasSuffix ( f [ j ] . Name ( ) , ".txt" ) {
return true
}
// 如果文件扩展名都是“.txt”, 或都不是“.txt”, 则按字母顺序排序
return f [ i ] . Name ( ) < f [ j ] . Name ( )
}
var ( //初始化变量
2023-03-15 16:59:09 +08:00
env string
applogger * logrus . Logger
redisClient * redis . Client
executableDir string
redisAddress string
redisPassword string
redisDB int
sftpAddress string
sftpUser string
sftpPassword string
sftpDir string
dbAddress string
2023-04-04 12:07:58 +08:00
dbPort string
2023-03-15 16:59:09 +08:00
dbUser string
dbPassword string
dbName string
zipPath string
txtPath string
logPath string
batchSize int //提交数据
insertSize int //一次性入库
insertChanSize int //通道缓冲数
goSize int //协程数
taskTime int
batchStatusTaskTime int
to [ ] string
token string
lastCallPath string
verifySignatureKey string
2023-08-07 09:51:31 +08:00
dataExpirationDays int //数据过期天数
delDataSize int //删除数据批次大小
rateLimiter uint //下载限速单位 M
2023-04-10 20:00:26 +08:00
dbMaxOpenConns int
dbMaxIdleConns int
dbConnMaxLifetime int
2023-03-05 13:18:40 +08:00
)
type Batch struct {
ID uint ` gorm:"primary_key" `
2023-03-08 13:51:15 +08:00
CommunicationChannelID string
2023-03-05 13:18:40 +08:00
CommunicationName string ` gorm:"type:varchar(255)" `
TargetsMember uint ` gorm:"type:int" `
TemplateID uint
2023-03-14 11:31:30 +08:00
Content string ` gorm:"type:text" `
2023-04-04 12:07:58 +08:00
CreatedAt time . Time ` gorm:"default:CURRENT_TIMESTAMP" `
UpdatedAt time . Time ` gorm:"default:CURRENT_TIMESTAMP" `
2023-03-15 10:03:55 +08:00
Status int ` gorm:"column:status" `
2023-03-14 11:31:30 +08:00
DataFileName string ` gorm:"type:text" `
Sid int ` gorm:"type:int" `
EndTime * time . Time ` gorm:"column:end_time" `
StartTime * time . Time ` gorm:"column:start_time" `
MC * int ` gorm:"column:mc" `
RC * int ` gorm:"column:rc" `
SC * int ` gorm:"column:sc" `
2023-03-05 13:18:40 +08:00
}
type BatcheData struct {
2023-04-04 16:02:28 +08:00
ID uint ` gorm:"primary_key" `
CommunicationChannelID string ` gorm:"column:communication_channel_id" `
Mobile string ` gorm:"column:mobile" `
ReservedField string ` gorm:"column:reserved_field" `
DataFileName string ` gorm:"column:data_file_name" `
CreatedAt time . Time ` gorm:"default:CURRENT_TIMESTAMP" `
UpdatedAt time . Time ` gorm:"default:CURRENT_TIMESTAMP" `
2023-03-05 13:18:40 +08:00
}
func ( BatcheData ) TableName ( ) string {
return "batche_datas"
}
type BatchProcessingInformation struct {
2023-03-07 18:23:07 +08:00
ID uint ` gorm:"primaryKey;autoIncrement" `
CommunicationChannelID string ` gorm:"column:communication_channel_id" `
RepeatTargetsMember int ` gorm:"column:repeat_targets_member" `
LastCallRepeatTargetsMember int ` gorm:"column:last_call_repeat_targets_member" `
InsertsTargetsMember int ` gorm:"column:inserts_targets_member" `
DataFileName string ` gorm:"column:data_file_name" `
2023-03-05 13:18:40 +08:00
}
func ( BatchProcessingInformation ) TableName ( ) string {
return "batches_processing_informations"
}
type BatchDataDuplicateLog struct {
2023-04-04 16:02:28 +08:00
ID int ` gorm:"primaryKey;autoIncrement" `
CommunicationChannelID string ` gorm:"column:communication_channel_id" `
Mobile string ` gorm:"column:mobile" `
ReservedField string ` gorm:"column:reserved_field" `
DataFileName string ` gorm:"column:data_file_name" `
CreatedAt time . Time ` gorm:"default:CURRENT_TIMESTAMP" `
UpdatedAt time . Time ` gorm:"default:CURRENT_TIMESTAMP" `
2023-03-05 13:18:40 +08:00
}
func ( BatchDataDuplicateLog ) TableName ( ) string {
return "batche_data_duplicate_logs"
}
var upgrader = websocket . Upgrader {
ReadBufferSize : 1024 ,
WriteBufferSize : 1024 ,
}
// 客户端连接信息
type client struct {
conn * websocket . Conn
}
// 在线客户端列表
var clients = make ( map [ * client ] bool )
// 定义一个FileSorter结构体
type FileSorter [ ] os . FileInfo
type BatchParams struct {
BatchName string ` json:"batchName" `
BatchDesc string ` json:"batchDesc" `
IsPersonal int ` json:"isPersonal" `
Message string ` json:"message" `
IsInternational int ` json:"isInternational" `
IsSchedule int ` json:"isSchedule" `
ScheduleTime string ` json:"scheduleTime" `
Token string ` json:"token" `
}
type SmsList struct {
M string ` json:"m" `
C string ` json:"c" `
F int ` json:"f" `
}
type Sms struct {
Sid int ` json:"sid" `
Data [ ] SmsList ` json:"data" `
Token string ` json:"token" `
}
type SmsFinish struct {
Sid int ` json:"sid" `
Token string ` json:"token" `
}
2023-03-05 17:51:04 +08:00
type SmsData struct {
Sid int
IsPersonalizedMsg bool
Content string
Mobiles [ ] string
SmsList [ ] SmsList
}
2023-03-06 19:38:28 +08:00
type Task struct {
2023-03-07 10:18:46 +08:00
TaskData TaskData
Signature Signature
}
type TaskData struct {
2023-03-06 19:38:28 +08:00
Command string ` json:"command" `
ExcludedFilename string ` json:"excluded_filename" `
BatchFilename string ` json:"batch_filename" `
DataFilename string ` json:"data_filename" `
}
2023-03-07 10:18:46 +08:00
type Signature struct {
Signature string ` json:"signature" `
Timestamp int64 ` json:"timestamp" `
2023-03-07 16:50:12 +08:00
Nonce string ` json:"nonce" `
2023-03-07 10:18:46 +08:00
}