1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00
Files
mariadb-columnstore-engine/dbcon/ddlpackageproc/createtableprocessor.cpp
Serguey Zefirov 5aa2a824c2 feat(MCOL-6082): Multiple readers of dbroots using OamCache logic
This patch introduces centralized logic of selecting what dbroot is
accessible in PrimProc on what node. The logic is in OamCache for time
being and can be moved later.
2025-07-21 14:32:39 +03:00

858 lines
24 KiB
C++

/* Copyright (C) 2014 InfiniDB, Inc.
Copyright (C) 2016 MariaDB Corporation
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
// $Id: createtableprocessor.cpp 9627 2013-06-18 13:59:21Z rdempsey $
#include <unistd.h>
#include <string>
using namespace std;
#include <boost/algorithm/string/case_conv.hpp>
#include "createtableprocessor.h"
#include "ddlpkg.h"
using namespace ddlpackage;
#include "we_messages.h"
using namespace WriteEngine;
#include "oamcache.h"
using namespace oam;
#include "bytestream.h"
using namespace messageqcpp;
#include "calpontsystemcatalog.h"
using namespace execplan;
#include "sqllogger.h"
#include "messagelog.h"
using namespace logging;
namespace ddlpackageprocessor
{
CreateTableProcessor::DDLResult CreateTableProcessor::processPackageInternal(
ddlpackage::SqlStatement* sqlStmt)
{
SUMMARY_INFO("CreateTableProcessor::processPackage");
DDLResult result;
BRM::TxnID txnID;
txnID.id = fTxnid.id;
txnID.valid = fTxnid.valid;
result.result = NO_ERROR;
int rc1 = 0;
rc1 = fDbrm->isReadWrite();
if (rc1 != 0)
{
Message::Args args;
Message message(9);
args.add("Unable to execute the statement due to DBRM is read only");
message.format(args);
result.result = CREATE_ERROR;
result.message = message;
fSessionManager.rolledback(txnID);
return result;
}
ddlpackage::CreateTableStatement* createTableStmt =
dynamic_cast<ddlpackage::CreateTableStatement*>(sqlStmt);
if (!createTableStmt)
{
Message::Args args;
Message message(9);
args.add("CreateTableStatement wrong cast");
message.format(args);
result.result = CREATE_ERROR;
result.message = message;
return result;
}
DETAIL_INFO(createTableStmt);
ddlpackage::TableDef& tableDef = *(createTableStmt->fTableDef);
// If schema = CALPONTSYS, do not create table
if (tableDef.fQualifiedName->fSchema == CALPONT_SCHEMA)
{
// release the transaction
fSessionManager.rolledback(txnID);
return result;
}
// Commit current transaction.
// all DDL statements cause an implicut commit
VERBOSE_INFO("Getting current txnID");
// Check whether the table is existed already
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
CalpontSystemCatalog::makeCalpontSystemCatalog(createTableStmt->fSessionID);
execplan::CalpontSystemCatalog::TableName tableName;
tableName.schema = tableDef.fQualifiedName->fSchema;
tableName.table = tableDef.fQualifiedName->fName;
execplan::CalpontSystemCatalog::ROPair roPair;
roPair.objnum = 0;
ByteStream::byte rc = 0;
/** @Bug 217 */
/** @Bug 225 */
try
{
roPair = systemCatalogPtr->tableRID(tableName);
}
catch (IDBExcept& ie)
{
if (checkPPLostConnection(ie.what()))
{
result.result = NETWORK_ERROR;
return result;
}
else
{
// TODO: What is and is not an error here?
if (ie.errorCode() == ERR_DATA_OFFLINE)
{
// release transaction
fSessionManager.rolledback(txnID);
// Return the error for display to user
Message::Args args;
Message message(9);
args.add(ie.what());
message.format(args);
result.result = CREATE_ERROR;
result.message = message;
return result;
}
else if (ie.errorCode() == ERR_TABLE_NOT_IN_CATALOG)
{
roPair.objnum = 0;
}
else // error out
{
// release transaction
fSessionManager.rolledback(txnID);
// Return the error for display to user
Message::Args args;
Message message(9);
args.add(ie.what());
message.format(args);
result.result = CREATE_ERROR;
result.message = message;
return result;
}
}
}
catch (std::exception& ex) // error out
{
// release transaction
fSessionManager.rolledback(txnID);
// Return the error for display to user
Message::Args args;
Message message(9);
args.add(ex.what());
message.format(args);
result.result = CREATE_ERROR;
result.message = message;
return result;
}
catch (...) // error out
{
// release transaction
fSessionManager.rolledback(txnID);
// Return the error for display to user
Message::Args args;
Message message(9);
args.add("Unknown exception caught when checking if the table name is already in use.");
message.format(args);
result.result = CREATE_ERROR;
result.message = message;
return result;
}
// This is a current db bug, it should not turn OID is it cannot find
if (roPair.objnum >= 3000)
{
Message::Args args;
Message message(9);
args.add("Internal create table error for");
args.add(tableName.toString());
args.add(": table already exists");
args.add("(your schema is probably out-of-sync)");
message.format(args);
result.result = CREATE_ERROR;
result.message = message;
// release the transaction
fSessionManager.rolledback(txnID);
return result;
}
// Start a new transaction
VERBOSE_INFO("Starting a new transaction");
string stmt = createTableStmt->fSql + "|" + tableDef.fQualifiedName->fSchema + "|";
SQLLogger logger(stmt, fDDLLoggingId, createTableStmt->fSessionID, txnID.id);
std::string err;
execplan::ObjectIDManager fObjectIDManager;
OamCache* oamcache = OamCache::makeOamCache();
string errorMsg;
// get a unique number
uint64_t uniqueId = 0;
// Bug 5070. Added exception handling
try
{
uniqueId = fDbrm->getUnique64();
}
catch (std::exception& ex)
{
Message::Args args;
Message message(9);
args.add(ex.what());
message.format(args);
result.result = CREATE_ERROR;
result.message = message;
fSessionManager.rolledback(txnID);
return result;
}
catch (...)
{
Message::Args args;
Message message(9);
args.add("Unknown error occurred while getting unique number.");
message.format(args);
result.result = CREATE_ERROR;
result.message = message;
fSessionManager.rolledback(txnID);
return result;
}
fWEClient->addQueue(uniqueId);
try
{
// Allocate tableoid table identification
VERBOSE_INFO("Allocating object ID for table");
// Allocate a object ID for each column we are about to create
VERBOSE_INFO("Allocating object IDs for columns");
uint32_t numColumns = tableDef.fColumns.size();
uint32_t numDictCols = 0;
for (unsigned i = 0; i < numColumns; i++)
{
int dataType;
dataType = convertDataType(tableDef.fColumns[i]->fType->fType);
if ((dataType == CalpontSystemCatalog::CHAR && tableDef.fColumns[i]->fType->fLength > 8) ||
(dataType == CalpontSystemCatalog::VARCHAR && tableDef.fColumns[i]->fType->fLength > 7) ||
(dataType == CalpontSystemCatalog::VARBINARY && tableDef.fColumns[i]->fType->fLength > 7) ||
(dataType == CalpontSystemCatalog::BLOB && tableDef.fColumns[i]->fType->fLength > 7) ||
(dataType == CalpontSystemCatalog::TEXT && tableDef.fColumns[i]->fType->fLength > 7))
numDictCols++;
}
// Include column oids, dictionary oids, tableoid, and
// also include AUX oid as of MCOL-5021
fStartingColOID = fObjectIDManager.allocOIDs(numColumns + numDictCols + 2);
#ifdef IDB_DDL_DEBUG
cout << fTxnid.id << " Create table allocOIDs got the starting oid " << fStartingColOID << endl;
#endif
uint32_t numColumnOids = numColumns + numDictCols;
numColumnOids += 1; // MCOL-5021
if (fStartingColOID < 0)
{
result.result = CREATE_ERROR;
errorMsg = "Error in getting objectid from oidmanager.";
Message::Args args;
Message message(9);
args.add("(1)Create table failed due to ");
args.add(errorMsg);
message.format(args);
result.message = message;
fSessionManager.rolledback(txnID);
return result;
}
// Write the table metadata to the systemtable
VERBOSE_INFO("Writing meta data to SYSTABLE");
ByteStream bytestream;
bytestream << (ByteStream::byte)WE_SVR_WRITE_SYSTABLE;
bytestream << uniqueId;
bytestream << (uint32_t)createTableStmt->fSessionID;
bytestream << (uint32_t)txnID.id;
bytestream << (uint32_t)fStartingColOID;
bytestream << (uint32_t)(fStartingColOID + numColumnOids);
bytestream << (uint32_t)createTableStmt->fTableWithAutoi;
uint16_t dbRoot;
BRM::OID_t sysOid = 1001;
// Find out where systable is
rc = fDbrm->getSysCatDBRoot(sysOid, dbRoot);
if (rc != 0)
{
result.result = (ResultCode)rc;
Message::Args args;
Message message(9);
args.add("Error while calling getSysCatDBRoot ");
args.add(errorMsg);
message.format(args);
result.message = message;
// release transaction
fSessionManager.rolledback(txnID);
return result;
}
int pmNum = 1;
bytestream << (uint32_t)dbRoot;
tableDef.serialize(bytestream);
boost::shared_ptr<messageqcpp::ByteStream> bsIn;
pmNum = oamcache->getOwnerPM(dbRoot);
// MCOL-66 The DBRM can't handle concurrent DDL
boost::mutex::scoped_lock lk(dbrmMutex);
try
{
#ifdef IDB_DDL_DEBUG
cout << fTxnid.id << " create table sending We_SVR_WRITE_SYSTABLE to pm " << pmNum << endl;
#endif
fWEClient->write(bytestream, (unsigned)pmNum);
while (1)
{
bsIn.reset(new ByteStream());
fWEClient->read(uniqueId, bsIn);
if (bsIn->length() == 0) // read error
{
rc = NETWORK_ERROR;
errorMsg = "Lost connection to Write Engine Server while updating SYSTABLES";
break;
}
else
{
*bsIn >> rc;
if (rc != 0)
{
errorMsg.clear();
*bsIn >> errorMsg;
#ifdef IDB_DDL_DEBUG
cout << fTxnid.id << "Create table We_SVR_WRITE_CREATETABLEFILES: " << errorMsg << endl;
#endif
}
break;
}
}
}
catch (runtime_error& ex) // write error
{
#ifdef IDB_DDL_DEBUG
cout << fTxnid.id << " create table got exception" << ex.what() << endl;
#endif
rc = NETWORK_ERROR;
errorMsg = ex.what();
}
catch (...)
{
rc = NETWORK_ERROR;
#ifdef IDB_DDL_DEBUG
cout << "create table got unknown exception" << endl;
#endif
}
if (rc != 0)
{
#ifdef IDB_DDL_DEBUG
cout << fTxnid.id << " Create table We_SVR_WRITE_CREATETABLEFILES: " << errorMsg << endl;
#endif
result.result = (ResultCode)rc;
Message::Args args;
Message message(9);
args.add("(2)Create table failed due to ");
args.add(errorMsg);
message.format(args);
result.message = message;
if (rc != NETWORK_ERROR)
{
rollBackTransaction(uniqueId, txnID, createTableStmt->fSessionID); // What to do with the error code
}
// release transaction
fSessionManager.rolledback(txnID);
return result;
}
VERBOSE_INFO("Writing meta data to SYSCOLUMN");
bytestream.restart();
bytestream << (ByteStream::byte)WE_SVR_WRITE_CREATE_SYSCOLUMN;
bytestream << uniqueId;
bytestream << (uint32_t)createTableStmt->fSessionID;
bytestream << (uint32_t)txnID.id;
bytestream << numColumns;
for (unsigned i = 0; i < numColumns; ++i)
{
bytestream << (uint32_t)(fStartingColOID + i + 1);
}
bytestream << numDictCols;
for (unsigned i = 0; i < numDictCols; ++i)
{
bytestream << (uint32_t)(fStartingColOID + numColumns + i + 1);
}
uint8_t alterFlag = 0;
int colPos = 0;
bytestream << (ByteStream::byte)alterFlag;
bytestream << (uint32_t)colPos;
sysOid = 1021;
// Find out where syscolumn is
rc = fDbrm->getSysCatDBRoot(sysOid, dbRoot);
if (rc != 0)
{
result.result = (ResultCode)rc;
Message::Args args;
Message message(9);
args.add("Error while calling getSysCatDBRoot ");
args.add(errorMsg);
message.format(args);
result.message = message;
// release transaction
fSessionManager.rolledback(txnID);
return result;
}
bytestream << (uint32_t)dbRoot;
tableDef.serialize(bytestream);
pmNum = oamcache->getOwnerPM(dbRoot);
try
{
#ifdef IDB_DDL_DEBUG
cout << fTxnid.id << " create table sending WE_SVR_WRITE_CREATE_SYSCOLUMN to pm " << pmNum << endl;
#endif
fWEClient->write(bytestream, (uint32_t)pmNum);
while (1)
{
bsIn.reset(new ByteStream());
fWEClient->read(uniqueId, bsIn);
if (bsIn->length() == 0) // read error
{
rc = NETWORK_ERROR;
errorMsg = "Lost connection to Write Engine Server while updating SYSTABLES";
break;
}
else
{
*bsIn >> rc;
if (rc != 0)
{
errorMsg.clear();
*bsIn >> errorMsg;
#ifdef IDB_DDL_DEBUG
cout << fTxnid.id << "Create table We_SVR_WRITE_CREATETABLEFILES: " << errorMsg << endl;
#endif
}
break;
}
}
}
catch (runtime_error& ex) // write error
{
#ifdef IDB_DDL_DEBUG
cout << fTxnid.id << " create table got exception" << ex.what() << endl;
#endif
rc = NETWORK_ERROR;
errorMsg = ex.what();
}
catch (...)
{
rc = NETWORK_ERROR;
#ifdef IDB_DDL_DEBUG
cout << fTxnid.id << " create table got unknown exception" << endl;
#endif
}
if (rc != 0)
{
#ifdef IDB_DDL_DEBUG
cout << fTxnid.id << " Create table WE_SVR_WRITE_CREATE_SYSCOLUMN: " << errorMsg << endl;
#endif
if (checkPPLostConnection(errorMsg))
{
result.result = PP_LOST_CONNECTION;
return result;
}
else
{
result.result = (ResultCode)rc;
Message::Args args;
Message message(9);
args.add("(3)Create table failed due to ");
args.add(errorMsg);
message.format(args);
result.message = message;
if (rc != NETWORK_ERROR)
{
rollBackTransaction(uniqueId, txnID,
createTableStmt->fSessionID); // What to do with the error code
}
// release transaction
fSessionManager.rolledback(txnID);
return result;
}
}
// Get the number of tables in the database, the current table is included.
int tableCount = systemCatalogPtr->getTableCount();
// Calculate which dbroot the columns should start
DBRootConfigList dbRootList = oamcache->getDBRootNums();
uint16_t useDBRootIndex = tableCount % dbRootList.size();
// Find out the dbroot# corresponding the useDBRootIndex from oam
uint16_t useDBRoot = dbRootList[useDBRootIndex];
VERBOSE_INFO("Creating column files");
ColumnDef* colDefPtr;
ddlpackage::ColumnDefList tableDefCols = tableDef.fColumns;
ColumnDefList::const_iterator iter = tableDefCols.begin();
bytestream.restart();
bytestream << (ByteStream::byte)WE_SVR_WRITE_CREATETABLEFILES;
bytestream << uniqueId;
bytestream << (uint32_t)txnID.id;
bytestream << numColumnOids;
unsigned colNum = 0;
unsigned dictNum = 0;
while (iter != tableDefCols.end())
{
colDefPtr = *iter;
CalpontSystemCatalog::ColDataType dataType = convertDataType(colDefPtr->fType->fType);
if (dataType == CalpontSystemCatalog::DECIMAL || dataType == CalpontSystemCatalog::UDECIMAL)
{
if (colDefPtr->fType->fPrecision == -1 || colDefPtr->fType->fPrecision == 0)
{
colDefPtr->fType->fLength = 8;
}
else if ((colDefPtr->fType->fPrecision > 0) && (colDefPtr->fType->fPrecision < 3))
{
colDefPtr->fType->fLength = 1;
}
else if (colDefPtr->fType->fPrecision < 5 && (colDefPtr->fType->fPrecision > 2))
{
colDefPtr->fType->fLength = 2;
}
else if (colDefPtr->fType->fPrecision > 4 && colDefPtr->fType->fPrecision < 10)
{
colDefPtr->fType->fLength = 4;
}
else if (colDefPtr->fType->fPrecision > 9 && colDefPtr->fType->fPrecision < 19)
{
colDefPtr->fType->fLength = 8;
}
else if (colDefPtr->fType->fPrecision > 18 && colDefPtr->fType->fPrecision < 39)
{
colDefPtr->fType->fLength = 16;
}
}
bytestream << (fStartingColOID + (colNum++) + 1);
bytestream << (uint8_t)dataType;
bytestream << (uint8_t) false;
bytestream << (uint32_t)colDefPtr->fType->fLength;
bytestream << (uint16_t)useDBRoot;
bytestream << (uint32_t)colDefPtr->fType->fCompressiontype;
if ((dataType == CalpontSystemCatalog::CHAR && colDefPtr->fType->fLength > 8) ||
(dataType == CalpontSystemCatalog::VARCHAR && colDefPtr->fType->fLength > 7) ||
(dataType == CalpontSystemCatalog::VARBINARY && colDefPtr->fType->fLength > 7) ||
(dataType == CalpontSystemCatalog::BLOB && colDefPtr->fType->fLength > 7) ||
(dataType == CalpontSystemCatalog::TEXT && colDefPtr->fType->fLength > 7))
{
bytestream << (uint32_t)(fStartingColOID + numColumns + (dictNum++) + 1);
bytestream << (uint8_t)dataType;
bytestream << (uint8_t) true;
bytestream << (uint32_t)colDefPtr->fType->fLength;
bytestream << (uint16_t)useDBRoot;
bytestream << (uint32_t)colDefPtr->fType->fCompressiontype;
}
++iter;
}
bytestream << (fStartingColOID + numColumnOids);
bytestream << (uint8_t)execplan::AUX_COL_DATATYPE;
bytestream << (uint8_t) false;
bytestream << (uint32_t)execplan::AUX_COL_WIDTH;
bytestream << (uint16_t)useDBRoot;
bytestream << (uint32_t)execplan::AUX_COL_COMPRESSION_TYPE;
//@Bug 4176. save oids to a log file for cleanup after fail over.
std::vector<CalpontSystemCatalog::OID> oidList;
for (unsigned i = 0; i < numColumns; ++i)
{
oidList.push_back(fStartingColOID + i + 1);
}
bytestream << numDictCols;
for (unsigned i = 0; i < numDictCols; ++i)
{
oidList.push_back(fStartingColOID + numColumns + i + 1);
}
// MCOL-5021
oidList.push_back(fStartingColOID + numColumnOids);
try
{
createWriteDropLogFile(fStartingColOID, uniqueId, oidList);
}
catch (std::exception& ex)
{
result.result = (ResultCode)rc;
Message::Args args;
Message message(9);
args.add("(4)Create table failed due to ");
args.add(ex.what());
message.format(args);
result.message = message;
if (rc != NETWORK_ERROR)
{
rollBackTransaction(uniqueId, txnID, createTableStmt->fSessionID); // What to do with the error code
}
// release transaction
fSessionManager.rolledback(txnID);
return result;
}
pmNum = oamcache->getOwnerPM(useDBRoot);
try
{
#ifdef IDB_DDL_DEBUG
cout << fTxnid.id << " create table sending WE_SVR_WRITE_CREATETABLEFILES to pm " << pmNum << endl;
#endif
fWEClient->write(bytestream, pmNum);
while (1)
{
bsIn.reset(new ByteStream());
fWEClient->read(uniqueId, bsIn);
if (bsIn->length() == 0) // read error
{
rc = NETWORK_ERROR;
errorMsg = "Lost connection to Write Engine Server while updating SYSTABLES";
break;
}
else
{
*bsIn >> rc;
if (rc != 0)
{
errorMsg.clear();
*bsIn >> errorMsg;
#ifdef IDB_DDL_DEBUG
cout << "Create table We_SVR_WRITE_CREATETABLEFILES: " << errorMsg << endl;
#endif
}
break;
}
}
if (rc != 0)
{
// drop the newly created files
bytestream.restart();
bytestream << (ByteStream::byte)WE_SVR_WRITE_DROPFILES;
bytestream << uniqueId;
bytestream << (uint32_t)numColumnOids;
for (unsigned i = 0; i < numColumnOids; i++)
{
bytestream << (uint32_t)(fStartingColOID + i + 1);
}
fWEClient->write(bytestream, pmNum);
while (1)
{
bsIn.reset(new ByteStream());
fWEClient->read(uniqueId, bsIn);
if (bsIn->length() == 0) // read error
{
break;
}
else
{
break;
}
}
//@Bug 5464. Delete from extent map.
fDbrm->deleteOIDs(oidList);
}
}
catch (runtime_error&)
{
errorMsg = "Lost connection to Write Engine Server";
}
if (rc != 0)
{
#ifdef IDB_DDL_DEBUG
cout << fTxnid.id << " Create table We_SVR_WRITE_CREATETABLEFILES: " << errorMsg << endl;
#endif
rollBackTransaction(uniqueId, txnID, createTableStmt->fSessionID); // What to do with the error code
fSessionManager.rolledback(txnID);
}
else
{
commitTransaction(uniqueId, txnID);
fSessionManager.committed(txnID);
fWEClient->removeQueue(uniqueId);
deleteLogFile(DROPTABLE_LOG, fStartingColOID, uniqueId);
}
// Log the DDL statement.
logDDL(createTableStmt->fSessionID, txnID.id, createTableStmt->fSql, createTableStmt->fOwner);
}
catch (std::exception& ex)
{
result.result = CREATE_ERROR;
Message::Args args;
Message message(9);
args.add("(5)Create table failed due to ");
args.add(ex.what());
message.format(args);
result.message = message;
fSessionManager.rolledback(txnID);
fWEClient->removeQueue(uniqueId);
return result;
}
// fWEClient->removeQueue(uniqueId);
if (rc != 0)
{
result.result = CREATE_ERROR;
Message::Args args;
Message message(9);
args.add("(6)Create table failed due to ");
args.add(errorMsg);
message.format(args);
result.message = message;
}
return result;
}
void CreateTableProcessor::rollBackCreateTable(const string& error, BRM::TxnID txnID, int sessionId,
ddlpackage::TableDef& tableDef, DDLResult& result)
{
cerr << "CreatetableProcessor::processPackage: " << error << endl;
Message::Args args;
Message message(1);
args.add("(7)Create table Failed: ");
args.add(error);
args.add("");
args.add("");
message.format(args);
result.result = CREATE_ERROR;
result.message = message;
fWriteEngine.rollbackTran(txnID.id, sessionId);
size_t size = tableDef.fColumns.size();
for (size_t i = 0; i < size; ++i)
{
fWriteEngine.dropColumn(txnID.id, fStartingColOID + i);
}
try
{
execplan::ObjectIDManager fObjectIDManager;
fObjectIDManager.returnOID(fTableOID);
fObjectIDManager.returnOIDs(fStartingColOID, fStartingColOID + tableDef.fColumns.size() - 1);
}
catch (std::exception& ex)
{
Message::Args args;
Message message(6);
args.add(ex.what());
message.format(args);
result.message = message;
result.result = CREATE_ERROR;
}
catch (...)
{
Message::Args args;
Message message(6);
args.add("Unknown exception");
message.format(args);
result.message = message;
result.result = CREATE_ERROR;
// cout << "returnOIDs error" << endl;
}
DictionaryOIDList::const_iterator dictoid_iter = fDictionaryOIDList.begin();
while (dictoid_iter != fDictionaryOIDList.end())
{
DictOID dictOID = *dictoid_iter;
fWriteEngine.dropDctnry(txnID.id, dictOID.dictOID, dictOID.treeOID, dictOID.listOID);
// fObjectIDManager.returnOID(dictOID.dictOID);
++dictoid_iter;
}
fSessionManager.rolledback(txnID);
}
} // namespace ddlpackageprocessor