You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-30 19:23:07 +03:00
Move config files
This patch: * Moves config files from /usr/local/mariadb/columnstore/etc to ENGINE_SYSCONFDIR/columnstore (ENGINE_SYSCONFDIR is /etc by default) * Sets a define called MCSSYSCONFDIR whic contains the ENGINE_SYSCONFDIR compile time setting * Modifies scripts and code to use the new paths * Removes a whole bunch of files we don't use
This commit is contained in:
@ -1,299 +0,0 @@
|
||||
#!/usr/bin/python
|
||||
##
|
||||
## Bulkloader script by Martin Thomas
|
||||
##
|
||||
|
||||
import os, sys, glob, shutil, xml.dom.minidom
|
||||
import getopt
|
||||
import logging
|
||||
import time
|
||||
|
||||
logger = logging.getLogger()
|
||||
shdlr = logging.StreamHandler()
|
||||
fhdlr = logging.FileHandler(filename='bulkload.log' )
|
||||
formatter = logging.Formatter('%(asctime)s:%(levelname)s: %(message)s')
|
||||
shdlr.setFormatter(formatter)
|
||||
fhdlr.setFormatter(formatter)
|
||||
logger.addHandler(shdlr)
|
||||
logger.addHandler(fhdlr)
|
||||
|
||||
## only report INFO or higher - change to WARNING to silence all logging
|
||||
logger.setLevel(logging.INFO)
|
||||
|
||||
|
||||
def usage():
|
||||
print """
|
||||
|
||||
Bulkload.py is intended to automate the manual steps required to load the database and build indexes from scratch.
|
||||
|
||||
- ipcs-pat will be built if missing
|
||||
- cpimport will be removed and rebuilt
|
||||
- PrimProc will be stopped and started
|
||||
- shared memory sgements wil be removed using ipcs-pat
|
||||
- database files will be removed
|
||||
- dbgen will be run with option 5
|
||||
- oid files and job files will be copied to correct locations
|
||||
- column data will be parsed and loaded using Job 299
|
||||
- index data will be exported, sorted and loaded using Job 300
|
||||
|
||||
Options:
|
||||
-w or --wedir= : Specify the write engine branch to use instead of the default trunk
|
||||
-n or --nocache= : Specify either col or idx and the -c flag will NOT be sent to cpimport
|
||||
-u or --usage : Usage message
|
||||
|
||||
Example:
|
||||
bulkload.py -w/home/adevelop/genii/we1.1 --nocache=idx
|
||||
Load the database using the we1.1 branch for writeengine and do not use cache when building indexes
|
||||
|
||||
THIS SPACE LEFT INTENTIONALLY BLANK
|
||||
"""
|
||||
|
||||
def find_paths():
|
||||
|
||||
"""Find DBRoot and BulkRoot."""
|
||||
try:
|
||||
config_file = os.environ['COLUMNSTORE_CONFIG_FILE']
|
||||
except KeyError:
|
||||
try:
|
||||
logger.info("Environment variable COLUMNSTORE_CONFIG_FILE not set, looking for system Columnstore.xml")
|
||||
config_file = '/usr/local/mariadb/columnstore/etc/Columnstore.xml'
|
||||
os.lstat(config_file)
|
||||
except:
|
||||
logger.error('No config file available')
|
||||
sys.exit('No config file available')
|
||||
try:
|
||||
xmldoc = xml.dom.minidom.parse(config_file)
|
||||
bulk_node = xmldoc.getElementsByTagName('BulkRoot')[0]
|
||||
db_node = xmldoc.getElementsByTagName('DBRoot1')[0]
|
||||
bulk_dir = bulk_node.childNodes[0].nodeValue
|
||||
data_dir = db_node.childNodes[0].nodeValue
|
||||
|
||||
except Exception, e:
|
||||
logger.error('Error parsing config file')
|
||||
logger.error(e)
|
||||
sys.exit('Error parsing config file')
|
||||
|
||||
return (bulk_dir, data_dir)
|
||||
|
||||
def check_dirs(bulkroot, dbroot):
|
||||
|
||||
problem = 0
|
||||
res = 0
|
||||
reqd_dirs = {
|
||||
os.getenv('HOME')+'/genii' : "No genii directory found (contains tools required to continue) (%s)",
|
||||
bulkroot: "Bulkroot specified as %s but not found",
|
||||
bulkroot+'/job': "No job directory found - needed to store Job xml files (looked in %s)",
|
||||
bulkroot+'/data/import': "No data/import directory found - expected %s to hold data to be loaded",
|
||||
bulkroot+'/log': "No data/log directory found - expected %s to log into",
|
||||
dbroot : "DBroot specified as %s but not found"
|
||||
}
|
||||
for dir in reqd_dirs.keys():
|
||||
try:
|
||||
res = os.lstat(dir)
|
||||
except:
|
||||
problem = 1
|
||||
logger.error(reqd_dirs[dir]%dir)
|
||||
|
||||
if problem:
|
||||
sys.exit(1)
|
||||
|
||||
def fix_hwm(job_file):
|
||||
|
||||
"""Find hwm in xml file and change to 0"""
|
||||
|
||||
import re
|
||||
|
||||
src_file = open(job_file, 'r')
|
||||
dst_file = open(job_file+'.tmp', 'w')
|
||||
|
||||
rep = re.compile('hwm="1"')
|
||||
|
||||
for line in src_file:
|
||||
line = rep.sub('hwm="0"', line)
|
||||
dst_file.write(line)
|
||||
# use os.rename instead of shutil.move to avoid problems traversing devices
|
||||
os.rename(job_file+'.tmp', job_file)
|
||||
|
||||
def find_indexes(job_file):
|
||||
|
||||
"""Find index definitions in job_file and return list of files to sort"""
|
||||
|
||||
index_files = []
|
||||
try: # try because we may have an old version of python
|
||||
xmldoc = xml.dom.minidom.parse(job_file)
|
||||
|
||||
for index_node in xmldoc.getElementsByTagName('Index'):
|
||||
index_files.append(index_node.getAttribute('mapName'))
|
||||
except:
|
||||
import re
|
||||
f = open(job_file)
|
||||
for line in f.read():
|
||||
b =re.search('mapName="(CPL_[0-9A-Z_]+)"', line)
|
||||
try: # try because not every line will match
|
||||
index_files.append(b.group(1))
|
||||
except: pass
|
||||
|
||||
return index_files
|
||||
|
||||
def exec_cmd(cmd, args):
|
||||
"""Execute command using subprocess module or if that fails,
|
||||
use os.system
|
||||
"""
|
||||
|
||||
try:
|
||||
import subprocess
|
||||
|
||||
try:
|
||||
retcode = call(cmd + " "+args, shell=True)
|
||||
if retcode < 0:
|
||||
print >>sys.stderr, "Child was terminated by signal", -retcode
|
||||
sys.exit(-1)
|
||||
|
||||
else:
|
||||
print >>sys.stderr, "Child returned", retcode
|
||||
|
||||
except OSError, e:
|
||||
|
||||
print >>sys.stderr, "Execution failed:", e
|
||||
sys.exit(-1)
|
||||
except:
|
||||
logger.info ('Old version of Python - subprocess not available, falling back to os.system')
|
||||
logger.info ('Executing: '+cmd+' '+args)
|
||||
res = os.system(cmd+' '+args)
|
||||
if res:
|
||||
logger.error('Bad return code %i from %s'%(res, cmd))
|
||||
sys.exit( res )
|
||||
|
||||
|
||||
def build_tool(tool):
|
||||
"""
|
||||
Use the tool dictionary to determine if required tool exists
|
||||
and build if not
|
||||
"""
|
||||
|
||||
if not os.path.exists(tool['path']+tool['tool']):
|
||||
logger.warn ("Building %s before continuing"%tool['tool'])
|
||||
curdir=os.getcwd()
|
||||
os.chdir(tool['path'])
|
||||
exec_cmd(tool['builder'], tool['args'])
|
||||
os.chdir(curdir)
|
||||
|
||||
def main():
|
||||
"""
|
||||
Bulk load the database..
|
||||
Check that we can write OIDfiles, that all required tools exist,
|
||||
clean up old files, sort the index inserts and generally rock and roll
|
||||
"""
|
||||
start_dir = curdir=os.getcwd() # remember where we started
|
||||
|
||||
if not os.environ.has_key('LD_LIBRARY_PATH'):
|
||||
logger.info('No environment variable LD_LIBRARY_PATH')
|
||||
else:
|
||||
if len(os.getenv('LD_LIBRARY_PATH'))<5:
|
||||
logger.info('Suspicous LD_LIBRARY_PATH: %s'%os.getenv('LD_LIBRARY_PATH'))
|
||||
|
||||
#-- figure out paths
|
||||
home = os.getenv('HOME')
|
||||
genii = home+'/genii'
|
||||
cache = {}
|
||||
cache['idx'] = '-c'
|
||||
cache['col'] = '-c'
|
||||
|
||||
#-- allow us to specify a write engine branch
|
||||
opts, args = getopt.getopt(sys.argv[1:], 'w:n:u', ['wedir=', 'nocache=', 'usage'])
|
||||
wedir = genii+'/writeengine'
|
||||
for opt, arg in opts:
|
||||
if opt =='-w' or opt =='--wedir':
|
||||
wedir = arg
|
||||
|
||||
if opt == '-n' or opt == '--nocache':
|
||||
if (arg=='idx' or arg=='col'):
|
||||
cache[arg] = ''
|
||||
logger.info("No cache for %s"% arg)
|
||||
|
||||
if opt == '-u' or opt == '--usage':
|
||||
usage()
|
||||
sys.exit()
|
||||
|
||||
logger.info("Using writengine at %s"%wedir)
|
||||
|
||||
(bulkroot, dbroot) = find_paths()
|
||||
|
||||
logger.info ("Bulkroot: %s \tDBRoot: %s\n"%(bulkroot, dbroot))
|
||||
|
||||
check_dirs(bulkroot, dbroot)
|
||||
|
||||
if len(glob.glob(bulkroot+'/data/import/*tbl')) == 0:
|
||||
sys.exit("No files for import found in BulkRoot: %s"%(bulkroot))
|
||||
|
||||
if len(glob.glob(dbroot+'/000.dir'))==0:
|
||||
logger.info("No files found in DBRoot: %s (not fatal)"%dbroot)
|
||||
|
||||
## force rebuild cpimport and build ipcs-pat if required
|
||||
|
||||
build_tool({'path':genii+'/versioning/BRM/',
|
||||
'tool':'ipcs-pat',
|
||||
'builder':'make', 'args':'tools'})
|
||||
|
||||
build_tool({'path':wedir+'/bulk/',
|
||||
'tool':'cpimport',
|
||||
'builder':'make', 'args':'clean'})
|
||||
try:
|
||||
exec_cmd('rm -f', wedir+'/bulk/cpimport')
|
||||
except:
|
||||
pass
|
||||
|
||||
try:
|
||||
os.lstat(start_dir+'/cpimport') # look in local directory first
|
||||
except:
|
||||
build_tool({'path':wedir+'/bulk/',
|
||||
'tool':'cpimport',
|
||||
'builder':'make', 'args':'cpimport'})
|
||||
|
||||
|
||||
## clean up before starting
|
||||
## remove old db files, removed old temp files, remove shared memory segments,
|
||||
## kill old PrimProc and start new one
|
||||
|
||||
logger.info ("Removing old DB files")
|
||||
exec_cmd('rm -fr ', dbroot+'/000.dir')
|
||||
|
||||
logger.info ("Removing old temp files")
|
||||
exec_cmd('rm -fr ', bulkroot+'/data/import/*.idx.txt')
|
||||
|
||||
logger.info ("Removing old process files")
|
||||
exec_cmd('rm -fr ', bulkroot+'/process/*.*')
|
||||
|
||||
logger.info("Killing primProc")
|
||||
os.system('killall -q -u $USER PrimProc')
|
||||
|
||||
logger.info ("kill controllernode and workernode")
|
||||
exec_cmd(genii+'/export/bin/dbrm', "stop ")
|
||||
|
||||
time.sleep(2);
|
||||
logger.info ("Removing shared memory segments")
|
||||
exec_cmd(genii+'/versioning/BRM/ipcs-pat', '-d')
|
||||
|
||||
logger.info("Starting controllernode workernode")
|
||||
exec_cmd(genii+'/export/bin/dbrm', "start ")
|
||||
|
||||
logger.info("Starting primProc")
|
||||
exec_cmd(genii+'/export/bin/PrimProc', "> primproc.log &")
|
||||
|
||||
## run dbbuilder - add yes command at front to automatically answer questions
|
||||
logger.info ("Building db and indexes (no data inserted)")
|
||||
exec_cmd('yes | '+genii+'/tools/dbbuilder/dbbuilder', ' 5')
|
||||
|
||||
logger.info ("Relocating OID files")
|
||||
|
||||
for xmlfile in glob.glob('./Job*xml'):
|
||||
logger.info ("Copying %s to %s\n"%(xmlfile, bulkroot+'/job'))
|
||||
# use os.rename instead of shutil.move to avoid problems traversing devices
|
||||
os.rename(xmlfile, bulkroot+'/job/'+xmlfile)
|
||||
|
||||
logger.info("Using cpimport at %s"%(wedir+'/bulk/cpimport'))
|
||||
exec_cmd('time '+wedir+'/bulk/cpimport', '-j 299 ')
|
||||
exec_cmd(wedir+'/bulk/cpimport', '-c -j 300 ' )
|
||||
|
||||
## the following line allows either interactive use or module import
|
||||
if __name__=="__main__": main()
|
@ -1,121 +0,0 @@
|
||||
#!/bin/bash
|
||||
|
||||
#This is the procedure for running bulkload using cpimport program
|
||||
#Usage of this program :
|
||||
#The necessary input parameter is the schema name
|
||||
#For example: bulkload.sh TPCH
|
||||
|
||||
#A table name and a Job ID can be entered by user when it is prompted or they can be skipped by hitting enter key
|
||||
#When the table name is skipped, ALL of the columns and index in ALL of the tables in the schema will be loaded
|
||||
|
||||
#When table name is entered, All of the columns and indexes in the entered table will be loaded
|
||||
#Job ID will determine the names of the two xml files. For example, job id 100 will generate Job_100.xml for columns and Job_101 for index xml file. Job id for index xml file is the entered job id +1
|
||||
#if the job id is skipped, the default job ids are 299 and 300 for column and index files
|
||||
#There are two xml files will be generated which reside in bulkroot directory under subdirectory job
|
||||
#For example, the job directory may look like /usr/local/mariadb/columnstore/test/bulk/job
|
||||
|
||||
# Set up a default search path.
|
||||
PROG_NAME=$(basename $0)
|
||||
SUFFIX=.tbl
|
||||
TABLENAME=""
|
||||
while getopts 't:j:e:s:d:p:n:u:h' OPTION
|
||||
do
|
||||
case ${OPTION} in
|
||||
s) Schema=${OPTARG};;
|
||||
t) TABLENAME=${OPTARG};;
|
||||
j) JOBID=${OPTARG};;
|
||||
e) MAXERROR=${OPTARG};;
|
||||
p) DESC=${OPTARG};;
|
||||
d) DELIMITER=${OPTARG};;
|
||||
n) NAME=${OPTARG};;
|
||||
u) USER=${OPTARG};;
|
||||
h) echo "Options: ${PROG_NAME} -s schema -j jobid [-t TableName -e max_error_row -p description -d delimiter -n name -u user]"
|
||||
exit 2;;
|
||||
\?) echo "Options: ${PROG_NAME} -s schema -j jobid [-t TableName -e max_error_row -s description -d delimiter -n name -u user]"
|
||||
exit 2;;
|
||||
esac
|
||||
done
|
||||
|
||||
#generate column xml file
|
||||
echo "MAXERROR in $PROG_NAME =" $MAXERROR
|
||||
echo "JOBID in $PROG_NAME =" $JOBID
|
||||
echo "Schema is " $Schema
|
||||
echo "DESC is " $DESC
|
||||
echo "DELIMITER =" $DELIMITER
|
||||
echo "TABLENAME is " $TABLENAME
|
||||
echo "NAME is " $NAME
|
||||
|
||||
if [ -n "$TABLENAME" ]; then
|
||||
./colxml $Schema -t $TABLENAME -j $JOBID -d $DELIMITER -s "$DESC" -e $MAXERROR -n "$NAME" -u $USER
|
||||
if [ "$?" <> "0" ]; then
|
||||
echo "Error in colxml !" 1>&2
|
||||
exit 1
|
||||
fi
|
||||
command="colxml $Schema -t $TABLENAME -j $JOBID -d $DELIMITER -s \"$DESC\" -e $MAXERROR -n \"$NAME\" -u \"$USER\" "
|
||||
echo $command
|
||||
else
|
||||
./colxml $Schema -j $JOBID -d $DELIMITER -s "$DESC" -e $MAXERROR -n "$NAME" -u $USER
|
||||
if [ "$?" <> "0" ]; then
|
||||
echo "Error in colxml !" 1>&2
|
||||
exit 1
|
||||
fi
|
||||
command="colxml $Schema -j $JOBID -d "$DELIMITER" -s \"$DESC\" -e $MAXERROR -n \"$NAME\" -u \"$USER\" "
|
||||
echo $command
|
||||
fi
|
||||
|
||||
#generate index xml file
|
||||
DESC="table index definition"
|
||||
NAME="index definitions for tables in $Schema"
|
||||
let "JOBID2 = JOBID+1"
|
||||
echo "DEFAULT INDEX JOB ID is " $JOBID2
|
||||
if [ -n "$TABLENAME" ]; then
|
||||
./indxml $Schema -t $TABLENAME -j $JOBID2 -s "$DESC" -e $MAXERROR -n "$NAME" -u $USER
|
||||
if [ "$?" <> "0" ]; then
|
||||
echo "Error in indxml !" 1>&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
command="indxml $Schema -t $TABLENAME -j $JOBID2 -s \"$DESC\" -e $MAXERROR -n \"$NAME\" -u \"$USER\" "
|
||||
echo $command
|
||||
|
||||
else
|
||||
./indxml $Schema -j $JOBID2 -s "$DESC" -e $MAXERROR -n "$NAME" -u $USER
|
||||
if [ "$?" <> "0" ]; then
|
||||
echo "Error in colxml !" 1>&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
command="indxml $Schema -j $JOBID2 -s \"$DESC\" -e $MAXERROR -n \"$NAME\" -u \"$USER\" "
|
||||
echo $command
|
||||
fi
|
||||
#get bulkroot
|
||||
if [ -n "$CALPONT_CONFIG_FILE" ]; then
|
||||
echo "CALPONT_CONFIG_FILE=" $CALPONT_CONFIG_FILE
|
||||
elif [ -z "$CALPONT_CONFIG_FILE"]; then
|
||||
CALPONT_CONFIG_FILE="/usr/local/mariadb/columnstore/etc/Columnstore.xml"
|
||||
echo "CALPONT_CONFIG_FILE=" $CALPONT_CONFIG_FILE
|
||||
else
|
||||
CALPONT_CONFIG_FILE="/usr/local/mariadb/columnstore/etc/Columnstore.xml"
|
||||
echo "CALPONT_CONFIG_FILE=" $CALPONT_CONFIG_FILE
|
||||
fi
|
||||
|
||||
awk '/BulkRoot/ { sub(/<BulkRoot>/,"",$0); sub(/<\/BulkRoot>/,"",$0); sub(/" "/,"",$0);print $0 > "tmp.txt"}' $CALPONT_CONFIG_FILE
|
||||
sed -e 's/ *//g' tmp.txt > out.txt
|
||||
|
||||
BulkRoot=$(cat out.txt)
|
||||
echo "BulkRoot=" $BulkRoot
|
||||
rm -rf out.txt tmp.txt
|
||||
|
||||
#bulk load column files
|
||||
./cpimport -j $JOBID
|
||||
command="cpimport -j $JOBID"
|
||||
echo $command
|
||||
#bulk load parallel index files
|
||||
#./splitidx -j $JOBID2
|
||||
#IDX_SHELL_SCRIPT="$BulkRoot/process/Job_$JOBID2.sh"
|
||||
#chmod +x $IDX_SHELL_SCRIPT
|
||||
#echo " run parallel loading $IDX_SHELL_SCRIPT"
|
||||
#$IDX_SHELL_SCRIPT
|
||||
|
||||
|
||||
|
@ -1,93 +0,0 @@
|
||||
#!/usr/bin/python
|
||||
|
||||
import os, sys, glob, shutil, xml.dom.minidom
|
||||
|
||||
def find_paths():
|
||||
|
||||
"""Find DBRoot and BulkRoot."""
|
||||
try:
|
||||
config_file = os.environ['COLUMNSTORE_CONFIG_FILE']
|
||||
except KeyError:
|
||||
try:
|
||||
config_file = '/usr/local/mariadb/columnstore/etc/Columnstore.xml'
|
||||
os.lstat(config_file)
|
||||
except:
|
||||
sys.exit('No config file available')
|
||||
|
||||
|
||||
xmldoc = xml.dom.minidom.parse(config_file)
|
||||
bulk_node = xmldoc.getElementsByTagName('BulkRoot')[0]
|
||||
db_node = xmldoc.getElementsByTagName('DBRoot')[0]
|
||||
|
||||
bulk_dir = bulk_node.childNodes[0].nodeValue
|
||||
data_dir = db_node.childNodes[0].nodeValue
|
||||
|
||||
return (bulk_dir, data_dir)
|
||||
|
||||
|
||||
def validate_indexes(job_file):
|
||||
index_files = []
|
||||
xmldoc = xml.dom.minidom.parse(job_file)
|
||||
|
||||
for index_node in xmldoc.getElementsByTagName('Index'):
|
||||
curTreeOid = index_node.getAttribute('iTreeOid')
|
||||
curListOid = index_node.getAttribute('iListOid')
|
||||
curMapOid = index_node.getAttribute('mapOid')
|
||||
#curIdxCmdArg = ' -t ' + curTreeOid + ' -l ' + curListOid + ' -v -c ' + curMapOid + ' > idxCol_' + curMapOid+'.out'
|
||||
curIdxCmdArg = ' -t %s -l %s -v -c %s > idxCol_%s.out' % (curTreeOid, curListOid, curMapOid, curMapOid)
|
||||
index_files.append( curIdxCmdArg )
|
||||
|
||||
return index_files
|
||||
|
||||
def exec_cmd(cmd, args):
|
||||
"""Execute command using subprocess module or if that fails,
|
||||
use os.system
|
||||
"""
|
||||
|
||||
try:
|
||||
import subprocess
|
||||
|
||||
try:
|
||||
retcode = call(cmd + " "+args, shell=True)
|
||||
if retcode < 0:
|
||||
print >>sys.stderr, "Child was terminated by signal", -retcode
|
||||
sys.exit(-1)
|
||||
|
||||
else:
|
||||
print >>sys.stderr, "Child returned", retcode
|
||||
|
||||
except OSError, e:
|
||||
|
||||
print >>sys.stderr, "Execution failed:", e
|
||||
sys.exit(-1)
|
||||
except:
|
||||
res = os.system(cmd+' '+args)
|
||||
if res:
|
||||
sys.exit( res )
|
||||
|
||||
|
||||
|
||||
def main():
|
||||
"""
|
||||
Validate indexes..
|
||||
"""
|
||||
|
||||
if len(os.getenv('LD_LIBRARY_PATH'))<5:
|
||||
print 'Suspicous LD_LIBRARY_PATH: %s'%os.getenv('LD_LIBRARY_PATH')
|
||||
|
||||
home = os.getenv('HOME')
|
||||
genii = home+'/genii'
|
||||
|
||||
(bulkroot, dbroot) = find_paths()
|
||||
|
||||
if len(glob.glob(bulkroot+'/job/Job_300.xml')) == 0:
|
||||
sys.exit("No Job_300.xml exist ")
|
||||
|
||||
indexes = validate_indexes(bulkroot+'/job/Job_300.xml')
|
||||
for idxCmdArg in indexes:
|
||||
print idxCmdArg
|
||||
exec_cmd( genii + '/tools/evalidx/evalidx', idxCmdArg )
|
||||
|
||||
|
||||
## the following line allows either interactive use or module import
|
||||
if __name__=="__main__": main()
|
@ -1,93 +0,0 @@
|
||||
#!/bin/bash
|
||||
|
||||
#This is the procedure for running bulkload using cpimport program
|
||||
#Usage of this program :
|
||||
#The necessary input parameter is the schema name
|
||||
#For example: bulkload.sh TPCH
|
||||
|
||||
#A table name and a Job ID can be entered by user when it is prompted or they can be skipped by hitting enter key
|
||||
#When the table name is skipped, ALL of the columns and index in ALL of the tables in the schema will be loaded
|
||||
|
||||
#When table name is entered, All of the columns and indexes in the entered table will be loaded
|
||||
#Job ID will determine the names of the two xml files. For example, job id 100 will generate Job_100.xml for columns and Job_101 for index xml file. Job id for index xml file is the entered job id +1
|
||||
#if the job id is skipped, the default job ids are 299 and 300 for column and index files
|
||||
#There are two xml files will be generated which reside in bulkroot directory under subdirectory job
|
||||
#For example, the job directory may look like /usr/local/mariadb/columnstore/test/bulk/job
|
||||
|
||||
# Set up a default search path.
|
||||
|
||||
#echo "This is Script name " $0
|
||||
PROG_NAME=$(basename $0)
|
||||
|
||||
USERNAME=`grep "^${USER}:" /etc/passwd | cut -d: -f5`
|
||||
JOBID=""
|
||||
TABLENAME=""
|
||||
Schema=""
|
||||
DELIMITER="|"
|
||||
MAXERROR=10
|
||||
FORMAT=CSV
|
||||
DESC="table columns definition"
|
||||
NAME="table columns definition"
|
||||
|
||||
|
||||
while getopts 't:j:e:s:d:p:n:hu' OPTION
|
||||
do
|
||||
case ${OPTION} in
|
||||
s) Schema=${OPTARG};;
|
||||
t) TABLENAME=${OPTARG};;
|
||||
j) JOBID=${OPTARG};;
|
||||
e) MAXERROR=${OPTARG};;
|
||||
p) DESC=${OPTARG};;
|
||||
d) DELIMITER=${OPTARG};;
|
||||
n) NAME=${OPTARG};;
|
||||
h) echo "Usage: ${PROG_NAME} -s schema -j jobid [-t TableName -e max_error_row -p description -d delimiter -n name ]"
|
||||
exit 2;;
|
||||
u) echo "Usage: ${PROG_NAME} -s schema -j jobid [-t TableName -e max_error_row -p description -d delimiter -n name ]"
|
||||
exit 2;;
|
||||
\?) echo "Usage: ${PROG_NAME} -s schema -j jobid [-t TableName -e max_error_row -p description -d delimiter -n name ]"
|
||||
exit 2;;
|
||||
esac
|
||||
done
|
||||
|
||||
if [ -n "$Schema" ]; then
|
||||
echo "Schema is " $Schema
|
||||
else
|
||||
echo "Error using the script, a schema is needed! "
|
||||
echo "usage as follows: "
|
||||
echo "Usage: ${PROG_NAME} -s schema -j jobid [-t TableName -p description -d delimiter -e max_error_rows -n name ]"
|
||||
echo "PLEASE ONLY INPUT SCHEMA NAME:"
|
||||
read Schema
|
||||
if [ -n "$Schema" ]; then
|
||||
echo "Schema is " $Schema
|
||||
else
|
||||
echo "Error using the script, a schema is needed! "
|
||||
echo "Usage: ${PROG_NAME} -s schema -j jobid [-t TableName -p description -d delimiter -e max_error_rows -n name ]"
|
||||
echo "Try again! Goodbye!"
|
||||
exit 2;
|
||||
fi
|
||||
fi
|
||||
NAME="column definitions for tables in $Schema"
|
||||
|
||||
if [ -n "$JOBID" ]; then
|
||||
echo "INPUT JOB ID is " $JOBID
|
||||
else
|
||||
echo "Error using the script, a jobid is needed! "
|
||||
echo "PLEASE INPUT jobid:"
|
||||
read JOBID
|
||||
if [ -n "$JOBID" ]; then
|
||||
echo "JOBID is " $JOBID
|
||||
else
|
||||
echo "Error using the script, a jobid is needed! "
|
||||
echo "Usage: ${PROG_NAME} -s schema -j jobid [-t TableName -s description -d delimiter -e max_error_rows -n name ]"
|
||||
echo "Try again! Goodbye!"
|
||||
exit 2;
|
||||
fi
|
||||
fi
|
||||
################################################################################
|
||||
|
||||
if [ -n "$TABLENAME" ]; then
|
||||
./bulkloadp.sh -e $MAXERROR -s $Schema -t "$TABLENAME" -j $JOBID -p "$DESC" -d "$DELIMITER" -n "$NAME" -u $USER
|
||||
|
||||
else
|
||||
./bulkloadp.sh -e $MAXERROR -s $Schema -j $JOBID -d "$DELIMITER" -p "$DESC" -n "$NAME" -u $USER
|
||||
fi
|
@ -1,95 +0,0 @@
|
||||
#!/bin/bash
|
||||
|
||||
#This is the procedure for running bulkload using cpimport program
|
||||
#Usage of this program :
|
||||
#The necessary input parameter is the schema name
|
||||
#For example: bulkload.sh TPCH
|
||||
|
||||
#A table name and a Job ID can be entered by user when it is prompted or they can be skipped by hitting enter key
|
||||
#When the table name is skipped, ALL of the columns and index in ALL of the tables in the schema will be loaded
|
||||
|
||||
#When table name is entered, All of the columns and indexes in the entered table will be loaded
|
||||
#Job ID will determine the names of the two xml files. For example, job id 100 will generate Job_100.xml for columns and Job_101 for index xml file. Job id for index xml file is the entered job id +1
|
||||
#if the job id is skipped, the default job ids are 299 and 300 for column and index files
|
||||
#There are two xml files will be generated which reside in bulkroot directory under subdirectory job
|
||||
#For example, the job directory may look like /usr/local/mariadb/columnstore/test/bulk/job
|
||||
|
||||
# Set up a default search path.
|
||||
PATH="$HOME/genii/export/bin:.:/sbin:/usr/sbin:/bin:/usr/bin:/usr/X11R6/bin"
|
||||
export PATH
|
||||
|
||||
#echo "This is Script name " $0
|
||||
PROG_NAME=$(basename $0)
|
||||
|
||||
USERNAME=`grep "^${USER}:" /etc/passwd | cut -d: -f5`
|
||||
JOBID=""
|
||||
TABLENAME=""
|
||||
Schema=""
|
||||
DELIMITER="|"
|
||||
MAXERROR=10
|
||||
FORMAT=CSV
|
||||
DESC="table columns definition"
|
||||
NAME="table columns definition"
|
||||
|
||||
|
||||
while getopts 't:j:e:s:d:p:n:hu' OPTION
|
||||
do
|
||||
case ${OPTION} in
|
||||
s) Schema=${OPTARG};;
|
||||
t) TABLENAME=${OPTARG};;
|
||||
j) JOBID=${OPTARG};;
|
||||
e) MAXERROR=${OPTARG};;
|
||||
p) DESC=${OPTARG};;
|
||||
d) DELIMITER=${OPTARG};;
|
||||
n) NAME=${OPTARG};;
|
||||
h) echo "Usage: ${PROG_NAME} -s schema -j jobid [-t TableName -e max_error_row -p description -d delimiter -n name ]"
|
||||
exit 2;;
|
||||
u) echo "Usage: ${PROG_NAME} -s schema -j jobid [-t TableName -e max_error_row -p description -d delimiter -n name ]"
|
||||
exit 2;;
|
||||
\?) echo "Usage: ${PROG_NAME} -s schema -j jobid [-t TableName -e max_error_row -p description -d delimiter -n name ]"
|
||||
exit 2;;
|
||||
esac
|
||||
done
|
||||
|
||||
if [ -n "$Schema" ]; then
|
||||
echo "Schema is " $Schema
|
||||
else
|
||||
echo "Error using the script, a schema is needed! "
|
||||
echo "usage as follows: "
|
||||
echo "Usage: ${PROG_NAME} Schema -j jobid [-t TableName -p description -d delimiter -e max_error_rows -n name ]"
|
||||
echo "PLEASE ONLY INPUT SCHEMA NAME:"
|
||||
read Schema
|
||||
if [ -n "$Schema" ]; then
|
||||
echo "Schema is " $Schema
|
||||
else
|
||||
echo "Error using the script, a schema is needed! "
|
||||
echo "Usage: ${PROG_NAME} Schema -j jobid [-t TableName -p description -d delimiter -e max_error_rows -n name ]"
|
||||
echo "Try again! Goodbye!"
|
||||
exit 2;
|
||||
fi
|
||||
fi
|
||||
NAME="column definitions for tables in $Schema"
|
||||
|
||||
if [ -n "$JOBID" ]; then
|
||||
echo "INPUT JOB ID is " $JOBID
|
||||
else
|
||||
echo "Error using the script, a jobid is needed! "
|
||||
echo "PLEASE INPUT jobid:"
|
||||
read JOBID
|
||||
if [ -n "$JOBID" ]; then
|
||||
echo "JOBID is " $JOBID
|
||||
else
|
||||
echo "Error using the script, a jobid is needed! "
|
||||
echo "Usage: ${PROG_NAME} Schema -j jobid [-t TableName -s description -d delimiter -e max_error_rows -n name ]"
|
||||
echo "Try again! Goodbye!"
|
||||
exit 2;
|
||||
fi
|
||||
fi
|
||||
################################################################################
|
||||
|
||||
if [ -n "$TABLENAME" ]; then
|
||||
bulkloadp.sh -e $MAXERROR -s $Schema -t "$TABLENAME" -j $JOBID -p "$DESC" -d "$DELIMITER" -n "$NAME" -u $USER
|
||||
|
||||
else
|
||||
bulkloadp.sh -e $MAXERROR -s $Schema -j $JOBID -d "$DELIMITER" -p "$DESC" -n "$NAME" -u $USER
|
||||
fi
|
@ -1,3 +0,0 @@
|
||||
cleanup.sh
|
||||
dbbuilder.sh
|
||||
bulkloadp.sh
|
@ -1,299 +0,0 @@
|
||||
#!/usr/bin/python
|
||||
##
|
||||
## Bulkloader script by Martin Thomas
|
||||
##
|
||||
|
||||
import os, sys, glob, shutil, xml.dom.minidom
|
||||
import getopt
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger()
|
||||
shdlr = logging.StreamHandler()
|
||||
fhdlr = logging.FileHandler(filename='bulkload.log' )
|
||||
formatter = logging.Formatter('%(asctime)s:%(levelname)s: %(message)s')
|
||||
shdlr.setFormatter(formatter)
|
||||
fhdlr.setFormatter(formatter)
|
||||
logger.addHandler(shdlr)
|
||||
logger.addHandler(fhdlr)
|
||||
|
||||
## only report INFO or higher - change to WARNING to silence all logging
|
||||
logger.setLevel(logging.INFO)
|
||||
|
||||
|
||||
def usage():
|
||||
print """
|
||||
|
||||
qa-bulkload.py is intended to automate the manual steps required to load the
|
||||
database and build indexes from scratch.
|
||||
|
||||
- PrimProc will be stopped and started
|
||||
- shared memory sgements wil be removed using ipcs-pat
|
||||
- database files will be removed
|
||||
- dbgen will be run with option 5
|
||||
- oid files and job files will be copied to correct locations
|
||||
- column data will be parsed and loaded using Job 299
|
||||
- index data will be exported, sorted and loaded using Job 300
|
||||
|
||||
Options:
|
||||
-n or --nocache= : Specify either col or idx and the -c flag will NOT be sent to cpimport
|
||||
-u or --usage : Usage message
|
||||
|
||||
Example:
|
||||
bulkload.py --nocache=idx
|
||||
Load the database, do not use cache when building indexes
|
||||
|
||||
THIS SPACE LEFT INTENTIONALLY BLANK
|
||||
"""
|
||||
|
||||
def find_paths():
|
||||
|
||||
"""Find DBRoot and BulkRoot."""
|
||||
try:
|
||||
config_file = os.environ['COLUMNSTORE_CONFIG_FILE']
|
||||
except KeyError:
|
||||
try:
|
||||
logger.info("Environment variable COLUMNSTORE_CONFIG_FILE not set, looking for system Columnstore.xml")
|
||||
config_file = '/usr/local/mariadb/columnstore/etc/Columnstore.xml'
|
||||
os.lstat(config_file)
|
||||
except:
|
||||
logger.error('No config file available')
|
||||
sys.exit('No config file available')
|
||||
try:
|
||||
xmldoc = xml.dom.minidom.parse(config_file)
|
||||
bulk_node = xmldoc.getElementsByTagName('BulkRoot')[0]
|
||||
db_node = xmldoc.getElementsByTagName('DBRoot')[0]
|
||||
bulk_dir = bulk_node.childNodes[0].nodeValue
|
||||
data_dir = db_node.childNodes[0].nodeValue
|
||||
|
||||
except Exception, e:
|
||||
logger.error('Error parsing config file')
|
||||
logger.error(e)
|
||||
sys.exit('Error parsing config file')
|
||||
|
||||
return (bulk_dir, data_dir)
|
||||
|
||||
def check_dirs(bulkroot, dbroot):
|
||||
|
||||
problem = 0
|
||||
res = 0
|
||||
reqd_dirs = {
|
||||
os.getenv('HOME')+'/genii' : "No genii directory found (contains tools required to continue) (%s)",
|
||||
bulkroot: "Bulkroot specified as %s but not found",
|
||||
bulkroot+'/job': "No job directory found - needed to store Job xml files (looked in %s)",
|
||||
bulkroot+'/data/import': "No data/import directory found - expected %s to hold data to be loaded",
|
||||
bulkroot+'/log': "No data/log directory found - expected %s to log into",
|
||||
dbroot : "DBroot specified as %s but not found"
|
||||
}
|
||||
for dir in reqd_dirs.keys():
|
||||
try:
|
||||
res = os.lstat(dir)
|
||||
except:
|
||||
problem = 1
|
||||
logger.error(reqd_dirs[dir]%dir)
|
||||
|
||||
if problem:
|
||||
sys.exit(1)
|
||||
|
||||
def fix_hwm(job_file):
|
||||
|
||||
"""Find hwm in xml file and change to 0"""
|
||||
|
||||
import re
|
||||
|
||||
src_file = open(job_file, 'r')
|
||||
dst_file = open(job_file+'.tmp', 'w')
|
||||
|
||||
rep = re.compile('hwm="1"')
|
||||
|
||||
for line in src_file:
|
||||
line = rep.sub('hwm="0"', line)
|
||||
dst_file.write(line)
|
||||
# use os.rename instead of shutil.move to avoid problems traversing devices
|
||||
os.rename(job_file+'.tmp', job_file)
|
||||
|
||||
def find_indexes(job_file):
|
||||
|
||||
"""Find index definitions in job_file and return list of files to sort"""
|
||||
|
||||
index_files = []
|
||||
try: # try because we may have an old version of python
|
||||
xmldoc = xml.dom.minidom.parse(job_file)
|
||||
|
||||
for index_node in xmldoc.getElementsByTagName('Index'):
|
||||
index_files.append(index_node.getAttribute('mapName'))
|
||||
except:
|
||||
import re
|
||||
f = open(job_file)
|
||||
for line in f.read():
|
||||
b =re.search('mapName="(CPL_[0-9A-Z_]+)"', line)
|
||||
try: # try because not every line will match
|
||||
index_files.append(b.group(1))
|
||||
except: pass
|
||||
|
||||
return index_files
|
||||
|
||||
def exec_cmd(cmd, args):
|
||||
"""Execute command using subprocess module or if that fails,
|
||||
use os.system
|
||||
"""
|
||||
|
||||
try:
|
||||
import subprocess
|
||||
|
||||
try:
|
||||
retcode = call(cmd + " "+args, shell=True)
|
||||
if retcode < 0:
|
||||
print >>sys.stderr, "Child was terminated by signal", -retcode
|
||||
sys.exit(-1)
|
||||
|
||||
else:
|
||||
print >>sys.stderr, "Child returned", retcode
|
||||
|
||||
except OSError, e:
|
||||
|
||||
print >>sys.stderr, "Execution failed:", e
|
||||
sys.exit(-1)
|
||||
except:
|
||||
logger.info ('Old version of Python - subprocess not available, falling back to os.system')
|
||||
logger.info ('Executing: '+cmd+' '+args)
|
||||
res = os.system(cmd+' '+args)
|
||||
if res:
|
||||
logger.error('Bad return code %i from %s'%(res, cmd))
|
||||
sys.exit( res )
|
||||
|
||||
|
||||
def build_tool(tool):
|
||||
"""
|
||||
Use the tool dictionary to determine if required tool exists
|
||||
and build if not
|
||||
"""
|
||||
|
||||
if not os.path.exists(tool['path']+tool['tool']):
|
||||
logger.warn ("Building %s before continuing"%tool['tool'])
|
||||
curdir=os.getcwd()
|
||||
os.chdir(tool['path'])
|
||||
exec_cmd(tool['builder'], tool['args'])
|
||||
os.chdir(curdir)
|
||||
|
||||
def main():
|
||||
"""
|
||||
Bulk load the database..
|
||||
Check that we can write OIDfiles, that all required tools exist,
|
||||
clean up old files, sort the index inserts and generally rock and roll
|
||||
"""
|
||||
start_dir = curdir=os.getcwd() # remember where we started
|
||||
|
||||
if not os.environ.has_key('LD_LIBRARY_PATH'):
|
||||
logger.info('No environment variable LD_LIBRARY_PATH')
|
||||
else:
|
||||
if len(os.getenv('LD_LIBRARY_PATH'))<5:
|
||||
logger.info('Suspicous LD_LIBRARY_PATH: %s'%os.getenv('LD_LIBRARY_PATH'))
|
||||
|
||||
#-- figure out paths
|
||||
home = os.getenv('HOME')
|
||||
cache = {}
|
||||
cache['idx'] = '-c'
|
||||
cache['col'] = '-c'
|
||||
|
||||
#-- allow us to specify a write engine branch
|
||||
opts, args = getopt.getopt(sys.argv[1:], 'n:u', ['nocache=', 'usage'])
|
||||
for opt, arg in opts:
|
||||
|
||||
if opt == '-n' or opt == '--nocache':
|
||||
if (arg=='idx' or arg=='col'):
|
||||
cache[arg] = ''
|
||||
logger.info("No cache for %s"% arg)
|
||||
|
||||
if opt == '-u' or opt == '--usage':
|
||||
usage()
|
||||
sys.exit()
|
||||
|
||||
(bulkroot, dbroot) = find_paths()
|
||||
|
||||
logger.info ("Bulkroot: %s \tDBRoot: %s\n"%(bulkroot, dbroot))
|
||||
|
||||
check_dirs(bulkroot, dbroot)
|
||||
|
||||
if len(glob.glob(bulkroot+'/data/import/*tbl')) == 0:
|
||||
sys.exit("No files for import found in BulkRoot: %s"%(bulkroot))
|
||||
|
||||
if len(glob.glob(dbroot+'/000.dir'))==0:
|
||||
logger.info("No files found in DBRoot: %s (not fatal)"%dbroot)
|
||||
|
||||
## qa version does not build any tools. Cease and desist if any tools missing
|
||||
|
||||
toolset = ['dbbuilder', 'cpimport', 'ipcs-pat', 'PrimProc']
|
||||
for tool in toolset:
|
||||
try:
|
||||
res = os.system('which %s'%tool)
|
||||
finally:
|
||||
if res:
|
||||
logger.error("Fatal error: %s not found"%tool)
|
||||
sys.exit(-1)
|
||||
|
||||
|
||||
|
||||
## clean up before starting
|
||||
## remove old db files, removed old temp files, remove shared memory segments,
|
||||
## kill old PrimProc and start new one
|
||||
|
||||
logger.info ("Removing old DB files")
|
||||
exec_cmd('rm -fr ', dbroot+'/000.dir')
|
||||
|
||||
logger.info ("Removing old temp files")
|
||||
exec_cmd('rm -fr ', bulkroot+'/data/import/*.idx.txt')
|
||||
|
||||
logger.info ("Removing shared memory segments")
|
||||
exec_cmd('ipcs-pat', '-d')
|
||||
|
||||
logger.info("Killing primProc")
|
||||
os.system('killall -q -u $USER PrimProc')
|
||||
|
||||
logger.info("Starting primProc")
|
||||
exec_cmd('PrimProc', "> primproc.log &")
|
||||
|
||||
## run dbbuilder
|
||||
logger.info ("Building db and indexes (no data inserted)")
|
||||
exec_cmd('yes | dbbuilder', ' 5')
|
||||
|
||||
logger.info ("Relocating OID files")
|
||||
for file in ['colOIDFile.dat', 'dicOIDFile.dat', 'indexOIDFile.dat']:
|
||||
# use os.rename instead of shutil.move to avoid problems traversing devices
|
||||
os.rename(file, dbroot+'/'+file)
|
||||
|
||||
for xmlfile in glob.glob('./Job*xml'):
|
||||
logger.info ("Copying %s to %s\n"%(xmlfile, bulkroot+'/job'))
|
||||
# use os.rename instead of shutil.move to avoid problems traversing devices
|
||||
os.rename(xmlfile, bulkroot+'/job/'+xmlfile)
|
||||
|
||||
exec_cmd('time cpimport', '-j 299 -b %s'%cache['col'])
|
||||
exec_cmd('time cpimport', '-j 299 -l %s'%cache['col'])
|
||||
|
||||
exec_cmd('time cpimport', '-j 300 -i -o %s'%cache['idx'])
|
||||
|
||||
logger.info("Over-riding HWM in job file - setting to 0")
|
||||
fix_hwm(bulkroot+'/job/Job_300.xml')
|
||||
|
||||
## sort the files after scanning index job file for mapName(s)
|
||||
logger.info ("Sorting indexes before insertion")
|
||||
indexes = find_indexes(bulkroot+'/job/Job_300.xml')
|
||||
for index in indexes:
|
||||
data_file='%s/data/import/%s.dat.idx.txt'%(bulkroot, index)
|
||||
sort_file ='%s/data/import/%s.dat.idx.sort'%(bulkroot, index)
|
||||
exec_cmd('time sort',' -k1 -n %s > %s'%(data_file, sort_file))
|
||||
# use os.rename instead of shutil.move to avoid problems traversing devices
|
||||
os.rename( sort_file, data_file)
|
||||
|
||||
logger.info("Inserting indexes")
|
||||
try:
|
||||
logger.info("Trying with -m option")
|
||||
exec_cmd('cpimport', '-j 300 -m -i -s %s'%cache['idx'])
|
||||
except:
|
||||
try:
|
||||
logger.warn("cpimport with -m option failed, fall back to regular options")
|
||||
exec_cmd('cpimport', '-j 300 -i -s %s'%cache['idx'])
|
||||
except:
|
||||
logger.error("Index load failed")
|
||||
|
||||
## the following line allows either interactive use or module import
|
||||
if __name__=="__main__": main()
|
Reference in New Issue
Block a user