From 2b27e4b8ca0339f0ada002d42f90ac8ad8784633 Mon Sep 17 00:00:00 2001 From: AprilWind <2100166581@qq.com> Date: Fri, 28 Jun 2024 11:40:22 +0800 Subject: [PATCH] =?UTF-8?q?docs=20=E5=88=A0=E9=99=A4=E5=A4=9A=E4=BD=99dubb?= =?UTF-8?q?o=E6=B3=A8=E9=87=8A=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../support/AbstractMetadataReport.java | 362 ++++-------------- .../store/redis/RedisMetadataReport.java | 352 +++-------------- 2 files changed, 133 insertions(+), 581 deletions(-) diff --git a/ruoyi-common/ruoyi-common-dubbo/src/main/java/org/apache/dubbo/metadata/report/support/AbstractMetadataReport.java b/ruoyi-common/ruoyi-common-dubbo/src/main/java/org/apache/dubbo/metadata/report/support/AbstractMetadataReport.java index cba4d52c..c1341761 100644 --- a/ruoyi-common/ruoyi-common-dubbo/src/main/java/org/apache/dubbo/metadata/report/support/AbstractMetadataReport.java +++ b/ruoyi-common/ruoyi-common-dubbo/src/main/java/org/apache/dubbo/metadata/report/support/AbstractMetadataReport.java @@ -49,9 +49,6 @@ import static org.apache.dubbo.common.constants.LoggerCodeConstants.PROXY_FAILED import static org.apache.dubbo.common.utils.StringUtils.replace; import static org.apache.dubbo.metadata.report.support.Constants.*; -/** - * 抽象的元数据上报实现类,实现了元数据上报的基本操作 - */ public abstract class AbstractMetadataReport implements MetadataReport { protected static final String DEFAULT_ROOT = "dubbo"; @@ -61,7 +58,8 @@ public abstract class AbstractMetadataReport implements MetadataReport { // Log output protected final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(getClass()); - // 本地磁盘缓存,特定键值 registries 记录元数据中心列表,其他是通知的服务提供者列表 + // Local disk cache, where the special key value.registries records the list of metadata centers, and the others are + // the list of notified service providers final Properties properties = new Properties(); private final ExecutorService reportCacheExecutor = Executors.newFixedThreadPool(1, new NamedThreadFactory("DubboSaveMetadataReport", true)); @@ -71,7 +69,7 @@ public abstract class AbstractMetadataReport implements MetadataReport { final Map failedReports = new ConcurrentHashMap<>(4); private URL reportURL; boolean syncReport; - // 本地磁盘缓存文件 + // Local disk cache file File file; private AtomicBoolean initialized = new AtomicBoolean(false); public MetadataReportRetry metadataReportRetry; @@ -81,30 +79,29 @@ public abstract class AbstractMetadataReport implements MetadataReport { private final boolean reportDefinition; protected ApplicationModel applicationModel; - /** - * 构造方法,初始化元数据上报实现类 - * - * @param reportServerURL 元数据上报服务器的URL - */ public AbstractMetadataReport(URL reportServerURL) { setUrl(reportServerURL); applicationModel = reportServerURL.getOrDefaultApplicationModel(); boolean localCacheEnabled = reportServerURL.getParameter(REGISTRY_LOCAL_FILE_CACHE_ENABLED, true); - // 启动文件保存定时器 + // Start file save timer String defaultFilename = System.getProperty("user.home") + DUBBO_METADATA - + reportServerURL.getApplication() + "-" + replace(reportServerURL.getAddress(), ":", "-") + CACHE; + + reportServerURL.getApplication() + + "-" + replace(reportServerURL.getAddress(), ":", "-") + + CACHE; String filename = reportServerURL.getParameter(FILE_KEY, defaultFilename); File file = null; if (localCacheEnabled && ConfigUtils.isNotEmpty(filename)) { file = new File(filename); - if (!file.exists() && file.getParentFile() != null && !file.getParentFile().exists()) { + if (!file.exists() + && file.getParentFile() != null + && !file.getParentFile().exists()) { if (!file.getParentFile().mkdirs()) { throw new IllegalArgumentException("Invalid service store file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!"); } } - // 如果文件存在,首先删除它 + // if this file exists, firstly delete it. if (!initialized.getAndSet(true) && file.exists()) { file.delete(); } @@ -115,31 +112,22 @@ public abstract class AbstractMetadataReport implements MetadataReport { metadataReportRetry = new MetadataReportRetry( reportServerURL.getParameter(RETRY_TIMES_KEY, DEFAULT_METADATA_REPORT_RETRY_TIMES), reportServerURL.getParameter(RETRY_PERIOD_KEY, DEFAULT_METADATA_REPORT_RETRY_PERIOD)); - - // 循环上报数据开关 + // cycle report the data switch if (reportServerURL.getParameter(CYCLE_REPORT_KEY, DEFAULT_METADATA_REPORT_CYCLE_REPORT)) { - reportTimerScheduler = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("DubboMetadataReportTimer", true)); - reportTimerScheduler.scheduleAtFixedRate(this::publishAll, calculateStartTime(), ONE_DAY_IN_MILLISECONDS, TimeUnit.MILLISECONDS); + reportTimerScheduler = Executors.newSingleThreadScheduledExecutor( + new NamedThreadFactory("DubboMetadataReportTimer", true)); + reportTimerScheduler.scheduleAtFixedRate( + this::publishAll, calculateStartTime(), ONE_DAY_IN_MILLISECONDS, TimeUnit.MILLISECONDS); } this.reportMetadata = reportServerURL.getParameter(REPORT_METADATA_KEY, false); this.reportDefinition = reportServerURL.getParameter(REPORT_DEFINITION_KEY, true); } - /** - * 获取元数据上报服务器的URL - * - * @return 元数据上报服务器的URL - */ public URL getUrl() { return reportURL; } - /** - * 设置元数据上报服务器的URL - * - * @param url 元数据上报服务器的URL - */ protected void setUrl(URL url) { if (url == null) { throw new IllegalArgumentException("metadataReport url == null"); @@ -147,11 +135,6 @@ public abstract class AbstractMetadataReport implements MetadataReport { this.reportURL = url; } - /** - * 执行保存属性操作,将属性持久化到本地磁盘缓存文件中 - * - * @param version 缓存版本号 - */ private void doSaveProperties(long version) { if (version < lastCacheChanged.get()) { return; @@ -159,7 +142,7 @@ public abstract class AbstractMetadataReport implements MetadataReport { if (file == null) { return; } - // 保存操作 + // Save try { File lockfile = new File(file.getAbsolutePath() + ".lock"); if (!lockfile.exists()) { @@ -173,7 +156,7 @@ public abstract class AbstractMetadataReport implements MetadataReport { "Can not lock the metadataReport cache file " + file.getAbsolutePath() + ", ignore and retry later, maybe multi java process use the file, please config: dubbo.metadata.file=xxx.properties"); } - // 保存 + // Save try { if (!file.exists()) { file.createNewFile(); @@ -181,10 +164,12 @@ public abstract class AbstractMetadataReport implements MetadataReport { Properties tmpProperties; if (!syncReport) { - // 当 syncReport = false 时,从同一个线程(reportCacheExecutor)中调用 properties.setProperty 和 properties.store,因此不需要深度复制 + // When syncReport = false, properties.setProperty and properties.store are called from the same + // thread(reportCacheExecutor), so deep copy is not required tmpProperties = properties; } else { - // 使用 store 方法和 this.properties 的 setProperty 方法会在多线程环境下引起锁竞争,因此需要深度复制一个新的容器 + // Using store method and setProperty method of the this.properties will cause lock contention + // under multi-threading, so deep copy a new container tmpProperties = new Properties(); Set> entries = properties.entrySet(); for (Map.Entry entry : entries) { @@ -205,14 +190,15 @@ public abstract class AbstractMetadataReport implements MetadataReport { } else { reportCacheExecutor.execute(new SaveProperties(lastCacheChanged.incrementAndGet())); } - logger.warn(COMMON_UNEXPECTED_EXCEPTION, "", "", - "Failed to save service store file, cause: " + e.getMessage(), e); + logger.warn( + COMMON_UNEXPECTED_EXCEPTION, + "", + "", + "Failed to save service store file, cause: " + e.getMessage(), + e); } } - /** - * 加载本地磁盘缓存文件中的属性 - */ void loadProperties() { if (file != null && file.exists()) { try (InputStream in = new FileInputStream(file)) { @@ -226,14 +212,6 @@ public abstract class AbstractMetadataReport implements MetadataReport { } } - /** - * 将元数据信息保存到本地文件中 - * - * @param metadataIdentifier 元数据标识符,用于唯一标识元数据信息 - * @param value 要保存的元数据信息的字符串表示 - * @param add 是否添加元数据信息,如果为 true,则添加;否则,移除 - * @param sync 是否同步保存,如果为 true,则同步保存;否则,异步保存 - */ private void saveProperties(MetadataIdentifier metadataIdentifier, String value, boolean add, boolean sync) { if (file == null) { return; @@ -241,19 +219,14 @@ public abstract class AbstractMetadataReport implements MetadataReport { try { if (add) { - // 添加元数据信息到 properties 中 properties.setProperty(metadataIdentifier.getUniqueKey(KeyTypeEnum.UNIQUE_KEY), value); } else { - // 移除指定的元数据信息 properties.remove(metadataIdentifier.getUniqueKey(KeyTypeEnum.UNIQUE_KEY)); } - // 更新缓存变化版本号 long version = lastCacheChanged.incrementAndGet(); if (sync) { - // 同步保存属性到文件 new SaveProperties(version).run(); } else { - // 异步执行保存属性到文件任务 reportCacheExecutor.execute(new SaveProperties(version)); } @@ -267,9 +240,6 @@ public abstract class AbstractMetadataReport implements MetadataReport { return getUrl().toString(); } - /** - * 内部类,实现了 `Runnable` 接口,用于保存属性到本地文件 - */ private class SaveProperties implements Runnable { private long version; @@ -283,14 +253,9 @@ public abstract class AbstractMetadataReport implements MetadataReport { } } - /** - * 保存属性到本地磁盘缓存文件中 - * - * @param providerMetadataIdentifier 提供者元数据标识符 - * @param serviceDefinition 要存储的服务定义 - */ @Override - public void storeProviderMetadata(MetadataIdentifier providerMetadataIdentifier, ServiceDefinition serviceDefinition) { + public void storeProviderMetadata( + MetadataIdentifier providerMetadataIdentifier, ServiceDefinition serviceDefinition) { if (syncReport) { storeProviderMetadataTask(providerMetadataIdentifier, serviceDefinition); } else { @@ -298,43 +263,33 @@ public abstract class AbstractMetadataReport implements MetadataReport { } } - /** - * 异步任务:存储服务提供者元数据的任务 - * - * @param providerMetadataIdentifier 提供者元数据的标识符 - * @param serviceDefinition 服务定义对象 - */ - private void storeProviderMetadataTask(MetadataIdentifier providerMetadataIdentifier, ServiceDefinition serviceDefinition) { - // 将元数据事件转换为服务订阅事件 - MetadataEvent metadataEvent = MetadataEvent.toServiceSubscribeEvent(applicationModel, providerMetadataIdentifier.getUniqueServiceName()); + private void storeProviderMetadataTask( + MetadataIdentifier providerMetadataIdentifier, ServiceDefinition serviceDefinition) { - // 发布元数据事件到指标事件总线,执行回调任务 + MetadataEvent metadataEvent = MetadataEvent.toServiceSubscribeEvent( + applicationModel, providerMetadataIdentifier.getUniqueServiceName()); MetricsEventBus.post( metadataEvent, () -> { boolean result = true; try { - // 记录日志:存储服务提供者元数据 if (logger.isInfoEnabled()) { logger.info("store provider metadata. Identifier : " + providerMetadataIdentifier + "; definition: " + serviceDefinition); } - - // 将服务定义对象放入所有元数据报告的缓存中,移除失败的报告 allMetadataReports.put(providerMetadataIdentifier, serviceDefinition); failedReports.remove(providerMetadataIdentifier); - - // 将服务定义对象转换为 JSON 字符串并存储到元数据存储中 String data = JsonUtils.toJson(serviceDefinition); doStoreProviderMetadata(providerMetadataIdentifier, data); - - // 保存属性变更到本地属性缓存 saveProperties(providerMetadataIdentifier, data, true, !syncReport); } catch (Exception e) { - // 如果存储失败,记录错误日志,加入失败的报告列表,并启动重试任务 + // retry again. If failed again, throw exception. failedReports.put(providerMetadataIdentifier, serviceDefinition); metadataReportRetry.startRetryTask(); - logger.error(PROXY_FAILED_EXPORT_SERVICE, "", "", + logger.error( + PROXY_FAILED_EXPORT_SERVICE, + "", + "", "Failed to put provider metadata " + providerMetadataIdentifier + " in " + serviceDefinition + ", cause: " + e.getMessage(), e); @@ -345,48 +300,32 @@ public abstract class AbstractMetadataReport implements MetadataReport { aBoolean -> aBoolean); } - /** - * 存储消费者元数据 - * 如果同步报告开关打开,则直接调用同步方法存储;否则,通过线程池异步执行存储任务 - * - * @param consumerMetadataIdentifier 消费者元数据的标识符 - * @param serviceParameterMap 服务参数映射表 - */ @Override - public void storeConsumerMetadata(MetadataIdentifier consumerMetadataIdentifier, Map serviceParameterMap) { + public void storeConsumerMetadata( + MetadataIdentifier consumerMetadataIdentifier, Map serviceParameterMap) { if (syncReport) { storeConsumerMetadataTask(consumerMetadataIdentifier, serviceParameterMap); } else { - reportCacheExecutor.execute(() -> storeConsumerMetadataTask(consumerMetadataIdentifier, serviceParameterMap)); + reportCacheExecutor.execute( + () -> storeConsumerMetadataTask(consumerMetadataIdentifier, serviceParameterMap)); } } - /** - * 异步任务:存储消费者元数据的任务 - * - * @param consumerMetadataIdentifier 消费者元数据的标识符 - * @param serviceParameterMap 服务参数映射表 - */ - protected void storeConsumerMetadataTask(MetadataIdentifier consumerMetadataIdentifier, Map serviceParameterMap) { + protected void storeConsumerMetadataTask( + MetadataIdentifier consumerMetadataIdentifier, Map serviceParameterMap) { try { - // 记录日志:存储消费者元数据 if (logger.isInfoEnabled()) { logger.info("store consumer metadata. Identifier : " + consumerMetadataIdentifier + "; definition: " + serviceParameterMap); } - - // 将服务参数映射表放入所有元数据报告的缓存中,移除失败的报告 allMetadataReports.put(consumerMetadataIdentifier, serviceParameterMap); failedReports.remove(consumerMetadataIdentifier); - // 将服务参数映射表转换为 JSON 字符串并存储到元数据存储中 String data = JsonUtils.toJson(serviceParameterMap); doStoreConsumerMetadata(consumerMetadataIdentifier, data); - - // 保存属性变更到本地属性缓存 saveProperties(consumerMetadataIdentifier, data, true, !syncReport); } catch (Exception e) { - // 如果存储失败,记录错误日志,加入失败的报告列表,并启动重试任务 + // retry again. If failed again, throw exception. failedReports.put(consumerMetadataIdentifier, serviceParameterMap); metadataReportRetry.startRetryTask(); logger.error( @@ -399,34 +338,20 @@ public abstract class AbstractMetadataReport implements MetadataReport { } } - /** - * 销毁方法,用于释放资源和关闭相关任务调度器 - */ @Override public void destroy() { - // 关闭报告缓存执行器 if (reportCacheExecutor != null) { reportCacheExecutor.shutdown(); } - - // 关闭报告定时调度器 if (reportTimerScheduler != null) { reportTimerScheduler.shutdown(); } - - // 销毁元数据报告重试管理器,并置空引用 if (metadataReportRetry != null) { metadataReportRetry.destroy(); metadataReportRetry = null; } } - /** - * 保存服务元数据。根据同步设置,同步执行或通过报告缓存执行保存操作 - * - * @param metadataIdentifier 服务元数据标识符 - * @param url 服务的URL - */ @Override public void saveServiceMetadata(ServiceMetadataIdentifier metadataIdentifier, URL url) { if (syncReport) { @@ -436,11 +361,6 @@ public abstract class AbstractMetadataReport implements MetadataReport { } } - /** - * 移除服务元数据。根据同步设置,同步执行或通过报告缓存执行移除操作 - * - * @param metadataIdentifier 服务元数据标识符 - */ @Override public void removeServiceMetadata(ServiceMetadataIdentifier metadataIdentifier) { if (syncReport) { @@ -450,51 +370,28 @@ public abstract class AbstractMetadataReport implements MetadataReport { } } - /** - * 获取导出的URL列表。如果未能获取,则回退到本地缓存 - * - * @param metadataIdentifier 服务元数据标识符 - * @return 导出的URL列表 - */ @Override public List getExportedURLs(ServiceMetadataIdentifier metadataIdentifier) { - // TODO 回退到本地缓存 + // TODO, fallback to local cache return doGetExportedURLs(metadataIdentifier); } - /** - * 存储订阅的数据。如果同步报告开启,则直接存储订阅数据;否则,将异步执行存储操作 - * - * @param subscriberMetadataIdentifier 订阅元数据标识符 - * @param urls 订阅的URL集合 - */ @Override public void saveSubscribedData(SubscriberMetadataIdentifier subscriberMetadataIdentifier, Set urls) { if (syncReport) { doSaveSubscriberData(subscriberMetadataIdentifier, JsonUtils.toJson(urls)); } else { - reportCacheExecutor.execute(() -> doSaveSubscriberData(subscriberMetadataIdentifier, JsonUtils.toJson(urls))); + reportCacheExecutor.execute( + () -> doSaveSubscriberData(subscriberMetadataIdentifier, JsonUtils.toJson(urls))); } } - /** - * 获取订阅的URL列表 - * - * @param subscriberMetadataIdentifier 订阅元数据标识符 - * @return 订阅的URL列表 - */ @Override public List getSubscribedURLs(SubscriberMetadataIdentifier subscriberMetadataIdentifier) { String content = doGetSubscribedURLs(subscriberMetadataIdentifier); return JsonUtils.toJavaList(content, String.class); } - /** - * 获取URL的协议 - * - * @param url URL对象 - * @return URL的协议 - */ String getProtocol(URL url) { String protocol = url.getSide(); protocol = protocol == null ? url.getProtocol() : protocol; @@ -502,52 +399,33 @@ public abstract class AbstractMetadataReport implements MetadataReport { } /** - * 判断是否需要重试处理元数据集合 - * - * @return 如果需要继续重试,则返回true;否则返回false + * @return if need to continue */ public boolean retry() { return doHandleMetadataCollection(failedReports); } - /** - * 指示是否应报告定义 - * - * @return 如果应报告定义,则返回true;否则返回false - */ @Override public boolean shouldReportDefinition() { return reportDefinition; } - /** - * 指示是否应报告元数据 - * - * @return 如果应报告元数据,则返回true;否则返回false - */ @Override public boolean shouldReportMetadata() { return reportMetadata; } - /** - * 处理元数据集合的方法,根据元数据的侧边(提供者或消费者)将其存储到相应的位置 - * - * @param metadataMap 元数据映射,包含要处理的元数据标识符和相应的对象 - * @return 如果处理完毕后需要继续重试,则返回true;否则返回false - */ private boolean doHandleMetadataCollection(Map metadataMap) { if (metadataMap.isEmpty()) { return true; } - Iterator> iterable = metadataMap.entrySet().iterator(); + Iterator> iterable = + metadataMap.entrySet().iterator(); while (iterable.hasNext()) { Map.Entry item = iterable.next(); if (PROVIDER_SIDE.equals(item.getKey().getSide())) { - // 如果是提供者侧的元数据,则存储为完整的服务定义对象 this.storeProviderMetadata(item.getKey(), (FullServiceDefinition) item.getValue()); } else if (CONSUMER_SIDE.equals(item.getKey().getSide())) { - // 如果是消费者侧的元数据,则存储为参数映射 this.storeConsumerMetadata(item.getKey(), (Map) item.getValue()); } } @@ -555,8 +433,7 @@ public abstract class AbstractMetadataReport implements MetadataReport { } /** - * 用于单元测试的方法,不是私有方法 - * 发布所有元数据到相应的处理方法 + * not private. just for unittest. */ void publishAll() { logger.info("start to publish all metadata."); @@ -564,14 +441,9 @@ public abstract class AbstractMetadataReport implements MetadataReport { } /** - * 计算一个起始时间,用于设置定时任务的启动时间 - * 时间计算逻辑包括: - * 1. 获取当前时间的毫秒数 - * 2. 将日历设置为当天的午夜(00:00:00.000) - * 3. 计算当前时间到午夜的毫秒数差 - * 4. 加上一定的偏移量,包括四小时的一半和一个四小时内的随机毫秒数 + * between 2:00 am to 6:00 am, the time is random. * - * @return 计算得到的起始时间 + * @return */ long calculateStartTime() { Calendar calendar = Calendar.getInstance(); @@ -586,77 +458,42 @@ public abstract class AbstractMetadataReport implements MetadataReport { + ThreadLocalRandom.current().nextInt(FOUR_HOURS_IN_MILLISECONDS); } - /** - * MetadataReportRetry 类用于处理元数据报告的重试机制 - */ class MetadataReportRetry { protected final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(getClass()); - /** - * 用于执行定时重试任务的调度执行器服务 - */ - final ScheduledExecutorService retryExecutor = Executors.newScheduledThreadPool(0, new NamedThreadFactory("DubboMetadataReportRetryTimer", true)); - - /** - * 用于取消重试任务的计划执行句柄 - */ + final ScheduledExecutorService retryExecutor = + Executors.newScheduledThreadPool(0, new NamedThreadFactory("DubboMetadataReportRetryTimer", true)); volatile ScheduledFuture retryScheduledFuture; - - /** - * 重试次数计数器,用于记录已经进行的重试次数 - */ final AtomicInteger retryCounter = new AtomicInteger(0); - - /** - * 重试任务的执行周期,以毫秒为单位 - */ + // retry task schedule period long retryPeriod; - - /** - * 当没有失败报告时,等待多少次运行重试任务 - */ + // if no failed report, wait how many times to run retry task. int retryTimesIfNonFail = 600; - /** - * 重试限制次数,达到此次数后不再继续重试 - */ int retryLimit; - /** - * 构造函数,初始化重试次数和重试周期 - * - * @param retryTimes 重试次数限制 - * @param retryPeriod 重试周期(毫秒) - */ public MetadataReportRetry(int retryTimes, int retryPeriod) { this.retryPeriod = retryPeriod; this.retryLimit = retryTimes; } - /** - * 启动重试任务,如果未启动则执行定时重试 - */ void startRetryTask() { if (retryScheduledFuture == null) { synchronized (retryCounter) { if (retryScheduledFuture == null) { retryScheduledFuture = retryExecutor.scheduleWithFixedDelay( () -> { - // 检查并连接到元数据 + // Check and connect to the metadata try { int times = retryCounter.incrementAndGet(); logger.info("start to retry task for metadata report. retry times:" + times); - - // 执行重试操作,如果无失败报告并且超过指定重试次数,则取消重试任务 if (retry() && times > retryTimesIfNonFail) { cancelRetryTask(); } - - // 如果超过重试限制次数,则取消重试任务 if (times > retryLimit) { cancelRetryTask(); } - } catch (Throwable t) { // 防御性容错处理 + } catch (Throwable t) { // Defensive fault tolerance logger.error( COMMON_UNEXPECTED_EXCEPTION, "", @@ -673,9 +510,6 @@ public abstract class AbstractMetadataReport implements MetadataReport { } } - /** - * 取消重试任务。如果存在已计划的任务,则取消并关闭重试执行器 - */ void cancelRetryTask() { if (retryScheduledFuture != null) { retryScheduledFuture.cancel(false); @@ -683,17 +517,12 @@ public abstract class AbstractMetadataReport implements MetadataReport { retryExecutor.shutdown(); } - /** - * 销毁操作。调用取消重试任务方法以确保所有任务被取消 - */ void destroy() { cancelRetryTask(); } /** - * 获取重试执行器实例,仅用于测试目的 - * - * @deprecated 仅用于测试 + * @deprecated only for test */ @Deprecated ScheduledExecutorService getRetryExecutor() { @@ -701,13 +530,6 @@ public abstract class AbstractMetadataReport implements MetadataReport { } } - /** - * 将订阅者数据保存到持久存储中。如果 URL 列表为空,则直接返回 - * 对 URL 列表进行编码后保存。 - * - * @param subscriberMetadataIdentifier 订阅者元数据标识 - * @param urls URL 列表 - */ private void doSaveSubscriberData(SubscriberMetadataIdentifier subscriberMetadataIdentifier, List urls) { if (CollectionUtils.isEmpty(urls)) { return; @@ -719,66 +541,25 @@ public abstract class AbstractMetadataReport implements MetadataReport { doSaveSubscriberData(subscriberMetadataIdentifier, encodedUrlList); } - /** - * 存储提供者元数据信息的抽象方法。由子类实现具体存储逻辑 - * - * @param providerMetadataIdentifier 提供者元数据标识 - * @param serviceDefinitions 服务定义信息字符串 - */ - protected abstract void doStoreProviderMetadata(MetadataIdentifier providerMetadataIdentifier, String serviceDefinitions); + protected abstract void doStoreProviderMetadata( + MetadataIdentifier providerMetadataIdentifier, String serviceDefinitions); - /** - * 存储消费者元数据信息的抽象方法。由子类实现具体存储逻辑 - * - * @param consumerMetadataIdentifier 消费者元数据标识 - * @param serviceParameterString 服务参数字符串 - */ - protected abstract void doStoreConsumerMetadata(MetadataIdentifier consumerMetadataIdentifier, String serviceParameterString); + protected abstract void doStoreConsumerMetadata( + MetadataIdentifier consumerMetadataIdentifier, String serviceParameterString); - /** - * 存储服务元数据信息的抽象方法。由子类实现具体存储逻辑 - * - * @param metadataIdentifier 服务元数据标识 - * @param url URL 对象 - */ protected abstract void doSaveMetadata(ServiceMetadataIdentifier metadataIdentifier, URL url); - /** - * 删除服务元数据信息的抽象方法。由子类实现具体删除逻辑 - * - * @param metadataIdentifier 服务元数据标识 - */ protected abstract void doRemoveMetadata(ServiceMetadataIdentifier metadataIdentifier); - /** - * 获取导出的 URL 列表的抽象方法。由子类实现具体获取逻辑 - * - * @param metadataIdentifier 服务元数据标识 - * @return 导出的 URL 列表 - */ protected abstract List doGetExportedURLs(ServiceMetadataIdentifier metadataIdentifier); - /** - * 存储订阅者数据的抽象方法。由子类实现具体存储逻辑 - * - * @param subscriberMetadataIdentifier 订阅者元数据标识 - * @param urlListStr URL 列表的 JSON 字符串形式 - */ - protected abstract void doSaveSubscriberData(SubscriberMetadataIdentifier subscriberMetadataIdentifier, String urlListStr); + protected abstract void doSaveSubscriberData( + SubscriberMetadataIdentifier subscriberMetadataIdentifier, String urlListStr); - /** - * 获取订阅的 URL 列表的抽象方法。由子类实现具体获取逻辑 - * - * @param subscriberMetadataIdentifier 订阅者元数据标识 - * @return 订阅的 URL 列表的 JSON 字符串形式 - */ protected abstract String doGetSubscribedURLs(SubscriberMetadataIdentifier subscriberMetadataIdentifier); /** - * 获取报告缓存执行器的方法。仅供单元测试使用 - * - * @return 报告缓存执行器 - * @deprecated 仅供单元测试使用 + * @deprecated only for unit test */ @Deprecated protected ExecutorService getReportCacheExecutor() { @@ -786,10 +567,7 @@ public abstract class AbstractMetadataReport implements MetadataReport { } /** - * 获取元数据报告重试管理器的方法。仅供单元测试使用 - * - * @return 元数据报告重试管理器 - * @deprecated 仅供单元测试使用 + * @deprecated only for unit test */ @Deprecated protected MetadataReportRetry getMetadataReportRetry() { diff --git a/ruoyi-common/ruoyi-common-dubbo/src/main/java/org/apache/dubbo/metadata/store/redis/RedisMetadataReport.java b/ruoyi-common/ruoyi-common-dubbo/src/main/java/org/apache/dubbo/metadata/store/redis/RedisMetadataReport.java index 9d1fd0c9..b2bb4186 100644 --- a/ruoyi-common/ruoyi-common-dubbo/src/main/java/org/apache/dubbo/metadata/store/redis/RedisMetadataReport.java +++ b/ruoyi-common/ruoyi-common-dubbo/src/main/java/org/apache/dubbo/metadata/store/redis/RedisMetadataReport.java @@ -16,7 +16,6 @@ */ package org.apache.dubbo.metadata.store.redis; -import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.apache.dubbo.common.URL; import org.apache.dubbo.common.config.configcenter.ConfigItem; import org.apache.dubbo.common.logger.ErrorTypeAwareLogger; @@ -29,17 +28,40 @@ import org.apache.dubbo.metadata.MappingChangedEvent; import org.apache.dubbo.metadata.MappingListener; import org.apache.dubbo.metadata.MetadataInfo; import org.apache.dubbo.metadata.ServiceNameMapping; -import org.apache.dubbo.metadata.report.identifier.*; +import org.apache.dubbo.metadata.report.identifier.BaseMetadataIdentifier; +import org.apache.dubbo.metadata.report.identifier.KeyTypeEnum; +import org.apache.dubbo.metadata.report.identifier.MetadataIdentifier; +import org.apache.dubbo.metadata.report.identifier.ServiceMetadataIdentifier; +import org.apache.dubbo.metadata.report.identifier.SubscriberMetadataIdentifier; import org.apache.dubbo.metadata.report.support.AbstractMetadataReport; import org.apache.dubbo.rpc.RpcException; -import redis.clients.jedis.*; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import redis.clients.jedis.HostAndPort; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisCluster; +import redis.clients.jedis.JedisPool; +import redis.clients.jedis.JedisPoolConfig; +import redis.clients.jedis.JedisPubSub; +import redis.clients.jedis.Transaction; import redis.clients.jedis.params.SetParams; import redis.clients.jedis.util.JedisClusterCRC16; -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; - -import static org.apache.dubbo.common.constants.CommonConstants.*; +import static org.apache.dubbo.common.constants.CommonConstants.CLUSTER_KEY; +import static org.apache.dubbo.common.constants.CommonConstants.CYCLE_REPORT_KEY; +import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT; +import static org.apache.dubbo.common.constants.CommonConstants.GROUP_CHAR_SEPARATOR; +import static org.apache.dubbo.common.constants.CommonConstants.QUEUES_KEY; +import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY; import static org.apache.dubbo.common.constants.LoggerCodeConstants.TRANSPORT_FAILED_RESPONSE; import static org.apache.dubbo.metadata.MetadataConstants.META_DATA_STORE_TAG; import static org.apache.dubbo.metadata.ServiceNameMapping.DEFAULT_MAPPING_GROUP; @@ -47,40 +69,31 @@ import static org.apache.dubbo.metadata.ServiceNameMapping.getAppNames; import static org.apache.dubbo.metadata.report.support.Constants.DEFAULT_METADATA_REPORT_CYCLE_REPORT; /** - * RedisMetadataReport 是基于 Redis 的元数据报告实现类 + * RedisMetadataReport */ public class RedisMetadataReport extends AbstractMetadataReport { + private static final String REDIS_DATABASE_KEY = "database"; private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(RedisMetadataReport.class); - // 受保护的 JedisPool 实例,用于测试 + + // protected , for test protected JedisPool pool; - // Redis 集群节点集合 private Set jedisClusterNodes; private int timeout; private String password; private final String root; - // 映射数据监听器映射表 private final ConcurrentHashMap mappingDataListenerMap = new ConcurrentHashMap<>(); private SetParams jedisParams = SetParams.setParams(); - /** - * 构造方法,根据给定的 URL 初始化 RedisMetadataReport - * - * @param url 元数据中心的 URL - */ public RedisMetadataReport(URL url) { super(url); timeout = url.getParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT); password = url.getPassword(); this.root = url.getGroup(DEFAULT_ROOT); - - // 设置默认的周期性报告时间 if (url.getParameter(CYCLE_REPORT_KEY, DEFAULT_METADATA_REPORT_CYCLE_REPORT)) { - // TTL 默认是周期报告时间的两倍 + // ttl default is twice the cycle-report time jedisParams.ex(ONE_DAY_IN_MILLISECONDS * 2); } - - // 判断是否为集群模式 if (url.getParameter(CLUSTER_KEY, false)) { jedisClusterNodes = new HashSet<>(); List urls = url.getBackupUrls(); @@ -88,61 +101,31 @@ public class RedisMetadataReport extends AbstractMetadataReport { jedisClusterNodes.add(new HostAndPort(tmpUrl.getHost(), tmpUrl.getPort())); } } else { - // 单机模式下的 Redis 数据库编号,默认为 0 int database = url.getParameter(REDIS_DATABASE_KEY, 0); pool = new JedisPool(new JedisPoolConfig(), url.getHost(), url.getPort(), timeout, password, database); } } - /** - * 存储提供者元数据的具体实现 - * - * @param providerMetadataIdentifier 提供者元数据标识符 - * @param serviceDefinitions 服务定义信息 - */ @Override protected void doStoreProviderMetadata(MetadataIdentifier providerMetadataIdentifier, String serviceDefinitions) { this.storeMetadata(providerMetadataIdentifier, serviceDefinitions); } - /** - * 存储消费者元数据的具体实现 - * - * @param consumerMetadataIdentifier 消费者元数据标识符 - * @param value 元数据值 - */ @Override protected void doStoreConsumerMetadata(MetadataIdentifier consumerMetadataIdentifier, String value) { this.storeMetadata(consumerMetadataIdentifier, value); } - /** - * 存储服务元数据的具体实现 - * - * @param serviceMetadataIdentifier 服务元数据标识符 - * @param url 服务URL编码后的完整字符串 - */ @Override protected void doSaveMetadata(ServiceMetadataIdentifier serviceMetadataIdentifier, URL url) { this.storeMetadata(serviceMetadataIdentifier, URL.encode(url.toFullString())); } - /** - * 移除元数据的具体实现 - * - * @param serviceMetadataIdentifier 服务元数据标识符 - */ @Override protected void doRemoveMetadata(ServiceMetadataIdentifier serviceMetadataIdentifier) { this.deleteMetadata(serviceMetadataIdentifier); } - /** - * 获取导出的URL列表 - * - * @param metadataIdentifier 服务元数据标识符 - * @return 导出的URL列表,如果内容为空则返回空列表 - */ @Override protected List doGetExportedURLs(ServiceMetadataIdentifier metadataIdentifier) { String content = getMetadata(metadataIdentifier); @@ -152,45 +135,21 @@ public class RedisMetadataReport extends AbstractMetadataReport { return new ArrayList<>(Arrays.asList(URL.decode(content))); } - /** - * 存储订阅者数据的具体实现 - * - * @param subscriberMetadataIdentifier 订阅者元数据标识符 - * @param urlListStr URL列表字符串 - */ @Override protected void doSaveSubscriberData(SubscriberMetadataIdentifier subscriberMetadataIdentifier, String urlListStr) { this.storeMetadata(subscriberMetadataIdentifier, urlListStr); } - /** - * 获取订阅的URL列表 - * - * @param subscriberMetadataIdentifier 订阅者元数据标识符 - * @return 订阅的URL列表 - */ @Override protected String doGetSubscribedURLs(SubscriberMetadataIdentifier subscriberMetadataIdentifier) { return this.getMetadata(subscriberMetadataIdentifier); } - /** - * 获取服务定义 - * - * @param metadataIdentifier 元数据标识符 - * @return 服务定义内容 - */ @Override public String getServiceDefinition(MetadataIdentifier metadataIdentifier) { return this.getMetadata(metadataIdentifier); } - /** - * 存储元数据的通用方法,根据是否有连接池选择存储方式 - * - * @param metadataIdentifier 元数据标识符 - * @param v 元数据值 - */ private void storeMetadata(BaseMetadataIdentifier metadataIdentifier, String v) { if (pool != null) { storeMetadataStandalone(metadataIdentifier, v); @@ -199,12 +158,6 @@ public class RedisMetadataReport extends AbstractMetadataReport { } } - /** - * 在集群模式下存储元数据 - * - * @param metadataIdentifier 元数据标识符 - * @param v 元数据值 - */ private void storeMetadataInCluster(BaseMetadataIdentifier metadataIdentifier, String v) { try (JedisCluster jedisCluster = new JedisCluster(jedisClusterNodes, timeout, timeout, 2, password, new GenericObjectPoolConfig<>())) { @@ -217,12 +170,6 @@ public class RedisMetadataReport extends AbstractMetadataReport { } } - /** - * 在单机模式下存储元数据 - * - * @param metadataIdentifier 元数据标识符 - * @param v 元数据值 - */ private void storeMetadataStandalone(BaseMetadataIdentifier metadataIdentifier, String v) { try (Jedis jedis = pool.getResource()) { jedis.set(metadataIdentifier.getUniqueKey(KeyTypeEnum.UNIQUE_KEY), v, jedisParams); @@ -233,11 +180,6 @@ public class RedisMetadataReport extends AbstractMetadataReport { } } - /** - * 删除元数据 - * - * @param metadataIdentifier 元数据标识符 - */ private void deleteMetadata(BaseMetadataIdentifier metadataIdentifier) { if (pool != null) { deleteMetadataStandalone(metadataIdentifier); @@ -246,11 +188,6 @@ public class RedisMetadataReport extends AbstractMetadataReport { } } - /** - * 在集群模式下删除元数据 - * - * @param metadataIdentifier 元数据标识符 - */ private void deleteMetadataInCluster(BaseMetadataIdentifier metadataIdentifier) { try (JedisCluster jedisCluster = new JedisCluster(jedisClusterNodes, timeout, timeout, 2, password, new GenericObjectPoolConfig<>())) { @@ -262,11 +199,6 @@ public class RedisMetadataReport extends AbstractMetadataReport { } } - /** - * 在单机模式下删除元数据 - * - * @param metadataIdentifier 元数据标识符 - */ private void deleteMetadataStandalone(BaseMetadataIdentifier metadataIdentifier) { try (Jedis jedis = pool.getResource()) { jedis.del(metadataIdentifier.getUniqueKey(KeyTypeEnum.UNIQUE_KEY)); @@ -277,12 +209,6 @@ public class RedisMetadataReport extends AbstractMetadataReport { } } - /** - * 获取元数据 - * - * @param metadataIdentifier 元数据标识符 - * @return 元数据值 - */ private String getMetadata(BaseMetadataIdentifier metadataIdentifier) { if (pool != null) { return getMetadataStandalone(metadataIdentifier); @@ -291,12 +217,6 @@ public class RedisMetadataReport extends AbstractMetadataReport { } } - /** - * 在集群模式下获取元数据 - * - * @param metadataIdentifier 元数据标识符 - * @return 元数据值 - */ private String getMetadataInCluster(BaseMetadataIdentifier metadataIdentifier) { try (JedisCluster jedisCluster = new JedisCluster(jedisClusterNodes, timeout, timeout, 2, password, new GenericObjectPoolConfig<>())) { @@ -308,12 +228,6 @@ public class RedisMetadataReport extends AbstractMetadataReport { } } - /** - * 在单机模式下获取元数据 - * - * @param metadataIdentifier 元数据标识符 - * @return 元数据值 - */ private String getMetadataStandalone(BaseMetadataIdentifier metadataIdentifier) { try (Jedis jedis = pool.getResource()) { return jedis.get(metadataIdentifier.getUniqueKey(KeyTypeEnum.UNIQUE_KEY)); @@ -325,17 +239,15 @@ public class RedisMetadataReport extends AbstractMetadataReport { } /** - * 使用Redis哈希存储类和应用程序名称的映射关系 - *

- * 键:默认为 'dubbo:mapping' - * 字段:类(serviceInterface) - * 值:应用程序名称列表 - * - * @param serviceInterface 类名(作为字段) - * @param defaultMappingGroup 默认映射组 {@link ServiceNameMapping#DEFAULT_MAPPING_GROUP} - * @param newConfigContent 新的应用程序名称列表 - * @param ticket 先前的应用程序名称列表 - * @return 是否成功注册映射关系 + * Store class and application names using Redis hashes + * key: default 'dubbo:mapping' + * field: class (serviceInterface) + * value: application_names + * @param serviceInterface field(class) + * @param defaultMappingGroup {@link ServiceNameMapping#DEFAULT_MAPPING_GROUP} + * @param newConfigContent new application_names + * @param ticket previous application_names + * @return */ @Override public boolean registerServiceAppMapping( @@ -353,17 +265,6 @@ public class RedisMetadataReport extends AbstractMetadataReport { } } - /** - * 根据是否存在 Redis 连接池选择存储映射关系的方法。 - * 如果存在 Redis 连接池,则使用单机存储方式 {@link #storeMappingStandalone(String, String, String, String)}; - * 否则,使用集群存储方式 {@link #storeMappingInCluster(String, String, String, String)}。 - * - * @param key 存储键 - * @param field 存储字段 - * @param value 存储值 - * @param ticket 事务票据,用于 CAS 操作 - * @return 存储是否成功 - */ private boolean storeMapping(String key, String field, String value, String ticket) { if (pool != null) { return storeMappingStandalone(key, field, value, ticket); @@ -373,17 +274,8 @@ public class RedisMetadataReport extends AbstractMetadataReport { } /** - * 在 Redis 集群中存储映射关系 - * 使用 Redis 集群的方式存储给定键和字段的映射关系,并实现乐观锁 CAS 操作 - * 如果旧值为空或与给定的事务票据匹配,则更新字段的值为新值,并发布更新事件 - * 使用 WATCH 和 MULTI 来实现事务操作 - * - * @param key 存储键 - * @param field 存储字段 - * @param value 存储值 - * @param ticket 事务票据,用于 CAS 操作 - * @return 存储是否成功 - * @throws RpcException 存储过程中发生的异常,以及失败的原因 + * use 'watch' to implement cas. + * Find information about slot distribution by key. */ private boolean storeMappingInCluster(String key, String field, String value, String ticket) { try (JedisCluster jedisCluster = @@ -412,14 +304,8 @@ public class RedisMetadataReport extends AbstractMetadataReport { } /** - * 在单机Redis中存储映射关系 - * 使用 'watch' 实现CAS(比较并交换) - * - * @param key Redis键 - * @param field Redis哈希字段(类名) - * @param value 新的应用程序名称列表 - * @param ticket 先前的应用程序名称列表 - * @return 是否成功存储映射关系 + * use 'watch' to implement cas. + * Find information about slot distribution by key. */ private boolean storeMappingStandalone(String key, String field, String value, String ticket) { try (Jedis jedis = pool.getResource()) { @@ -444,48 +330,36 @@ public class RedisMetadataReport extends AbstractMetadataReport { } /** - * 构建映射关键字,用于存储服务类和应用名称的 Redis 哈希表键 - * 结合根路径和默认映射组名构建完整的映射键 - * - * @param defaultMappingGroup 默认映射组名 {@link ServiceNameMapping#DEFAULT_MAPPING_GROUP} - * @return 构建的映射关键字 + * build mapping key + * @param defaultMappingGroup {@link ServiceNameMapping#DEFAULT_MAPPING_GROUP} + * @return */ private String buildMappingKey(String defaultMappingGroup) { return this.root + GROUP_CHAR_SEPARATOR + defaultMappingGroup; } /** - * 构建发布订阅键,用于 Redis 发布-订阅模式中的通道名称 - * 结合默认映射组名和队列键构建完整的发布订阅键 - * - * @return 构建的发布订阅键 + * build pub/sub key */ private String buildPubSubKey() { return buildMappingKey(DEFAULT_MAPPING_GROUP) + GROUP_CHAR_SEPARATOR + QUEUES_KEY; } /** - * 根据服务键和分组获取配置项 - * 使用分组构建映射键,并获取映射数据,然后返回一个配置项对象 - * - * @param serviceKey 服务键,用于标识特定的服务 - * @param group 分组,用于构建映射键 - * @return 配置项对象,包含从映射数据中获取的内容 + * get content and use content to complete cas + * @param serviceKey class + * @param group {@link ServiceNameMapping#DEFAULT_MAPPING_GROUP} */ @Override public ConfigItem getConfigItem(String serviceKey, String group) { String key = buildMappingKey(group); String content = getMappingData(key, serviceKey); + return new ConfigItem(content, content); } /** - * 根据键和字段从 Redis 中获取映射数据 - * 如果连接池不为空,则使用独立模式获取数据;否则使用集群模式 - * - * @param key 键,用于定位数据的存储位置 - * @param field 字段,用于定位具体的数据项 - * @return 获取到的映射数据 + * get current application_names */ private String getMappingData(String key, String field) { if (pool != null) { @@ -495,14 +369,6 @@ public class RedisMetadataReport extends AbstractMetadataReport { } } - /** - * 从 Redis 集群中获取指定键和字段的映射数据 - * - * @param key Redis 哈希表的键 - * @param field 哈希表中的字段 - * @return 返回键和字段对应的值,如果获取失败则抛出异常 - * @throws RpcException 如果从 Redis 集群获取数据失败,抛出该异常 - */ private String getMappingDataInCluster(String key, String field) { try (JedisCluster jedisCluster = new JedisCluster(jedisClusterNodes, timeout, timeout, 2, password, new GenericObjectPoolConfig<>())) { @@ -514,13 +380,6 @@ public class RedisMetadataReport extends AbstractMetadataReport { } } - /** - * 使用集群模式从 Redis 中获取映射数据 - * - * @param key 键,用于定位数据的存储位置 - * @param field 字段,用于定位具体的数据项 - * @return 获取到的映射数据,如果获取失败则抛出 RpcException 异常 - */ private String getMappingDataStandalone(String key, String field) { try (Jedis jedis = pool.getResource()) { return jedis.hget(key, field); @@ -532,10 +391,7 @@ public class RedisMetadataReport extends AbstractMetadataReport { } /** - * 移除服务应用映射的监听器 - * - * @param serviceKey 服务键,用于标识特定的服务 - * @param listener 映射监听器,用于处理映射变更事件 + * remove listener. If have no listener,thread will dead */ @Override public void removeServiceAppMappingListener(String serviceKey, MappingListener listener) { @@ -550,13 +406,8 @@ public class RedisMetadataReport extends AbstractMetadataReport { } /** - * 启动一个线程并订阅 {@link this#buildPubSubKey()} - * 如果 'application_names' 消息发生变化,则通知 {@link MappingListener}。 - * - * @param serviceKey 服务键 - * @param listener 映射监听器 - * @param url URL - * @return 返回服务与应用映射关系的集合 + * Start a thread and subscribe to {@link this#buildPubSubKey()}. + * Notify {@link MappingListener} if there is a change in the 'application_names' message. */ @Override public Set getServiceAppMapping(String serviceKey, MappingListener listener, URL url) { @@ -570,82 +421,45 @@ public class RedisMetadataReport extends AbstractMetadataReport { return this.getServiceAppMapping(serviceKey, url); } - /** - * 获取指定服务键的服务与应用映射关系集合 - * - * @param serviceKey 服务键 - * @param url URL - * @return 返回服务与应用映射关系的集合 - */ @Override public Set getServiceAppMapping(String serviceKey, URL url) { String key = buildMappingKey(DEFAULT_MAPPING_GROUP); return getAppNames(getMappingData(key, serviceKey)); } - /** - * 获取订阅者元数据信息 - * - * @param identifier 订阅者元数据标识符 - * @param instanceMetadata 实例元数据映射 - * @return 返回订阅者的元数据信息对象 - */ @Override public MetadataInfo getAppMetadata(SubscriberMetadataIdentifier identifier, Map instanceMetadata) { String content = this.getMetadata(identifier); return JsonUtils.toJavaObject(content, MetadataInfo.class); } - /** - * 发布应用元数据信息 - * - * @param identifier 订阅者元数据标识符 - * @param metadataInfo 元数据信息对象 - */ @Override public void publishAppMetadata(SubscriberMetadataIdentifier identifier, MetadataInfo metadataInfo) { this.storeMetadata(identifier, metadataInfo.getContent()); } - /** - * 取消发布应用元数据信息 - * - * @param identifier 订阅者元数据标识符 - * @param metadataInfo 元数据信息对象 - */ @Override public void unPublishAppMetadata(SubscriberMetadataIdentifier identifier, MetadataInfo metadataInfo) { this.deleteMetadata(identifier); } - // 用于测试 + // for test public MappingDataListener getMappingDataListener() { return mappingDataListenerMap.get(buildPubSubKey()); } /** - * 监听 'application_names' 消息的变化并通知监听器 + * Listen for changes in the 'application_names' message and notify the listener. */ class NotifySub extends JedisPubSub { + private final Map> listeners = new ConcurrentHashMap<>(); - /** - * 添加监听器 - * - * @param key 监听的键 - * @param listener 监听器对象 - */ public void addListener(String key, MappingListener listener) { Set listenerSet = listeners.computeIfAbsent(key, k -> new ConcurrentHashSet<>()); listenerSet.add(listener); } - /** - * 移除监听器 - * - * @param serviceKey 服务键 - * @param listener 监听器对象 - */ public void removeListener(String serviceKey, MappingListener listener) { Set listenerSet = this.listeners.get(serviceKey); if (listenerSet != null) { @@ -656,21 +470,10 @@ public class RedisMetadataReport extends AbstractMetadataReport { } } - /** - * 检查监听器集合是否为空 - * - * @return 如果监听器集合为空则返回 true,否则返回 false - */ public Boolean isEmpty() { return this.listeners.isEmpty(); } - /** - * 当接收到消息时触发的方法 - * - * @param key 消息的键 - * @param msg 接收到的消息内容 - */ @Override public void onMessage(String key, String msg) { logger.info("sub from redis:" + key + " message:" + msg); @@ -683,24 +486,11 @@ public class RedisMetadataReport extends AbstractMetadataReport { } } - /** - * 当接收到模式消息时触发的方法 - * - * @param pattern 模式 - * @param key 消息的键 - * @param msg 接收到的消息内容 - */ @Override public void onPMessage(String pattern, String key, String msg) { onMessage(key, msg); } - /** - * 当成功订阅模式时触发的方法 - * - * @param pattern 订阅的模式 - * @param subscribedChannels 订阅的频道数量 - */ @Override public void onPSubscribe(String pattern, int subscribedChannels) { super.onPSubscribe(pattern, subscribedChannels); @@ -708,7 +498,7 @@ public class RedisMetadataReport extends AbstractMetadataReport { } /** - * 监听应用名称变化消息的线程类 + * Subscribe application names change message. */ class MappingDataListener extends Thread { @@ -718,27 +508,14 @@ public class RedisMetadataReport extends AbstractMetadataReport { // for test protected volatile boolean running = true; - /** - * 构造方法,指定监听的路径 - * - * @param path 监听的路径 - */ public MappingDataListener(String path) { this.path = path; } - /** - * 获取通知订阅器 - * - * @return 通知订阅器 - */ public NotifySub getNotifySub() { return notifySub; } - /** - * 线程运行方法,持续订阅指定路径的消息 - */ @Override public void run() { while (running) { @@ -763,9 +540,6 @@ public class RedisMetadataReport extends AbstractMetadataReport { } } - /** - * 关闭方法,用于停止线程运行并取消订阅指定路径的消息 - */ public void shutdown() { try { running = false;