diff --git a/writeengine/redistribute/we_redistributecontrol.cpp b/writeengine/redistribute/we_redistributecontrol.cpp index 1b029763a..573a7c360 100644 --- a/writeengine/redistribute/we_redistributecontrol.cpp +++ b/writeengine/redistribute/we_redistributecontrol.cpp @@ -413,6 +413,7 @@ bool RedistributeControl::getStartOptions(messageqcpp::ByteStream& bs) bs >> fOptions; bs >> n; + fSourceList.clear(); fSourceList.reserve(n); for (uint32_t i = 0; i < n; i++) { @@ -420,6 +421,7 @@ bool RedistributeControl::getStartOptions(messageqcpp::ByteStream& bs) fSourceList.push_back(d); } bs >> n; + fDestinationList.clear(); fDestinationList.reserve(n); for (uint32_t i = 0; i < n; i++) { diff --git a/writeengine/redistribute/we_redistributecontrolthread.cpp b/writeengine/redistribute/we_redistributecontrolthread.cpp index 94b21bf29..dd498dd2a 100644 --- a/writeengine/redistribute/we_redistributecontrolthread.cpp +++ b/writeengine/redistribute/we_redistributecontrolthread.cpp @@ -173,20 +173,23 @@ int RedistributeControlThread::setup() vector::iterator i = fControl->fSourceList.begin(); for (; i != fControl->fSourceList.end(); i++) { -// fSourceSet.insert(*i); + fSourceSet.insert(*i); fDbrootSet.insert(*i); if (*i > fMaxDbroot) fMaxDbroot = *i; } -// vector::iterator j = fControl->fDestinationList.begin(); -// for (; j != fControl->fDestinationList.end(); j++) -// { -// fTargetSet.insert(*j); -// fDbrootSet.insert(*j); + vector::iterator j = fControl->fDestinationList.begin(); + for (; j != fControl->fDestinationList.end(); j++) + { + fTargetSet.insert(*j); + if (fDbrootSet.find(*j) == fDbrootSet.end()) + { + fDbrootSet.insert(*j); + } // if (*j > fMaxDbroot) // fMaxDbroot = *j; -// } + } } catch (const std::exception& ex) { @@ -262,9 +265,9 @@ int RedistributeControlThread::makeRedistributePlan() j != partitionMap.end(); j++) { int dbroot = j->first.dbroot; - if (fDbrootSet.find(dbroot) != fDbrootSet.end()) + if (fSourceSet.find(dbroot) != fSourceSet.end()) { - // only dbroot in source and target list needs attention + // only dbroot in source list needs attention dbPartVec[dbroot].push_back(j->first.partition); if (j->first.partition > maxPartitionId) @@ -274,98 +277,167 @@ int RedistributeControlThread::makeRedistributePlan() } } - // sort the partition + // sort the partitions for (vector >::iterator k = dbPartVec.begin(); k != dbPartVec.end(); k++) sort(k->begin(), k->end()); // divide the dbroots into the source and target sets - uint64_t average = totalPartitionCount / fDbrootSet.size(); - uint64_t remainder = totalPartitionCount % fDbrootSet.size(); + uint64_t average = totalPartitionCount / fTargetSet.size(); + // Remainder is the number of partitions that must be spread across some + // of the dbroots such that no dbroot has more than average+1 partitions. + uint64_t remainder = totalPartitionCount % fTargetSet.size(); set sourceDbroots; set targetDbroots; // list > targetList; // to be ordered by partition size + int64_t extra = remainder; for (set::iterator j = fDbrootSet.begin(); j != fDbrootSet.end(); ++j) { + if (fTargetSet.find(*j) == fTargetSet.end()) + { + // Not a target (removed on command line). Always a source. + sourceDbroots.insert(*j); + continue; + } + // If a dbroot has exactly average+1 partitions and there's extras to be had, + // then it is neither a source nor a target. + if ((dbPartVec[*j].size() == average+1) && extra) + { + --extra; + continue; + } if (dbPartVec[*j].size() > average) { - // the last partition is not a candidate for redistribute. - dbPartVec[*j].pop_back(); + // Sources are those dbroots with more than average partitions sourceDbroots.insert(*j); } - else if (dbPartVec[*j].size() <= average) + else { + // Targets are those with room ( <= average ) targetDbroots.insert(*j); } } + // At this point, there are two concepts of target. (1)Those fTargetSet, which is the + // set of dbroots the user wants partitions on and (2) those in targetDbroots, a subset of + // fTargetSet, which is those that actually have room, based on average, for more data. - // After redistribution, partition # is in [average, average+1]. - // When remainder > # of source, some target will have (average+1) partitions. - int64_t extra = ((int64_t) remainder) - ((int64_t) sourceDbroots.size()); + // After redistribution, partition count for each dbroot is average or average+1. + // When remainder > 0, some targets will have (average+1) partitions. - // loop through target dbroots - set::iterator k = sourceDbroots.begin(); + // loop through target dbroots and find partitions from sources to move to each. + set::iterator sourceDbroot = sourceDbroots.begin(); int sourceCnt = sourceDbroots.size(); - for (set::iterator j = targetDbroots.begin(); j != targetDbroots.end(); j++) + for (set::iterator targetDbroot = targetDbroots.begin(); + targetDbroot != targetDbroots.end(); + ++targetDbroot) { // check if this target will have average + 1 partitions. uint64_t e = 0; if (extra-- > 0) e = 1; - // the partitions already on the target dbroot - set parts(dbPartVec[*j].begin(), dbPartVec[*j].end()); - if (parts.size() >= (average+e)) - continue; // no need to move any partition to this target + // A set of the partitions already on the target. We try not to move the same partition here. + set targetParts(dbPartVec[*targetDbroot].begin(), dbPartVec[*targetDbroot].end()); + if (targetParts.size() >= (average+e)) + continue; // Don't move any partitions to this target // partitions to be moved to this target vector planVec; - // looking for source partitions start from partition1 - bool done = false; // if target got enough partitions - int loop = 0; // avoid infinity loop, if possible - while (!done && loop < maxPartitionId) + // looking for source partitions start from partition 0 + bool done = false; // if target got enough partitions, set to true. + int loop = 0; // avoid infinite loop. maxPartitionId is the last partition of one of the dbroots. It's a place to stop if all else fails. + while (!done && loop <= maxPartitionId) { - // maxPartitionId is the last partition of one of the dbroots, not a candidate. - for (int p = loop++; p < maxPartitionId && !done; p++) + for (int p = loop++; p <= maxPartitionId && !done; ++p) { bool found = false; - if (parts.find(p) == parts.end()) + if (targetParts.find(p) == targetParts.end()) // True if the partition is not on the target already { - // try to find p in one of the source + // try to find partition p in one of the source dbroots for (int x = 0; x < sourceCnt && !found; ++x) { - vector& v = dbPartVec[*k]; - if (v.size() >= average) + vector& sourceParts = dbPartVec[*sourceDbroot]; + // This partition needs to move if: + // 1) source still has more than average partitions, or + // 2) source is not a listed target (we want to empty source). + bool bNotTarget = fTargetSet.find(*sourceDbroot) == fTargetSet.end(); // true if source not in target list + if (sourceParts.size() >= average || bNotTarget) { - vector::iterator y = find(v.begin(), v.end(), p); - if ((y != v.end()) && - (v.size() > parts.size())) // @bug4840, tie-break. + vector::iterator y = find(sourceParts.begin(), sourceParts.end(), p); + if ((y != sourceParts.end()) && + (sourceParts.size() > targetParts.size() || bNotTarget)) { - parts.insert(p); - planVec.push_back(PartitionInfo(*k, p)); + targetParts.insert(p); + planVec.push_back(PartitionInfo(*sourceDbroot, p)); found = true; // update the source - v.erase(y); + sourceParts.erase(y); } } - if (++k == sourceDbroots.end()) - k = sourceDbroots.begin(); + if (++sourceDbroot == sourceDbroots.end()) + sourceDbroot = sourceDbroots.begin(); } // for source - if (parts.size() >= (average+e)) + if (targetParts.size() == (average+e)) done = true; } // !find p } // for p } // while loop // dump the plan for the target to file - dumpPlanToFile(i->first, planVec, *j); - + dumpPlanToFile(i->first, planVec, *targetDbroot); } // for target - } // for tables + // It's possible that a source that is "removed" on the command line is not empty. + // This can happen if a partition exists on all dbroots. + + // Loop through the sources, looking for dbroots that are not targets that also still contain partitions + for (set::iterator sourceDbroot = sourceDbroots.begin(); + sourceDbroot != sourceDbroots.end(); + ++sourceDbroot) + { + // Is this source in target list? If so, do nothing. + if (fTargetSet.find(*sourceDbroot) != fTargetSet.end()) + continue; + vector& sourceParts = dbPartVec[*sourceDbroot]; // Partitions still on source + + // We can't erase from a vector we're iterating, so we need a kludge: + for (int p = 0; p <= maxPartitionId; ++p) + { + vector::iterator sourcePart = find(sourceParts.begin(), sourceParts.end(), p); + if (sourcePart == sourceParts.end()) + { + continue; + } + // Look through targets to see which can accept this partition. Find the one with the least + // number of partitions. Someday we want to put with the dbroot having the fewest segments of + // the partition. + uint64_t partCount = std::numeric_limits::max(); + int tdbroot = 0; + for (set::iterator targetDbroot = targetDbroots.begin(); + targetDbroot != targetDbroots.end(); + ++targetDbroot) + { + if (dbPartVec[*targetDbroot].size() < partCount) + { + tdbroot = *targetDbroot; + } + } + if (tdbroot == 0) + { + continue; + } + set targetParts(dbPartVec[tdbroot].begin(), dbPartVec[tdbroot].end()); + vector planVec; // partitions to be moved to this target + targetParts.insert(p); + planVec.push_back(PartitionInfo(*sourceDbroot, p)); + sourceParts.erase(sourcePart); + dumpPlanToFile(i->first, planVec, tdbroot); + } // for sourceParts + } // for source + } // for tables } catch (const std::exception& ex) { @@ -376,11 +448,10 @@ int RedistributeControlThread::makeRedistributePlan() { ret = 2; } - + displayPlan(); return ret; } - void RedistributeControlThread::dumpPlanToFile(uint64_t oid, vector& vec, int target) { // open the plan file, if not already opened, to write. @@ -421,6 +492,46 @@ void RedistributeControlThread::dumpPlanToFile(uint64_t oid, vectorfPlanFilePtr); + + ByteStream bs; + uint32_t entryId = 0; + long entrySize = sizeof(RedistributePlanEntry); + fControl->logMessage(string("Redistribution Plan:")); + while (entryId++ < fEntryCount) + { + try + { + RedistributePlanEntry entry; + errno = 0; + size_t n = fread(&entry, entrySize, 1, fControl->fPlanFilePtr); + if (n != 1) + { + int e = errno; + ostringstream oss; + oss << "Failed to read from redistribute.plan: " << strerror(e) << " (" << e << ")"; + throw runtime_error(oss.str()); + } + + // Print this plan entry + ostringstream oss; + oss << "table oid " << entry.table << " partition " << entry.partition + << " moves from dbroot " << entry.source << " to " << entry.destination << endl; + fControl->logMessage(oss.str()); + } + catch (std::exception &e) + { + cout << "exception during display of plan: " << e.what() << endl; + } + catch (...) + { + cout << "exception during display of plan" << endl; + } + } +} int RedistributeControlThread::executeRedistributePlan() { diff --git a/writeengine/redistribute/we_redistributecontrolthread.h b/writeengine/redistribute/we_redistributecontrolthread.h index 42c5e8589..d16817650 100644 --- a/writeengine/redistribute/we_redistributecontrolthread.h +++ b/writeengine/redistribute/we_redistributecontrolthread.h @@ -99,7 +99,7 @@ class RedistributeControlThread int connectToWes(int); void dumpPlanToFile(uint64_t, vector&, int); - + void displayPlan(); uint32_t fAction; oam::OamCache* fOamCache; @@ -108,7 +108,7 @@ class RedistributeControlThread std::set fSourceSet; std::set fTargetSet; - std::set fDbrootSet; + std::set fDbrootSet; // Union of fSourceSet and fTargetSet int fMaxDbroot; uint32_t fEntryCount; std::string fErrorMsg; diff --git a/writeengine/redistribute/we_redistributeworkerthread.cpp b/writeengine/redistribute/we_redistributeworkerthread.cpp index 2d0488840..067442156 100644 --- a/writeengine/redistribute/we_redistributeworkerthread.cpp +++ b/writeengine/redistribute/we_redistributeworkerthread.cpp @@ -356,6 +356,7 @@ int RedistributeWorkerThread::buildEntryList() if (firstOid) fSegments.insert(j->segmentNum); } +#if 0 else if (j->dbRoot == target && j->partitionNum == partition) { // the partition already exists on the target dbroot @@ -367,7 +368,7 @@ int RedistributeWorkerThread::buildEntryList() logMessage(fErrorMsg, __LINE__); return fErrorCode; } - +#endif // workaround for HWM_0 of highest extents of the oid on target dbroot. if (j->dbRoot == target) { @@ -1235,7 +1236,7 @@ void RedistributeWorkerThread::handleDataStart(SBS& sbs, size_t& size) ostringstream oss; oss << "=>redistributing: " << fileName << ", oid=" << dc.oid << ", db=" << dc.dbroot << ", part=" << dc.partition << ", seg=" << dc.segment << " from db=" - << fMsgHeader.destination; // fMsgHeader has swapped source and destination. + << fMsgHeader.source; logMessage(oss.str(), __LINE__); } else diff --git a/writeengine/server/WriteEngineServer.vpj b/writeengine/server/WriteEngineServer.vpj index 405a193f7..62f83bd1b 100644 --- a/writeengine/server/WriteEngineServer.vpj +++ b/writeengine/server/WriteEngineServer.vpj @@ -1,241 +1,250 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + Version="10.0" + VendorName="SlickEdit" + TemplateName="GNU C/C++" + WorkingDir="."> + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +