大概是我這邊在處理隊(duì)列數(shù)據(jù)時(shí)需要判斷表中是否存在,不存在就向表中插入數(shù)據(jù),存在的話就更新這條數(shù)據(jù),這個(gè)邏輯在單進(jìn)程下正常,但是多進(jìn)程下,會(huì)出現(xiàn)重復(fù)入庫(kù)的問(wèn)題。
下面代碼:以day為條件查詢(xún)是否存在了當(dāng)天的統(tǒng)計(jì)記錄,存在就更新統(tǒng)計(jì),不存在就新增一條當(dāng)天的統(tǒng)計(jì)記錄,但是多個(gè)進(jìn)程下,好多個(gè)進(jìn)程取到的都是當(dāng)天不同時(shí)刻的數(shù)據(jù),我這邊判斷只能以日期判斷,造成數(shù)據(jù)重復(fù)入庫(kù)問(wèn)題。想過(guò)在表中加入唯一索引,這樣寫(xiě)入時(shí)會(huì)拋出異常,當(dāng)前數(shù)據(jù)重回隊(duì)列等待下次消費(fèi),但是x次后數(shù)據(jù)就被丟棄了,對(duì)這個(gè)當(dāng)日統(tǒng)計(jì)記錄來(lái)說(shuō)會(huì)存在丟數(shù)據(jù)的風(fēng)險(xiǎn),求大佬給指點(diǎn)下。
$row = Db::table('statistic')->where('day', $day)->first();
if ($row) {
$data = [
'count' => $row->count + 1,
'cost' => $row->cost + $costTime,
'success_count' => $row->success_count + ($success ? 1 : 0),
'error_count' => $row->error_count + ($success ? 0 : 1),
];
Db::table('statistic')->where('day', $day)->update($data);
} else {
$data = [
'day' => $day,
'count' => 1,
'cost' => $costTime,
'success_count' => $success ? 1 : 0,
'error_count' => $success ? 0 : 1,
];
Db::table('statistic')->insert($data);
}
本問(wèn)題不在討論,感覺(jué)是彎路,但是下面大佬關(guān)于鎖的指點(diǎn)確實(shí)很有啟發(fā),大家看到了只學(xué)習(xí)大佬的思路,不要看我那個(gè)代碼邏輯了,誤人子弟
不知道咋搞了,我看了下,大概有13個(gè)統(tǒng)計(jì)表,要是都提前建好,我就不太需要隊(duì)列處理了,被關(guān)鎖的話幾乎就把多進(jìn)程強(qiáng)制單進(jìn)程了,那個(gè)加唯一索引 異常處理貌似可以,我測(cè)試下
$transferData = self::$transferData;
$project = $transferData['project'];
$time = date('Y-m-d H:i:s');
try{
Db::table('project')->insert([
'project' => $transferData['project'],
'created_at' => $time,
'updated_at' => $time
]);
}catch (\Throwable $th){
Db::table('project')->where('project', $project)->update([
'updated_at' => $time
]);
}
表中加入了唯一索引,代碼調(diào)成這種后會(huì)出現(xiàn)mysql死鎖的情況
SQLSTATE[40001]: Serialization failure: 1213 Deadlock found when trying to get lock; try restarting transaction (SQL: update project
set updated_at
= 2023-08-10 12:10:37 where project
= 營(yíng)銷(xiāo)系統(tǒng))
安裝插件:http://www.wtbis.cn/plugin/55
創(chuàng)建統(tǒng)計(jì)鎖
/**
* 統(tǒng)計(jì)鎖
* @method static LockInterface lock(string $day, ?float $ttl = null, ?bool $autoRelease = null) 數(shù)據(jù)庫(kù)操作鎖
*/
class StatisticLocker extends Locker
{
}
使用鎖
//取鎖:有效期3秒,自釋放
$lock = \app\services\lock\StatisticLocker::lock($day, 3, true);
//多進(jìn)程阻塞等待
if ($lock->acquire(true)) {
$row = Db::table('statistic')->where('day', $day)->first();
if ($row) {
$data = [
'count' => $row->count + 1,
'cost' => $row->cost + $costTime,
'success_count' => $row->success_count + ($success ? 1 : 0),
'error_count' => $row->error_count + ($success ? 0 : 1),
];
Db::table('statistic')->where('day', $day)->update($data);
} else {
$data = [
'day' => $day,
'count' => 1,
'cost' => $costTime,
'success_count' => $success ? 1 : 0,
'error_count' => $success ? 0 : 1,
];
Db::table('statistic')->insert($data);
}
}
//代碼執(zhí)行到這里,會(huì)立刻自動(dòng)釋放鎖
class SyncStatisticsToMysql implements Consumer
{
// 要消費(fèi)的隊(duì)列名
public string $queue = 'sync-statistics-to-mysql';
// 連接名,對(duì)應(yīng) plugin/webman/redis-queue/redis.php 里的連接`
public string $connection = 'default';
// 消費(fèi)的數(shù)據(jù)
public static array $transferData = [];
// 消費(fèi)
public function consume($data)
{
self::$transferData = $data;
Db::beginTransaction();
try {
// 組裝數(shù)據(jù)
self::assembleData();
// 應(yīng)用
self::project();
// 調(diào)用記錄
self::tracing();
// 總統(tǒng)計(jì)
self::statistic();
// 應(yīng)用總統(tǒng)計(jì)
self::statisticProject();
// 應(yīng)用每分鐘統(tǒng)計(jì)
self::statisticProjectInterval();
// 應(yīng)用調(diào)用統(tǒng)計(jì)
self::statisticProjectTransfer();
// 應(yīng)用調(diào)用每分鐘統(tǒng)計(jì)
self::statisticProjectTransferInterval();
// 應(yīng)用狀態(tài)碼統(tǒng)計(jì)
self::statisticProjectCode();
// 應(yīng)用狀態(tài)碼每分鐘統(tǒng)計(jì)
self::statisticProjectCodeInterval();
// 應(yīng)用IP統(tǒng)計(jì)
self::statisticProjectIp();
// 應(yīng)用IP每分鐘統(tǒng)計(jì)
self::statisticProjectIpInterval();
// 應(yīng)用IP狀態(tài)碼統(tǒng)計(jì)
self::statisticProjectIpCode();
// 應(yīng)用IP狀態(tài)碼每分鐘統(tǒng)計(jì)
self::statisticProjectIpCodeInterval();
// 應(yīng)用IP調(diào)用統(tǒng)計(jì)
self::statisticProjectIpTransfer();
// 應(yīng)用IP調(diào)用每分鐘統(tǒng)計(jì)
self::statisticProjectIpTransferInterval();
Db::commit();
} catch (\Throwable $th) {
Db::rollBack();
var_dump($th->getMessage());
throw new ServerErrorHttpException($th->getMessage());
}
}
/**
* 組裝數(shù)據(jù)
*/
protected static function assembleData()
{
$transferData = self::$transferData;
$names = explode('-', $transferData['project']);
$transferData['project'] = $names[0];
$transferData['type'] = $names[1] ?? 'All';
$timer = strtotime($transferData['time']);
$transferData['day'] = date('Ymd', $timer);
$transferData['minute'] = date('YmdHi', $timer);
$transferData['trace'] = uniqid();
self::$transferData = $transferData;
}
/**
* 應(yīng)用
*/
protected static function project()
{
$transferData = self::$transferData;
$project = $transferData['project'];
$time = date('Y-m-d H:i:s');
try {
Db::table('project')->insert([
'project' => $transferData['project'],
'created_at' => $time,
'updated_at' => $time
]);
} catch (\PDOException | \Exception $e) {
Db::table('project')->where('project', $project)->update([
'updated_at' => $time
]);
}
}
/**
* 調(diào)用記錄
*/
protected static function tracing()
{
$transferData = self::$transferData;
$insert = [
'day' => $transferData['day'],
'time' => $transferData['time'],
'trace' => $transferData['trace'],
'project' => $transferData['project'],
'type' => $transferData['type'],
'ip' => $transferData['ip'],
'transfer' => $transferData['transfer'],
'cost_time' => $transferData['costTime'],
'success' => $transferData['success'],
'code' => $transferData['code'],
'details' => $transferData['details'],
];
Db::table('tracing')->insert($insert);
}
/**
* 總統(tǒng)計(jì)
*/
public static function statistic()
{
$transferData = self::$transferData;
$day = $transferData['day'];
$success = 1 === $transferData['success'];
$costTime = $transferData['costTime'];
try {
$data = [
'day' => $day,
'count' => 1,
'cost' => $costTime,
'success_count' => $success ? 1 : 0,
'error_count' => $success ? 0 : 1,
];
Db::table('statistic')->insert($data);
} catch (\PDOException | \Exception $e) {
$data = [
'count' => Db::raw('count + 1'),
'cost' => Db::raw("cost + {$costTime}"),
];
if($success){
$data['success_count'] = Db::raw('success_count + 1');
}else{
$data['error_count'] = Db::raw('error_count + 1');
}
Db::table('statistic')->where('day', $day)
->update($data);
}
}
/**
* 應(yīng)用總統(tǒng)計(jì)
*/
public static function statisticProject()
{
$transferData = self::$transferData;
$day = $transferData['day'];
$project = $transferData['project'];
$type = $transferData['type'];
$success = 1 === $transferData['success'];
$costTime = $transferData['costTime'];
try {
$data = [
'day' => $day,
'project' => $project,
'type' => $type,
'count' => 1,
'cost' => $costTime,
'success_count' => $success ? 1 : 0,
'error_count' => $success ? 0 : 1,
];
Db::table('statistic_project')->insert($data);
} catch (\PDOException | \Exception $e) {
$where = [
['day', '=', $day],
['project', '=', $project],
['type', '=', $type],
];
$data = [
'count' => Db::raw('count + 1'),
'cost' => Db::raw("cost + {$costTime}"),
];
if($success){
$data['success_count'] = Db::raw('success_count + 1');
}else{
$data['error_count'] = Db::raw('error_count + 1');
}
Db::table('statistic_project')
->where($where)
->update($data);
}
}
/**
* 應(yīng)用每分鐘統(tǒng)計(jì)
*/
public static function statisticProjectInterval()
{
$transferData = self::$transferData;
$day = $transferData['day'];
$minute = $transferData['minute'];
$project = $transferData['project'];
$type = $transferData['type'];
$success = 1 === $transferData['success'];
$costTime = $transferData['costTime'];
try {
$data = [
'day' => $day,
'time' => $minute,
'project' => $project,
'type' => $type,
'count' => 1,
'cost' => $costTime,
'success_count' => $success ? 1 : 0,
'error_count' => $success ? 0 : 1,
];
Db::table('statistic_project_interval')->insert($data);
} catch (\PDOException | \Exception $e) {
$where = [
['day', '=', $day],
['time', '=', $minute],
['project', '=', $project],
['type', '=', $type],
];
$data = [
'count' => Db::raw('count + 1'),
'cost' => Db::raw("cost + {$costTime}"),
];
if($success){
$data['success_count'] = Db::raw('success_count + 1');
}else{
$data['error_count'] = Db::raw('error_count + 1');
}
Db::table('statistic_project_interval')
->where($where)
->update($data);
}
}
/**
* 應(yīng)用調(diào)用統(tǒng)計(jì)
*/
public static function statisticProjectTransfer()
{
$transferData = self::$transferData;
$day = $transferData['day'];
$project = $transferData['project'];
$type = $transferData['type'];
$costTime = $transferData['costTime'];
$transfer = $transferData['transfer'];
$success = 1 === $transferData['success'];
try {
$data = [
'day' => $day,
'project' => $project,
'type' => $type,
'transfer' => $transfer,
'count' => 1,
'cost' => $costTime,
'success_count' => $success ? 1 : 0,
'error_count' => $success ? 0 : 1,
];
Db::table('statistic_project_transfer')->insert($data);
} catch (\PDOException | \Exception $e) {
$where = [
['day', '=', $day],
['project', '=', $project],
['type', '=', $type],
['transfer', '=', $transfer]
];
$data = [
'count' => Db::raw('count + 1'),
'cost' => Db::raw("cost + {$costTime}"),
];
if($success){
$data['success_count'] = Db::raw('success_count + 1');
}else{
$data['error_count'] = Db::raw('error_count + 1');
}
Db::table('statistic_project_transfer')
->where($where)
->update($data);
}
}
/**
* 應(yīng)用調(diào)用每分鐘統(tǒng)計(jì)
*/
public static function statisticProjectTransferInterval()
{
$transferData = self::$transferData;
$day = $transferData['day'];
$minute = $transferData['minute'];
$project = $transferData['project'];
$type = $transferData['type'];
$costTime = $transferData['costTime'];
$transfer = $transferData['transfer'];
$success = 1 === $transferData['success'];
try {
$data = [
'day' => $day,
'time' => $minute,
'project' => $project,
'type' => $type,
'transfer' => $transfer,
'count' => 1,
'cost' => $costTime,
'success_count' => $success ? 1 : 0,
'error_count' => $success ? 0 : 1,
];
Db::table('statistic_project_transfer_interval')->insert($data);
} catch (\PDOException | \Exception $e) {
$where = [
['day', '=', $day],
['time', '=', $minute],
['project', '=', $project],
['type', '=', $type],
['transfer', '=', $transfer]
];
$data = [
'count' => Db::raw('count + 1'),
'cost' => Db::raw("cost + {$costTime}"),
];
if($success){
$data['success_count'] = Db::raw('success_count + 1');
}else{
$data['error_count'] = Db::raw('error_count + 1');
}
Db::table('statistic_project_ip_transfer_interval')
->where($where)
->update($data);
}
}
/**
* 應(yīng)用狀態(tài)碼統(tǒng)計(jì)
*/
public static function statisticProjectCode()
{
$transferData = self::$transferData;
$day = $transferData['day'];
$project = $transferData['project'];
$type = $transferData['type'];
$costTime = $transferData['costTime'];
$code = $transferData['code'];
try {
$data = [
'day' => $day,
'project' => $project,
'type' => $type,
'code' => $code,
'count' => 1,
'cost' => $costTime,
];
Db::table('statistic_project_code')->insert($data);
} catch (\PDOException | \Exception $e) {
$where = [
['day', '=', $day],
['code', '=', $code],
['project', '=', $project],
['type', '=', $type]
];
Db::table('statistic_project_code')
->where($where)
->update([
'count' => Db::raw('count + 1'),
'cost' => Db::raw("cost + {$costTime}"),
]);
}
}
/**
* 應(yīng)用狀態(tài)碼每分鐘統(tǒng)計(jì)
*/
public static function statisticProjectCodeInterval()
{
$transferData = self::$transferData;
$day = $transferData['day'];
$minute = $transferData['minute'];
$project = $transferData['project'];
$type = $transferData['type'];
$costTime = $transferData['costTime'];
$code = $transferData['code'];
try {
$data = [
'day' => $day,
'time' => $minute,
'project' => $project,
'type' => $type,
'code' => $code,
'count' => 1,
'cost' => $costTime,
];
Db::table('statistic_project_code_interval')->insert($data);
} catch (\PDOException | \Exception $e) {
$where = [
['day', '=', $day],
['time', '=', $minute],
['code', '=', $code],
['project', '=', $project],
['type', '=', $type]
];
Db::table('statistic_project_code_interval')
->where($where)
->update([
'count' => Db::raw('count + 1'),
'cost' => Db::raw("cost + {$costTime}"),
]);
}
}
/**
* 應(yīng)用IP統(tǒng)計(jì)
*/
public static function statisticProjectIp()
{
$transferData = self::$transferData;
$day = $transferData['day'];
$project = $transferData['project'];
$type = $transferData['type'];
$costTime = $transferData['costTime'];
$success = 1 === $transferData['success'];
$ip = $transferData['ip'];
try {
$data = [
'day' => $day,
'project' => $project,
'type' => $type,
'ip' => $ip,
'count' => 1,
'cost' => $costTime,
'success_count' => $success ? 1 : 0,
'error_count' => $success ? 0 : 1,
];
Db::table('statistic_project_ip')->insert($data);
} catch (\PDOException | \Exception $e) {
$where = [
['day', '=', $day],
['project', '=', $project],
['type', '=', $type],
['ip', '=', $ip],
];
$data = [
'count' => Db::raw('count + 1'),
'cost' => Db::raw("cost + {$costTime}"),
];
if($success){
$data['success_count'] = Db::raw('success_count + 1');
}else{
$data['error_count'] = Db::raw('error_count + 1');
}
Db::table('statistic_project_ip')
->where($where)
->update($data);
}
}
/**
* 應(yīng)用IP每分鐘統(tǒng)計(jì)
*/
public static function statisticProjectIpInterval()
{
$transferData = self::$transferData;
$day = $transferData['day'];
$minute = $transferData['minute'];
$project = $transferData['project'];
$type = $transferData['type'];
$costTime = $transferData['costTime'];
$success = 1 === $transferData['success'];
$ip = $transferData['ip'];
try {
$data = [
'day' => $day,
'time' => $minute,
'project' => $project,
'type' => $type,
'ip' => $ip,
'count' => 1,
'cost' => $costTime,
'success_count' => $success ? 1 : 0,
'error_count' => $success ? 0 : 1,
];
Db::table('statistic_project_ip_interval')->insert($data);
} catch (\PDOException | \Exception $e) {
$where = [
['day', '=', $day],
['time', '=', $minute],
['project', '=', $project],
['type', '=', $type],
['ip', '=', $ip],
];
$data = [
'count' => Db::raw('count + 1'),
'cost' => Db::raw("cost + {$costTime}"),
];
if($success){
$data['success_count'] = Db::raw('success_count + 1');
}else{
$data['error_count'] = Db::raw('error_count + 1');
}
Db::table('statistic_project_ip_interval')
->where($where)
->update($data);
}
}
/**
* 應(yīng)用IP狀態(tài)碼統(tǒng)計(jì)
*/
public static function statisticProjectIpCode()
{
$transferData = self::$transferData;
$day = $transferData['day'];
$project = $transferData['project'];
$type = $transferData['type'];
$costTime = $transferData['costTime'];
$ip = $transferData['ip'];
$code = $transferData['code'];
try {
$data = [
'day' => $day,
'project' => $project,
'type' => $type,
'ip' => $ip,
'code' => $code,
'count' => 1,
'cost' => $costTime,
];
Db::table('statistic_project_ip_code')->insert($data);
} catch (\PDOException | \Exception $e) {
$where = [
['day', '=', $day],
['code', '=', $code],
['project', '=', $project],
['type', '=', $type],
['ip', '=', $ip]
];
Db::table('statistic_project_ip_code')
->where($where)
->update([
'count' => Db::raw('count + 1'),
'cost' => Db::raw("cost + {$costTime}"),
]);
}
}
/**
* 應(yīng)用IP狀態(tài)碼每分鐘統(tǒng)計(jì)
*/
public static function statisticProjectIpCodeInterval()
{
$transferData = self::$transferData;
$day = $transferData['day'];
$minute = $transferData['minute'];
$project = $transferData['project'];
$type = $transferData['type'];
$costTime = $transferData['costTime'];
$ip = $transferData['ip'];
$code = $transferData['code'];
try {
$data = [
'day' => $day,
'time' => $minute,
'project' => $project,
'type' => $type,
'ip' => $ip,
'code' => $code,
'count' => 1,
'cost' => $costTime,
];
Db::table('statistic_project_ip_code_interval')->insert($data);
} catch (\PDOException | \Exception $e) {
$where = [
['day', '=', $day],
['time', '=', $minute],
['project', '=', $project],
['type', '=', $type],
['ip', '=', $ip],
['code', '=', $code],
];
Db::table('statistic_project_ip_code_interval')
->where($where)
->update([
'count' => Db::raw('count + 1'),
'cost' => Db::raw("cost + {$costTime}"),
]);
}
}
/**
* 應(yīng)用IP調(diào)用統(tǒng)計(jì)
*/
public static function statisticProjectIpTransfer()
{
$transferData = self::$transferData;
$day = $transferData['day'];
$project = $transferData['project'];
$type = $transferData['type'];
$costTime = $transferData['costTime'];
$ip = $transferData['ip'];
$transfer = $transferData['transfer'];
$success = 1 === $transferData['success'];
try {
$data = [
'day' => $day,
'project' => $project,
'type' => $type,
'ip' => $ip,
'transfer' => $transfer,
'count' => 1,
'cost' => $costTime,
'success_count' => $success ? 1 : 0,
'error_count' => $success ? 0 : 1,
];
Db::table('statistic_project_ip_transfer')->insert($data);
} catch (\PDOException | \Exception $e) {
$where = [
['day', '=', $day],
['project', '=', $project],
['type', '=', $type],
['ip', '=', $ip],
['transfer', '=', $transfer]
];
$data = [
'count' => Db::raw('count + 1'),
'cost' => Db::raw("cost + {$costTime}"),
];
if($success){
$data['success_count'] = Db::raw('success_count + 1');
}else{
$data['error_count'] = Db::raw('error_count + 1');
}
Db::table('statistic_project_ip_transfer')
->where($where)
->update($data);
}
}
/**
* 應(yīng)用IP調(diào)用每分鐘統(tǒng)計(jì)
*/
public static function statisticProjectIpTransferInterval()
{
$transferData = self::$transferData;
$day = $transferData['day'];
$minute = $transferData['minute'];
$project = $transferData['project'];
$type = $transferData['type'];
$costTime = $transferData['costTime'];
$ip = $transferData['ip'];
$transfer = $transferData['transfer'];
$success = 1 === $transferData['success'];
try {
$data = [
'day' => $day,
'time' => $minute,
'project' => $project,
'type' => $type,
'ip' => $ip,
'transfer' => $transfer,
'count' => 1,
'cost' => $costTime,
'success_count' => $success ? 1 : 0,
'error_count' => $success ? 0 : 1,
];
Db::table('statistic_project_ip_transfer_interval')->insert($data);
} catch (\PDOException | \Exception $e) {
$where = [
['day', '=', $day],
['time', '=', $minute],
['project', '=', $project],
['type', '=', $type],
['ip', '=', $ip],
['transfer', '=', $transfer]
];
$data = [
'count' => Db::raw('count + 1'),
'cost' => Db::raw("cost + {$costTime}"),
];
if($success){
$data['success_count'] = Db::raw('success_count + 1');
}else{
$data['error_count'] = Db::raw('error_count + 1');
}
Db::table('statistic_project_ip_transfer_interval')
->where($where)
->update($data);
}
}
}
有個(gè)statistics的統(tǒng)計(jì)系統(tǒng),我抄的這個(gè),但是他的貌似是定時(shí)器加redis實(shí)現(xiàn)的,我想用隊(duì)列的多進(jìn)程
現(xiàn)在是按第一個(gè)大佬加表唯一索引寫(xiě)的代碼,但是多進(jìn)程會(huì)出現(xiàn)mysql死鎖問(wèn)題,單進(jìn)程完全沒(méi)問(wèn)題
你這種寫(xiě)法數(shù)據(jù)庫(kù)的壓力有點(diǎn)大;
我簡(jiǎn)單壓測(cè)了下,執(zhí)行時(shí)間大部分在0.5s以下,個(gè)別在1秒多,電腦4h8g的,確實(shí)很耗時(shí),但是這程序只要能正常運(yùn)行都沒(méi)問(wèn)題,前期都不考慮數(shù)據(jù)庫(kù)壓力了,我擔(dān)心多進(jìn)程如果加入了redis鎖 會(huì)不會(huì)更加耗時(shí)的問(wèn)題
/**
* 統(tǒng)計(jì)鎖
* @method static LockInterface lock(string $day, ?float $ttl = null, ?bool $autoRelease = null) 數(shù)據(jù)庫(kù)操作鎖
* @method static LockInterface project(string $project, ?float $ttl = null, ?bool $autoRelease = null) 應(yīng)用鎖
* @method static LockInterface statistic(string $day, ?float $ttl = null, ?bool $autoRelease = null) 總統(tǒng)計(jì)鎖
* @method static LockInterface statisticProject(string $day_project_type, ?float $ttl = null, ?bool $autoRelease = null) 應(yīng)用總統(tǒng)計(jì)
* @method static LockInterface statisticProjectInterval(string $day_minute_project_type, ?float $ttl = null, ?bool $autoRelease = null) 應(yīng)用每分鐘統(tǒng)計(jì)
* @method static LockInterface statisticProjectTransfer(string $key, ?float $ttl = null, ?bool $autoRelease = null) 應(yīng)用調(diào)用統(tǒng)計(jì)
* @method static LockInterface statisticProjectTransferInterval(string $key, ?float $ttl = null, ?bool $autoRelease = null) 應(yīng)用調(diào)用每分鐘統(tǒng)計(jì)
* @method static LockInterface statisticProjectCode(string $key, ?float $ttl = null, ?bool $autoRelease = null) 應(yīng)用狀態(tài)碼統(tǒng)計(jì)
* @method static LockInterface statisticProjectCodeInterval(string $key, ?float $ttl = null, ?bool $autoRelease = null) 應(yīng)用狀態(tài)碼每分鐘統(tǒng)計(jì)
* @method static LockInterface statisticProjectIp(string $key, ?float $ttl = null, ?bool $autoRelease = null) 應(yīng)用IP統(tǒng)計(jì)
* @method static LockInterface statisticProjectIpInterval(string $key, ?float $ttl = null, ?bool $autoRelease = null) 應(yīng)用IP每分鐘統(tǒng)計(jì)
* @method static LockInterface statisticProjectIpCode(string $key, ?float $ttl = null, ?bool $autoRelease = null) 應(yīng)用IP狀態(tài)碼統(tǒng)計(jì)
* @method static LockInterface statisticProjectIpCodeInterval(string $key, ?float $ttl = null, ?bool $autoRelease = null) 應(yīng)用IP狀態(tài)碼每分鐘統(tǒng)計(jì)
* @method static LockInterface statisticProjectIpTransfer(string $key, ?float $ttl = null, ?bool $autoRelease = null) 應(yīng)用IP調(diào)用統(tǒng)計(jì)
* @method static LockInterface statisticProjectIpTransferInterval(string $key, ?float $ttl = null, ?bool $autoRelease = null) 應(yīng)用IP調(diào)用每分鐘統(tǒng)計(jì)
*/
class StatisticLocker extends Locker
{
}
// 提高性能
if (\support\Redis::exists($key)) {
goto update_db;
}
//取鎖:有效期3秒,自釋放
$lock = \app\services\lock\StatisticLocker::lock($day, 3, true);
//多進(jìn)程阻塞等待
if ($lock->acquire(true)) {
$row = Db::table('statistic')->where('day', $day)->first();
if ($row) {
update_db:
//這里不要用查詢(xún)出的遞增字段的值,因?yàn)槎噙M(jìn)程下存在并發(fā),數(shù)據(jù)會(huì)不準(zhǔn)?。? //最佳實(shí)踐,根據(jù)查詢(xún)出的數(shù)據(jù),拼接$where,而$data使用數(shù)據(jù)庫(kù)原生語(yǔ)法
Db::table('statistic')->where($where)->update($data);
} else {
$data = [
'day' => $day,
'count' => 1,
'cost' => $costTime,
'success_count' => $success ? 1 : 0,
'error_count' => $success ? 0 : 1,
];
Db::table('statistic')->insert($data);
//設(shè)置標(biāo)志
\support\Redis::setEx($key, 300, time());
}
}
//代碼執(zhí)行到這里,會(huì)立刻自動(dòng)釋放鎖
執(zhí)行流程:
1、標(biāo)志位的key,自己根據(jù)業(yè)務(wù)組裝;
2、不存在標(biāo)志位時(shí),多進(jìn)程阻塞等待 獲取鎖;
3、插入成功后,設(shè)置標(biāo)志位;
4、隊(duì)列再有數(shù)據(jù)進(jìn)來(lái),存在標(biāo)志位時(shí),多進(jìn)程同步更新;
<?php
namespace app\queue\redis;
use app\services\lock\StatisticLocker;
use support\Db;
use support\Redis;
use Tinywan\ExceptionHandler\Exception\ServerErrorHttpException;
use Webman\RedisQueue\Consumer;
class SyncStatisticsToMysql implements Consumer
{
// 要消費(fèi)的隊(duì)列名
public string $queue = 'sync-statistics-to-mysql';
// 連接名,對(duì)應(yīng) plugin/webman/redis-queue/redis.php 里的連接`
public string $connection = 'default';
// 消費(fèi)的數(shù)據(jù)
public static array $transferData = [];
public static $num = 0;
// 消費(fèi)
public function consume($data)
{
$startTime = microtime(true);
self::$transferData = $data;
try {
// 組裝數(shù)據(jù)
self::assembleData();
// 調(diào)用記錄
self::tracing();
// 應(yīng)用
self::project();
// 總統(tǒng)計(jì)
self::statistic();
// 應(yīng)用總統(tǒng)計(jì)
self::statisticProject();
// 應(yīng)用每分鐘統(tǒng)計(jì)
self::statisticProjectInterval();
// 應(yīng)用調(diào)用統(tǒng)計(jì)
self::statisticProjectTransfer();
// 應(yīng)用調(diào)用每分鐘統(tǒng)計(jì)
self::statisticProjectTransferInterval();
// 應(yīng)用狀態(tài)碼統(tǒng)計(jì)
self::statisticProjectCode();
// 應(yīng)用狀態(tài)碼每分鐘統(tǒng)計(jì)
self::statisticProjectCodeInterval();
// 應(yīng)用IP統(tǒng)計(jì)
self::statisticProjectIp();
// 應(yīng)用IP每分鐘統(tǒng)計(jì)
self::statisticProjectIpInterval();
// 應(yīng)用IP狀態(tài)碼統(tǒng)計(jì)
self::statisticProjectIpCode();
// 應(yīng)用IP狀態(tài)碼每分鐘統(tǒng)計(jì)
self::statisticProjectIpCodeInterval();
// 應(yīng)用IP調(diào)用統(tǒng)計(jì)
self::statisticProjectIpTransfer();
//應(yīng)用IP調(diào)用每分鐘統(tǒng)計(jì)
self::statisticProjectIpTransferInterval();
} catch (\Throwable $th) {
var_dump($th->getMessage());
throw new ServerErrorHttpException($th->getMessage());
}
$endTime = microtime(true);
self::$num += $endTime - $startTime;
var_dump(self::$num);
}
/**
* 組裝數(shù)據(jù)
*/
protected static function assembleData()
{
$transferData = self::$transferData;
$names = explode('-', $transferData['project']);
$transferData['project'] = $names[0];
$transferData['type'] = $names[1] ?? 'All';
$timer = strtotime($transferData['time']);
$transferData['day'] = date('Ymd', $timer);
$transferData['minute'] = date('YmdHi', $timer);
$transferData['trace'] = uniqid();
$transferData['project_key'] = md5($transferData['project']);
$transferData['statistic_key'] = md5($transferData['day']);
$transferData['statistic_project_key'] = md5($transferData['day'] . $transferData['project'] . $transferData['type']);
$transferData['statistic_project_interval_key'] = md5($transferData['day'] . $transferData['project'] . $transferData['type'] . $transferData['minute']);
$transferData['statistic_project_transfer_key'] = md5($transferData['day'] . $transferData['project'] . $transferData['type'] . $transferData['transfer']);
$transferData['statistic_project_transfer_interval_key'] = md5($transferData['day'] . $transferData['project'] . $transferData['type'] . $transferData['transfer'] . $transferData['minute']);
$transferData['statistic_project_code_key'] = md5($transferData['day'] . $transferData['project'] . $transferData['type'] . $transferData['code']);
$transferData['statistic_project_code_interval_key'] = md5($transferData['day'] . $transferData['project'] . $transferData['type'] . $transferData['code'] . $transferData['minute']);
$transferData['statistic_project_ip_key'] = md5($transferData['day'] . $transferData['project'] . $transferData['type'] . $transferData['ip']);
$transferData['statistic_project_ip_interval_key'] = md5($transferData['day'] . $transferData['project'] . $transferData['type'] . $transferData['ip'] . $transferData['minute']);
$transferData['statistic_project_ip_code_key'] = md5($transferData['day'] . $transferData['project'] . $transferData['type'] . $transferData['ip'] . $transferData['code']);
$transferData['statistic_project_ip_code_interval_key'] = md5($transferData['day'] . $transferData['project'] . $transferData['type'] . $transferData['ip'] . $transferData['code'] . $transferData['minute']);
$transferData['statistic_project_ip_transfer_key'] = md5($transferData['day'] . $transferData['project'] . $transferData['type'] . $transferData['ip'] . $transferData['transfer']);
$transferData['statistic_project_ip_transfer_interval_key'] = md5($transferData['day'] . $transferData['project'] . $transferData['type'] . $transferData['ip'] . $transferData['transfer'] . $transferData['minute']);
self::$transferData = $transferData;
}
/**
* 調(diào)用記錄
*/
protected static function tracing()
{
$transferData = self::$transferData;
$insert = [
'day' => $transferData['day'],
'time' => $transferData['time'],
'trace' => $transferData['trace'],
'project' => $transferData['project'],
'type' => $transferData['type'],
'ip' => $transferData['ip'],
'transfer' => $transferData['transfer'],
'cost_time' => $transferData['costTime'],
'success' => $transferData['success'],
'code' => $transferData['code'],
'details' => $transferData['details'],
];
Db::table('tracing')->insert($insert);
}
/**
* 應(yīng)用
*/
protected static function project()
{
$transferData = self::$transferData;
$project = $transferData['project'];
$time = date('Y-m-d H:i:s');
$key = $transferData['project_key'];
if (Redis::exists('key:' . $key)) {
goto update_db;
}
//取鎖:有效期3秒,自釋放
$lock = StatisticLocker::lock($key, 3, true);
//多進(jìn)程阻塞等待
if ($lock->acquire(true)) {
$row = Db::table('project')->where('key', $key)->first();
if ($row) {
update_db:
//這里不要用查詢(xún)出的遞增字段的值,因?yàn)槎噙M(jìn)程下存在并發(fā),數(shù)據(jù)會(huì)不準(zhǔn)!!
//最佳實(shí)踐,根據(jù)查詢(xún)出的數(shù)據(jù),拼接$where,而$data使用數(shù)據(jù)庫(kù)原生語(yǔ)法
Db::table('project')->where('key', $key)->update([
'updated_at' => $time
]);
} else {
$data = [
'key' => $key,
'project' => $project,
'created_at' => $time,
'updated_at' => $time
];
Db::table('project')->insert($data);
//設(shè)置標(biāo)志
Redis::setEx('key:' . $key, 300, time());
}
}
//代碼執(zhí)行到這里,會(huì)立刻自動(dòng)釋放鎖
}
/**
* 總統(tǒng)計(jì)
*/
public static function statistic()
{
$transferData = self::$transferData;
$day = $transferData['day'];
$success = 1 === $transferData['success'];
$costTime = $transferData['costTime'];
$key = $transferData['statistic_key'];
if (Redis::exists('key:' . $key)) {
goto update_db;
}
//取鎖:有效期3秒,自釋放
$lock = StatisticLocker::lock($key, 3, true);
//多進(jìn)程阻塞等待
if ($lock->acquire(true)) {
$row = Db::table('statistic')->where('key', $key)->first();
if ($row) {
update_db:
//這里不要用查詢(xún)出的遞增字段的值,因?yàn)槎噙M(jìn)程下存在并發(fā),數(shù)據(jù)會(huì)不準(zhǔn)??!
//最佳實(shí)踐,根據(jù)查詢(xún)出的數(shù)據(jù),拼接$where,而$data使用數(shù)據(jù)庫(kù)原生語(yǔ)法
$data = [
'count' => Db::raw('count + 1'),
'cost' => Db::raw("cost + {$costTime}"),
];
if ($success) {
$data['success_count'] = Db::raw('success_count + 1');
} else {
$data['error_count'] = Db::raw('error_count + 1');
}
Db::table('statistic')->where('key', $key)
->update($data);
} else {
$data = [
'key' => $key,
'day' => $day,
'count' => 1,
'cost' => $costTime,
'success_count' => $success ? 1 : 0,
'error_count' => $success ? 0 : 1,
];
Db::table('statistic')->insert($data);
//設(shè)置標(biāo)志
Redis::setEx('key:' . $key, 300, time());
}
}
}
/**
* 應(yīng)用總統(tǒng)計(jì)
*/
public static function statisticProject()
{
$transferData = self::$transferData;
$day = $transferData['day'];
$project = $transferData['project'];
$type = $transferData['type'];
$success = 1 === $transferData['success'];
$costTime = $transferData['costTime'];
$key = $transferData['statistic_project_key'];
if (Redis::exists('key:' . $key)) {
goto update_db;
}
//取鎖:有效期3秒,自釋放
$lock = StatisticLocker::lock($key, 3, true);
//多進(jìn)程阻塞等待
if ($lock->acquire(true)) {
$row = Db::table('statistic_project')->where('key', $key)->first();
if ($row) {
update_db:
//這里不要用查詢(xún)出的遞增字段的值,因?yàn)槎噙M(jìn)程下存在并發(fā),數(shù)據(jù)會(huì)不準(zhǔn)??!
//最佳實(shí)踐,根據(jù)查詢(xún)出的數(shù)據(jù),拼接$where,而$data使用數(shù)據(jù)庫(kù)原生語(yǔ)法
$data = [
'count' => Db::raw('count + 1'),
'cost' => Db::raw("cost + {$costTime}"),
];
if ($success) {
$data['success_count'] = Db::raw('success_count + 1');
} else {
$data['error_count'] = Db::raw('error_count + 1');
}
Db::table('statistic_project')
->where('key', $key)
->update($data);
} else {
$data = [
'key' => $key,
'day' => $day,
'project' => $project,
'type' => $type,
'count' => 1,
'cost' => $costTime,
'success_count' => $success ? 1 : 0,
'error_count' => $success ? 0 : 1,
];
Db::table('statistic_project')->insert($data);
//設(shè)置標(biāo)志
Redis::setEx('key:' . $key, 300, time());
}
}
}
/**
* 應(yīng)用每分鐘統(tǒng)計(jì)
*/
public static function statisticProjectInterval()
{
$transferData = self::$transferData;
$day = $transferData['day'];
$minute = $transferData['minute'];
$project = $transferData['project'];
$type = $transferData['type'];
$success = 1 === $transferData['success'];
$costTime = $transferData['costTime'];
$key = $transferData['statistic_project_interval_key'];
if (Redis::exists('key:' . $key)) {
goto update_db;
}
//取鎖:有效期3秒,自釋放
$lock = StatisticLocker::lock($key, 3, true);
//多進(jìn)程阻塞等待
if ($lock->acquire(true)) {
$row = Db::table('statistic_project_interval')->where('key', $key)->first();
if ($row) {
update_db:
//這里不要用查詢(xún)出的遞增字段的值,因?yàn)槎噙M(jìn)程下存在并發(fā),數(shù)據(jù)會(huì)不準(zhǔn)??!
//最佳實(shí)踐,根據(jù)查詢(xún)出的數(shù)據(jù),拼接$where,而$data使用數(shù)據(jù)庫(kù)原生語(yǔ)法
$data = [
'count' => Db::raw('count + 1'),
'cost' => Db::raw("cost + {$costTime}"),
];
if ($success) {
$data['success_count'] = Db::raw('success_count + 1');
} else {
$data['error_count'] = Db::raw('error_count + 1');
}
Db::table('statistic_project_interval')
->where('key', $key)
->update($data);
} else {
$data = [
'key' => $key,
'day' => $day,
'time' => $minute,
'project' => $project,
'type' => $type,
'count' => 1,
'cost' => $costTime,
'success_count' => $success ? 1 : 0,
'error_count' => $success ? 0 : 1,
];
Db::table('statistic_project_interval')->insert($data);
//設(shè)置標(biāo)志
Redis::setEx('key:' . $key, 300, time());
}
}
}
/**
* 應(yīng)用調(diào)用統(tǒng)計(jì)
*/
public static function statisticProjectTransfer()
{
$transferData = self::$transferData;
$day = $transferData['day'];
$project = $transferData['project'];
$type = $transferData['type'];
$costTime = $transferData['costTime'];
$transfer = $transferData['transfer'];
$success = 1 === $transferData['success'];
$key = $transferData['statistic_project_transfer_key'];
if (Redis::exists('key:' . $key)) {
goto update_db;
}
//取鎖:有效期3秒,自釋放
$lock = StatisticLocker::lock($key, 3, true);
//多進(jìn)程阻塞等待
if ($lock->acquire(true)) {
$row = Db::table('statistic_project_transfer')->where('key', $key)->first();
if ($row) {
update_db:
//這里不要用查詢(xún)出的遞增字段的值,因?yàn)槎噙M(jìn)程下存在并發(fā),數(shù)據(jù)會(huì)不準(zhǔn)!!
//最佳實(shí)踐,根據(jù)查詢(xún)出的數(shù)據(jù),拼接$where,而$data使用數(shù)據(jù)庫(kù)原生語(yǔ)法
$data = [
'count' => Db::raw('count + 1'),
'cost' => Db::raw("cost + {$costTime}"),
];
if ($success) {
$data['success_count'] = Db::raw('success_count + 1');
} else {
$data['error_count'] = Db::raw('error_count + 1');
}
Db::table('statistic_project_transfer')
->where('key', $key)
->update($data);
} else {
$data = [
'key' => $key,
'day' => $day,
'project' => $project,
'type' => $type,
'transfer' => $transfer,
'count' => 1,
'cost' => $costTime,
'success_count' => $success ? 1 : 0,
'error_count' => $success ? 0 : 1,
];
Db::table('statistic_project_transfer')->insert($data);
//設(shè)置標(biāo)志
Redis::setEx('key:' . $key, 300, time());
}
}
}
/**
* 應(yīng)用調(diào)用每分鐘統(tǒng)計(jì)
*/
public static function statisticProjectTransferInterval()
{
$transferData = self::$transferData;
$day = $transferData['day'];
$minute = $transferData['minute'];
$project = $transferData['project'];
$type = $transferData['type'];
$costTime = $transferData['costTime'];
$transfer = $transferData['transfer'];
$success = 1 === $transferData['success'];
$key = $transferData['statistic_project_transfer_interval_key'];
if (Redis::exists('key:' . $key)) {
goto update_db;
}
//取鎖:有效期3秒,自釋放
$lock = StatisticLocker::lock($key, 3, true);
//多進(jìn)程阻塞等待
if ($lock->acquire(true)) {
$row = Db::table('statistic_project_transfer_interval')->where('key', $key)->first();
if ($row) {
update_db:
//這里不要用查詢(xún)出的遞增字段的值,因?yàn)槎噙M(jìn)程下存在并發(fā),數(shù)據(jù)會(huì)不準(zhǔn)!!
//最佳實(shí)踐,根據(jù)查詢(xún)出的數(shù)據(jù),拼接$where,而$data使用數(shù)據(jù)庫(kù)原生語(yǔ)法
$data = [
'count' => Db::raw('count + 1'),
'cost' => Db::raw("cost + {$costTime}"),
];
if ($success) {
$data['success_count'] = Db::raw('success_count + 1');
} else {
$data['error_count'] = Db::raw('error_count + 1');
}
Db::table('statistic_project_transfer_interval')
->where('key', $key)
->update($data);
} else {
$data = [
'key' => $key,
'day' => $day,
'time' => $minute,
'project' => $project,
'type' => $type,
'transfer' => $transfer,
'count' => 1,
'cost' => $costTime,
'success_count' => $success ? 1 : 0,
'error_count' => $success ? 0 : 1,
];
Db::table('statistic_project_transfer_interval')->insert($data);
//設(shè)置標(biāo)志
Redis::setEx('key:' . $key, 300, time());
}
}
}
/**
* 應(yīng)用狀態(tài)碼統(tǒng)計(jì)
*/
public static function statisticProjectCode()
{
$transferData = self::$transferData;
$day = $transferData['day'];
$project = $transferData['project'];
$type = $transferData['type'];
$costTime = $transferData['costTime'];
$code = $transferData['code'];
$key = $transferData['statistic_project_code_key'];
if (Redis::exists('key:' . $key)) {
goto update_db;
}
//取鎖:有效期3秒,自釋放
$lock = StatisticLocker::lock($key, 3, true);
//多進(jìn)程阻塞等待
if ($lock->acquire(true)) {
$row = Db::table('statistic_project_code')->where('key', $key)->first();
if ($row) {
update_db:
//這里不要用查詢(xún)出的遞增字段的值,因?yàn)槎噙M(jìn)程下存在并發(fā),數(shù)據(jù)會(huì)不準(zhǔn)?。? //最佳實(shí)踐,根據(jù)查詢(xún)出的數(shù)據(jù),拼接$where,而$data使用數(shù)據(jù)庫(kù)原生語(yǔ)法
Db::table('statistic_project_code')
->where('key', $key)
->update([
'count' => Db::raw('count + 1'),
'cost' => Db::raw("cost + {$costTime}"),
]);
} else {
$data = [
'key' => $key,
'day' => $day,
'project' => $project,
'type' => $type,
'code' => $code,
'count' => 1,
'cost' => $costTime,
];
Db::table('statistic_project_code')->insert($data);
//設(shè)置標(biāo)志
Redis::setEx('key:' . $key, 300, time());
}
}
}
/**
* 應(yīng)用狀態(tài)碼每分鐘統(tǒng)計(jì)
*/
public static function statisticProjectCodeInterval()
{
$transferData = self::$transferData;
$day = $transferData['day'];
$minute = $transferData['minute'];
$project = $transferData['project'];
$type = $transferData['type'];
$costTime = $transferData['costTime'];
$code = $transferData['code'];
$key = $transferData['statistic_project_code_interval_key'];
if (Redis::exists('key:' . $key)) {
goto update_db;
}
//取鎖:有效期3秒,自釋放
$lock = StatisticLocker::lock($key, 3, true);
//多進(jìn)程阻塞等待
if ($lock->acquire(true)) {
$row = Db::table('statistic_project_code_interval')->where('key', $key)->first();
if ($row) {
update_db:
//這里不要用查詢(xún)出的遞增字段的值,因?yàn)槎噙M(jìn)程下存在并發(fā),數(shù)據(jù)會(huì)不準(zhǔn)!!
//最佳實(shí)踐,根據(jù)查詢(xún)出的數(shù)據(jù),拼接$where,而$data使用數(shù)據(jù)庫(kù)原生語(yǔ)法
Db::table('statistic_project_code_interval')
->where('key', $key)
->update([
'count' => Db::raw('count + 1'),
'cost' => Db::raw("cost + {$costTime}"),
]);
} else {
$data = [
'key' => $key,
'day' => $day,
'time' => $minute,
'project' => $project,
'type' => $type,
'code' => $code,
'count' => 1,
'cost' => $costTime,
];
Db::table('statistic_project_code_interval')->insert($data);
//設(shè)置標(biāo)志
Redis::setEx('key:' . $key, 300, time());
}
}
}
/**
* 應(yīng)用IP統(tǒng)計(jì)
*/
public static function statisticProjectIp()
{
$transferData = self::$transferData;
$day = $transferData['day'];
$project = $transferData['project'];
$type = $transferData['type'];
$costTime = $transferData['costTime'];
$success = 1 === $transferData['success'];
$ip = $transferData['ip'];
$key = $transferData['statistic_project_ip_key'];
if (Redis::exists('key:' . $key)) {
goto update_db;
}
//取鎖:有效期3秒,自釋放
$lock = StatisticLocker::lock($key, 3, true);
//多進(jìn)程阻塞等待
if ($lock->acquire(true)) {
$row = Db::table('statistic_project_ip')->where('key', $key)->first();
if ($row) {
update_db:
//這里不要用查詢(xún)出的遞增字段的值,因?yàn)槎噙M(jìn)程下存在并發(fā),數(shù)據(jù)會(huì)不準(zhǔn)??!
//最佳實(shí)踐,根據(jù)查詢(xún)出的數(shù)據(jù),拼接$where,而$data使用數(shù)據(jù)庫(kù)原生語(yǔ)法
$data = [
'count' => Db::raw('count + 1'),
'cost' => Db::raw("cost + {$costTime}"),
];
if ($success) {
$data['success_count'] = Db::raw('success_count + 1');
} else {
$data['error_count'] = Db::raw('error_count + 1');
}
Db::table('statistic_project_ip')
->where('key', $key)
->update($data);
} else {
$data = [
'key' => $key,
'day' => $day,
'project' => $project,
'type' => $type,
'ip' => $ip,
'count' => 1,
'cost' => $costTime,
'success_count' => $success ? 1 : 0,
'error_count' => $success ? 0 : 1,
];
Db::table('statistic_project_ip')->insert($data);
//設(shè)置標(biāo)志
Redis::setEx('key:' . $key, 300, time());
}
}
}
/**
* 應(yīng)用IP每分鐘統(tǒng)計(jì)
*/
public static function statisticProjectIpInterval()
{
$transferData = self::$transferData;
$day = $transferData['day'];
$minute = $transferData['minute'];
$project = $transferData['project'];
$type = $transferData['type'];
$costTime = $transferData['costTime'];
$success = 1 === $transferData['success'];
$ip = $transferData['ip'];
$key = $transferData['statistic_project_ip_interval_key'];
if (Redis::exists('key:' . $key)) {
goto update_db;
}
//取鎖:有效期3秒,自釋放
$lock = StatisticLocker::lock($key, 3, true);
//多進(jìn)程阻塞等待
if ($lock->acquire(true)) {
$row = Db::table('statistic_project_ip_interval')->where('key', $key)->first();
if ($row) {
update_db:
//這里不要用查詢(xún)出的遞增字段的值,因?yàn)槎噙M(jìn)程下存在并發(fā),數(shù)據(jù)會(huì)不準(zhǔn)?。? //最佳實(shí)踐,根據(jù)查詢(xún)出的數(shù)據(jù),拼接$where,而$data使用數(shù)據(jù)庫(kù)原生語(yǔ)法
$data = [
'count' => Db::raw('count + 1'),
'cost' => Db::raw("cost + {$costTime}"),
];
if ($success) {
$data['success_count'] = Db::raw('success_count + 1');
} else {
$data['error_count'] = Db::raw('error_count + 1');
}
Db::table('statistic_project_ip_interval')
->where('key', $key)
->update($data);
} else {
$data = [
'key' => $key,
'day' => $day,
'time' => $minute,
'project' => $project,
'type' => $type,
'ip' => $ip,
'count' => 1,
'cost' => $costTime,
'success_count' => $success ? 1 : 0,
'error_count' => $success ? 0 : 1,
];
Db::table('statistic_project_ip_interval')->insert($data);
//設(shè)置標(biāo)志
Redis::setEx('key:' . $key, 300, time());
}
}
}
/**
* 應(yīng)用IP狀態(tài)碼統(tǒng)計(jì)
*/
public static function statisticProjectIpCode()
{
$transferData = self::$transferData;
$day = $transferData['day'];
$project = $transferData['project'];
$type = $transferData['type'];
$costTime = $transferData['costTime'];
$ip = $transferData['ip'];
$code = $transferData['code'];
$key = $transferData['statistic_project_ip_code_key'];
if (Redis::exists('key:' . $key)) {
goto update_db;
}
//取鎖:有效期3秒,自釋放
$lock = StatisticLocker::lock($key, 3, true);
//多進(jìn)程阻塞等待
if ($lock->acquire(true)) {
$row = Db::table('statistic_project_ip_code')->where('key', $key)->first();
if ($row) {
update_db:
//這里不要用查詢(xún)出的遞增字段的值,因?yàn)槎噙M(jìn)程下存在并發(fā),數(shù)據(jù)會(huì)不準(zhǔn)!!
//最佳實(shí)踐,根據(jù)查詢(xún)出的數(shù)據(jù),拼接$where,而$data使用數(shù)據(jù)庫(kù)原生語(yǔ)法
Db::table('statistic_project_ip_code')
->where('key', $key)
->update([
'count' => Db::raw('count + 1'),
'cost' => Db::raw("cost + {$costTime}"),
]);
} else {
$data = [
'key' => $key,
'day' => $day,
'project' => $project,
'type' => $type,
'ip' => $ip,
'code' => $code,
'count' => 1,
'cost' => $costTime,
];
Db::table('statistic_project_ip_code')->insert($data);
//設(shè)置標(biāo)志
Redis::setEx('key:' . $key, 300, time());
}
}
}
/**
* 應(yīng)用IP狀態(tài)碼每分鐘統(tǒng)計(jì)
*/
public static function statisticProjectIpCodeInterval()
{
$transferData = self::$transferData;
$day = $transferData['day'];
$minute = $transferData['minute'];
$project = $transferData['project'];
$type = $transferData['type'];
$costTime = $transferData['costTime'];
$ip = $transferData['ip'];
$code = $transferData['code'];
$key = $transferData['statistic_project_ip_code_interval_key'];
if (Redis::exists('key:' . $key)) {
goto update_db;
}
//取鎖:有效期3秒,自釋放
$lock = StatisticLocker::lock($key, 3, true);
//多進(jìn)程阻塞等待
if ($lock->acquire(true)) {
$row = Db::table('statistic_project_ip_code_interval')->where('key', $key)->first();
if ($row) {
update_db:
//這里不要用查詢(xún)出的遞增字段的值,因?yàn)槎噙M(jìn)程下存在并發(fā),數(shù)據(jù)會(huì)不準(zhǔn)!!
//最佳實(shí)踐,根據(jù)查詢(xún)出的數(shù)據(jù),拼接$where,而$data使用數(shù)據(jù)庫(kù)原生語(yǔ)法
Db::table('statistic_project_ip_code_interval')
->where('key', $key)
->update([
'count' => Db::raw('count + 1'),
'cost' => Db::raw("cost + {$costTime}"),
]);
} else {
$data = [
'key' => $key,
'day' => $day,
'time' => $minute,
'project' => $project,
'type' => $type,
'ip' => $ip,
'code' => $code,
'count' => 1,
'cost' => $costTime,
];
Db::table('statistic_project_ip_code_interval')->insert($data);
//設(shè)置標(biāo)志
Redis::setEx('key:' . $key, 300, time());
}
}
}
/**
* 應(yīng)用IP調(diào)用統(tǒng)計(jì)
*/
public static function statisticProjectIpTransfer()
{
$transferData = self::$transferData;
$day = $transferData['day'];
$project = $transferData['project'];
$type = $transferData['type'];
$costTime = $transferData['costTime'];
$ip = $transferData['ip'];
$transfer = $transferData['transfer'];
$success = 1 === $transferData['success'];
$key = $transferData['statistic_project_ip_transfer_key'];
if (Redis::exists('key:' . $key)) {
goto update_db;
}
//取鎖:有效期3秒,自釋放
$lock = StatisticLocker::lock($key, 3, true);
//多進(jìn)程阻塞等待
if ($lock->acquire(true)) {
$row = Db::table('statistic_project_ip_transfer')->where('key', $key)->first();
if ($row) {
update_db:
//這里不要用查詢(xún)出的遞增字段的值,因?yàn)槎噙M(jìn)程下存在并發(fā),數(shù)據(jù)會(huì)不準(zhǔn)??!
//最佳實(shí)踐,根據(jù)查詢(xún)出的數(shù)據(jù),拼接$where,而$data使用數(shù)據(jù)庫(kù)原生語(yǔ)法
$data = [
'count' => Db::raw('count + 1'),
'cost' => Db::raw("cost + {$costTime}"),
];
if ($success) {
$data['success_count'] = Db::raw('success_count + 1');
} else {
$data['error_count'] = Db::raw('error_count + 1');
}
Db::table('statistic_project_ip_transfer')
->where('key', $key)
->update($data);
} else {
$data = [
'key' => $key,
'day' => $day,
'project' => $project,
'type' => $type,
'ip' => $ip,
'transfer' => $transfer,
'count' => 1,
'cost' => $costTime,
'success_count' => $success ? 1 : 0,
'error_count' => $success ? 0 : 1,
];
Db::table('statistic_project_ip_transfer')->insert($data);
//設(shè)置標(biāo)志
Redis::setEx('key:' . $key, 300, time());
}
}
}
/**
* 應(yīng)用IP調(diào)用每分鐘統(tǒng)計(jì)
*/
public static function statisticProjectIpTransferInterval()
{
$transferData = self::$transferData;
$day = $transferData['day'];
$minute = $transferData['minute'];
$project = $transferData['project'];
$type = $transferData['type'];
$costTime = $transferData['costTime'];
$ip = $transferData['ip'];
$transfer = $transferData['transfer'];
$success = 1 === $transferData['success'];
$key = $transferData['statistic_project_ip_transfer_interval_key'];
if (Redis::exists('key:' . $key)) {
goto update_db;
}
//取鎖:有效期3秒,自釋放
$lock = StatisticLocker::lock($key, 3, true);
//多進(jìn)程阻塞等待
if ($lock->acquire(true)) {
$row = Db::table('statistic_project_ip_transfer_interval')->where('key', $key)->first();
if ($row) {
update_db:
//這里不要用查詢(xún)出的遞增字段的值,因?yàn)槎噙M(jìn)程下存在并發(fā),數(shù)據(jù)會(huì)不準(zhǔn)?。? //最佳實(shí)踐,根據(jù)查詢(xún)出的數(shù)據(jù),拼接$where,而$data使用數(shù)據(jù)庫(kù)原生語(yǔ)法
$data = [
'count' => Db::raw('count + 1'),
'cost' => Db::raw("cost + {$costTime}"),
];
if ($success) {
$data['success_count'] = Db::raw('success_count + 1');
} else {
$data['error_count'] = Db::raw('error_count + 1');
}
Db::table('statistic_project_ip_transfer_interval')
->where('key', $key)
->update($data);
} else {
$data = [
'key' => $key,
'day' => $day,
'time' => $minute,
'project' => $project,
'type' => $type,
'ip' => $ip,
'transfer' => $transfer,
'count' => 1,
'cost' => $costTime,
'success_count' => $success ? 1 : 0,
'error_count' => $success ? 0 : 1,
];
Db::table('statistic_project_ip_transfer_interval')->insert($data);
//設(shè)置標(biāo)志
Redis::setEx('key:' . $key, 300, time());
}
}
}
}
上述代碼經(jīng)過(guò)測(cè)試,300條數(shù)據(jù),單進(jìn)程耗時(shí)20.487082958221436s,cpu數(shù)進(jìn)程耗時(shí)21.118263483047485s,cpu*4進(jìn)程數(shù)耗時(shí)22.74993324279785s,對(duì)進(jìn)程處理并沒(méi)有加快反而由于鎖的存在多耗時(shí)了;大佬幫看看我上面的代碼,我準(zhǔn)備放棄了,改用redis+定時(shí)器掃描批量入庫(kù)了
//取鎖:有效期3秒,自釋放
$lock = StatisticLocker::lock($key, 3, true);
數(shù)據(jù)庫(kù)頻繁的插入更新可能導(dǎo)致每次鎖的釋放時(shí)間增加;
根據(jù)業(yè)務(wù)邏輯,insert只有一次,update可能存在很多次,這一塊是造成數(shù)據(jù)庫(kù)壓力的原因,所以建議把update部分的內(nèi)容放到redis隊(duì)列,其他邏輯保持不變。
那個(gè)鎖插件的基類(lèi)代碼:
class Locker
{
public static function __callStatic($name, $arguments)
{
$key = $arguments[0] ?? '';
unset($arguments[0]);
return static::createLock($name . $key, ...$arguments);
}
而你用的都是這一個(gè)方法,key的取值是否有重復(fù),不得而知。
//取鎖:有效期3秒,自釋放
$lock = StatisticLocker::lock($key, 3, true);
每一個(gè)表的操作是不能共享鎖的。也要根據(jù)業(yè)務(wù)的特性,ttl值也是有講究的。
比如每天、每分鐘、每小時(shí)才創(chuàng)建一次的,要根據(jù)時(shí)間合理調(diào)整ttl值。
總體原則是:創(chuàng)建時(shí)候才加鎖,多進(jìn)程同步更新。
我記得我給你寫(xiě)了很多個(gè)方法的???為的就是避免鎖重復(fù)
當(dāng)時(shí),很多方法的時(shí)候出現(xiàn)了重復(fù)入庫(kù)的問(wèn)題,我沒(méi)研究那個(gè)鎖的原理 想著是不是方法不一造成的 就改成一個(gè)了
算了 我這代碼太垃圾了 不值得再探討了,簡(jiǎn)單測(cè)試數(shù)據(jù)承載量太低了,這段代碼只是實(shí)現(xiàn)了業(yè)務(wù)上的邏輯,實(shí)際中根本無(wú)法承載業(yè)務(wù)需求
重復(fù)入庫(kù)是因?yàn)閿?shù)據(jù)庫(kù)壓力大,3秒后鎖自動(dòng)釋放了。因?yàn)槭亲葬尫沛i,所以可適當(dāng)延長(zhǎng)鎖時(shí)間比如30秒。
測(cè)試300數(shù)據(jù)處理 8進(jìn)程 時(shí)間float(20.861277103424072)根之前差不多 這個(gè)處理速度太慢了,而且造成數(shù)據(jù)庫(kù)壓力很大,目前還是沒(méi)數(shù)據(jù)的狀態(tài),當(dāng)數(shù)據(jù)庫(kù)數(shù)據(jù)量大的時(shí)候,這個(gè)處理時(shí)間會(huì)根漫長(zhǎng),還有key字段加了索引
每一個(gè)方法名,表示一個(gè)鎖的前綴,前綴也是鎖key的一部分(可看源碼);
意思就是前綴不同、key值相同,也是不同的鎖。
/**
* 統(tǒng)計(jì)鎖
* @method static LockInterface lock(string $day, ?float $ttl = null, ?bool $autoRelease = null) 數(shù)據(jù)庫(kù)操作鎖
* @method static LockInterface project(string $project, ?float $ttl = null, ?bool $autoRelease = null) 應(yīng)用鎖
* @method static LockInterface statistic(string $day, ?float $ttl = null, ?bool $autoRelease = null) 總統(tǒng)計(jì)鎖
* @method static LockInterface statisticProject(string $day_project_type, ?float $ttl = null, ?bool $autoRelease = null) 應(yīng)用總統(tǒng)計(jì)
* @method static LockInterface statisticProjectInterval(string $day_minute_project_type, ?float $ttl = null, ?bool $autoRelease = null) 應(yīng)用每分鐘統(tǒng)計(jì)
* @method static LockInterface statisticProjectTransfer(string $key, ?float $ttl = null, ?bool $autoRelease = null) 應(yīng)用調(diào)用統(tǒng)計(jì)
* @method static LockInterface statisticProjectTransferInterval(string $key, ?float $ttl = null, ?bool $autoRelease = null) 應(yīng)用調(diào)用每分鐘統(tǒng)計(jì)
* @method static LockInterface statisticProjectCode(string $key, ?float $ttl = null, ?bool $autoRelease = null) 應(yīng)用狀態(tài)碼統(tǒng)計(jì)
* @method static LockInterface statisticProjectCodeInterval(string $key, ?float $ttl = null, ?bool $autoRelease = null) 應(yīng)用狀態(tài)碼每分鐘統(tǒng)計(jì)
* @method static LockInterface statisticProjectIp(string $key, ?float $ttl = null, ?bool $autoRelease = null) 應(yīng)用IP統(tǒng)計(jì)
* @method static LockInterface statisticProjectIpInterval(string $key, ?float $ttl = null, ?bool $autoRelease = null) 應(yīng)用IP每分鐘統(tǒng)計(jì)
* @method static LockInterface statisticProjectIpCode(string $key, ?float $ttl = null, ?bool $autoRelease = null) 應(yīng)用IP狀態(tài)碼統(tǒng)計(jì)
* @method static LockInterface statisticProjectIpCodeInterval(string $key, ?float $ttl = null, ?bool $autoRelease = null) 應(yīng)用IP狀態(tài)碼每分鐘統(tǒng)計(jì)
* @method static LockInterface statisticProjectIpTransfer(string $key, ?float $ttl = null, ?bool $autoRelease = null) 應(yīng)用IP調(diào)用統(tǒng)計(jì)
* @method static LockInterface statisticProjectIpTransferInterval(string $key, ?float $ttl = null, ?bool $autoRelease = null) 應(yīng)用IP調(diào)用每分鐘統(tǒng)計(jì)
*/
class StatisticLocker extends Locker
{
}
我們之前訪問(wèn)量不大,直接是使用事務(wù),保證數(shù)據(jù)的一致性,我問(wèn)了ai,它讓我使用行鎖
use Illuminate\Support\Facades\DB;
// 開(kāi)啟事務(wù)
DB::beginTransaction();
try {
// 使用行級(jí)鎖查詢(xún)數(shù)據(jù)
$row = DB::table('statistic')->where('day', $day)->lockForUpdate()->first();
if ($row) {
$data = [
'count' => $row->count + 1,
'cost' => $row->cost + $costTime,
'success_count' => $row->success_count + ($success ? 1 : 0),
'error_count' => $row->error_count + ($success ? 0 : 1),
];
DB::table('statistic')->where('day', $day)->update($data);
} else {
$data = [
'day' => $day,
'count' => 1,
'cost' => $costTime,
'success_count' => $success ? 1 : 0,
'error_count' => $success ? 0 : 1,
];
DB::table('statistic')->insert($data);
}
// 提交事務(wù)
DB::commit();
} catch (\Exception $e) {
// 回滾事務(wù)
DB::rollback();
// 處理異常
}
隊(duì)列消費(fèi)不是應(yīng)該消出庫(kù)就沒(méi)了,不存在二次消費(fèi)吧
不存在,只是我隊(duì)列里數(shù)據(jù)有的是重復(fù)的 設(shè)計(jì)更新問(wèn)題,只是插入的話用隊(duì)列完全沒(méi)問(wèn)題,但是正如上面大佬說(shuō)的,最高內(nèi)存計(jì)算 批量入庫(kù)減少數(shù)據(jù)庫(kù)壓力,也就是僅僅入庫(kù)的話使用隊(duì)列并不是完美,隊(duì)列還是根一些發(fā)郵件啊 之類(lèi)的業(yè)務(wù)更匹配
你這種情況是多進(jìn)程并發(fā)所致,你的SQL語(yǔ)句并非原子操作。
想要解決這個(gè)問(wèn)題,你只能使用MySQL 自帶SQL語(yǔ)句來(lái)實(shí)現(xiàn)原子操作。
這篇文章能解決這些問(wèn)題,https://www.ngui.cc/el/1890018.html?action=onClick
建議采用這條SQL寫(xiě)法:ON DUPLICATE KEY UPDATE