You've already forked mariadb-columnstore-engine
							
							
				mirror of
				https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
				synced 2025-11-03 17:13:17 +03:00 
			
		
		
		
	
		
			
				
	
	
		
			745 lines
		
	
	
		
			19 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			745 lines
		
	
	
		
			19 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
/* Copyright (C) 2014 InfiniDB, Inc.
 | 
						|
   Copyright (C) 2016 MariaDB Corporaton
 | 
						|
 | 
						|
   This program is free software; you can redistribute it and/or
 | 
						|
   modify it under the terms of the GNU General Public License
 | 
						|
   as published by the Free Software Foundation; version 2 of
 | 
						|
   the License.
 | 
						|
 | 
						|
   This program is distributed in the hope that it will be useful,
 | 
						|
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
						|
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | 
						|
   GNU General Public License for more details.
 | 
						|
 | 
						|
   You should have received a copy of the GNU General Public License
 | 
						|
   along with this program; if not, write to the Free Software
 | 
						|
   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
 | 
						|
   MA 02110-1301, USA. */
 | 
						|
 | 
						|
/*
 | 
						|
* $Id: we_redistributecontrol.cpp 4450 2013-01-21 14:13:24Z rdempsey $
 | 
						|
*/
 | 
						|
 | 
						|
#include <iostream>
 | 
						|
#include <set>
 | 
						|
#include <vector>
 | 
						|
#include <cassert>
 | 
						|
#include <stdexcept>
 | 
						|
#include <sstream>
 | 
						|
#include <string>
 | 
						|
#include <ctime>
 | 
						|
#include <unistd.h>
 | 
						|
//#include <sys/stat.h>
 | 
						|
using namespace std;
 | 
						|
 | 
						|
#include "boost/scoped_ptr.hpp"
 | 
						|
#include "boost/scoped_array.hpp"
 | 
						|
#include "boost/thread.hpp"
 | 
						|
#include "boost/thread/mutex.hpp"
 | 
						|
#include "boost/filesystem/path.hpp"
 | 
						|
#include "boost/filesystem/operations.hpp"
 | 
						|
using namespace boost;
 | 
						|
 | 
						|
#include "installdir.h"
 | 
						|
 | 
						|
#include "configcpp.h"
 | 
						|
using namespace config;
 | 
						|
 | 
						|
#include "liboamcpp.h"
 | 
						|
using namespace oam;
 | 
						|
 | 
						|
#include "messagequeue.h"
 | 
						|
#include "bytestream.h"
 | 
						|
using namespace messageqcpp;
 | 
						|
 | 
						|
#include "logger.h"
 | 
						|
 | 
						|
#include "calpontsystemcatalog.h"
 | 
						|
using namespace execplan;
 | 
						|
 | 
						|
#include "we_messages.h"
 | 
						|
#include "we_redistributedef.h"
 | 
						|
#include "we_redistributecontrolthread.h"
 | 
						|
#include "we_redistributeworkerthread.h"
 | 
						|
#include "we_redistributecontrol.h"
 | 
						|
 | 
						|
