/* Copyright (C) 2014 InfiniDB, Inc.
 * Copyright (C) 2016 MariaDB Corporation.

   This program is free software; you can redistribute it and/or
   modify it under the terms of the GNU General Public License
   as published by the Free Software Foundation; version 2 of
   the License.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
   MA 02110-1301, USA. */

// $Id: weclients.h 525 2010-01-19 23:18:05Z xlou $
//
/** @file */

#include <sstream>
#include <stdexcept>
#include <cassert>
#include <ctime>
#include <algorithm>
#include <unistd.h>
#include <arpa/inet.h>
#if __FreeBSD__
#include <sys/socket.h>
#endif
using namespace std;

#include <boost/thread/mutex.hpp>
using namespace boost;

#include "messagequeue.h"
#include "bytestream.h"
using namespace messageqcpp;

#include "configcpp.h"
using namespace config;

#include "errorids.h"
#include "exceptclasses.h"
#include "messagelog.h"
#include "messageobj.h"
#include "loggingid.h"
using namespace logging;

#include "liboamcpp.h"
using namespace oam;

#include "we_clients.h"
#include "we_messages.h"
using namespace WriteEngine;

#include "atomicops.h"

namespace
{
void writeToLog(const char* file, int line, const string& msg, LOG_TYPE logto = LOG_TYPE_INFO)
{
  LoggingID lid(05);
  MessageLog ml(lid);
  Message::Args args;
  Message m(0);
  args.add(file);
  args.add("@");
  args.add(line);
  args.add(msg);
  m.format(args);

  switch (logto)
  {
    case LOG_TYPE_DEBUG: ml.logDebugMessage(m); break;

    case LOG_TYPE_INFO: ml.logInfoMessage(m); break;

    case LOG_TYPE_WARNING: ml.logWarningMessage(m); break;

    case LOG_TYPE_ERROR: ml.logErrorMessage(m); break;

    case LOG_TYPE_CRITICAL: ml.logCriticalMessage(m); break;
  }
}

string getModuleNameByIPAddr(oam::ModuleTypeConfig moduletypeconfig, string ipAddress)
{
  string modulename = "pm1";
  DeviceNetworkList::iterator pt = moduletypeconfig.ModuleNetworkList.begin();

  for (; pt != moduletypeconfig.ModuleNetworkList.end(); pt++)
  {
    modulename = (*pt).DeviceName;
    HostConfigList::iterator pt1 = (*pt).hostConfigList.begin();

    for (; pt1 != (*pt).hostConfigList.end(); pt1++)
    {
      if (ipAddress == (*pt1).IPAddr)
        return modulename;
    }
  }

  return modulename;
}

struct WEClientRunner
{
  WEClientRunner(WriteEngine::WEClients* jl, boost::shared_ptr<MessageQueueClient> cl,
                 uint32_t connectionIndex)
   : jbl(jl), client(cl), connIndex(connectionIndex)
  {
  }
  WriteEngine::WEClients* jbl;
  boost::shared_ptr<MessageQueueClient> client;
  uint32_t connIndex;
  void operator()()
  {
    // cout << "Listening on client at 0x" << hex << (ptrdiff_t)client << dec << endl;
    try
    {
      jbl->Listen(client, connIndex);
      // cout << "Listening connIndex " << connIndex << endl;
    }
    catch (std::exception& ex)
    {
      string what(ex.what());
      cerr << "exception caught in WEClient: " << what << endl;

      if (what.find("St9bad_alloc") != string::npos)
      {
        writeToLog(__FILE__, __LINE__, what, LOG_TYPE_CRITICAL);
        //           abort();
      }
      else
        writeToLog(__FILE__, __LINE__, what);
    }
    catch (...)
    {
      string msg("exception caught in WEClientRunner.");
      writeToLog(__FILE__, __LINE__, msg);
      cerr << msg << endl;
    }
  }
};

template <typename T>
struct QueueShutdown
{
  void operator()(T& x)
  {
    x.shutdown();
  }
};
}  // namespace

