From d47980ceabaf46fe5b0322ea5bbefa5a33b7a673 Mon Sep 17 00:00:00 2001 From: Patrick LeBlanc Date: Mon, 9 Sep 2019 09:50:30 -0500 Subject: [PATCH 1/3] MCOL-3488. Chunk shifting failed with SM enabled. The ChunkManager class was getting an IDBFileSystem instance in a different way than seemingly everything else. Added code to allow it to get an SMFileSystem if cloud storage is specified. --- writeengine/shared/we_chunkmanager.cpp | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/writeengine/shared/we_chunkmanager.cpp b/writeengine/shared/we_chunkmanager.cpp index c6bf3ef50..75a8feff8 100644 --- a/writeengine/shared/we_chunkmanager.cpp +++ b/writeengine/shared/we_chunkmanager.cpp @@ -47,6 +47,7 @@ using namespace execplan; #include "IDBDataFile.h" #include "IDBPolicy.h" +#include "cloudio/SMFileSystem.h" using namespace idbdatafile; namespace @@ -96,7 +97,9 @@ ChunkManager::ChunkManager() : fMaxActiveChunkNum(100), fLenCompressed(0), fIsBu fLocalModuleId(Config::getLocalModuleID()), fFs(fIsHdfs ? IDBFileSystem::getFs(IDBDataFile::HDFS) : - IDBFileSystem::getFs(IDBDataFile::BUFFERED)) + IDBPolicy::useCloud() ? + IDBFileSystem::getFs(IDBDataFile::CLOUD) : + IDBFileSystem::getFs(IDBDataFile::BUFFERED)) { fUserPaddings = Config::getNumCompressedPadBlks() * BYTE_PER_BLOCK; fCompressor.numUserPaddingBytes(fUserPaddings); @@ -2316,7 +2319,7 @@ int ChunkManager::swapTmpFile(const string& src, const string& dest) { // return value int rc = NO_ERROR; - + // if no change to the cdf, the tmp may not exist, no need to swap. if (!fFs.exists(src.c_str())) return rc; From 9c7e2e923e63637011d246ceeec5f7d966f18552 Mon Sep 17 00:00:00 2001 From: Patrick LeBlanc Date: Mon, 9 Sep 2019 12:27:13 -0500 Subject: [PATCH 2/3] MCOL-3488. Found places with similar code. --- .../redistribute/we_redistributeworkerthread.cpp | 12 ++++++++---- writeengine/server/we_getfilesizes.cpp | 2 ++ writeengine/shared/we_confirmhdfsdbfile.cpp | 8 +++++--- 3 files changed, 15 insertions(+), 7 deletions(-) diff --git a/writeengine/redistribute/we_redistributeworkerthread.cpp b/writeengine/redistribute/we_redistributeworkerthread.cpp index 267cb2220..c16a0a82e 100644 --- a/writeengine/redistribute/we_redistributeworkerthread.cpp +++ b/writeengine/redistribute/we_redistributeworkerthread.cpp @@ -486,8 +486,9 @@ int RedistributeWorkerThread::sendData() int16_t source = fPlanEntry.source; int16_t dest = fPlanEntry.destination; - IDBDataFile::Types fileType = - (IDBPolicy::useHdfs() ? IDBDataFile::HDFS : IDBDataFile::UNBUFFERED); + IDBDataFile::Types fileType = (IDBPolicy::useHdfs() ? IDBDataFile::HDFS : + IDBPolicy::useCloud() ? IDBDataFile::CLOUD : IDBDataFile::UNBUFFERED); + IDBFileSystem& fs = IDBFileSystem::getFs( fileType ); if ((remotePM) && (fileType != IDBDataFile::HDFS)) @@ -1519,8 +1520,11 @@ void RedistributeWorkerThread::handleDataAbort(SBS& sbs, size_t& size) if (fNewFilePtr != NULL) closeFile(fNewFilePtr); - IDBFileSystem& fs = IDBFileSystem::getFs( - (IDBPolicy::useHdfs() ? IDBDataFile::HDFS : IDBDataFile::UNBUFFERED) ); + IDBFileSystem& fs = (IDBPolicy::useHdfs() ? + IDBFileSystem::getFs(IDBDataFile::HDFS) : + IDBPolicy::useCloud() ? + IDBFileSystem::getFs(IDBDataFile::CLOUD) : + IDBFileSystem::getFs(IDBDataFile::BUFFERED)); // remove local files for (set::iterator i = fNewDirSet.begin(); i != fNewDirSet.end(); i++) diff --git a/writeengine/server/we_getfilesizes.cpp b/writeengine/server/we_getfilesizes.cpp index 5e408d4e5..3a597ff69 100644 --- a/writeengine/server/we_getfilesizes.cpp +++ b/writeengine/server/we_getfilesizes.cpp @@ -183,6 +183,8 @@ struct ColumnThread if (bUsingHdfs) fileType = IDBDataFile::HDFS; + else if (IDBPolicy::useCloud()) + fileType = IDBDataFile::CLOUD; else fileType = IDBDataFile::UNBUFFERED; diff --git a/writeengine/shared/we_confirmhdfsdbfile.cpp b/writeengine/shared/we_confirmhdfsdbfile.cpp index b749c9603..ec1483e12 100644 --- a/writeengine/shared/we_confirmhdfsdbfile.cpp +++ b/writeengine/shared/we_confirmhdfsdbfile.cpp @@ -49,9 +49,11 @@ namespace WriteEngine // on useHdfs() to tell me which FileSystem reference to get. //------------------------------------------------------------------------------ ConfirmHdfsDbFile::ConfirmHdfsDbFile() : - fFs( (idbdatafile::IDBPolicy::useHdfs()) ? - idbdatafile::IDBFileSystem::getFs(idbdatafile::IDBDataFile::HDFS) : - idbdatafile::IDBFileSystem::getFs(idbdatafile::IDBDataFile::BUFFERED)) + fFs(idbdatafile::IDBPolicy::useHdfs() ? + idbdatafile::IDBFileSystem::getFs(idbdatafile::IDBDataFile::HDFS) : + idbdatafile::IDBPolicy::useCloud() ? + idbdatafile::IDBFileSystem::getFs(idbdatafile::IDBDataFile::CLOUD) : + idbdatafile::IDBFileSystem::getFs(idbdatafile::IDBDataFile::BUFFERED)) { } From b0e934dd2d0359c76f43f96a61dcb1a2e57ab2e1 Mon Sep 17 00:00:00 2001 From: Patrick LeBlanc Date: Mon, 9 Sep 2019 12:32:38 -0500 Subject: [PATCH 3/3] MCOL-3488. Found another one. --- writeengine/redistribute/we_redistributeworkerthread.cpp | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/writeengine/redistribute/we_redistributeworkerthread.cpp b/writeengine/redistribute/we_redistributeworkerthread.cpp index c16a0a82e..b2ed7ba47 100644 --- a/writeengine/redistribute/we_redistributeworkerthread.cpp +++ b/writeengine/redistribute/we_redistributeworkerthread.cpp @@ -1050,8 +1050,11 @@ void RedistributeWorkerThread::confirmToPeer() } } - IDBFileSystem& fs = IDBFileSystem::getFs( - (IDBPolicy::useHdfs() ? IDBDataFile::HDFS : IDBDataFile::UNBUFFERED) ); + IDBFileSystem& fs = (IDBPolicy::useHdfs() ? + IDBFileSystem::getFs(IDBDataFile::HDFS) : + IDBPolicy::useCloud() ? + IDBFileSystem::getFs(IDBDataFile::CLOUD) : + IDBFileSystem::getFs(IDBDataFile::BUFFERED)); uint32_t confirmCode = RED_DATA_COMMIT;