1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

MCOL-259 replace system call to columnstore status with a function call to alleviate a race condition. Also added exception logging in a few places that will be helpful.

This commit is contained in:
David Hall
2016-09-02 09:26:44 -05:00
parent 75ecb950f9
commit 7e0723a8bc
4 changed files with 281 additions and 187 deletions

View File

@ -1368,10 +1368,7 @@ namespace oam
void Oam::getSystemStatus(SystemStatus& systemstatus, bool systemStatusOnly) void Oam::getSystemStatus(SystemStatus& systemstatus, bool systemStatusOnly)
{ {
string cmd = startup::StartUp::installDir() + "/bin/columnstore status > /tmp/status.log"; checkSystemRunning("getSystemStatus");
system(cmd.c_str());
if (!checkLogStatus("/tmp/status.log", "MariaDB Columnstore is running") )
exceptionControl("getSystemStatus", API_FAILURE);
#ifdef _MSC_VER #ifdef _MSC_VER
// TODO: Remove when we create OAM for Windows // TODO: Remove when we create OAM for Windows
@ -1386,81 +1383,67 @@ namespace oam
DbrootStatus dbrootstatus; DbrootStatus dbrootstatus;
systemstatus.systemdbrootstatus.dbrootstatus.clear(); systemstatus.systemdbrootstatus.dbrootstatus.clear();
for ( int i = 0 ; i < 2 ; i ++) for ( int i = 0 ; i < 2 ; i ++)
{
try
{ {
MessageQueueClient processor("ProcStatusControl"); try
// processor.syncProto(false);
ByteStream::byte ModuleNumber;
ByteStream::byte ExtDeviceNumber;
ByteStream::byte dbrootNumber;
ByteStream::byte NICNumber;
ByteStream::byte state;
std::string name;
std::string date;
ByteStream obs, ibs;
obs << (ByteStream::byte) GET_SYSTEM_STATUS;
if ( systemStatusOnly )
obs << (ByteStream::byte) 1;
else
obs << (ByteStream::byte) 2;
try {
struct timespec ts = { 3, 0 };
processor.write(obs, &ts);
}
catch (exception& e)
{
processor.shutdown();
// string error = e.what();
// writeLog("getSystemStatus: write exception: " + error, LOG_TYPE_ERROR);
exceptionControl("getSystemStatus", API_FAILURE);
}
catch(...)
{
processor.shutdown();
// writeLog("getSystemStatus: write exception: unknown", LOG_TYPE_ERROR);
exceptionControl("getSystemStatus", API_FAILURE);
}
// wait 30 seconds for ACK from Process Monitor
try {
struct timespec ts = { 30, 0 };
ibs = processor.read(&ts);
}
catch (exception& e)
{
processor.shutdown();
string error = e.what();
// writeLog("getSystemStatus: read exception: " + error, LOG_TYPE_ERROR);
exceptionControl("getSystemStatus", API_FAILURE);
}
catch(...)
{
processor.shutdown();
// writeLog("getSystemStatus: read exception: unknown", LOG_TYPE_ERROR);
exceptionControl("getSystemStatus", API_FAILURE);
}
if (ibs.length() > 0)
{ {
MessageQueueClient processor("ProcStatusControl");
// processor.syncProto(false);
ByteStream::byte ModuleNumber;
ByteStream::byte ExtDeviceNumber;
ByteStream::byte dbrootNumber;
ByteStream::byte NICNumber;
ByteStream::byte state;
std::string name;
std::string date;
ByteStream obs, ibs;
obs << (ByteStream::byte) GET_SYSTEM_STATUS;
if ( systemStatusOnly ) if ( systemStatusOnly )
{ obs << (ByteStream::byte) 1;
ibs >> name;
ibs >> state;
ibs >> date;
if ( name.find("system") != string::npos ) {
systemstatus.SystemOpState = state;
systemstatus.StateChangeDate = date;
}
}
else else
obs << (ByteStream::byte) 2;
try {
struct timespec ts = { 3, 0 };
processor.write(obs, &ts);
}
catch (exception& e)
{ {
ibs >> ModuleNumber; processor.shutdown();
string error = e.what();
for( int i=0 ; i < ModuleNumber ; ++i) writeLog("getSystemStatus: write exception: " + error, LOG_TYPE_ERROR);
exceptionControl("getSystemStatus write", API_FAILURE);
}
catch(...)
{
processor.shutdown();
writeLog("getSystemStatus: write exception: unknown", LOG_TYPE_ERROR);
exceptionControl("getSystemStatus write", API_FAILURE);
}
// wait 30 seconds for ACK from Process Monitor
try {
struct timespec ts = { 30, 0 };
ibs = processor.read(&ts);
}
catch (exception& e)
{
processor.shutdown();
string error = e.what();
writeLog("getSystemStatus: read exception: " + error, LOG_TYPE_ERROR);
exceptionControl("getSystemStatus read", API_FAILURE);
}
catch(...)
{
processor.shutdown();
writeLog("getSystemStatus: read exception: unknown", LOG_TYPE_ERROR);
exceptionControl("getSystemStatus read", API_FAILURE);
}
if (ibs.length() > 0)
{
if ( systemStatusOnly )
{ {
ibs >> name; ibs >> name;
ibs >> state; ibs >> state;
@ -1469,68 +1452,93 @@ namespace oam
systemstatus.SystemOpState = state; systemstatus.SystemOpState = state;
systemstatus.StateChangeDate = date; systemstatus.StateChangeDate = date;
} }
else }
else
{
ibs >> ModuleNumber;
for( int i=0 ; i < ModuleNumber ; ++i)
{ {
modulestatus.Module = name; ibs >> name;
modulestatus.ModuleOpState = state; ibs >> state;
modulestatus.StateChangeDate = date; ibs >> date;
systemstatus.systemmodulestatus.modulestatus.push_back(modulestatus); if ( name.find("system") != string::npos ) {
systemstatus.SystemOpState = state;
systemstatus.StateChangeDate = date;
}
else
{
modulestatus.Module = name;
modulestatus.ModuleOpState = state;
modulestatus.StateChangeDate = date;
systemstatus.systemmodulestatus.modulestatus.push_back(modulestatus);
}
}
ibs >> ExtDeviceNumber;
for( int i=0 ; i < ExtDeviceNumber ; ++i)
{
ibs >> name;
ibs >> state;
ibs >> date;
extdevicestatus.Name = name;
extdevicestatus.OpState = state;
extdevicestatus.StateChangeDate = date;
systemstatus.systemextdevicestatus.extdevicestatus.push_back(extdevicestatus);
}
ibs >> NICNumber;
for( int i=0 ; i < NICNumber ; ++i)
{
ibs >> name;
ibs >> state;
ibs >> date;
nicstatus.HostName = name;
nicstatus.NICOpState = state;
nicstatus.StateChangeDate = date;
systemstatus.systemnicstatus.nicstatus.push_back(nicstatus);
}
ibs >> dbrootNumber;
for( int i=0 ; i < dbrootNumber ; ++i)
{
ibs >> name;
ibs >> state;
ibs >> date;
dbrootstatus.Name = name;
dbrootstatus.OpState = state;
dbrootstatus.StateChangeDate = date;
systemstatus.systemdbrootstatus.dbrootstatus.push_back(dbrootstatus);
} }
} }
ibs >> ExtDeviceNumber; processor.shutdown();
return;
for( int i=0 ; i < ExtDeviceNumber ; ++i)
{
ibs >> name;
ibs >> state;
ibs >> date;
extdevicestatus.Name = name;
extdevicestatus.OpState = state;
extdevicestatus.StateChangeDate = date;
systemstatus.systemextdevicestatus.extdevicestatus.push_back(extdevicestatus);
}
ibs >> NICNumber;
for( int i=0 ; i < NICNumber ; ++i)
{
ibs >> name;
ibs >> state;
ibs >> date;
nicstatus.HostName = name;
nicstatus.NICOpState = state;
nicstatus.StateChangeDate = date;
systemstatus.systemnicstatus.nicstatus.push_back(nicstatus);
}
ibs >> dbrootNumber;
for( int i=0 ; i < dbrootNumber ; ++i)
{
ibs >> name;
ibs >> state;
ibs >> date;
dbrootstatus.Name = name;
dbrootstatus.OpState = state;
dbrootstatus.StateChangeDate = date;
systemstatus.systemdbrootstatus.dbrootstatus.push_back(dbrootstatus);
}
} }
else
{
writeLog("getSystemStatus: ProcStatusControl returns 0 length", LOG_TYPE_ERROR);
}
// timeout ocurred, shutdown connection
processor.shutdown(); processor.shutdown();
return; writeLog("getSystemStatus: read 0 length", LOG_TYPE_ERROR);
exceptionControl("getSystemStatus read 0", API_FAILURE);
}
catch (exception& e)
{
string error = e.what();
writeLog("getSystemStatus: final exception: " + error, LOG_TYPE_ERROR);
}
catch(...)
{
writeLog("getSystemStatus: final exception: unknown", LOG_TYPE_ERROR);
} }
// timeout ocurred, shutdown connection
processor.shutdown();
// writeLog("getSystemStatus: read 0 length", LOG_TYPE_ERROR);
exceptionControl("getSystemStatus", API_FAILURE);
} }
catch(...)
{}
}
exceptionControl("getSystemStatus:MessageQueueClient-Error", API_FAILURE); exceptionControl("getSystemStatus:MessageQueueClient-Error", API_FAILURE);
} }
/******************************************************************** /********************************************************************
@ -1572,6 +1580,7 @@ namespace oam
ModuleConfig moduleconfig; ModuleConfig moduleconfig;
std::vector<int> NICstates; std::vector<int> NICstates;
degraded = false; degraded = false;
state = oam::UNEQUIP;
try try
{ {
@ -1595,8 +1604,19 @@ namespace oam
getNICStatus((*pt1).HostName, state); getNICStatus((*pt1).HostName, state);
NICstates.push_back(state); NICstates.push_back(state);
} }
catch (...) catch (exception& e)
{} {
Oam oam;
ostringstream os;
os << "Oam::getModuleStatus exception while getNICStatus " << (*pt1).HostName << " " << e.what();
oam.writeLog(os.str(), logging::LOG_TYPE_ERROR);
}
catch (...) {
Oam oam;
ostringstream os;
os << "Oam::getModuleStatus exception while getNICStatus " << (*pt1).HostName;
oam.writeLog(os.str(), logging::LOG_TYPE_ERROR);
}
} }
vector<int>::iterator pt = NICstates.begin(); vector<int>::iterator pt = NICstates.begin();
@ -1609,16 +1629,37 @@ namespace oam
} }
return; return;
} }
catch (...) catch (exception& e)
{} {
Oam oam;
ostringstream os;
os << "Oam::getModuleStatus exception while getSystemConfig " << name << " " << e.what();
oam.writeLog(os.str(), logging::LOG_TYPE_ERROR);
}
catch (...) {
Oam oam;
ostringstream os;
os << "Oam::getModuleStatus exception while getSystemConfig " << name;
oam.writeLog(os.str(), logging::LOG_TYPE_ERROR);
}
} }
} }
} }
catch(...) catch (exception& e)
{} {
Oam oam;
ostringstream os;
os << "Oam::getModuleStatus exception while getSystemStatus " << e.what();
oam.writeLog(os.str(), logging::LOG_TYPE_ERROR);
}
catch (...) {
Oam oam;
ostringstream os;
os << "Oam::getModuleStatus exception while getSystemStatus";
oam.writeLog(os.str(), logging::LOG_TYPE_ERROR);
}
// no match found // no match found
state = oam::UNEQUIP;
exceptionControl("getModuleStatus", API_INVALID_PARAMETER); exceptionControl("getModuleStatus", API_INVALID_PARAMETER);
} }
@ -1794,8 +1835,12 @@ namespace oam
} }
} }
} }
catch (exception&) catch (exception& e)
{ {
Oam oam;
ostringstream os;
os << "Oam::getNICStatus exception while getSystemStatus for " << name << " " << e.what();
oam.writeLog(os.str(), logging::LOG_TYPE_ERROR);
exceptionControl("getNICStatus", API_FAILURE); exceptionControl("getNICStatus", API_FAILURE);
} }
@ -2091,10 +2136,7 @@ namespace oam
void Oam::getProcessStatus(SystemProcessStatus& systemprocessstatus, string port) void Oam::getProcessStatus(SystemProcessStatus& systemprocessstatus, string port)
{ {
string cmd = startup::StartUp::installDir() + "/bin/columnstore status > /tmp/status.log"; checkSystemRunning("getProcessStatus");
system(cmd.c_str());
if (!checkLogStatus("/tmp/status.log", "MariaDB Columnstore is running") )
exceptionControl("getProcessStatus", API_FAILURE);
ProcessStatus processstatus; ProcessStatus processstatus;
systemprocessstatus.processstatus.clear(); systemprocessstatus.processstatus.clear();
@ -2194,10 +2236,7 @@ namespace oam
return; return;
#endif #endif
string cmd = startup::StartUp::installDir() + "/bin/columnstore status > /tmp/status.log"; checkSystemRunning("getProcessStatus");
system(cmd.c_str());
if (!checkLogStatus("/tmp/status.log", "MariaDB Columnstore is running") )
exceptionControl("getProcessStatus", API_FAILURE);
for ( int i = 0 ; i < 5 ; i ++) for ( int i = 0 ; i < 5 ; i ++)
{ {
@ -2292,10 +2331,7 @@ namespace oam
void Oam::setProcessStatus(const std::string process, const std::string module, const int state, pid_t PID) void Oam::setProcessStatus(const std::string process, const std::string module, const int state, pid_t PID)
{ {
string cmd = startup::StartUp::installDir() + "/bin/columnstore status > /tmp/status.log"; checkSystemRunning("setProcessStatus");
system(cmd.c_str());
if (!checkLogStatus("/tmp/status.log", "MariaDB Columnstore is running") )
exceptionControl("setProcessStatus", API_FAILURE);
//send and wait for ack and resend if not received //send and wait for ack and resend if not received
//retry 5 time max //retry 5 time max
@ -2812,10 +2848,7 @@ namespace oam
exceptionControl("getMyProcessStatus", API_FAILURE); exceptionControl("getMyProcessStatus", API_FAILURE);
} }
string cmd = startup::StartUp::installDir() + "/bin/columnstore status > /tmp/status.log"; checkSystemRunning("getMyProcessStatus");
system(cmd.c_str());
if (!checkLogStatus("/tmp/status.log", "MariaDB Columnstore is running") )
exceptionControl("getMyProcessStatus", API_FAILURE);
for ( int i = 0 ; i < 5 ; i ++) for ( int i = 0 ; i < 5 ; i ++)
{ {
@ -4652,20 +4685,31 @@ namespace oam
{ {
ifstream file (fileName.c_str()); ifstream file (fileName.c_str());
char line[400]; if (!file.is_open())
{
ostringstream os;
os << "checkLogStatus error while opening file " << fileName << " " << strerror(errno);
writeLog(os.str(), LOG_TYPE_ERROR );
}
string buf; string buf;
while (file.getline(line, 400)) while (getline(file, buf))
{ {
buf = line;
string::size_type pos = buf.find(phrase,0); string::size_type pos = buf.find(phrase,0);
if (pos != string::npos) if (pos != string::npos)
//found phrase //found phrase
return true; return true;
} }
if (file.bad())
{
ostringstream os;
os << "checkLogStatus error while reading file " << fileName << " " << strerror(errno);
writeLog(os.str(), LOG_TYPE_ERROR );
}
file.close(); file.close();
ostringstream os;
os << "checkLogStatus failed " << fileName << " expected \"" << phrase.c_str() << "\" found \"" << buf.c_str() << "\"";
writeLog(os.str(), LOG_TYPE_ERROR );
return false; return false;
} }
@ -4821,10 +4865,7 @@ namespace oam
********************************************************************/ ********************************************************************/
bool Oam::switchParentOAMModule(std::string moduleName, GRACEFUL_FLAG gracefulflag) bool Oam::switchParentOAMModule(std::string moduleName, GRACEFUL_FLAG gracefulflag)
{ {
string cmd = startup::StartUp::installDir() + "/bin/columnstore status > /tmp/status.log"; checkSystemRunning("switchParentOAMModule");
system(cmd.c_str());
if (!checkLogStatus("/tmp/status.log", "MariaDB Columnstore is running") )
exceptionControl("switchParentOAMModule", API_FAILURE);
int returnStatus; int returnStatus;
// We assume that moduleName is a valid pm // We assume that moduleName is a valid pm
@ -4837,7 +4878,7 @@ namespace oam
string cmdLine = "ping "; string cmdLine = "ping ";
string cmdOption = " -w 1 >> /dev/null"; string cmdOption = " -w 1 >> /dev/null";
cmd = cmdLine + IPAddr + cmdOption; string cmd = cmdLine + IPAddr + cmdOption;
if ( system(cmd.c_str()) != 0 ) { if ( system(cmd.c_str()) != 0 ) {
//ping failure //ping failure
try{ try{
@ -6267,10 +6308,7 @@ namespace oam
exceptionControl("sysConfig->write", API_FAILURE); exceptionControl("sysConfig->write", API_FAILURE);
} }
string cmd = startup::StartUp::installDir() + "/bin/columnstore status > /tmp/status.log"; checkSystemRunning("addDbroot");
system(cmd.c_str());
if (!checkLogStatus("/tmp/status.log", "MariaDB Columnstore is running") )
return;
//get updated Columnstore.xml distributed //get updated Columnstore.xml distributed
distributeConfigFile("system"); distributeConfigFile("system");
@ -8734,10 +8772,8 @@ namespace oam
GRACEFUL_FLAG gracefulflag, ACK_FLAG ackflag, const std::string argument1, GRACEFUL_FLAG gracefulflag, ACK_FLAG ackflag, const std::string argument1,
const std::string argument2, int timeout) const std::string argument2, int timeout)
{ {
string cmd = startup::StartUp::installDir() + "/bin/columnstore status > /tmp/status.log"; if (!checkSystemRunning(""))
system(cmd.c_str()); return API_CONN_REFUSED;
if (!checkLogStatus("/tmp/status.log", "MariaDB Columnstore is running") )
return API_CONN_REFUSED;
int returnStatus = API_SUCCESS; //default int returnStatus = API_SUCCESS; //default
ByteStream msg; ByteStream msg;
@ -8837,10 +8873,8 @@ namespace oam
int Oam::sendMsgToProcMgr2(messageqcpp::ByteStream::byte requestType, DeviceNetworkList devicenetworklist, int Oam::sendMsgToProcMgr2(messageqcpp::ByteStream::byte requestType, DeviceNetworkList devicenetworklist,
GRACEFUL_FLAG gracefulflag, ACK_FLAG ackflag, const std::string password, const std::string mysqlpw) GRACEFUL_FLAG gracefulflag, ACK_FLAG ackflag, const std::string password, const std::string mysqlpw)
{ {
string cmd = startup::StartUp::installDir() + "/bin/columnstore status > /tmp/status.log"; if (!checkSystemRunning(""))
system(cmd.c_str()); return API_CONN_REFUSED;
if (!checkLogStatus("/tmp/status.log", "MariaDB Columnstore is running") )
return API_CONN_REFUSED;
int returnStatus = API_TIMEOUT; //default int returnStatus = API_TIMEOUT; //default
ByteStream msg; ByteStream msg;
@ -8953,10 +8987,8 @@ namespace oam
int Oam::sendMsgToProcMgr3(messageqcpp::ByteStream::byte requestType, AlarmList& alarmlist, const std::string date) int Oam::sendMsgToProcMgr3(messageqcpp::ByteStream::byte requestType, AlarmList& alarmlist, const std::string date)
{ {
string cmd = startup::StartUp::installDir() + "/bin/columnstore status > /tmp/status.log"; if (!checkSystemRunning(""))
system(cmd.c_str()); return API_CONN_REFUSED;
if (!checkLogStatus("/tmp/status.log", "MariaDB Columnstore is running") )
return API_CONN_REFUSED;
int returnStatus = API_SUCCESS; //default int returnStatus = API_SUCCESS; //default
ByteStream msg; ByteStream msg;
@ -9056,9 +9088,7 @@ namespace oam
GRACEFUL_FLAG gracefulflag, ACK_FLAG ackflag, GRACEFUL_FLAG gracefulflag, ACK_FLAG ackflag,
const std::string argument1, const std::string argument2, int timeout) const std::string argument1, const std::string argument2, int timeout)
{ {
string cmd = startup::StartUp::installDir() + "/bin/columnstore status > /tmp/status.log"; if (!checkSystemRunning(""))
system(cmd.c_str());
if (!checkLogStatus("/tmp/status.log", "MariaDB Columnstore is running") )
return API_CONN_REFUSED; return API_CONN_REFUSED;
int returnStatus = API_STILL_WORKING; int returnStatus = API_STILL_WORKING;
@ -9244,9 +9274,7 @@ namespace oam
void Oam::sendStatusUpdate(ByteStream obs, ByteStream::byte returnRequestType) void Oam::sendStatusUpdate(ByteStream obs, ByteStream::byte returnRequestType)
{ {
string cmd = startup::StartUp::installDir() + "/bin/columnstore status > /tmp/status.log"; if (!checkSystemRunning(""))
system(cmd.c_str());
if (!checkLogStatus("/tmp/status.log", "MariaDB Columnstore is running") )
return; return;
for ( int i = 0 ; i < 5 ; i ++) for ( int i = 0 ; i < 5 ; i ++)
@ -9622,6 +9650,33 @@ namespace oam
return returnStatus; return returnStatus;
} }
bool Oam::checkSystemRunning(const char* function)
{
struct stat st;
if (stat("/var/lock/subsys/columnstore", &st) == 0)
{
return true;
}
if (geteuid() != 0)
{
// not root user
// The stat above may fail for non-root because of permissions
// This is a non-optimal solution
string cmd = "pgrep ProcMon";
if (system(cmd.c_str()) != 0)
{
return true;
}
}
ostringstream os;
os << function << " system is not running: " << strerror(errno);
writeLog(os.str(), LOG_TYPE_ERROR );
if (strlen(function))
{
throw runtime_error(os.str());
}
return false;
}
} //namespace oam } //namespace oam

View File

@ -2482,6 +2482,8 @@ namespace oam
*/ */
void sendStatusUpdate(messageqcpp::ByteStream obs, messageqcpp::ByteStream::byte returnRequestType); void sendStatusUpdate(messageqcpp::ByteStream obs, messageqcpp::ByteStream::byte returnRequestType);
bool checkSystemRunning(const char* function);
std::string CalpontConfigFile; std::string CalpontConfigFile;
std::string AlarmConfigFile; std::string AlarmConfigFile;
std::string ProcessConfigFile; std::string ProcessConfigFile;

View File

@ -120,7 +120,19 @@ void OamCache::checkReload()
try { try {
oam.getModuleStatus(string("pm") + num, state, degraded); oam.getModuleStatus(string("pm") + num, state, degraded);
} }
catch (...) {break;} catch (std::exception& e)
{
ostringstream os;
os << "OamCache::checkReload exception while getModuleStatus pm" << num << " " << e.what();
oam.writeLog(os.str(), logging::LOG_TYPE_ERROR);
break;
}
catch (...) {
ostringstream os;
os << "OamCache::checkReload exception while getModuleStatus pm" << num;
oam.writeLog(os.str(), logging::LOG_TYPE_ERROR);
break;
}
if (state == oam::ACTIVE || state == oam::DEGRADED) { if (state == oam::ACTIVE || state == oam::DEGRADED) {
pmToConnectionMap[*it] = i++; pmToConnectionMap[*it] = i++;
@ -134,10 +146,22 @@ void OamCache::checkReload()
{ {
ostringstream os; ostringstream os;
os << "OamCache::checkReload shows state for pm" << num << " as " << oamState[state]; os << "OamCache::checkReload shows state for pm" << num << " as " << oamState[state];
oam.writeLog(os.str(), logging::LOG_TYPE_WARNING); oam.writeLog(os.str(), logging::LOG_TYPE_ERROR);
} }
} }
catch (...) { /* doesn't get added to the connection map */ } catch (std::exception& e)
{
ostringstream os;
os << "OamCache::checkReload final exception while getModuleStatus " << e.what();
oam.writeLog(os.str(), logging::LOG_TYPE_ERROR);
break;
}
catch (...) {
ostringstream os;
os << "OamCache::checkReload final exception while getModuleStatus";
oam.writeLog(os.str(), logging::LOG_TYPE_ERROR);
break;
}
} }
#else #else
moduleIds.push_back(*it); moduleIds.push_back(*it);

View File

@ -437,7 +437,10 @@ const SBS InetStreamSocket::read(const struct ::timespec* timeout, bool* isTimeO
size_t mlread = 0; size_t mlread = 0;
if (readToMagic(msecs, isTimeOut, stats) == false) //indicates a timeout or EOF if (readToMagic(msecs, isTimeOut, stats) == false) //indicates a timeout or EOF
{
logIoError("InetStreamSocket::read: timeout during readToMagic", 0);
return SBS(new ByteStream(0)); return SBS(new ByteStream(0));
}
//FIXME: This seems like a lot of work to read 4 bytes... //FIXME: This seems like a lot of work to read 4 bytes...
while (mlread < sizeof(msglen)) while (mlread < sizeof(msglen))
@ -458,6 +461,7 @@ const SBS InetStreamSocket::read(const struct ::timespec* timeout, bool* isTimeO
{ {
if (isTimeOut) if (isTimeOut)
*isTimeOut = true; *isTimeOut = true;
logIoError("InetStreamSocket::read: timeout during first poll", 0);
return SBS(new ByteStream(0)); return SBS(new ByteStream(0));
} }
} }
@ -470,7 +474,10 @@ const SBS InetStreamSocket::read(const struct ::timespec* timeout, bool* isTimeO
if (t == 0) if (t == 0)
{ {
if (timeout == NULL) if (timeout == NULL)
{
logIoError("InetStreamSocket::read: timeout during first read", 0);
return SBS(new ByteStream(0)); // don't return an incomplete message return SBS(new ByteStream(0)); // don't return an incomplete message
}
else else
throw SocketClosed("InetStreamSocket::read: Remote is closed"); throw SocketClosed("InetStreamSocket::read: Remote is closed");
} }
@ -518,7 +525,10 @@ const SBS InetStreamSocket::read(const struct ::timespec* timeout, bool* isTimeO
if (err == 0) // timeout if (err == 0) // timeout
{ {
if (isTimeOut) if (isTimeOut)
{
logIoError("InetStreamSocket::read: timeout during second poll", 0);
*isTimeOut = true; *isTimeOut = true;
}
if (stats) if (stats)
stats->dataRecvd(nread); stats->dataRecvd(nread);
return SBS(new ByteStream(0)); return SBS(new ByteStream(0));
@ -538,7 +548,10 @@ const SBS InetStreamSocket::read(const struct ::timespec* timeout, bool* isTimeO
if (timeout == NULL) if (timeout == NULL)
return SBS(new ByteStream(0)); // don't return an incomplete message return SBS(new ByteStream(0)); // don't return an incomplete message
else else
{
logIoError("InetStreamSocket::read: timeout during second read", 0);
throw SocketClosed("InetStreamSocket::read: Remote is closed"); throw SocketClosed("InetStreamSocket::read: Remote is closed");
}
} }
if (t < 0) { if (t < 0) {
ostringstream oss; ostringstream oss;