如何對正在使用中的數據庫進行曆史數據遷移?

飛天心宏 2021-09-19 10:29:04 阅读数:66

正在 使用

作為開發人員,能學會簡單地對曆史數據遷移是日常基本功。在上篇文章中,我們初步地解釋了曆史數據遷移的基本概念,以及如何使用SQL SERVER存儲過程實現對曆史數據遷移。

一般來說直接在數據庫中寫SQL語句(insert into ... select from)的方式進行遷移,僅適用於“停服狀態”下的數據遷移場景,也就是數據庫處於無用戶使用的情况下,而且遷移場景有限。

當數據庫在生產環境中實時在用,而且數據量較大的前提下,很顯然會影響性能,上述方法並不可取。

今天我們來介紹一個新的思路,可將曆史數據遷移對現有實時在用數據庫的性能影響降到較低水平。

1、大體遷移思路

簡單描述下思路:
1、先利用程序根據創建時間昇序排序,直接查詢TopN條數據,記錄到程序緩存中。
【查詢後就和正式數據庫沒關系了,只要TopN不大,這個簡單的查詢幾乎對數據庫沒有任何影響。】
2、然後程序將緩存中的TopN條數據寫入到曆史庫。
【這個階段和正式數據庫沒有半點關系,哪怕寫的再慢,也不影響正式數據庫】
3、核對下曆史數據庫中的數據,確保沒有問題,就可以删除正式環境中的TopN條數據了。
【這裏删除操作稍微比查詢影響大些,但是僅僅是小批量的數據删除同樣影響不大。】
4、如果需要大批量數據遷移怎麼辦? 非常簡單,分批執行,比如循環執行CountX次,那麼遷移的總數據量就為TopN×CountX ,所以根本不怕數據量大,開啟程序自動執行即可,也就是用耗時間來减少對數據庫性能的影響。【執行時間換性能】

這個思路和insert into ... select from的方式,最大的區別就在用寫入曆史數據庫的過程不影響正式在用的數據庫,只剩下TopN條數據的查詢和删除操作,影響很小。

特別說明:TopN不能太大,這個越小越好,根據錶字段的多少,數據庫的性能,以及用戶連接數情况綜合考慮,建議TopN取值在1000到5000條之間,這樣對數據庫影響幾乎可以忽略。

 

2、例子中使用到的主要技術

為了實現這個思路,我這裏也隨便寫個簡單程序來試試效果,做個例子,僅供大家參考。

由於是最基礎的例子,我就不用通用的底層框架了,免得大家看起來吃力。同時為了運行演示方便,直接寫個exe可執行程序好了。

這個講解的例子,用到的技術主要包括:
數據庫:SQL SERVER,
數據庫訪問組件:Entity Framework Core
日志記錄:Serialog.AspNetCore
開發語言:C#
技術框架:.NET 5
項目模版:控制臺應用程序

 

3、創建個Model實體類

按照EntityFramework Core的思路,先建個Model吧。

 1 using System;
 2 using System.Collections.Generic;
 3 using System.ComponentModel.DataAnnotations;
 4 using System.ComponentModel.DataAnnotations.Schema;
 5 using System.Text;
 6
 7 namespace Tyingtech_glps.Entities.HDM
 8 {
 9 /// <summary>
 10 /// 接口請求記錄
 11 /// </summary>
 12 [Table("GLPS_APIREQUEST")]
 13 public class GLPS_APIREQUEST
 14  {
 15  [Key]
 16 public string FID { get; set; }
 17
 18 /// <summary>
 19 /// 接口編號【固定】
 20 /// </summary>
 21 public string FAPICODE { get; set; }
 22
 23 /// <summary>
 24 /// 請求方身份ID
 25 /// </summary>
 26 public string FAPPID { get; set; }
 27
 28 /// <summary>
 29 /// 接口請求方URL地址
 30 /// </summary>
 31 public string FFORMURL { get; set; }
 32
 33 /// <summary>
 34 /// 接口請求方IP
 35 /// </summary>
 36 public string FIP { get; set; }
 37
 38 /// <summary>
 39 /// 請求參數(JSON字符串)
 40 /// </summary>
 41 public string FREQUESTDATA { get; set; }
 42
 43 /// <summary>
 44 /// 請求時間點
 45 /// </summary>
 46 public DateTime FREQTIME { get; set; }
 47
 48 /// <summary>
 49 ///響應參數(JSON字符串)
 50 /// </summary>
 51 public string FRESPONSE { get; set; }
 52
 53 /// <summary>
 54 /// 響應時間點
 55 /// </summary>
 56 public DateTime FRESTIME { get; set; }
 57
 58 /// <summary>
 59 /// 總毫秒數
 60 /// </summary>
 61 public int FMILLISECOND { get; set; }
 62
 63 /// <summary>
 64 /// 接口請求結果(1:成功;0:失敗;-1:接口內部异常)
 65 /// </summary>
 66 public int FISSUCCESS { get; set; }
 67
 68 /// <summary>
 69 /// 失敗詳情,僅內部使用(如:內部報錯异常信息;AppId錯誤;非法請求;參數不全;...)
 70 /// </summary>
 71 public string FRESULT { get; set; }
 72
 73 /// <summary>
 74 /// 約定格式數據1 (如:車牌號)
 75 /// </summary>
 76 public string FDATA1 { get; set; }
 77
 78 /// <summary>
 79 /// 約定格式數據2 (如:進場、離場)
 80 /// </summary>
 81 public string FDATA2 { get; set; }
 82
 83 /// <summary>
 84 /// 約定格式數據3 (如:...)
 85 /// </summary>
 86 public string FDATA3 { get; set; }
 87
 88 /// <summary>
 89 /// 約定格式數據4(如:...)
 90 /// </summary>
 91 public string FDATA4 { get; set; }
 92
 93 /// <summary>
 94 /// 約定格式數據5(如:...)
 95 /// </summary>
 96 public string FDATA5 { get; set; }
 97
 98 /// <summary>
 99 /// 創建時間
100 /// </summary>
101 public DateTime FCREATETIME { get; set; }
102  }
103 }

 

