1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173
| #include "sync_manager.h" #include <spdlog/spdlog.h>
namespace config {
SyncManager::SyncManager(std::shared_ptr<ConfigUpdateManager> config_manager) : config_manager_(config_manager) { running_.store(true); worker_thread_ = std::thread(&SyncManager::syncWorker, this); }
SyncManager::~SyncManager() { running_.store(false); queue_cv_.notify_all(); if (worker_thread_.joinable()) { worker_thread_.join(); } }
void SyncManager::setPolicy(const SyncPolicy& policy) { std::lock_guard<std::mutex> lock(queue_mutex_); policy_ = policy; SPDLOG_INFO("Sync policy updated: strategy={}, batch_size={}, max_delay={}ms", static_cast<int>(policy_.strategy), policy_.max_batch_size, policy_.max_delay_ms ); }
void SyncManager::setSyncCallback(SyncCallback callback) { sync_callback_ = std::move(callback); }
bool SyncManager::requestSync(const std::string& module_name, int64_t version) { { std::lock_guard<std::mutex> lock(queue_mutex_);
auto it = std::find_if(pending_syncs_.begin(), pending_syncs_.end(), [&module_name](const auto& p) { return p.first == module_name; });
if (it != pending_syncs_.end()) { it->second = version; SPDLOG_DEBUG("Updated pending sync for {}: version={}", module_name, version); } else { pending_syncs_.emplace_back(module_name, version); SPDLOG_DEBUG("Added pending sync for {}: version={}", module_name, version); } }
if (policy_.strategy == SyncStrategy::IMMEDIATE) { queue_cv_.notify_one(); } else { queue_cv_.notify_one(); }
return true; }
void SyncManager::syncWorker() { SPDLOG_INFO("Sync worker started");
while (running_.load()) { std::unique_lock<std::mutex> lock(queue_mutex_);
if (pending_syncs_.empty()) { queue_cv_.wait_for(lock, std::chrono::milliseconds(policy_.max_delay_ms), [this] { return !pending_syncs_.empty() || !running_.load(); }); }
if (!running_.load()) { break; }
if (policy_.strategy == SyncStrategy::BATCHED && pending_syncs_.size() < policy_.max_batch_size) { queue_cv_.wait_for(lock, std::chrono::milliseconds(policy_.max_delay_ms)); }
processBatch(); }
SPDLOG_INFO("Sync worker stopped"); }
bool SyncManager::processBatch() { std::vector<std::pair<std::string, int64_t>> batch; batch.swap(batch, pending_syncs_);
if (batch.empty()) { return true; }
SPDLOG_INFO("Processing sync batch: size={}", batch.size());
for (const auto& [module, version] : batch) { bool success = false; int retry = 0;
while (retry < policy_.retry_count && !success) { if (sync_callback_) { success = sync_callback_(version, module); } else { success = true; }
if (!success) { retry++; SPDLOG_WARN("Sync failed for {}@{}, retry {}/{}", module, version, retry, policy_.retry_count); std::this_thread::sleep_for(std::chrono::milliseconds(policy_.retry_delay_ms)); } }
if (policy_.require_ack) { waitForAck(version, module); } }
last_sync_version_.store(config_manager_->getGlobalVersion()); synced_.store(true);
return true; }
bool SyncManager::waitForAck(int64_t version, const std::string& module) { std::unique_lock<std::mutex> lock(ack_mutex_);
auto it = ack_status_.find(module); if (it != ack_status_.end() && it->second) { ack_status_.erase(it); return true; }
return ack_cv_.wait_for(lock, std::chrono::milliseconds(policy_.retry_delay_ms), [this, &module]() { auto it = ack_status_.find(module); return it != ack_status_.end() && it->second; } ); }
bool SyncManager::forceFullSync(const std::vector<std::string>& modules) { SPDLOG_INFO("Force full sync requested for {} modules", modules.size());
std::lock_guard<std::mutex> lock(queue_mutex_);
int64_t current_version = config_manager_->getGlobalVersion(); for (const auto& module : modules) { pending_syncs_.emplace_back(module, current_version); }
queue_cv_.notify_one(); return true; }
bool SyncManager::isSynced() const { return synced_.load(); }
int64_t SyncManager::getLastSyncVersion() const { return last_sync_version_.load(); }
}
|