namespace redistribute
 | 
						|
{
 | 
						|
 | 
						|
RedistributeControl* RedistributeControl::fInstance = NULL;
 | 
						|
boost::mutex instanceMutex;
 | 
						|
 | 
						|
const string RedistributeDir("/data1/systemFiles/redistribute");
 | 
						|
const string InfoFileName("/redistribute.info");
 | 
						|
const string PlanFileName("/redistribute.plan");
 | 
						|
 | 
						|
 | 
						|
RedistributeControl* RedistributeControl::instance()
 | 
						|
{
 | 
						|
    // The constructor is protected by instanceMutex lock.
 | 
						|
    mutex::scoped_lock lock(instanceMutex);
 | 
						|
 | 
						|
    if (fInstance == NULL)
 | 
						|
        fInstance = new RedistributeControl();
 | 
						|
 | 
						|
    return fInstance;
 | 
						|
}
 | 
						|
 | 
						|
 | 
						|
RedistributeControl::RedistributeControl() : fInfoFilePtr(NULL), fPlanFilePtr(NULL)
 | 
						|
{
 | 
						|
    // default path /usr/local/mariadb/columnstore/data1/systemFiles/redistribute
 | 
						|
    string installDir = startup::StartUp::installDir();
 | 
						|
    fRedistributeDir = installDir + RedistributeDir;
 | 
						|
    fInfoFilePath = fRedistributeDir + InfoFileName;
 | 
						|
    fPlanFilePath = fRedistributeDir + PlanFileName;
 | 
						|
 | 
						|
    fOam.reset(new oam::Oam);
 | 
						|
    fDbrm.reset(new BRM::DBRM);
 | 
						|
    fSysLogger.reset(new logging::Logger(32));  //32 - writeengineserver in SubsystemIDs.txt
 | 
						|
    logging::MsgMap msgMap;
 | 
						|
    msgMap[logging::M0002] = logging::Message(logging::M0002);
 | 
						|
    fSysLogger->msgMap(msgMap);
 | 
						|
 | 
						|
    //struct stat st;
 | 
						|
    //if (stat(fRedistributeDir.c_str(), &st) != 0)
 | 
						|
    //filesystem::path dirPath(fRedistributeDir);
 | 
						|
    if (filesystem::exists(fRedistributeDir))
 | 
						|
    {
 | 
						|
        // try to open info file for update if dir exists
 | 
						|
        RedistributeInfo info;
 | 
						|
        fInfoFilePtr = fopen(fInfoFilePath.c_str(), "r+");
 | 
						|
 | 
						|
        if (fInfoFilePtr != NULL && 1 == fread(&info, sizeof(info), 1, fInfoFilePtr))
 | 
						|
        {
 | 
						|
            fRedistributeInfo = info;
 | 
						|
 | 
						|
            // if there was an active session, mark it as failed until support resume.
 | 
						|
            if (fRedistributeInfo.state == RED_STATE_ACTIVE)
 | 
						|
                updateState(RED_STATE_FAILED);
 | 
						|
        }
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
 | 
						|
RedistributeControl::~RedistributeControl()
 | 
						|
{
 | 
						|
    fOam.reset();
 | 
						|
    fDbrm.reset();
 | 
						|
    delete fInstance;
 | 
						|
    fInstance = NULL;
 | 
						|
}
 | 
						|
 | 
						|
 | 
						|
int RedistributeControl::handleUIMsg(messageqcpp::ByteStream& bs, messageqcpp::IOSocket& so)
 | 
						|
{
 | 
						|
    mutex::scoped_lock sessionLock(fSessionMutex);
 | 
						|
 | 
						|
    uint32_t status = RED_STATE_UNDEF;
 | 
						|
    const RedistributeMsgHeader* h = (const RedistributeMsgHeader*) bs.buf();
 | 
						|
 | 
						|
    try
 | 
						|
    {
 | 
						|
        switch (h->messageId)
 | 
						|
        {
 | 
						|
            case RED_CNTL_START:
 | 
						|
                status = handleStartMsg(bs, so);
 | 
						|
                break;
 | 
						|
 | 
						|
            case RED_CNTL_STOP:
 | 
						|
                status = handleStopMsg(bs, so);
 | 
						|
                break;
 | 
						|
 | 
						|
            case RED_CNTL_CLEAR:
 | 
						|
                status = handleClearMsg(bs, so);
 | 
						|
                break;
 | 
						|
 | 
						|
            case RED_CNTL_STATUS:
 | 
						|
            default:
 | 
						|
                status = handleStatusMsg(bs, so);
 | 
						|
                break;
 | 
						|
        }
 | 
						|
    }
 | 
						|
    catch (const std::exception& ex)
 | 
						|
    {
 | 
						|
        if (fUIResponse.empty())
 | 
						|
            fUIResponse = ex.what();
 | 
						|
    }
 | 
						|
    catch (...)
 | 
						|
    {
 | 
						|
        if (fUIResponse.empty())
 | 
						|
            fUIResponse = "Failed to process the redistribute command.";
 | 
						|
    }
 | 
						|
 | 
						|
    // log the response
 | 
						|
    logMessage(fUIResponse);
 | 
						|
 | 
						|
    //	bs restart() in handlers
 | 
						|
    bs.restart();
 | 
						|
    bs << (ByteStream::byte) WriteEngine::WE_SVR_REDISTRIBUTE;  // dummy, keep for now.
 | 
						|
    bs << status;
 | 
						|
    bs << fUIResponse;
 | 
						|
    so.write(bs);
 | 
						|
 | 
						|
    return status;
 | 
						|
}
 | 
						|
 | 
						|
 | 
						|
int RedistributeControl::handleStartMsg(messageqcpp::ByteStream& bs, messageqcpp::IOSocket& so)
 | 
						|
{
 | 
						|
    ostringstream oss;
 | 
						|
    uint32_t status = getCurrentState();
 | 
						|
 | 
						|
    if (status != RED_STATE_IDLE)
 | 
						|
    {
 | 
						|
        if (status == RED_STATE_ACTIVE)
 | 
						|
            oss << "Redistribute is already running.  Command is ignored.  You need to stop and clear this active session before starting a new one.";
 | 
						|
        else
 | 
						|
            oss << "Redistribute is not in IDLE state.  Command is ignored.  Please check the status of last session, then reset the state to IDLE using action CLEAR.";
 | 
						|
 | 
						|
        fUIResponse = oss.str();
 | 
						|
 | 
						|
        return status;
 | 
						|
    }
 | 
						|
 | 
						|
    // must be IDLE state
 | 
						|
    try
 | 
						|
    {
 | 
						|
        // skip the header part, may need save it.
 | 
						|
        bs.advance(sizeof(RedistributeMsgHeader));
 | 
						|
 | 
						|
        getStartOptions(bs);
 | 
						|
        RedistributeControlThread::setStopAction(false);
 | 
						|
        updateState(RED_STATE_ACTIVE);
 | 
						|
    }
 | 
						|
    catch (const std::exception& ex)
 | 
						|
    {
 | 
						|
        if (fErrorMsg.empty())
 | 
						|
            fErrorMsg = ex.what();
 | 
						|
 | 
						|
        fRedistributeInfo.state = RED_STATE_FAILED;
 | 
						|
    }
 | 
						|
    catch (...)
 | 
						|
    {
 | 
						|
        fRedistributeInfo.state = RED_STATE_FAILED;
 | 
						|
    }
 | 
						|
 | 
						|
    status = fRedistributeInfo.state;
 | 
						|
 | 
						|
    if (status == RED_STATE_ACTIVE)
 | 
						|
    {
 | 
						|
        oss << "Redistribute is started.";
 | 
						|
        fControlThread.reset(new boost::thread(RedistributeControlThread(RED_CNTL_START)));
 | 
						|
        // Let go the new thread unless we want to call interrupt on this thread in future.
 | 
						|
        // Not going to join() because the redistribution could take very long.
 | 
						|
        fControlThread->detach();
 | 
						|
        fControlThread.reset();
 | 
						|
    }
 | 
						|
    else
 | 
						|
    {
 | 
						|
        updateState(RED_STATE_FAILED);
 | 
						|
        oss << "Starting redistribute failed.";
 | 
						|
 | 
						|
        if (!fErrorMsg.empty())
 | 
						|
            oss << "  " << fErrorMsg;
 | 
						|
    }
 | 
						|
 | 
						|
    fUIResponse = oss.str();
 | 
						|
 | 
						|
    return status;
 | 
						|
}
 | 
						|
 | 
						|
 | 
						|
int RedistributeControl::handleStatusMsg(messageqcpp::ByteStream&, messageqcpp::IOSocket& so)
 | 
						|
{
 | 
						|
    ostringstream oss;
 | 
						|
    uint32_t status = getCurrentState();
 | 
						|
    RedistributeInfo info = fRedistributeInfo;
 | 
						|
 | 
						|
    switch (status)
 | 
						|
    {
 | 
						|
        case RED_STATE_IDLE:
 | 
						|
            oss << "Redistribute is in IDLE state.";
 | 
						|
            break;
 | 
						|
 | 
						|
        case RED_STATE_ACTIVE:
 | 
						|
            oss << "Redistribute is in progress: total " << info.planned;
 | 
						|
 | 
						|
            if (info.planned > 1)
 | 
						|
                oss << " logical partitions are planned to move.\n";
 | 
						|
            else
 | 
						|
                oss << " logical partition is planned to move.\n";
 | 
						|
 | 
						|
            if (info.planned > 0)
 | 
						|
            {
 | 
						|
                if (info.endTime > 0)
 | 
						|
                    oss << "In " << (info.endTime - info.startTime) << " seconds, ";
 | 
						|
 | 
						|
                oss << info.success << " success, "
 | 
						|
                    << info.skipped << " skipped, "
 | 
						|
                    << info.failed << " failed, "
 | 
						|
                    << ((info.success + info.skipped + info.failed) * 100 / info.planned) << "%.";
 | 
						|
            }
 | 
						|
 | 
						|
            break;
 | 
						|
 | 
						|
        case RED_STATE_FINISH:
 | 
						|
            oss << "Redistribute is finished.\n"
 | 
						|
                << info.success << " success, "
 | 
						|
                << info.skipped << " skipped, "
 | 
						|
                << info.failed << " failed.\n";
 | 
						|
 | 
						|
            if (info.endTime > 0)
 | 
						|
                oss << "Total time: " << (info.endTime - info.startTime) << " seconds.\n";
 | 
						|
 | 
						|
            break;
 | 
						|
 | 
						|
        case RED_STATE_FAILED:
 | 
						|
            oss << "Redistribute is failed.\n";
 | 
						|
 | 
						|
            try
 | 
						|
            {
 | 
						|
                size_t l = 0;  // message length
 | 
						|
                size_t n = fread(&l, sizeof(int), 1, fInfoFilePtr);
 | 
						|
 | 
						|
                if (n == 1)
 | 
						|
                {
 | 
						|
                    boost::scoped_array<char> buf(new char[l + 1]);
 | 
						|
                    n = fread(buf.get(), 1, l, fInfoFilePtr);
 | 
						|
 | 
						|
                    if (n == l)
 | 
						|
                    {
 | 
						|
                        buf[l] = '\0';
 | 
						|
                        fErrorMsg += buf.get();
 | 
						|
                        oss << buf.get();
 | 
						|
                    }
 | 
						|
                }
 | 
						|
            }
 | 
						|
            catch (const std::exception&)
 | 
						|
            {
 | 
						|
            }
 | 
						|
            catch (...)
 | 
						|
            {
 | 
						|
            }
 | 
						|
 | 
						|
            break;
 | 
						|
 | 
						|
        case RED_STATE_STOPPED:
 | 
						|
            oss << "Redistribute is stopped by user.\n";
 | 
						|
 | 
						|
            if (info.planned > 0)
 | 
						|
            {
 | 
						|
                if (info.endTime > 0)
 | 
						|
                    oss << "In " << (info.endTime - info.startTime) << " seconds, ";
 | 
						|
 | 
						|
                oss << info.success << " success, "
 | 
						|
                    << info.skipped << " skipped, "
 | 
						|
                    << info.failed << " failed, "
 | 
						|
                    << ((info.success + info.skipped + info.failed) * 100 / info.planned) << "%.";
 | 
						|
            }
 | 
						|
 | 
						|
            break;
 | 
						|
 | 
						|
        default:
 | 
						|
            oss << "Failed to retrieve redistribute information, the file "
 | 
						|
                << fInfoFilePath << " may be corrupted.";
 | 
						|
            break;
 | 
						|
    }
 | 
						|
 | 
						|
    fUIResponse = oss.str();
 | 
						|
 | 
						|
    return status;
 | 
						|
}
 | 
						|
 | 
						|
 | 
						|
int RedistributeControl::handleStopMsg(messageqcpp::ByteStream&, messageqcpp::IOSocket& so)
 | 
						|
{
 | 
						|
    ostringstream oss;
 | 
						|
    uint32_t status = getCurrentState();
 | 
						|
 | 
						|
    if (status != RED_STATE_ACTIVE)
 | 
						|
    {
 | 
						|
        oss << "Redistribute is not running. Command is ignored.";
 | 
						|
    }
 | 
						|
    else
 | 
						|
    {
 | 
						|
        RedistributeControlThread::setStopAction(true);
 | 
						|
        updateState(RED_STATE_STOPPED);
 | 
						|
        status = RED_STATE_STOPPED;
 | 
						|
        boost::thread rct((RedistributeControlThread(RED_CNTL_STOP)));
 | 
						|
        rct.join();
 | 
						|
        oss << "Redistribute is stopped.";
 | 
						|
    }
 | 
						|
 | 
						|
    fUIResponse = oss.str();
 | 
						|
 | 
						|
    return status;
 | 
						|
}
 | 
						|
 | 
						|
 | 
						|
int RedistributeControl::handleClearMsg(messageqcpp::ByteStream&, messageqcpp::IOSocket& so)
 | 
						|
{
 | 
						|
    ostringstream oss;
 | 
						|
    uint32_t status = getCurrentState();
 | 
						|
 | 
						|
    if (status == RED_STATE_ACTIVE)
 | 
						|
    {
 | 
						|
        oss << "Redistribute is running. Command is ignored. To CLEAR, you have to wait or stop the running session.";
 | 
						|
    }
 | 
						|
    else
 | 
						|
    {
 | 
						|
        updateState(RED_STATE_IDLE);
 | 
						|
        status = RED_STATE_IDLE;
 | 
						|
        oss << "Cleared.";
 | 
						|
    }
 | 
						|
 | 
						|
    fUIResponse = oss.str();
 | 
						|
 | 
						|
    return status;
 | 
						|
}
 | 
						|
 | 
						|
 | 
						|
uint32_t RedistributeControl::getCurrentState()
 | 
						|
{
 | 
						|
    uint32_t status = RED_STATE_UNDEF;
 | 
						|
    ostringstream oss;
 | 
						|
    mutex::scoped_lock lock(fInfoFileMutex);
 | 
						|
 | 
						|
    if (!fInfoFilePtr)
 | 
						|
    {
 | 
						|
        status = RED_STATE_IDLE;
 | 
						|
    }
 | 
						|
    else
 | 
						|
    {
 | 
						|
        rewind(fInfoFilePtr);
 | 
						|
        RedistributeInfo info;
 | 
						|
        size_t n = fread(&info, sizeof(info), 1, fInfoFilePtr);
 | 
						|
 | 
						|
        if (n == 1)
 | 
						|
        {
 | 
						|
            fRedistributeInfo = info;
 | 
						|
            status = info.state;
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    return status;
 | 
						|
}
 | 
						|
 | 
						|
 | 
						|
bool RedistributeControl::getStartOptions(messageqcpp::ByteStream& bs)
 | 
						|
{
 | 
						|
    bool ret = true;
 | 
						|
    uint32_t n = 0;
 | 
						|
    uint32_t d = 0;
 | 
						|
 | 
						|
    try
 | 
						|
    {
 | 
						|
        bs >> fOptions;
 | 
						|
 | 
						|
        bs >> n;
 | 
						|
        fSourceList.clear();
 | 
						|
        fSourceList.reserve(n);
 | 
						|
 | 
						|
        for (uint32_t i = 0; i < n; i++)
 | 
						|
        {
 | 
						|
            bs >> d;
 | 
						|
            fSourceList.push_back(d);
 | 
						|
        }
 | 
						|
 | 
						|
        bs >> n;
 | 
						|
        fDestinationList.clear();
 | 
						|
        fDestinationList.reserve(n);
 | 
						|
 | 
						|
        for (uint32_t i = 0; i < n; i++)
 | 
						|
        {
 | 
						|
            bs >> d;
 | 
						|
            fDestinationList.push_back(d);
 | 
						|
        }
 | 
						|
 | 
						|
        if (fSourceList.size() == 0 || fDestinationList.size() == 0)
 | 
						|
            throw runtime_error("Failed to get dbroot lists.");
 | 
						|
    }
 | 
						|
    catch (const std::exception& ex)
 | 
						|
    {
 | 
						|
        ret = false;
 | 
						|
        fErrorMsg = ex.what();
 | 
						|
    }
 | 
						|
    catch (...)
 | 
						|
    {
 | 
						|
        ret = false;
 | 
						|
        fErrorMsg = "Failed to get dbroot lists.";
 | 
						|
    }
 | 
						|
 | 
						|
    return ret;
 | 
						|
}
 | 
						|
 | 
						|
 | 
						|
void RedistributeControl::updateState(uint32_t s)
 | 
						|
{
 | 
						|
    mutex::scoped_lock lock(fInfoFileMutex);
 | 
						|
 | 
						|
    // allowed state change:
 | 
						|
    //   idle    ->  active
 | 
						|
    //   active  ->  finish
 | 
						|
    //   active  ->  stopped
 | 
						|
    //   active  ->  failed
 | 
						|
    //   finish  ->  idle
 | 
						|
    //   stopped ->  idle
 | 
						|
    //   failed  ->  idle
 | 
						|
 | 
						|
 | 
						|
    if (s == RED_STATE_IDLE)
 | 
						|
    {
 | 
						|
        if (fRedistributeInfo.state == RED_STATE_ACTIVE)
 | 
						|
            return;
 | 
						|
 | 
						|
        // close the files if they are already opened
 | 
						|
        if (fInfoFilePtr != NULL)
 | 
						|
        {
 | 
						|
            fclose(fInfoFilePtr);
 | 
						|
            fInfoFilePtr = NULL;
 | 
						|
        }
 | 
						|
 | 
						|
        if (fPlanFilePtr != NULL)
 | 
						|
        {
 | 
						|
            fclose(fPlanFilePtr);
 | 
						|
            fPlanFilePtr = NULL;
 | 
						|
        }
 | 
						|
 | 
						|
        // move old files to archive
 | 
						|
        // zip or compress if the .plan file gets large
 | 
						|
        time_t t = fRedistributeInfo.startTime;
 | 
						|
 | 
						|
        if (t == 0)
 | 
						|
            t = time(NULL);
 | 
						|
 | 
						|
        ostringstream oss;
 | 
						|
        struct tm m;
 | 
						|
        localtime_r(&t, &m);
 | 
						|
        oss << setfill('0') << setw(4) << (m.tm_year + 1900) << setw(2) << (m.tm_mon + 1)
 | 
						|
            << setw(2) << (m.tm_mday) << setw(2) << (m.tm_hour) << setw(2) << (m.tm_min)
 | 
						|
            << setw(2) << (m.tm_sec);
 | 
						|
 | 
						|
        try
 | 
						|
        {
 | 
						|
            if (filesystem::exists(fInfoFilePath) && filesystem::exists(fPlanFilePath))
 | 
						|
            {
 | 
						|
                bool mergeOk = false;
 | 
						|
                FILE* infoPtr = fopen(fInfoFilePath.c_str(), "r+b");
 | 
						|
                FILE* entryPtr = fopen(fPlanFilePath.c_str(), "rb");
 | 
						|
                int rc = 1;
 | 
						|
 | 
						|
                if (infoPtr != NULL && entryPtr != NULL)
 | 
						|
                {
 | 
						|
                    rc = fseek(infoPtr, sizeof(RedistributeInfo), SEEK_SET);
 | 
						|
                    RedistributePlanEntry entry;
 | 
						|
 | 
						|
                    while (rc == 0)
 | 
						|
                    {
 | 
						|
                        size_t n = fread(&entry, sizeof(entry), 1, entryPtr);
 | 
						|
 | 
						|
                        if (n != 1)
 | 
						|
                            break;
 | 
						|
 | 
						|
                        n = fwrite(&entry, sizeof(entry), 1, infoPtr);
 | 
						|
                        fflush(infoPtr);
 | 
						|
 | 
						|
                        if (n != 1)
 | 
						|
                            rc = -1;
 | 
						|
                    }
 | 
						|
 | 
						|
                }
 | 
						|
 | 
						|
                if (rc == 0 && feof(entryPtr))
 | 
						|
                    mergeOk = true;
 | 
						|
 | 
						|
                if (infoPtr != NULL)
 | 
						|
                    fclose(infoPtr);
 | 
						|
 | 
						|
                if (entryPtr != NULL)
 | 
						|
                    fclose(entryPtr);
 | 
						|
 | 
						|
                if (mergeOk)
 | 
						|
                    filesystem::remove(fPlanFilePath);
 | 
						|
            }
 | 
						|
 | 
						|
            if (filesystem::exists(fInfoFilePath))
 | 
						|
            {
 | 
						|
                string newInfoPath = fRedistributeDir + "/archive" + InfoFileName + "." + oss.str();
 | 
						|
                filesystem::rename(fInfoFilePath, newInfoPath);
 | 
						|
            }
 | 
						|
 | 
						|
            if (filesystem::exists(fPlanFilePath))
 | 
						|
            {
 | 
						|
                string newPlanPath = fRedistributeDir + "/archive" + PlanFileName + "." + oss.str();
 | 
						|
                filesystem::rename(fPlanFilePath, newPlanPath);
 | 
						|
            }
 | 
						|
        }
 | 
						|
        catch (const std::exception&)
 | 
						|
        {
 | 
						|
        }
 | 
						|
        catch (...)
 | 
						|
        {
 | 
						|
        }
 | 
						|
 | 
						|
        fRedistributeInfo = RedistributeInfo();
 | 
						|
        return;
 | 
						|
    }
 | 
						|
 | 
						|
    // safety check
 | 
						|
    if (s != RED_STATE_ACTIVE && fRedistributeInfo.state != RED_STATE_ACTIVE)
 | 
						|
        return;
 | 
						|
 | 
						|
    // in IDLE state there is no redistribute.info file
 | 
						|
    if (s == RED_STATE_ACTIVE)
 | 
						|
    {
 | 
						|
//		filesystem::path dirPath(fRedistributeDir);
 | 
						|
//		if (filesystem::exists(fRedistributeDir) && !filesystem::is_directory(fRedistributeDir))
 | 
						|
//			filesystem::remove(fRedistributeDir);
 | 
						|
        if (!filesystem::exists(fRedistributeDir))
 | 
						|
        {
 | 
						|
            errno = 0;
 | 
						|
            filesystem::create_directory(fRedistributeDir);
 | 
						|
 | 
						|
            if (!filesystem::exists(fRedistributeDir))
 | 
						|
            {
 | 
						|
                int e = errno;
 | 
						|
                ostringstream oss;
 | 
						|
                oss << "Failed to create redistribute directory: ";
 | 
						|
                oss << strerror(e) << " (" << e << ")";
 | 
						|
                throw runtime_error(oss.str());
 | 
						|
            }
 | 
						|
 | 
						|
            errno = 0;
 | 
						|
            filesystem::path archivePath(fRedistributeDir + "/archive");
 | 
						|
            filesystem::create_directory(archivePath);
 | 
						|
 | 
						|
            if (!filesystem::exists(archivePath))
 | 
						|
            {
 | 
						|
                int e = errno;
 | 
						|
                ostringstream oss;
 | 
						|
                oss << "Failed to create redistribute archive directory: ";
 | 
						|
                oss << strerror(e) << " (" << e << ")";
 | 
						|
                throw runtime_error(oss.str());
 | 
						|
            }
 | 
						|
        }
 | 
						|
 | 
						|
        fRedistributeInfo.startTime = time(NULL);
 | 
						|
    }
 | 
						|
 | 
						|
 | 
						|
    // open the info file to write
 | 
						|
    errno = 0;
 | 
						|
 | 
						|
    if (fInfoFilePtr == NULL)
 | 
						|
        fInfoFilePtr = fopen(fInfoFilePath.c_str(), "w+");
 | 
						|
 | 
						|
    if (fInfoFilePtr == NULL)
 | 
						|
    {
 | 
						|
        int e = errno;
 | 
						|
        ostringstream oss;
 | 
						|
        oss << "Failed to open " << fInfoFilePath << ": " << strerror(e) << " (" << e << ")";
 | 
						|
        throw runtime_error(oss.str());
 | 
						|
    }
 | 
						|
 | 
						|
    fRedistributeInfo.state = s;
 | 
						|
 | 
						|
    if (s == RED_STATE_FINISH)
 | 
						|
        fRedistributeInfo.endTime = time(NULL);
 | 
						|
 | 
						|
    rewind(fInfoFilePtr);
 | 
						|
    size_t n = fwrite(&fRedistributeInfo, sizeof(fRedistributeInfo), 1, fInfoFilePtr);
 | 
						|
 | 
						|
    if (n != 1)
 | 
						|
    {
 | 
						|
        fclose(fInfoFilePtr);
 | 
						|
        fInfoFilePtr = NULL;
 | 
						|
 | 
						|
        int e = errno;
 | 
						|
        ostringstream oss;
 | 
						|
        oss << "Failed to write into " << fInfoFilePath << ": " << strerror(e) << " (" << e << ")";
 | 
						|
        throw runtime_error(oss.str());
 | 
						|
    }
 | 
						|
 | 
						|
    fflush(fInfoFilePtr);
 | 
						|
}
 | 
						|
 | 
						|
 | 
						|
void RedistributeControl::setEntryCount(uint32_t entryCount)
 | 
						|
{
 | 
						|
    mutex::scoped_lock lock(fInfoFileMutex);
 | 
						|
    fRedistributeInfo.planned = entryCount;
 | 
						|
 | 
						|
    rewind(fInfoFilePtr);
 | 
						|
    fwrite(&fRedistributeInfo, sizeof(fRedistributeInfo), 1, fInfoFilePtr);
 | 
						|
    fflush(fInfoFilePtr);
 | 
						|
}
 | 
						|
 | 
						|
 | 
						|
void RedistributeControl::updateProgressInfo(uint32_t s, time_t t)
 | 
						|
{
 | 
						|
    mutex::scoped_lock lock(fInfoFileMutex);
 | 
						|
    fRedistributeInfo.endTime = t;
 | 
						|
 | 
						|
    switch (s)
 | 
						|
    {
 | 
						|
        case RED_TRANS_SUCCESS:
 | 
						|
            fRedistributeInfo.success++;
 | 
						|
            break;
 | 
						|
 | 
						|
        case RED_TRANS_SKIPPED:
 | 
						|
            fRedistributeInfo.skipped++;
 | 
						|
            break;
 | 
						|
 | 
						|
        default:
 | 
						|
            fRedistributeInfo.failed++;
 | 
						|
            break;
 | 
						|
    }
 | 
						|
 | 
						|
    rewind(fInfoFilePtr);
 | 
						|
    fwrite(&fRedistributeInfo, sizeof(fRedistributeInfo), 1, fInfoFilePtr);
 | 
						|
    fflush(fInfoFilePtr);
 | 
						|
}
 | 
						|
 | 
						|
 | 
						|
int RedistributeControl::handleJobMsg(messageqcpp::ByteStream& bs, messageqcpp::IOSocket& so)
 | 
						|
{
 | 
						|
//	mutex::scoped_lock jobLock(fJobMutex);
 | 
						|
 | 
						|
    uint32_t status = RED_TRANS_SUCCESS;
 | 
						|
 | 
						|
    try
 | 
						|
    {
 | 
						|
        fWorkThread.reset(new boost::thread(RedistributeWorkerThread(bs, so)));
 | 
						|
        fWorkThread->join();
 | 
						|
    }
 | 
						|
    catch (const std::exception& ex)
 | 
						|
    {
 | 
						|
        status = RED_TRANS_FAILED;
 | 
						|
        logMessage(ex.what());
 | 
						|
    }
 | 
						|
    catch (...)
 | 
						|
    {
 | 
						|
        status = RED_TRANS_FAILED;
 | 
						|
    }
 | 
						|
 | 
						|
    return status;
 | 
						|
}
 | 
						|
 | 
						|
 | 
						|
void RedistributeControl::logMessage(const string& msg)
 | 
						|
{
 | 
						|
    logging::Message::Args args;
 | 
						|
    args.add(string("RED:"));
 | 
						|
    args.add(msg);
 | 
						|
 | 
						|
    fSysLogger->logMessage(
 | 
						|
        logging::LOG_TYPE_INFO, logging::M0002, args, logging::LoggingID(32, 0, 0));
 | 
						|
}
 | 
						|
 | 
						|
 | 
						|
} // namespace
 | 
						|
 | 
						|
// vim:ts=4 sw=4:
 | 
						|
 |