namespace WriteEngine
{
WEClients::WEClients(int PrgmID) : fPrgmID(PrgmID), pmCount(0)
{
  closingConnection = 0;
  Setup();
}

WEClients::~WEClients()
{
  Close();
}

void WEClients::Setup()
{
  makeBusy(true);
  joblist::ResourceManager* rm = joblist::ResourceManager::instance();
  oam::Oam oam;
  string ipAddress;
  ModuleTypeConfig moduletypeconfig;

  try
  {
    oam.getSystemConfig("pm", moduletypeconfig);
  }
  catch (...)
  {
    writeToLog(__FILE__, __LINE__, "oam.getSystemConfig error, unknown exception", LOG_TYPE_ERROR);
    throw runtime_error("Setup failed");
  }

  uint32_t pmCountConfig = moduletypeconfig.ModuleCount;
  pmCount = 0;
  int moduleID = 1;

  char buff[32];
  ByteStream bs, bsRead;

  if (fPrgmID == DDLPROC)
  {
    bs << (ByteStream::byte)WE_SVR_DDL_KEEPALIVE;
    bs << (ByteStream::octbyte)moduleID;
  }
  else if (fPrgmID == DMLPROC)
  {
    bs << (ByteStream::byte)WE_SVR_DML_KEEPALIVE;
    bs << (ByteStream::octbyte)moduleID;
  }
  else if (fPrgmID == SPLITTER)
  {
    bs << (ByteStream::byte)WE_CLT_SRV_KEEPALIVE;
  }
  else if (fPrgmID == BATCHINSERTPROC)
  {
    bs << (ByteStream::byte)WE_SVR_BATCH_KEEPALIVE;
    bs << (ByteStream::octbyte)moduleID;
  }

  for (unsigned i = 0; i < pmCountConfig; i++)
  {
    // Find the module id
    moduleID = atoi((moduletypeconfig.ModuleNetworkList[i])
                        .DeviceName.substr(MAX_MODULE_TYPE_SIZE, MAX_MODULE_ID_SIZE)
                        .c_str());
    // cout << "setting connection to moduleid " << moduleID << endl;
    snprintf(buff, sizeof(buff), "pm%u_WriteEngineServer", moduleID);
    string fServer(buff);

    boost::shared_ptr<MessageQueueClient> cl(new MessageQueueClient(fServer, rm->getConfig()));
    boost::shared_ptr<boost::mutex> nl(new boost::mutex());

    // Bug 5224. Take out the retrys. If connection fails, we assume the server is down.
    try
    {
      if (cl->connect())
      {
        try
        {
          cl->write(bs);
        }
        catch (std::exception& ex1)
        {
          ostringstream oss;
          oss << "Write to WES during connect failed due to " << ex1.what();
          throw runtime_error(oss.str());
        }

        try
        {
          bsRead = cl->read();

          if (bsRead.length() == 0)
            throw runtime_error("Got byte 0 during reading ");
        }
        catch (std::exception& ex2)
        {
          ostringstream oss;
          oss << "Read from WES during connect failed due to " << ex2.what() << " and this = " << this;
          throw runtime_error(oss.str());
        }

        fPmConnections[moduleID] = cl;
        // cout << "connection is open. this = " << this << endl;
        // cout << "set up connection to mudule " << moduleID << endl;
        // assign the module name
        // ipAddress = sin_addr2String(cl->serv_addr().sin_addr);
        ipAddress = cl->addr2String();
        cl->moduleName(getModuleNameByIPAddr(moduletypeconfig, ipAddress));
        StartClientListener(cl, i);
        pmCount++;
        // ostringstream oss;
        // oss << "WECLIENT: connected to " << fServer + " and this = " << this << " and pmcount is now " <<
        // pmCount; writeToLog(__FILE__, __LINE__, oss.str() , LOG_TYPE_DEBUG);
      }
      else
      {
        throw runtime_error("Connection refused");
      }
    }
    catch (std::exception& ex)
    {
      writeToLog(__FILE__, __LINE__, "Could not connect to " + fServer + ": " + ex.what(), LOG_TYPE_ERROR);
      cerr << "Could not connect to " << fServer << ": " << ex.what() << endl;
    }
    catch (...)
    {
      writeToLog(__FILE__, __LINE__, "Could not connect to " + fServer, LOG_TYPE_ERROR);
    }
  }
}

int WEClients::Close()
{
  makeBusy(false);
  closingConnection = 1;
  ByteStream bs;
  bs << (ByteStream::byte)WE_SVR_CLOSE_CONNECTION;
  write_to_all(bs);

  // cout << "connection is closed. this = " << this << " and closingConnection = " << closingConnection <<
  // endl;
  for (uint32_t i = 0; i < fWESReader.size(); i++)
  {
    fWESReader[i]->join();
  }

  fWESReader.clear();
  fPmConnections.clear();
  pmCount = 0;
  // ostringstream oss;
  // oss << "WECLIENT: closed connection to wes and this = " << this << " and pmcount is now " << pmCount;
  // writeToLog(__FILE__, __LINE__, oss.str() , LOG_TYPE_DEBUG);
  return 0;
}

void WEClients::Listen(boost::shared_ptr<MessageQueueClient> client, uint32_t connIndex)
{
  SBS sbs;

  try
  {
    while (Busy())
    {
      // TODO: This call blocks so setting Busy() in another thread doesn't work here...
      sbs = client->read();

      if (sbs->length() != 0)
      {
        // cout << "adding data to connIndex " << endl;
        addDataToOutput(sbs, connIndex);
      }
      else  // got zero bytes on read, nothing more will come
      {
        if (closingConnection > 0)
        {
          return;
        }

        cerr << "WEC got 0 byte message for object " << this << endl;
        goto Error;
      }
    }

    return;
  }
  catch (std::exception& e)
  {
    cerr << "WEC Caught EXCEPTION: " << e.what() << endl;
    goto Error;
  }
  catch (...)
  {
    cerr << "WEC Caught UNKNOWN EXCEPT" << endl;
    goto Error;
  }

Error:
  // error condition! push 0 length bs to messagequeuemap and
  // eventually let jobstep error out.
  boost::mutex::scoped_lock lk(fMlock);

  MessageQueueMap::iterator map_tok;
  sbs.reset(new ByteStream(0U));

  for (map_tok = fSessionMessages.begin(); map_tok != fSessionMessages.end(); ++map_tok)
  {
    map_tok->second->queue.clear();
    (void)atomicops::atomicInc(&map_tok->second->unackedWork[0]);
    map_tok->second->queue.push(sbs);
  }

  lk.unlock();

  // reset the pmconnection map
  {
    boost::mutex::scoped_lock onErrLock(fOnErrMutex);
    string moduleName = client->moduleName();
    ClientList::iterator itor = fPmConnections.begin();

    while (itor != fPmConnections.end())
    {
      if (moduleName == (itor->second)->moduleName())
      {
        (fPmConnections[itor->first]).reset();
        pmCount--;
        ostringstream oss;
        // oss << "WECLIENT: connection to is reset and this = " << this << " and pmcount is decremented.";
        // writeToLog(__FILE__, __LINE__, oss.str() , LOG_TYPE_DEBUG);
      }

      itor++;
    }
  }
  return;
}

void WEClients::addQueue(uint32_t key)
{
  bool b;

  boost::mutex* lock = new boost::mutex();
  condition* cond = new condition();
  boost::shared_ptr<MQE> mqe(new MQE(pmCount));

  mqe->queue = WESMsgQueue(lock, cond);

  boost::mutex::scoped_lock lk(fMlock);
  b = fSessionMessages.insert(pair<uint32_t, boost::shared_ptr<MQE> >(key, mqe)).second;

  if (!b)
  {
    ostringstream os;
    os << "WEClient: attempt to add a queue with a duplicate ID " << key << endl;
    throw runtime_error(os.str());
  }
}

void WEClients::removeQueue(uint32_t key)
{
  boost::mutex::scoped_lock lk(fMlock);
  MessageQueueMap::iterator map_tok = fSessionMessages.find(key);

  if (map_tok == fSessionMessages.end())
    return;

  map_tok->second->queue.shutdown();
  map_tok->second->queue.clear();
  fSessionMessages.erase(map_tok);
}

void WEClients::shutdownQueue(uint32_t key)
{
  boost::mutex::scoped_lock lk(fMlock);
  MessageQueueMap::iterator map_tok = fSessionMessages.find(key);

  if (map_tok == fSessionMessages.end())
    return;

  map_tok->second->queue.shutdown();
  map_tok->second->queue.clear();
}

void WEClients::read(uint32_t key, SBS& bs)
{
  boost::shared_ptr<MQE> mqe;

  // Find the StepMsgQueueList for this session
  boost::mutex::scoped_lock lk(fMlock);
  MessageQueueMap::iterator map_tok = fSessionMessages.find(key);

  if (map_tok == fSessionMessages.end())
  {
    ostringstream os;
    // cout << " reading for key " << key << " not found" << endl;
    os << "WEClient: attempt to read(bs) from a nonexistent queue\n";
    throw runtime_error(os.str());
  }

  mqe = map_tok->second;
  lk.unlock();

  // this method can block: you can't hold any locks here...
  (void)mqe->queue.pop(&bs);

  if (!bs)
    bs.reset(new ByteStream());
}

void WEClients::write(const messageqcpp::ByteStream& msg, uint32_t connection)
{
  if (pmCount == 0)
  {
    ostringstream oss;
    oss << "WECLIENT: There is no connection to WES and this = " << this;
    writeToLog(__FILE__, __LINE__, oss.str(), LOG_TYPE_DEBUG);
    throw runtime_error("There is no WriteEngineServer to send message to.");
  }

  if (fPmConnections[connection] != 0)
    fPmConnections[connection]->write(msg);
  else
  {
    ostringstream os;
    os << "Lost connection to WriteEngineServer on pm" << connection;
    throw runtime_error(os.str());
  }
}

void WEClients::write_to_all(const messageqcpp::ByteStream& msg)
{
  if (pmCount == 0)
  {
    ostringstream oss;
    oss << "WECLIENT:  There is no connection to WES and this = " << this;
    writeToLog(__FILE__, __LINE__, oss.str(), LOG_TYPE_DEBUG);
    throw runtime_error("There is no WriteEngineServer to send message to.");
  }

  ClientList::iterator itor = fPmConnections.begin();
  while (itor != fPmConnections.end())
  {
    if (itor->second != NULL)
    {
      itor->second->write(msg);
    }

    itor++;
  }
}

void WEClients::StartClientListener(boost::shared_ptr<MessageQueueClient> cl, uint32_t connIndex)
{
  boost::thread* thrd = new boost::thread(WEClientRunner(this, cl, connIndex));
  fWESReader.push_back(thrd);
}

void WEClients::addDataToOutput(SBS sbs, uint32_t connIndex)
{
  // ISMPacketHeader *hdr = (ISMPacketHeader*)(sbs->buf());
  // PrimitiveHeader *p = (PrimitiveHeader *)(hdr+1);
  // uint32_t uniqueId = p->UniqueID;
  uint64_t uniqueId = 0;
  *sbs >> uniqueId;
  boost::shared_ptr<MQE> mqe;

  boost::mutex::scoped_lock lk(fMlock);
  MessageQueueMap::iterator map_tok = fSessionMessages.find(uniqueId);

  if (map_tok == fSessionMessages.end())
  {
    return;
  }

  mqe = map_tok->second;
  lk.unlock();

  if (pmCount > 0)
  {
    atomicops::atomicInc(&mqe->unackedWork[connIndex % pmCount]);
  }

  (void)mqe->queue.push(sbs);
}

}  // namespace WriteEngine