4、執行TopN條數據的遷移 

 1 /// <summary>
 2 /// 曆史數據遷移【一次性遷移】
 3 /// </summary>
 4 /// <param name="n">數據遷移量(條數) top N</param>
 5 /// <returns>實際遷移成功數量</returns>
 6 public static int DataToHisExecOne(int n)
 7 {
 8 n = n > 10000 ? 10000 : n; //一次最多1萬(再多可能會對性能有影響【遷移第一條:盡量不能影響現有在用數據庫的業務】)
 9
 10 //測試環境
 11 EnumWhichDB whichDB = EnumWhichDB.DevGlps;
 12 EnumWhichDB whichDBHis = EnumWhichDB.DevGlpsHis;
 13
 14 string connStr = DBConnectionString.GetConnStr(whichDB); //當前需要遷移的數據庫
 15 string connStrHis = DBConnectionString.GetConnStr(whichDBHis); //曆史數據庫
 16
 17 //Serialog 記錄日志
 18 var logFileName = string.Format("ToHisOne_ApiRequest_{0}.txt", DateTime.Now.ToString("yyyyMMddHH"));
 19 var log = new LoggerConfiguration()
 20  .WriteTo.Console()
 21  .WriteTo.File(logFileName)
 22  .CreateLogger();
 23
 24 log.Information($"開始遷移數據,計劃遷移條數為:{n}");
 25
 26 int beginCount = GetRowCount(whichDB); //當前數據庫數據遷移前的記錄行數
 27 int beginCountHis = GetRowCount(whichDBHis); //曆史數據庫數據遷移前的記錄行數
 28
 29 Stopwatch sw = new Stopwatch(); //檢測運行時間(對每個階段)
 30 Stopwatch swAll = new Stopwatch(); //總耗時
 31  swAll.Start();
 32
 33  sw.Start();
 34 DateTime maxCreateTime = GetAscMaxCreateTimeTopN(whichDB,n);
 35  sw.Stop();
 36 log.Information("對應遷移數據FCREATETIME為:" + maxCreateTime.ToString("yyyy-MM-dd HH:mm:ss.fffff") + $",查詢maxCreateTime耗時:{sw.ElapsedMilliseconds} 毫秒。");
 37
 38 sw.Restart(); //重新開始計時
 39 List<GLPS_APIREQUEST> records = new List<GLPS_APIREQUEST>();
 40 //從數據庫查最早的數據 
 41 using (var db = new GlpsDbContext(connStr))
 42  {
 43 records = db.DS_ApiRequest
 44 .Where(t => t.FCREATETIME <= maxCreateTime)
 45 .OrderBy(t => t.FCREATETIME)
 46 .AsNoTracking() //非跟踪查詢(只讀,提昇效率)
 47  .ToListAsync().Result;
 48  }
 49 sw.Stop(); //計時結束
 50 log.Information($"按時間點實際查詢到 {records.Count} 條數據,耗時:{sw.ElapsedMilliseconds} 毫秒。");
 51
 52 sw.Restart(); //重新開始計時
 53 //寫數據到曆史數據庫
 54 int newCount = 0;
 55 using (var db = new GlpsDbContext(connStrHis))
 56  {
 57  db.DS_ApiRequest.AddRange(records);
 58 newCount = db.SaveChanges(); //最後保存數據
 59 log.Information($"實際成功寫入到曆史數據庫條數: {newCount}");
 60  }
 61 sw.Stop(); //計時結束
 62 log.Information($"實際成功寫入到曆史數據庫條數:{newCount} , 寫入耗時:{sw.ElapsedMilliseconds} 毫秒。");
 63
 64
 65 //最後删除數據
 66 DateTime maxCreateTimeHis = GetMaxCreateTime(whichDBHis); //曆史數據庫最大的FCreateTime
 67
 68 //兩個時間相同,則可以删除數據,否則不删除,直接預警(中間數據可能出錯,需要人工幹預)
 69 if (maxCreateTime == maxCreateTimeHis)
 70  {
 71  sw.Restart();
 72 var rowCount = DeleteByFCreateTime(whichDB, maxCreateTime);
 73  sw.Stop();
 74 log.Information($"遷移後删除數據條數:{rowCount} , 删除耗時:{sw.ElapsedMilliseconds} 毫秒。");
 75  }
 76 else if (newCount < n && newCount == records.Count) //就是實際小於n,那麼是:C#的datetime和數據庫的datetime精度不同
 77  {
 78  sw.Restart();
 79 var rowCount = DeleteByFCreateTime(whichDB,maxCreateTimeHis); //需按曆史數據庫的日期來删除
 80  sw.Stop();
 81 log.Information($"遷移後删除數據條數:{rowCount} , 删除耗時:{sw.ElapsedMilliseconds} 毫秒。");
 82  }
 83 else
 84  {
 85 log.Error("數據對比出錯:maxCreateTime != maxCreateTimeHis。 未執行最後的删除數據!!!請開發人員核對數據。");
 86 log.Information("maxCreateTime = " + maxCreateTime.ToString("yyyy-MM-dd HH:mm:ss.fff"));
 87 log.Information("maxCreateTimeHis = " + maxCreateTimeHis.ToString("yyyy-MM-dd HH:mm:ss.fff"));
 88  }
 89
 90 int endCount = GetRowCount(whichDB); //當前數據庫數據遷移後的記錄行數
 91 int endCountHis = GetRowCount(whichDBHis); //曆史數據庫數據遷移後的記錄行數
 92 log.Information($"遷移前GLPS_APIREQUEST的數據條數:{beginCount} , 遷移後數據條數:{endCount}");
 93 log.Information($"遷移前GLPS_APIREQUESTHis的數據條數:{beginCountHis} , 遷移後數據條數:{endCountHis}");
 94
 95  swAll.Stop();
 96 log.Information($"swAll:遷移總耗時:{swAll.ElapsedMilliseconds} 毫秒。");
 97 log.Information("------------------------------------------------");
 98
 99 return newCount;
100 }

 

