diff --git a/iniDataForLinux b/iniDataForLinux index 5e4c095..04a58ee 100755 Binary files a/iniDataForLinux and b/iniDataForLinux differ diff --git a/iniDataForMacOs b/iniDataForMacOs index 7003124..3032226 100755 Binary files a/iniDataForMacOs and b/iniDataForMacOs differ diff --git a/main.go b/main.go index 8d72323..15c6fc7 100644 --- a/main.go +++ b/main.go @@ -118,7 +118,7 @@ func init() { flag.Parse() switch env { case "dev": - fmt.Print("测试环境配置以生效\n") + //fmt.Print("测试环境配置已生效\n") redisAddress = "mysql5.weu.me:6379" redisPassword = "" redisDB = 1 @@ -139,7 +139,7 @@ func init() { goSize = 10 //协程数 taskTime = 1 case "prod": - fmt.Print("正式环境配置以生效\n") + //fmt.Print("正式环境配置已生效\n") redisAddress = "r-bp11564d96842414128.redis.rds.aliyuncs.com:6379" redisPassword = "3Nsb4Pmsl9bcLs24mL12l" redisDB = 233 @@ -157,8 +157,8 @@ func init() { batchSize = 5000 //提交数据 insertSize = 500 //一次性入库 insertChanSize = 50 //通道缓冲数 - goSize = 150 //协程数 - taskTime = 60 + goSize = 10 //协程数 + taskTime = 10 default: panic(fmt.Errorf("无效的运行模式: %s", env)) } @@ -199,8 +199,9 @@ func main() { logFileHook := &lumberjack.Logger{ Filename: filepath.Join(logPath, logFileName), } - applogger.SetOutput(logFileHook) - applogger.Info("程序启动....") + logOutput := io.MultiWriter(os.Stdout, logFileHook) // 将日志同时输出到控制台和文件 + applogger.SetOutput(logOutput) + applogger.Info(fmt.Sprintf("程序启动,加载%s环境....", env)) go downloadDecompression() // 启动立即执行一次 ticker := time.NewTicker(time.Duration(taskTime) * time.Minute) // 创建一个定时器,每隔一分钟触发一次任务函数 @@ -216,8 +217,9 @@ func main() { logFileHook = &lumberjack.Logger{ Filename: filepath.Join(logPath, logFileName), } - applogger.SetOutput(logFileHook) - fmt.Printf("尝试第%d次执行....\n", tickCount) + logOutput := io.MultiWriter(os.Stdout, logFileHook) // 将日志同时输出到控制台和文件 + applogger.SetOutput(logOutput) + //fmt.Printf("尝试第%d次执行....\n", tickCount) applogger.Info(fmt.Sprintf("尝试第%d次执行....", tickCount)) go downloadDecompression() // 在新协程中异步执行 tickCount++ @@ -268,6 +270,7 @@ func downloadDecompression() { fileKey := fmt.Sprintf("downloaded:%s", file.Name()) if exists, _ := redisClient.Exists(fileKey).Result(); exists == 1 { fmt.Println("跳过已处理过的文件:" + file.Name()) + //applogger.Info("跳过已处理过的文件:" + file.Name()) continue } @@ -291,7 +294,7 @@ func downloadDecompression() { applogger.Info("下载文件失败,文件名: %s,错误信息:%v", file.Name(), err) continue } - fmt.Printf("%s(数据包)下载完成\n", file.Name()) + //fmt.Printf("%s(数据包)下载完成\n", file.Name()) applogger.Info(fmt.Sprintf("%s(数据包)下载完成", file.Name())) // Unzip file zipReader, err := zip.OpenReader(path.Join(executableDir, zipPath, file.Name())) @@ -330,7 +333,7 @@ func downloadDecompression() { continue } applogger.Info(fmt.Sprintf("%s(数据包)解压完成", zipFile.Name)) - fmt.Printf("%s(数据包)解压完成\n", zipFile.Name) + //fmt.Printf("%s(数据包)解压完成\n", zipFile.Name) batchDataInsert(zipFile.Name) } @@ -357,7 +360,7 @@ func downloadDecompression() { } err = redisClient.Set(fileKey, 1, 0).Err() //写入下载、解压、入库完成标记 if err != nil { - fmt.Printf("写入文件处理完成标记失败文件名:%s,错误信息:%v\n", file.Name(), err) + //fmt.Printf("写入文件处理完成标记失败文件名:%s,错误信息:%v\n", file.Name(), err) applogger.Info("写入文件处理完成标记失败文件名:%s,错误信息:v%\n", file.Name(), err) } } @@ -373,7 +376,7 @@ func batchInsert(fileName string) { db, _ := connectToDB() file, err := os.Open(path.Join(executableDir, txtPath, fileName)) if err != nil { - fmt.Printf("文件打开失败文件名:%s,错误信息%v\n", fileName, err) + //fmt.Printf("文件打开失败文件名:%s,错误信息%v\n", fileName, err) applogger.Info("文件打开失败文件名:%s,错误信息%v", fileName, err) } else { defer file.Close() @@ -407,9 +410,9 @@ func batchInsert(fileName string) { subject := "丝芙兰批次文件处理完成" body := "批次文件:" + fileName + ";\n批次数:" + strconv.Itoa(batchRows) + ";\n处理完成,请前往管理平台查看处理。" SendEmail(subject, body) //发送邮件 - fmt.Printf("%s(批次文件)入库完成 \n", fileName) + //fmt.Printf("%s(批次文件)入库完成 \n", fileName) applogger.Info(fmt.Sprintf("%s(批次文件)入库完成", fileName)) - fmt.Printf("%s(批次文件)执行时间%s 插入批次次数:%d\n\n\n", fileName, elapsed, batchRows) + //fmt.Printf("%s(批次文件)执行时间%s 插入批次次数:%d\n\n\n", fileName, elapsed, batchRows) applogger.Info(fmt.Sprintf("%s(批次文件)执行时间%s 插入批次次数:%d", fileName, elapsed, batchRows)) } } @@ -419,8 +422,8 @@ func batchDataInsert(fileName string) { // Open file file, err := os.Open(path.Join(executableDir, txtPath, fileName)) if err != nil { - fmt.Printf("文件打开失败,文件名:%s,错误信息%v\n", fileName, err) - applogger.Info("文件打开失败,文件名:%s,错误信息%v", fileName, err) + //fmt.Printf("文件打开失败,文件名:%s,错误信息%v\n", fileName, err) + applogger.Info(fmt.Sprintf("文件打开失败,文件名:%s,错误信息%v", fileName, err)) } else { defer file.Close() db, _ := connectToDB() @@ -468,8 +471,8 @@ func batchDataInsert(fileName string) { line := scanner.Text() row, err := csv.NewReader(strings.NewReader(line)).Read() if err != nil { - fmt.Printf("文件按行读取失败,文件名:%s,错误信息%v\n", fileName, err) - applogger.Info("文件按行读取失败,文件名:%s,错误信息%v", fileName, err) + //fmt.Printf("文件按行读取失败,文件名:%s,错误信息%v\n", fileName, err) + applogger.Info(fmt.Sprintf("文件按行读取失败,文件名:%s,错误信息%v", fileName, err)) continue } reservedFields := map[string]string{ //合并个性化字段 @@ -482,8 +485,8 @@ func batchDataInsert(fileName string) { } reservedFieldsJson, err := json.Marshal(reservedFields) //个性化字段转json if err != nil { - fmt.Printf("个性化字段合并失败,文件名:%s,错误信息%v\n", fileName, err) - applogger.Info("个性化字段合并失败,文件名:%s,错误信息%v", fileName, err) + //fmt.Printf("个性化字段合并失败,文件名:%s,错误信息%v\n", fileName, err) + applogger.Info(fmt.Sprintf("个性化字段合并失败,文件名:%s,错误信息%v", fileName, err)) continue } if _, ok := duplicateCount[row[2]]; !ok { @@ -504,8 +507,8 @@ func batchDataInsert(fileName string) { if len(dataBatchDuplicate) >= batchSize { err = db.CreateInBatches(dataBatchDuplicate, insertSize).Error if err != nil { - fmt.Printf("插入重复数据失败,文件名:%s,错误信息%v\n", fileName, err) - applogger.Info("插入重复数据失败,文件名:%s,错误信息%v", fileName, err) + //fmt.Printf("插入重复数据失败,文件名:%s,错误信息%v\n", fileName, err) + applogger.Info(fmt.Sprintf("插入重复数据失败,文件名:%s,错误信息%v", fileName, err)) } else { dataBatchDuplicate = make([]BatchDataDuplicateLog, 0, batchSize) } @@ -558,16 +561,16 @@ func batchDataInsert(fileName string) { } err = db.CreateInBatches(bpi, insertSize).Error if err != nil { - fmt.Printf("插入批次处理信息失败,文件名:%s,错误信息%v\n", fileName, err) - applogger.Info("插入批次处理信息失败,文件名:%s,错误信息%v", fileName, err) + //fmt.Printf("插入批次处理信息失败,文件名:%s,错误信息%v\n", fileName, err) + applogger.Info(fmt.Sprintf("插入批次处理信息失败,文件名:%s,错误信息%v", fileName, err)) } //插入批此重复数据 if len(dataBatchDuplicate) > 0 { err = db.CreateInBatches(dataBatchDuplicate, insertSize).Error if err != nil { - fmt.Printf("插入重复数据失败,文件名:%s,错误信息%v\n", fileName, err) - applogger.Info("插入重复数据失败,文件名:%s,错误信息%v", fileName, err) + //fmt.Printf("插入重复数据失败,文件名:%s,错误信息%v\n", fileName, err) + applogger.Info(fmt.Sprintf("插入重复数据失败,文件名:%s,错误信息%v", fileName, err)) } else { dataBatchDuplicate = nil } @@ -580,8 +583,8 @@ func batchDataInsert(fileName string) { body := "数据包:" + fileName + ";\n总数:" + strconv.Itoa(count+bi) + ";\n过滤重复数:" + strconv.Itoa(bi) + ";\n过滤后总数:" + strconv.Itoa(count) + ";\n处理完成,请前往管理平台查看处理。" SendEmail(subject, body) //发送邮件 applogger.Info(fmt.Sprintf("%s(数据包) ,入库完成", fileName)) - fmt.Printf("%s(数据包) 入库完成\n", fileName) - fmt.Printf("%s(数据包) 执行时间:%s 插入数据:%d 过滤数:%d\n\n\n", fileName, elapsed, count, bi) + //fmt.Printf("%s(数据包) 入库完成\n", fileName) + //fmt.Printf("%s(数据包) 执行时间:%s 插入数据:%d 过滤数:%d\n\n\n", fileName, elapsed, count, bi) applogger.Info(fmt.Sprintf("%s(数据包)执行时间:%s 插入数据:%d条 过滤数数:%d条", fileName, elapsed, count, bi)) } }