分布式锁
分布式锁
什么是分布式锁
在计算机科学中,锁是在并发场景下用于强行限制资源访问的一种同步机制,即用于在并发控制中通过互斥手段来保证数据同步安全。
分布式锁,顾名思义,应用于分布式场景下,它和单进程中的锁并没有本质上的不同,只是控制对象由一个进程中的多个线程变成了多个进程中的多个线程;此外,临界区的资源也由进程内共享资源变成了分布式系统内部共享资源。
如何实现分布式锁
JDK 虽然提供了大量锁工具,但是只能作用于单一 Java 进程,无法应用于分布式系统。为了解决这个问题,需要使用分布式锁。
分布式锁的解决方案大致有以下几种:
- 基于数据库实现
- 基于缓存(redis,memcached 等)实现
- 基于 Zookeeper 实现 ✅
注:推荐基于 ZooKeeper 实现分布式锁,具体原因看完本文即可明了。
分布式锁的实现要点大同小异,仅在实现细节上有所不同。
分布式锁的实现要点如下:
- 互斥 - 分布式锁必须是独一无二的,表现形式为:向数据存储插入一个唯一的 key,一旦有一个线程插入这个 key,其他线程就不能再插入了。
- 保证 key 唯一性的最简单的方式是使用 UUID。
- 此外,可以参考 Snowflake ID(雪花算法),将机器地址(IP 地址、机器 ID、MAC 地址)、Jvm 进程 ID(应用 ID、服务 ID)、时间戳等关键信息拼接起来作为唯一标识。
- 避免死锁 - 数据库分布式锁和缓存分布式锁(Redis)的思路都是引入超时机制,即成功申请锁后,超过一定时间,锁失效(删除 key),原因在于它们无法感知申请锁的客户端节点状态。而 ZooKeeper 由于其 znode 以目录、文件形式组织,天然就存在物理空间隔离,只要 znode 存在,即表示客户端节点还在工作,所以不存在这种问题。
- 可重入 - 可重入指的是:同一个线程在没有释放锁之前,能否再次获得该锁。其实现方案是:只需在加锁的时候,记录好当前获取锁的节点 + 线程组合的唯一标识,然后在后续的加锁请求时,如果当前请求的节点 + 线程的唯一标识和当前持有锁的相同,那么就直接返回加锁成功;如果不相同,则按正常加锁流程处理。
- 公平性 - 当多个线程请求同一锁时,它们必须按照请求的顺序来获取锁,即先来先得的原则。锁的公平性的实现也非常简单,对于被阻塞的加锁请求,我们只要先记录好它们的顺序,在锁被释放后,按顺序颁发就可以了。
- 自旋重试 - 获取不到锁时,不要直接返回失败,而是支持一定的周期自旋重试,设置一个总的超时时间,当过了超时时间以后还没有获取到锁则返回失败。
- 容错 - 在分布式锁的场景中,部分失败和异步网络这两个问题是同时存在的。如果一个进程获得了锁,但是这个进程与锁服务之间的网络出现了问题,导致无法通信,那么这个情况下,如果锁服务让它一直持有锁,就会导致死锁的发生。
数据库分布式锁
数据库分布式锁原理
(1)创建锁表
CREATE TABLE `distributed_lock` (
`id` BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '主键',
`resource` VARCHAR(64) NOT NULL DEFAULT '' COMMENT '资源',
`count` INT(10) UNSIGNED NOT NULL DEFAULT '0' COMMENT '锁次数,统计可重入锁',
`desc` TEXT DEFAULT NULL COMMENT '备注',
`create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uniq_resource`(`resource`)
)
ENGINE = InnoDB DEFAULT CHARSET = `utf8mb4`;
(2)获取锁
想要锁住某个方法时,执行以下 SQL:
insert into methodLock(method_name,desc) values (‘method_name’,‘desc’)
因为我们对 method_name
做了唯一性约束,这里如果有多个请求同时提交到数据库的话,数据库会保证只有一个操作可以成功,那么我们就可以认为操作成功的那个线程获得了该方法的锁,可以执行方法体内容。
成功插入则获取锁。
(3)释放锁
当方法执行完毕之后,想要释放锁的话,需要执行以下 Sql:
delete from methodLock where method_name ='method_name'
数据库分布式锁问题
- 强依赖数据库的可用性:如果数据库是一个单点,一旦数据库挂掉,会导致业务系统不可用。
- 可能会出现死锁:没有失效时间,一旦解锁操作失败,就会导致锁记录一直在数据库中,其他线程无法再获得到锁。
- 非阻塞:因为数据的
insert
操作,一旦插入失败就会直接报错。没有获得锁的线程并不会进入排队队列,要想再次获得锁就要再次触发获得锁操作。 - 非重入:同一个线程在没有释放锁之前无法再次获得该锁。因为数据中数据已经存在了。
解决办法:
- 单点问题可以用多数据库实例,同时写入 N 个节点,N/2+1 个成功就任务锁定成功。
- 写一个定时任务,隔一段时间清除一次过期的数据。
- 写一个 while 循环,不断的重试插入,直到成功。
- 在数据库表中加个字段,记录当前获得锁的机器的主机信息和线程信息,那么下次再获取锁的时候先查询数据库,如果当前机器的主机信息和线程信息在数据库可以查到的话,直接把锁分配给他就可以了。
数据库分布式锁小结
- 优点: 直接借助数据库,容易理解。
- 缺点: 会有各种各样的问题,在解决问题的过程中会使整个方案变得越来越复杂。操作数据库需要一定的开销,性能问题需要考虑。
Redis 分布式锁
相比于用数据库来实现分布式锁,基于缓存实现的分布式锁的性能会更好。目前有很多成熟的分布式产品,包括 Redis、memcache、Tair 等。这里以 Redis 举例。
Redis 分布式锁原理
这个分布式锁有 3 个重要的考量点:
- 互斥(只能有一个客户端获取锁)
- 不能死锁
- 容错(只要大部分 redis 节点创建了这把锁就可以)
对应的 Redis 指令如下:
setnx
-setnx key val
:当且仅当 key 不存在时,set 一个 key 为 val 的字符串,返回 1;若 key 存在,则什么都不做,返回 0。expire
-expire key timeout
:为 key 设置一个超时时间,单位为 second,超过这个时间锁会自动释放,避免死锁。delete
-delete key
:删除 key
注意:
不要将
setnx
和expire
作为两个命令组合实现加锁,这样就无法保证原子性。如果客户端在setnx
之后崩溃,那么将导致锁无法释放。正确的做法应是在setnx
命令中指定expire
时间。
Redis 分布式锁实现
(1)申请锁
SET resource_name my_random_value NX PX 30000
执行这个命令就 ok。
NX
:表示只有key
不存在的时候才会设置成功。(如果此时 redis 中存在这个 key,那么设置失败,返回nil
)PX 30000
:意思是 30s 后锁自动释放。别人创建的时候如果发现已经有了就不能加锁了。
(2)释放锁
释放锁就是删除 key ,但是一般可以用 lua
脚本删除,判断 value 一样才删除:
-- 删除锁的时候,找到 key 对应的 value,跟自己传过去的 value 做比较,如果是一样的才删除。
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
Redis 分布式锁小结
为啥要用 random_value
随机值呢?因为如果某个客户端获取到了锁,但是阻塞了很长时间才执行完,比如说超过了 30s,此时可能已经自动释放锁了,此时可能别的客户端已经获取到了这个锁,要是你这个时候直接删除 key 的话会有问题,所以得用随机值加上面的 lua
脚本来释放锁。
但是这样是肯定不行的。因为如果是普通的 redis 单实例,那就是单点故障。或者是 redis 普通主从,那 redis 主从异步复制,如果主节点挂了(key 就没有了),key 还没同步到从节点,此时从节点切换为主节点,别人就可以 set key,从而拿到锁。
基于 setnx
实现的分布式锁存在的问题:
- 不可重入 - 同一个线程无法多次获取同一把锁(eg:方法 A 调用方法 B,在方法 A 中先获取锁,然后去调用方法 B,方法 B 也需要获取同一把锁,这种情况下如果锁不可重入,方法 B 显然获取不到锁,会出现死锁的情况)
- 不可重试 - 获取锁只尝试一次就返回 false,没有重试机制
- 超时释放 - 超时释放虽然能够避免死锁,但如果业务执行执行时间较长导致锁释放,会存在安全隐患
- 主从一致性 - 主从数据同步存在延迟,比如:线程在主节点获取了锁,尚未同步给从节点时主节点宕机,此时会选一个从节点作为新的主节点,这个从节点由于没有完成同步不存在锁的标识,此时其他线程可以趁虚而入拿到锁,这就造成多个线程同时拿到锁,就会出现安全问题)
ZooKeeper 分布式锁
ZooKeeper 分布式锁原理
ZooKeeper 实现分布式锁基于 ZooKeeper 的两个特性:
- 顺序临时节点:ZooKeeper 的存储类似于 DNS 那样的具有层级的命名空间。ZooKeeper 节点类型可以分为持久节点(PERSISTENT )、临时节点(EPHEMERAL),每个节点还能被标记为有序性(SEQUENTIAL),一旦节点被标记为有序性,那么整个节点就具有顺序自增的特点。
- Watch 机制:ZooKeeper 允许用户在指定节点上注册一些 Watcher,并且在特定事件触发的时候,ZooKeeper 服务端会将事件通知给用户。
这也是 ZooKeeper 客户端 curator 的分布式锁实现。
- 创建一个目录 mylock;
- 线程 A 想获取锁就在 mylock 目录下创建临时顺序节点;
- 获取 mylock 目录下所有的子节点,然后获取比自己小的兄弟节点,如果不存在,则说明当前线程顺序号最小,获得锁;
- 线程 B 获取所有节点,判断自己不是最小节点,设置监听比自己次小的节点;
- 线程 A 处理完,删除自己的节点,线程 B 监听到变更事件,判断自己是不是最小的节点,如果是则获得锁。
ZooKeeper 分布式锁实现
/**
* ZooKeeperSession
*
* @author bingo
* @since 2018/11/29
*
*/
public class ZooKeeperSession {
private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
private ZooKeeper zookeeper;
private CountDownLatch latch;
public ZooKeeperSession() {
try {
this.zookeeper = new ZooKeeper("192.168.31.187:2181,192.168.31.19:2181,192.168.31.227:2181", 50000, new ZooKeeperWatcher());
try {
connectedSemaphore.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("ZooKeeper session established......");
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 获取分布式锁
*
* @param productId
*/
public Boolean acquireDistributedLock(Long productId) {
String path = "/product-lock-" + productId;
try {
zookeeper.create(path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
return true;
} catch (Exception e) {
while (true) {
try {
// 相当于是给node注册一个监听器,去看看这个监听器是否存在
Stat stat = zk.exists(path, true);
if (stat != null) {
this.latch = new CountDownLatch(1);
this.latch.await(waitTime, TimeUnit.MILLISECONDS);
this.latch = null;
}
zookeeper.create(path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
return true;
} catch (Exception ee) {
continue;
}
}
}
return true;
}
/**
* 释放掉一个分布式锁
*
* @param productId
*/
public void releaseDistributedLock(Long productId) {
String path = "/product-lock-" + productId;
try {
zookeeper.delete(path, -1);
System.out.println("release the lock for product[id=" + productId + "]......");
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 建立zk session的watcher
*
* @author bingo
* @since 2018/11/29
*
*/
private class ZooKeeperWatcher implements Watcher {
public void process(WatchedEvent event) {
System.out.println("Receive watched event: " + event.getState());
if (KeeperState.SyncConnected == event.getState()) {
connectedSemaphore.countDown();
}
if (this.latch != null) {
this.latch.countDown();
}
}
}
/**
* 封装单例的静态内部类
*
* @author bingo
* @since 2018/11/29
*
*/
private static class Singleton {
private static ZooKeeperSession instance;
static {
instance = new ZooKeeperSession();
}
public static ZooKeeperSession getInstance() {
return instance;
}
}
/**
* 获取单例
*
* @return
*/
public static ZooKeeperSession getInstance() {
return Singleton.getInstance();
}
/**
* 初始化单例的便捷方法
*/
public static void init() {
getInstance();
}
}
也可以采用另一种方式,创建临时顺序节点:
如果有一把锁,被多个人给竞争,此时多个人会排队,第一个拿到锁的人会执行,然后释放锁;后面的每个人都会去监听排在自己前面的那个人创建的 node 上,一旦某个人释放了锁,排在自己后面的人就会被 zookeeper 给通知,一旦被通知了之后,就 ok 了,自己就获取到了锁,就可以执行代码了。
public class ZooKeeperDistributedLock implements Watcher {
private ZooKeeper zk;
private String locksRoot = "/locks";
private String productId;
private String waitNode;
private String lockNode;
private CountDownLatch latch;
private CountDownLatch connectedLatch = new CountDownLatch(1);
private int sessionTimeout = 30000;
public ZooKeeperDistributedLock(String productId) {
this.productId = productId;
try {
String address = "192.168.31.187:2181,192.168.31.19:2181,192.168.31.227:2181";
zk = new ZooKeeper(address, sessionTimeout, this);
connectedLatch.await();
} catch (IOException e) {
throw new LockException(e);
} catch (KeeperException e) {
throw new LockException(e);
} catch (InterruptedException e) {
throw new LockException(e);
}
}
public void process(WatchedEvent event) {
if (event.getState() == KeeperState.SyncConnected) {
connectedLatch.countDown();
return;
}
if (this.latch != null) {
this.latch.countDown();
}
}
public void acquireDistributedLock() {
try {
if (this.tryLock()) {
return;
} else {
waitForLock(waitNode, sessionTimeout);
}
} catch (KeeperException e) {
throw new LockException(e);
} catch (InterruptedException e) {
throw new LockException(e);
}
}
public boolean tryLock() {
try {
// 传入进去的locksRoot + “/” + productId
// 假设productId代表了一个商品id,比如说1
// locksRoot = locks
// /locks/10000000000,/locks/10000000001,/locks/10000000002
lockNode = zk.create(locksRoot + "/" + productId, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
// 看看刚创建的节点是不是最小的节点
// locks:10000000000,10000000001,10000000002
List<String> locks = zk.getChildren(locksRoot, false);
Collections.sort(locks);
if(lockNode.equals(locksRoot+"/"+ locks.get(0))){
//如果是最小的节点,则表示取得锁
return true;
}
//如果不是最小的节点,找到比自己小1的节点
int previousLockIndex = -1;
for(int i = 0; i < locks.size(); i++) {
if(lockNode.equals(locksRoot + “/” + locks.get(i))) {
previousLockIndex = i - 1;
break;
}
}
this.waitNode = locks.get(previousLockIndex);
} catch (KeeperException e) {
throw new LockException(e);
} catch (InterruptedException e) {
throw new LockException(e);
}
return false;
}
private boolean waitForLock(String waitNode, long waitTime) throws InterruptedException, KeeperException {
Stat stat = zk.exists(locksRoot + "/" + waitNode, true);
if (stat != null) {
this.latch = new CountDownLatch(1);
this.latch.await(waitTime, TimeUnit.MILLISECONDS);
this.latch = null;
}
return true;
}
public void unlock() {
try {
// 删除/locks/10000000000节点
// 删除/locks/10000000001节点
System.out.println("unlock " + lockNode);
zk.delete(lockNode, -1);
lockNode = null;
zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
public class LockException extends RuntimeException {
private static final long serialVersionUID = 1L;
public LockException(String e) {
super(e);
}
public LockException(Exception e) {
super(e);
}
}
}
ZooKeeper 分布式锁小结
ZooKeeper 版本的分布式锁问题相对比较来说少。
- 锁的占用时间限制:redis 就有占用时间限制,而 ZooKeeper 则没有,最主要的原因是 redis 目前没有办法知道已经获取锁的客户端的状态,是已经挂了呢还是正在执行耗时较长的业务逻辑。而 ZooKeeper 通过临时节点就能清晰知道,如果临时节点存在说明还在执行业务逻辑,如果临时节点不存在说明已经执行完毕释放锁或者是挂了。由此看来 redis 如果能像 ZooKeeper 一样添加一些与客户端绑定的临时键,也是一大好事。
- 是否单点故障:redis 本身有很多中玩法,如客户端一致性 hash,服务器端 sentinel 方案或者 cluster 方案,很难做到一种分布式锁方式能应对所有这些方案。而 ZooKeeper 只有一种玩法,多台机器的节点数据是一致的,没有 redis 的那么多的麻烦因素要考虑。
总体上来说 ZooKeeper 实现分布式锁更加的简单,可靠性更高。但 ZooKeeper 因为需要频繁的创建和删除节点,性能上不如 Redis 方式。
Redisson 分布式锁
Redisson 分布式锁使用
(1)引入依赖
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.6</version>
</dependency>
(2)配置
Spring Boot 配置方式
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redissonClient() {
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
return Redisson.create(config);
}
}
xml 配置方式
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:redisson="http://redisson.org/schema/redisson"
xmlns="http://www.springframework.org/schema/beans"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://redisson.org/schema/redisson
http://redisson.org/schema/redisson/redisson.xsd">
<bean id="stringCodec" class="org.redisson.client.codec.StringCodec"/>
<redisson:client id="standalone"
name="aliasName1,aliasName2"
codec-ref="stringCodec">
<redisson:single-server address="redis://127.0.0.1:6379"
connection-pool-size="500"
idle-connection-timeout="10000"
connect-timeout="10000"
timeout="3000"
database="0"/>
</redisson:client>
</beans>
(3)使用分布式锁
import cn.hutool.core.thread.ThreadUtil;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.redisson.api.RBucket;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
@Slf4j
public class RedissonStandaloneTest {
private static RedissonClient redissonClient;
static {
ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:redisson-standalone.xml");
redissonClient = (RedissonClient) applicationContext.getBean("standalone");
}
@Test
@DisplayName("分布式锁测试")
public void testLock() {
// 两个线程任务都是不断再尝试获取或,直到成功获取锁后才推出任务
// 第一个线程获取到锁后,第二个线程需要等待 5 秒超时后才能获取到锁
CountDownLatch latch = new CountDownLatch(2);
ExecutorService executorService = ThreadUtil.newFixedExecutor(2, "获取锁", true);
executorService.submit(new Task(latch));
executorService.submit(new Task(latch));
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
static class Task implements Runnable {
private CountDownLatch latch;
public Task(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void run() {
while (true) {
RLock lock = redissonClient.getLock("test_lock");
try {
boolean isLock = lock.tryLock(1, 5, TimeUnit.SECONDS);
if (isLock) {
log.info("获取分布式锁成功");
break;
} else {
log.warn("获取分布式锁失败");
}
} catch (Exception e) {
log.error("获取分布式锁异常", e);
}
}
latch.countDown();
}
}
}
// 输出:
// 17:59:25.896 [获取锁1] [INFO ] i.g.d.j.redis.RedissonStandaloneTest.run -
// 获取分布式锁成功
// 17:59:26.888 [获取锁0] [WARN ] i.g.d.j.redis.RedissonStandaloneTest.run -
// 获取分布式锁失败
// 17:59:27.889 [获取锁0] [WARN ] i.g.d.j.redis.RedissonStandaloneTest.run -
// 获取分布式锁失败
// 17:59:28.891 [获取锁0] [WARN ] i.g.d.j.redis.RedissonStandaloneTest.run -
// 获取分布式锁失败
// 17:59:29.892 [获取锁0] [WARN ] i.g.d.j.redis.RedissonStandaloneTest.run -
// 获取分布式锁失败
// 17:59:30.895 [获取锁0] [WARN ] i.g.d.j.redis.RedissonStandaloneTest.run -
// 获取分布式锁失败
// 17:59:30.896 [获取锁0] [INFO ] i.g.d.j.redis.RedissonStandaloneTest.run -
// 获取分布式锁成功
Redisson 分布式锁原理
- 使用 Hash 结构,在 key 的位置记录锁的名称,在 field 的位置记录线程标识,在 value 位置记录锁的重入次数(重复获取锁时只需要判断获取锁的线程是否是自己即可)
- 释放锁时,重入次数减 1,并判断是否为 0,若为 0,直接将锁删除即可
- 基于 Redisson 可重入锁原理编写 Lua 脚本
lua复制代码local key = KEYS[1]; -- 锁的key
local threadId = ARGV[1] -- 线程的唯一标识
local releaseTime = ARGV[2]; -- 锁的自动释放时间
-- 判断是否存在
if(redis.call('exists', key) == 0) then
-- 不存在,获取锁
redis.call('hset', key, threadId, '1');
-- 设置有效期
redis.call('expire', key, releaseTime);
return 1; -- 返回结果
end;
-- 锁已经存在,判断threadId是否是自己
if(redis.call('hexists', key, threadId) == 1) then
-- 存在,获取锁,重入次数+1
redis.call('hincrby', key, threadId, '1');
-- 设置有效期
redis.call('expire', key, releaseTime);
return 1; -- 返回结果
end;
return 0; -- 获取锁的不是自己,获取锁失败
lua复制代码local key = KEYS[1]; -- 锁的key
local threadId = ARGV[1] -- 线程的唯一标识
local releaseTime = ARGV[2]; -- 锁的自动释放时间
-- 判断当前锁是否被自己持有
if(redis.call('hexists', key, threadId) == 0) then
return nil; -- 不是自己,直接返回
end;
-- 是自己的锁,重入次数-1
local count = redis.call('hincrby', key, threadId, -1);
-- 判断重入次数是否为0
if (count > 0) then
-- 大于0说明不能直接释放锁,重置锁有效期即可
redis.call('expire', key, releaseTime);
return nil;
else -- 等于0可以直接释放锁
redis.call('del', key);
return nil;
end;
分布式锁方案对比
数据库分布式锁,问题比较多,解决起来比较麻烦,不推荐。
性能:
- Redis 分布式锁,其实需要自己不断自旋去尝试获取锁,比较消耗性能。
- ZooKeeper 分布式锁,获取不到锁,注册个监听器即可,不需要不断主动尝试获取锁,性能开销较小。
可靠性:
- 如果是 redis 获取锁的那个客户端出现 bug 挂了,那么只能等待超时时间之后才能释放锁;
- 而 zk 的话,因为创建的是临时 znode,只要客户端挂了,znode 就没了,此时就自动释放锁。
综上分析,ZooKeeper 实现分布式锁更加的简单,可靠性更高。✅