中間用到的幾個單獨邏輯的方法 

 1 /// <summary>
 2 /// 獲取總記錄行數
 3 /// </summary>
 4 /// <param name="whichDB"></param>
 5 /// <returns></returns>
 6 private static int GetRowCount(EnumWhichDB whichDB)
 7 {
 8 string connStr = DBConnectionString.GetConnStr(whichDB);
 9 int totalCount = 0;
10 using (var db = new GlpsDbContext(connStr))
11  {
12 totalCount = db.DS_ApiRequest.Count();
13  }
14 return totalCount;
15 }
16
17
18 /// <summary>
19 /// 查詢數據庫中最大的FCreateTime
20 /// </summary>
21 /// <param name="whichDB"></param>
22 /// <returns></returns>
23 private static DateTime GetMaxCreateTime(EnumWhichDB whichDB)
24 {
25 string connStr = DBConnectionString.GetConnStr(whichDB);
26 DateTime maxCreateTime = DateTime.MinValue;
27 using (var db = new GlpsDbContext(connStr))
28  {
29 //查詢曆史數據庫,最大的FCREATETIME
30 var record = db.DS_ApiRequest.OrderByDescending(t => t.FCREATETIME).Take(1).SingleOrDefault();
31 if (record != null)
32  {
33 maxCreateTime = record.FCREATETIME;
34  }
35  }
36 return maxCreateTime;
37 }
38
39 /// <summary>
40 /// 按創建時間從小到大排序(FCreateTime Asc),取前N條數據的最大FCreateTime
41 /// 【即:取最早N條數據中,最大的創建時間】
42 /// </summary>
43 /// <param name="whichDB"></param>
44 /// <param name="topN"></param>
45 /// <returns></returns>
46 private static DateTime GetAscMaxCreateTimeTopN(EnumWhichDB whichDB,int topN)
47 {
48 string connStr = DBConnectionString.GetConnStr(whichDB);
49 DateTime maxCreateTime = DateTime.MinValue;
50 using (var db = new GlpsDbContext(connStr))
51  {
52 //查詢totalCount對應最大的FCREATETIME
53 var record = db.DS_ApiRequest.OrderBy(t => t.FCREATETIME)
54 .Skip(topN - 1).Take(1).SingleOrDefault();
55 if (record != null)
56  {
57 maxCreateTime = record.FCREATETIME;
58  }
59  }
60 return maxCreateTime;
61 }
62
63 /// <summary>
64 /// 删除小於等於【某個創建時間】的數據
65 /// </summary>
66 /// <param name="whichDB"></param>
67 /// <param name="maxCreateTime"></param>
68 /// <returns>删除記錄數</returns>
69 private static int DeleteByFCreateTime(EnumWhichDB whichDB, DateTime maxCreateTime)
70 {
71 int rowCount = 0;
72 string connStr = DBConnectionString.GetConnStr(whichDB);
73 using (var db = new GlpsDbContext(connStr))
74  {
75 List<SqlParameter> listParams = new List<SqlParameter>{
76 new SqlParameter("FCREATETIME", maxCreateTime)
77  };
78 rowCount = db.Database.ExecuteSqlRaw(@"delete from GLPS_APIREQUEST where FCREATETIME<[email protected]", listParams);
79  }
80 return rowCount;
81 }

 

5、大數據量循環執行的邏輯 

 1 /// <summary>
 2 /// 曆史數據遷移【分批遷移】
 3 /// </summary>
 4 /// <param name="totalCount">任務總遷移條數</param>
 5 /// <param name="prePageCount">分批遷移,單次查詢數量</param>
 6 public static void DataToHis(int totalCount=100,int prePageCount=10)
 7 {
 8 //臨界值設定
 9 totalCount = totalCount > 1000000 ? 1000000 : totalCount; //單次任務100萬
10 prePageCount = prePageCount > 10000 ? 10000 : prePageCount; //每次1萬
11
12 //Serialog 記錄日志
13 var logFileName = string.Format("ToHis_ApiRequest_{0}.txt", DateTime.Now.ToString("yyyyMMddHH"));
14 var log = new LoggerConfiguration()
15  .WriteTo.Console()
16  .WriteTo.File(logFileName)
17  .CreateLogger();
18
19 Stopwatch swAll = new Stopwatch(); //總耗時
20  swAll.Start();
21
22 log.Information($"本次計劃總遷移數據條數:{totalCount},分批單次執行條數:{prePageCount}");
23 int okCount = 0; //遷移成功的條數
24 int runTimes = 0;
25 while (okCount < totalCount)
26  {
27 runTimes++;
28 if (runTimes % 20 == 0)
29  {
30 Console.Clear(); //每執行20次的時候,清除控制臺
31 Console.WriteLine("控制臺已被清理。");
32  }
33 if (totalCount - okCount < prePageCount)
34  {
35 prePageCount = totalCount - okCount; //最後一次如果沒有一頁數據,只遷移部分
36 if (prePageCount == 1) break; //如果是1條的話,日期精度容易出現問題,特意控制不執行
37  }
38 okCount += DataToHisExecOne(prePageCount);
39 log.Information(@"已執行累計條數{0},累計耗時:{1}分{2}秒{3},累計執行次數{4}", okCount, swAll.Elapsed.TotalMinutes, swAll.Elapsed.Seconds, swAll.Elapsed.Milliseconds, runTimes);
40  }
41
42  swAll.Stop();
43 log.Information($"本次實際總遷移數據條數:{okCount},共分批執行次數:{runTimes}");
44 log.Information("================================================");
45
46 }

 

