#!/usr/bin/env python3 """Imports CSV files into an Odoo server. Dependencies: - odoorpc: pip install odoorpc==0.5.1 - requests_toolbelt: pip install requests_toolbelt==0.8.0 """ # Version 2.22 import argparse import hashlib import hmac import logging import os import sys from datetime import datetime from typing import List, Tuple import odoorpc from requests_toolbelt import MultipartEncoder _logger = logging.getLogger(__name__) __version__ = '1.0.1' __date__ = '2019-07-29' __updated__ = '2019-08-01' def import_csv( login: str, password: str, database: str, protocol: str, host: str, port: int, timeout: int, model_filenames: List[Tuple[str, str]], update_parameter: bool = True, ) -> int: _logger.info( f"Connecting to Odoo on <{protocol}://{host}:{port}> ({database} DB)" ) o = odoorpc.ODOO(host, port=port, protocol=protocol, timeout=timeout) _logger.info("Connected") # Do "o.login(ODOO_DB, 'admin', ODOO_ADMIN_PASS)" by hand here so we can # grab the session ID (odoorpc does not store it anywhere). data = o.json( "/web/session/authenticate", {"db": database, "login": login, "password": password}, ) session_id = data["result"]["session_id"] uid = data["result"]["uid"] if uid: context = data["result"]["user_context"] o._env = odoorpc.env.Environment(o, database, uid, context=context) o._login = login o._password = password else: raise odoorpc.error.RPCError("Wrong login ID or password") import_obj = o.env["base_import.import"] param_obj = o.env["ir.config_parameter"] # user_obj = o.env["res.users"] _logger.info("Getting a CSRF token") # Reproduce what odoo.http::csrf_token does to build a CSRF token. max_ts = '' db_secret = param_obj.get_param('database.secret') csrf_token = ( '%so%s' % ( hmac.new( db_secret.encode(), msg=('%s%s' % (session_id, max_ts)).encode(), digestmod=hashlib.sha1, ).hexdigest(), max_ts ) ) _logger.debug(f"CSRF token: {csrf_token}") _logger.info("Importing CSV files") import_options = { "encoding": "utf-8", "quoting": '"', "separator": ",", "headers": True, } _logger.info(f"{len(model_filenames)} files to import") for model, csv_file in model_filenames: _logger.info('Importing - %s in model %s', csv_file, model) import_dlg_id = import_obj.create({'res_model': model}) with open(csv_file, 'rb') as f: m = MultipartEncoder( fields={ 'csrf_token': csrf_token, 'import_id': str(import_dlg_id), 'file': ('filename', f, 'text/csv'), } ) data = m.to_string() response = o.http( 'base_import/set_file', data=data, headers={'Content-Type': m.content_type} ) _logger.debug('response: %s', response) # This returns a lot of stuff we don't need, except for the field list. fields = import_obj.parse_preview(import_dlg_id, import_options)[ "headers"] # Got fields; now run the import. errors = list() results = import_obj.do(import_dlg_id, fields, import_options) for result in results: if result['type'] == 'error': errors.append(result['message']) if errors: _logger.fatal( 'Error while importing - %s in model %s:', csv_file, model) for error in errors: _logger.warning(' - %s', error) return 1 else: _logger.info('Imported - %s in model %s', csv_file, model) if update_parameter: _logger.debug("Setting import_csv parameter") param_obj.set_param('import_csv.write_date', str(datetime.now())) _logger.info("Done.") return 0 def main(argv=None): # IGNORE:C0111 """Parse arguments and launch conversion """ program_version = __version__ program_build_date = str(__updated__) program_version_message = '%%(prog)s %s (%s)' % ( program_version, program_build_date) program_shortdesc = __doc__.split(".")[0] program_license = '''%s Created by Vincent Hatakeyama on %s. Copyright 2019 XCG Consulting. All rights reserved. Licensed under the MIT License Distributed on an "AS IS" basis without warranties or conditions of any kind, either express or implied. USAGE ''' % (program_shortdesc, str(__date__)) # Argument parsing parser = argparse.ArgumentParser( description=program_license, formatter_class=argparse.RawDescriptionHelpFormatter) parser.add_argument( '-V', '--version', action='version', version=program_version_message) parser.add_argument( '-v', '--verbose', dest='verbose', action='count', help="set verbosity level [default: %(default)s]") parser.add_argument( '--login', help="Odoo user [default: %(default)s]", default='admin', ) parser.add_argument( '--password', help="Odoo user password", required=True, ) parser.add_argument( '-p', '--port', help="Odoo port [default: %(default)s]", default=8069, ) parser.add_argument( '--host', help="Odoo host [default: %(default)s]", default='localhost', ) parser.add_argument( '--protocol', help="Odoo protocol [default: %(default)s]", default='jsonrpc', # TODO point to odoorpc values choices=['jsonrpc', 'jsonrpc+ssl'], ) parser.add_argument( '--timeout', help="Odoo timeout [default: %(default)s]", default=36000, ) parser.add_argument( '-d', '--database', help="Odoo database", required=True, ) # TODO indicate that either a file or a directory is required parser.add_argument( '-f', '--file', help="CSV to import", action='append', ) parser.add_argument( '--directory', help="Directory of CSV files to import", action='append', ) nmspc = parser.parse_args(argv) verbose = nmspc.verbose if not verbose: level = logging.WARN elif verbose == 1: level = logging.INFO else: level = logging.DEBUG logging.basicConfig( level=level, format='%(asctime)s %(levelname)8s [%(lineno)3d]: %(message)s', ) model_filenames = list() # ⚠ assume files are named ".* model_name.csv" or ".* model_name" file_list = list() if nmspc.file: file_list.extend(nmspc.file) if nmspc.directory: for directory in nmspc.directory: for root, subdirs, files in os.walk(directory): file_list.extend(os.path.join(root, f) for f in sorted(files)) for filename in file_list: _logger.debug(f"{filename} to import") model_csv = os.path.basename(filename).split(' ')[-1] if model_csv[-4:] == '.csv': model = model_csv[:-4] else: model = model_csv model_filenames.append((model, filename)) return import_csv( login=nmspc.login, password=nmspc.password, port=nmspc.port, host=nmspc.host, protocol=nmspc.protocol, timeout=nmspc.timeout, database=nmspc.database, model_filenames=model_filenames, ) if __name__ == "__main__": return_code = main(sys.argv[1:]) if return_code: exit(return_code)