Skip to content

Commit c52715e

Browse files
committed
Changes main program to addax.py
1 parent 288fbdf commit c52715e

File tree

1 file changed

+254
-0
lines changed

1 file changed

+254
-0
lines changed

core/src/main/bin/addax.py

Lines changed: 254 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,254 @@
1+
#!/usr/bin/env python
2+
from __future__ import print_function
3+
4+
import codecs
5+
import json
6+
import os
7+
import platform
8+
import re
9+
import signal
10+
import subprocess
11+
import sys
12+
import time
13+
from glob import glob
14+
from optparse import OptionGroup
15+
from optparse import OptionParser
16+
from string import Template
17+
18+
19+
def isWindows():
20+
return platform.system() == 'Windows'
21+
22+
23+
def get_version():
24+
"""
25+
extract version from lib/addax-core-<version>.jar package
26+
"""
27+
core_jar = glob(os.path.join(ADDAX_HOME, "lib", "addax-core-*.jar"))
28+
if not core_jar:
29+
return ""
30+
else:
31+
return os.path.basename(core_jar[0])[11:-4]
32+
33+
34+
ADDAX_HOME = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
35+
ADDAX_VERSION = 'Addax ' + get_version()
36+
CODING = '-Dfile.encoding=UTF-8'
37+
if isWindows():
38+
codecs.register(
39+
lambda name: name == 'cp65001' and codecs.lookup('utf-8') or None)
40+
CLASS_PATH = "{1}{0}lib{0}*".format(os.sep, ADDAX_HOME)
41+
CODING = '-Dfile.encoding=cp936'
42+
else:
43+
CLASS_PATH = ".:/etc/hbase/conf:{1}{0}lib{0}*".format(os.sep, ADDAX_HOME)
44+
45+
LOGBACK_FILE = "{1}{0}conf{0}logback.xml".format(os.sep, ADDAX_HOME)
46+
DEFAULT_JVM = "-Xms64m -Xmx2g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath={}".format(
47+
ADDAX_HOME)
48+
DEFAULT_PROPERTY_CONF = "%s -Dlogback.statusListenerClass=ch.qos.logback.core.status.NopStatusListener \
49+
-Djava.security.egd=file:///dev/urandom -Daddax.home=%s -Dlogback.configurationFile=%s " % \
50+
(CODING, ADDAX_HOME, LOGBACK_FILE)
51+
ENGINE_COMMAND = "java -server ${jvm} %s -classpath %s ${params} com.wgzhao.addax.core.Engine \
52+
-mode ${mode} -jobid ${jobid} -job ${job}" % \
53+
(DEFAULT_PROPERTY_CONF, CLASS_PATH)
54+
REMOTE_DEBUG_CONFIG = "-Xdebug -Xrunjdwp:transport=dt_socket,server=y,address=9999"
55+
56+
RET_STATE = {
57+
"KILL": 143,
58+
"FAIL": -1,
59+
"OK": 0,
60+
"RUN": 1,
61+
"RETRY": 2
62+
}
63+
64+
65+
def suicide(signum, e):
66+
global child_process
67+
print("[Error] Addax receive unexpected signal {}, starts to suicide.".format(signum))
68+
69+
if child_process:
70+
child_process.send_signal(signal.SIGQUIT)
71+
time.sleep(1)
72+
child_process.kill()
73+
print("Addax Process was killed ! you did ?")
74+
sys.exit(RET_STATE["KILL"])
75+
76+
77+
def register_signal():
78+
if not isWindows():
79+
global child_process
80+
signal.signal(2, suicide)
81+
signal.signal(3, suicide)
82+
signal.signal(15, suicide)
83+
84+
85+
def getOptionParser():
86+
usage = "usage: %prog [options] job-url-or-path"
87+
parser = OptionParser(usage=usage)
88+
parser.add_option("-v", "--version", action="store_true",
89+
help="Print version and exit")
90+
91+
prodEnvOptionGroup = OptionGroup(parser, "Product Env Options",
92+
"Normal user use these options to set jvm parameters, job runtime mode etc. "
93+
"Make sure these options can be used in Product Env.")
94+
prodEnvOptionGroup.add_option("-j", "--jvm", metavar="<jvm parameters>", dest="jvmParameters", action="store",
95+
help="Set jvm parameters if necessary.")
96+
prodEnvOptionGroup.add_option("--jobid", metavar="<job unique id>", dest="jobid", action="store", default="-1",
97+
help="Set job unique id when running by Distribute/Local Mode.")
98+
prodEnvOptionGroup.add_option("-m", "--mode", metavar="<job runtime mode>",
99+
action="store", default="standalone",
100+
help="Set job runtime mode such as: standalone, local, distribute. "
101+
"Default mode is standalone.")
102+
prodEnvOptionGroup.add_option("-p", "--params", metavar="<parameter used in job config>",
103+
action="store", dest="params",
104+
help='Set job parameter, eg: the source tableName you want to set it by command, '
105+
'then you can use like this: -p"-DtableName=your-table-name", '
106+
'if you have mutiple parameters: -p"-DtableName=your-table-name -DcolumnName=your-column-name".'
107+
'Note: you should config in you job tableName with ${tableName}.')
108+
prodEnvOptionGroup.add_option("-r", "--reader", metavar="<parameter used in view job config[reader] template>",
109+
action="store", dest="reader", type="string",
110+
help='View job config[reader] template, eg: mysqlreader,streamreader')
111+
prodEnvOptionGroup.add_option("-w", "--writer", metavar="<parameter used in view job config[writer] template>",
112+
action="store", dest="writer", type="string",
113+
help='View job config[writer] template, eg: mysqlwriter,streamwriter')
114+
prodEnvOptionGroup.add_option("-l", "--logdir", metavar="<log directory>",
115+
action="store", dest="logdir", type="string",
116+
help="the directory which log writes to",
117+
default=ADDAX_HOME + os.sep + 'log')
118+
parser.add_option_group(prodEnvOptionGroup)
119+
120+
devEnvOptionGroup = OptionGroup(parser, "Develop/Debug Options",
121+
"Developer use these options to trace more details of DataX.")
122+
devEnvOptionGroup.add_option("-d", "--debug", dest="remoteDebug", action="store_true",
123+
help="Set to remote debug mode.")
124+
devEnvOptionGroup.add_option("--loglevel", metavar="<log level>", dest="loglevel", action="store",
125+
default="info", help="Set log level such as: debug, info, all etc.")
126+
parser.add_option_group(devEnvOptionGroup)
127+
return parser
128+
129+
130+
def generateJobConfigTemplate(reader, writer):
131+
readerRef = "Please refer to the document:\n\thttps://addax.readthedocs.io/zh_CN/latest/reader/{}.html\n".format(
132+
reader)
133+
writerRef = "Please refer to the document:\n\thttps://addax.readthedocs.io/zh_CN/latest/writer/{}.html\n".format(
134+
writer)
135+
print(readerRef, writerRef)
136+
jobGuid = 'Please save the following configuration as a json file and use\n python {ADDAX_HOME}/bin/addax.py {JSON_FILE_NAME}.json \nto run the job.\n'
137+
print(jobGuid)
138+
jobTemplate = {
139+
"job": {
140+
"setting": {
141+
"speed": {
142+
"channel": ""
143+
}
144+
},
145+
"content": [
146+
{
147+
"reader": {},
148+
"writer": {}
149+
}
150+
]
151+
}
152+
}
153+
readerTemplatePath = os.path.join(
154+
ADDAX_HOME, "plugin", "reader", reader, "plugin_job_template.json")
155+
writerTemplatePath = os.path.join(
156+
ADDAX_HOME, "plugin", "writer", writer, "plugin_job_template.json")
157+
readerPar = None
158+
writerPar = None
159+
try:
160+
readerPar = readPluginTemplate(readerTemplatePath)
161+
except Exception as e:
162+
print("Read reader[%s] template error: can\'t find file %s" % (
163+
reader, readerTemplatePath))
164+
try:
165+
writerPar = readPluginTemplate(writerTemplatePath)
166+
except Exception as e:
167+
print("Read writer[{}] template error: : can\'t find file {}: {}".format(
168+
writer, writerTemplatePath, e))
169+
jobTemplate['job']['content'][0]['reader'] = readerPar
170+
jobTemplate['job']['content'][0]['writer'] = writerPar
171+
print(json.dumps(jobTemplate, indent=4, sort_keys=True))
172+
173+
174+
def readPluginTemplate(plugin):
175+
with open(plugin, 'r') as f:
176+
return json.load(f)
177+
178+
179+
def isUrl(path):
180+
if not path:
181+
return False
182+
183+
assert (isinstance(path, str))
184+
m = re.match(r"^http[s]?://\S+\w*", path.lower())
185+
if m:
186+
return True
187+
else:
188+
return False
189+
190+
191+
def buildStartCommand(options, args):
192+
commandMap = {}
193+
tempJVMCommand = DEFAULT_JVM
194+
if options.jvmParameters:
195+
tempJVMCommand = tempJVMCommand + " " + options.jvmParameters
196+
197+
if options.remoteDebug:
198+
tempJVMCommand = tempJVMCommand + " " + REMOTE_DEBUG_CONFIG
199+
200+
if options.loglevel:
201+
tempJVMCommand = tempJVMCommand + " " + \
202+
("-Dloglevel=%s" % (options.loglevel))
203+
204+
if options.mode:
205+
commandMap["mode"] = options.mode
206+
207+
# jobResource may be url , or local file(ralative, absolution)
208+
jobResource = args[0]
209+
if not isUrl(jobResource):
210+
jobResource = os.path.abspath(jobResource)
211+
if jobResource.lower().startswith("file://"):
212+
jobResource = jobResource[len("file://"):]
213+
# get job's filename if it's local file
214+
if not jobResource.startswith('http://') and not jobResource.startswith('https://'):
215+
jobFilename = os.path.splitext(os.path.split(jobResource)[-1])[0]
216+
else:
217+
jobFilename = jobResource[-20:].replace('/', '_').replace('.', '_')
218+
curr_time = time.strftime("%Y%m%d_%H%M%S")
219+
jobParams = ("-Daddax.log=%s -Dlog.file.name=addax_%s_%s_%s.log") % (
220+
options.logdir, jobFilename, curr_time, os.getpid())
221+
if options.params:
222+
jobParams = jobParams + " " + options.params
223+
224+
if options.jobid:
225+
commandMap["jobid"] = options.jobid
226+
227+
commandMap["jvm"] = tempJVMCommand
228+
commandMap["params"] = jobParams
229+
commandMap["job"] = jobResource
230+
return Template(ENGINE_COMMAND).substitute(**commandMap)
231+
232+
233+
if __name__ == "__main__":
234+
parser = getOptionParser()
235+
options, args = parser.parse_args(sys.argv[1:])
236+
if options.version:
237+
print(ADDAX_VERSION)
238+
sys.exit(0)
239+
240+
if options.reader is not None and options.writer is not None:
241+
generateJobConfigTemplate(options.reader, options.writer)
242+
sys.exit(RET_STATE['OK'])
243+
if len(args) != 1:
244+
parser.print_help()
245+
sys.exit(RET_STATE['FAIL'])
246+
247+
startCommand = buildStartCommand(options, args)
248+
if options.loglevel.lower() == "debug":
249+
print("start command: {}".format(startCommand))
250+
child_process = subprocess.Popen(startCommand, shell=True)
251+
register_signal()
252+
(stdout, stderr) = child_process.communicate()
253+
254+
sys.exit(child_process.returncode)

0 commit comments

Comments
 (0)