6、在appsettings.json 做一些基礎配置

配置的目的是為了方便執行,免得改程序。

1 "TyUseEnv": "0", //使用環境(0:測試環境;1:正式環境(沙箱))
2 "TyTopN": "10", //每次遷移的數據條數
3 "TyTotalCount": "25", //分批總遷移數據量
4 "TyForTable": "GLPS_APIREQUEST", //需要遷移數據的錶名(測試已支持:GLPS_APIREQUEST、GLPS_GATEENTRYREC)

 

其他的錶以此類推,可以進行多錶切換。

在控制臺Main方法中通過依賴注入的方式,動態實例化要遷移的錶結構。

曆史數據遷移,只要支持此接口(IExecDataToHis)即可,說白就2個遷移方法而已:單次執行和分批循環執行。

 

其他的錶以此類推,可以進行多錶切換。

在控制臺Main方法中通過依賴注入的方式,動態實例化要遷移的錶結構。

曆史數據遷移,只要支持此接口(IExecDataToHis)即可,說白就2個遷移方法而已:單次執行和分批循環執行。

 1 using Twi.NET5.Core;
 2
 3 namespace Tyingtech_glps.Interface.HDM
 4 {
 5 /// <summary>
 6 /// 可執行的數據遷移接口(單個和分批)
 7 /// </summary>
 8 public interface IExecDataToHis : IWhoAmI
 9  {
10 /// <summary>
11 /// 曆史數據遷移【分批遷移】
12 /// </summary>
13 /// <param name="totalCount">任務總遷移條數</param>
14 /// <param name="perPageCount">分批遷移,單次查詢每頁數量</param>
15 public TwiReturnBase DataToHis(int totalCount = 10000, int perPageCount = 1000);
16
17 /// <summary>
18 /// 曆史數據遷移【一次性遷移】
19 /// </summary>
20 /// <param name="n">數據遷移量(條數) top N</param>
21 public TwiReturnBase DataToHisExecOne(int n);
22  }
23 }

TwiReturnBase 就是一個統一封裝的返回類型,不用管它。

 

7、控制臺運行界面參考

最後exe程序的界面效果如下。

 然後就是執行命令了,單次遷移輸入1,分批執行遷移輸入2。

遷移哪張錶,每次單次遷移多少條,總共遷移多少數據量,都可在appsettings.json中配置。

 

8、單次執行效果

直接輸入命令1,開始執行。

 

(哈哈,看來我直接本地電腦還是非常卡的,不過不影響思路的效果,等最後我們換臺測試服務器看看1000萬數據的效果。)

 

9、分配執行效果

自動執行多次的效果,只要配置好,就會自動執行,分配執行不影響性能。

如果中間報錯,會自動停止執行。

 

10、大數據量的測試效果

我們換臺測試服務器,用此方法測試下有數據在跑的沙箱環境一千萬數據量的效果,每天源源不斷執行,測試遷移了快939萬的數據問題都不大。

 

由於沙箱環境中也有數據在跑,所以業務數據庫是實時增加數據量的。

 

11、此方法的優缺點

這個思路能解决哪怕是生產環境都可以用,有事沒事遷移點數據量,對數據庫性能沒有多大影響,而且支持不同數據庫的遷移,比如SQL SERVER 到MySQL等等,但是缺點就是遷移起來比較慢些。適用於小企業小項目的常見場景,這個思路基本就够了。

【以後有空我還會再開篇文章講別的大數據遷移思路】

當然最後要說的是,這個例子不是很靠譜,為什麼這麼說呢?首先這個是硬編碼方式,局限性太大,如果要通用的話,程序應該再抽下,直接配置錶名相關的即可。

實現通用工具模式,任何時候無需修改代碼,直接簡單改配置即可。這個我們下期教程繼續改進思路。

 

版权声明:本文为[飛天心宏]所创,转载请带上原文链接,感谢。 https://gsmany.com/2021/09/20210919102903480z.html