Something went wrong on our end
-
Vincent Hatakeyama authoredVincent Hatakeyama authored
import_csv.py 7.63 KiB
#!/usr/bin/env python3
"""Imports CSV files into an Odoo server.
Dependencies:
- odoorpc:
pip install odoorpc==0.5.1
- requests_toolbelt:
pip install requests_toolbelt==0.8.0
"""
# Version 2.22
import argparse
import hashlib
import hmac
import logging
import os
import sys
from datetime import datetime
from typing import List, Tuple
import odoorpc
from requests_toolbelt import MultipartEncoder
_logger = logging.getLogger(__name__)
__version__ = '1.0.1'
__date__ = '2019-07-29'
__updated__ = '2019-08-01'
def import_csv(
login: str, password: str,
database: str,
protocol: str, host: str, port: int,
timeout: int,
model_filenames: List[Tuple[str, str]],
update_parameter: bool = True,
) -> int:
_logger.info(
f"Connecting to Odoo on <{protocol}://{host}:{port}> ({database} DB)"
)
o = odoorpc.ODOO(host, port=port, protocol=protocol, timeout=timeout)
_logger.info("Connected")
# Do "o.login(ODOO_DB, 'admin', ODOO_ADMIN_PASS)" by hand here so we can
# grab the session ID (odoorpc does not store it anywhere).
data = o.json(
"/web/session/authenticate",
{"db": database, "login": login, "password": password},
)
session_id = data["result"]["session_id"]
uid = data["result"]["uid"]
if uid:
context = data["result"]["user_context"]
o._env = odoorpc.env.Environment(o, database, uid, context=context)
o._login = login
o._password = password
else:
raise odoorpc.error.RPCError("Wrong login ID or password")
import_obj = o.env["base_import.import"]
param_obj = o.env["ir.config_parameter"]
# user_obj = o.env["res.users"]
_logger.info("Getting a CSRF token")
# Reproduce what odoo.http::csrf_token does to build a CSRF token.
max_ts = ''
db_secret = param_obj.get_param('database.secret')
csrf_token = (
'%so%s' % (
hmac.new(
db_secret.encode(),
msg=('%s%s' % (session_id, max_ts)).encode(),
digestmod=hashlib.sha1,
).hexdigest(),
max_ts
)
)
_logger.debug(f"CSRF token: {csrf_token}")
_logger.info("Importing CSV files")
import_options = {
"encoding": "utf-8",
"quoting": '"',
"separator": ",",
"headers": True,
}
_logger.info(f"{len(model_filenames)} files to import")
for model, csv_file in model_filenames:
_logger.info('Importing - %s in model %s', csv_file, model)
import_dlg_id = import_obj.create({'res_model': model})
with open(csv_file, 'rb') as f:
m = MultipartEncoder(
fields={
'csrf_token': csrf_token,
'import_id': str(import_dlg_id),
'file': ('filename', f, 'text/csv'),
}
)
data = m.to_string()
response = o.http(
'base_import/set_file',
data=data,
headers={'Content-Type': m.content_type}
)
_logger.debug('response: %s', response)
# This returns a lot of stuff we don't need, except for the field list.
fields = import_obj.parse_preview(import_dlg_id, import_options)[
"headers"]
# Got fields; now run the import.
errors = list()
results = import_obj.do(import_dlg_id, fields, import_options)
for result in results:
if result['type'] == 'error':
errors.append(result['message'])
if errors:
_logger.fatal(
'Error while importing - %s in model %s:', csv_file, model)
for error in errors:
_logger.warning(' - %s', error)
return 1
else:
_logger.info('Imported - %s in model %s', csv_file, model)
if update_parameter:
_logger.debug("Setting import_csv parameter")
param_obj.set_param('import_csv.write_date', str(datetime.now()))
_logger.info("Done.")
return 0
def main(argv=None): # IGNORE:C0111
"""Parse arguments and launch conversion
"""
program_version = __version__
program_build_date = str(__updated__)
program_version_message = '%%(prog)s %s (%s)' % (
program_version, program_build_date)
program_shortdesc = __doc__.split(".")[0]
program_license = '''%s
Created by Vincent Hatakeyama on %s.
Copyright 2019 XCG Consulting. All rights reserved.
Licensed under the MIT License
Distributed on an "AS IS" basis without warranties
or conditions of any kind, either express or implied.
USAGE
''' % (program_shortdesc, str(__date__))
# Argument parsing
parser = argparse.ArgumentParser(
description=program_license,
formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument(
'-V', '--version', action='version',
version=program_version_message)
parser.add_argument(
'-v', '--verbose', dest='verbose', action='count',
help="set verbosity level [default: %(default)s]")
parser.add_argument(
'--login',
help="Odoo user [default: %(default)s]",
default='admin',
)
parser.add_argument(
'--password',
help="Odoo user password",
required=True,
)
parser.add_argument(
'-p',
'--port',
help="Odoo port [default: %(default)s]",
default=8069,
)
parser.add_argument(
'--host',
help="Odoo host [default: %(default)s]",
default='localhost',
)
parser.add_argument(
'--protocol',
help="Odoo protocol [default: %(default)s]",
default='jsonrpc',
# TODO point to odoorpc values
choices=['jsonrpc', 'jsonrpc+ssl'],
)
parser.add_argument(
'--timeout',
help="Odoo timeout [default: %(default)s]",
default=36000,
)
parser.add_argument(
'-d',
'--database',
help="Odoo database",
required=True,
)
# TODO indicate that either a file or a directory is required
parser.add_argument(
'-f',
'--file',
help="CSV to import",
action='append',
)
parser.add_argument(
'--directory',
help="Directory of CSV files to import",
action='append',
)
nmspc = parser.parse_args(argv)
verbose = nmspc.verbose
if not verbose:
level = logging.WARN
elif verbose == 1:
level = logging.INFO
else:
level = logging.DEBUG
logging.basicConfig(
level=level,
format='%(asctime)s %(levelname)8s [%(lineno)3d]: %(message)s',
)
model_filenames = list()
# ⚠ assume files are named ".* model_name.csv" or ".* model_name"
file_list = list()
if nmspc.file:
file_list.extend(nmspc.file)
if nmspc.directory:
for directory in nmspc.directory:
for root, subdirs, files in os.walk(directory):
file_list.extend(os.path.join(root, f) for f in sorted(files))
for filename in file_list:
_logger.debug(f"{filename} to import")
model_csv = os.path.basename(filename).split(' ')[-1]
if model_csv[-4:] == '.csv':
model = model_csv[:-4]
else:
model = model_csv
model_filenames.append((model, filename))
return import_csv(
login=nmspc.login,
password=nmspc.password,
port=nmspc.port,
host=nmspc.host,
protocol=nmspc.protocol,
timeout=nmspc.timeout,
database=nmspc.database,
model_filenames=model_filenames,
)
if __name__ == "__main__":
return_code = main(sys.argv[1:])
if return_code:
exit(return_code)