Skip to content
Snippets Groups Projects
import_csv.py 8.2 KiB
Newer Older
#!/usr/bin/env python3
"""Imports CSV files into an Odoo server.

Dependencies:
- odoorpc:
    pip install odoorpc==0.5.1
- requests_toolbelt:
    pip install requests_toolbelt==0.8.0
"""
# Version 2.22
import argparse
import hashlib
import hmac
import logging
import os
import sys
from datetime import datetime
from typing import List, Tuple

import odoorpc
from requests_toolbelt import MultipartEncoder

_logger = logging.getLogger(__name__)



def import_csv(
        login: str, password: str,
        database: str,
        protocol: str, host: str, port: int,
        timeout: int,
        model_filenames: List[Tuple[str, str, str]],
        update_parameter: bool = True,
) -> int:
    _logger.info(
        f"Connecting to Odoo on <{protocol}://{host}:{port}> ({database} DB)"
    )
    o = odoorpc.ODOO(host, port=port, protocol=protocol, timeout=timeout)
    _logger.info("Connected")

    # Do "o.login(ODOO_DB, 'admin', ODOO_ADMIN_PASS)" by hand here so we can
    # grab the session ID (odoorpc does not store it anywhere).
    data = o.json(
        "/web/session/authenticate",
        {"db": database, "login": login, "password": password},
    )
    session_id = data["result"]["session_id"]
    uid = data["result"]["uid"]
    if uid:
        context = data["result"]["user_context"]
        o._env = odoorpc.env.Environment(o, database, uid, context=context)
        o._login = login
        o._password = password
    else:
        raise odoorpc.error.RPCError("Wrong login ID or password")

    import_obj = o.env["base_import.import"]
    param_obj = o.env["ir.config_parameter"]

    _logger.info("Getting a CSRF token")
    # Reproduce what odoo.http::csrf_token does to build a CSRF token.
    max_ts = ''
    db_secret = param_obj.get_param('database.secret')
    csrf_token = (
        '%so%s' % (
            hmac.new(
                db_secret.encode(),
                msg=('%s%s' % (session_id, max_ts)).encode(),
                digestmod=hashlib.sha1,
            ).hexdigest(),
            max_ts
        )
    )
    _logger.debug(f"CSRF token: {csrf_token}")

    _logger.info("Importing CSV files")

    import_options = {
        "encoding": "utf-8",
        "quoting": '"',
        "separator": ",",
        "headers": True,
    }
    _logger.info(f"{len(model_filenames)} files to import")
    for model, csv_file, lang in model_filenames:
        _logger.info(
            'Importing - %s in model %s (lang %s)', csv_file, model, lang)
        context = {
            'lang': lang,
            # TODO add tz maybe
        }
        import_dlg_id = import_obj.create(
            {'res_model': model},
            context=context,
        )
        with open(csv_file, 'rb') as f:
            m = MultipartEncoder(
                fields={
                    'csrf_token': csrf_token,
                    'import_id': str(import_dlg_id),
                    'file': ('filename', f, 'text/csv'),
                }
            )
            data = m.to_string()
            response = o.http(
                'base_import/set_file',
                data=data,
                headers={'Content-Type': m.content_type}
            )
            # in Odoo, context contains user lang, but that is ok so far
            _logger.debug('response: %s', response)
        # This returns a lot of stuff we don't need, except for the field list.
        fields = import_obj.parse_preview(
            import_dlg_id, import_options,
        )["headers"]
        errors = list()
        # this is where setting the context is necessary
        results = import_obj.do(
            import_dlg_id, fields, import_options, context=context
        )
        for result in results:
            if result['type'] == 'error':
                errors.append(result['message'])
        if errors:
            _logger.fatal(
                'Error while importing - %s in model %s:', csv_file, model)
            for error in errors:
                _logger.warning(' - %s', error)
            return 1
        else:
            _logger.info('Imported - %s in model %s', csv_file, model)

    if update_parameter:
        _logger.debug("Setting import_csv parameter")
        param_obj.set_param('import_csv.write_date', str(datetime.now()))

    _logger.info("Done.")
    return 0


def main(argv=None):  # IGNORE:C0111
    """Parse arguments and launch conversion
    """
    program_version = __version__
    program_build_date = str(__updated__)
    program_version_message = '%%(prog)s %s (%s)' % (
        program_version, program_build_date)
    program_shortdesc = __doc__.split(".")[0]
    program_license = '''%s

  Created by Vincent Hatakeyama on %s.
  Copyright 2019 XCG Consulting. All rights reserved.

  Licensed under the MIT License

  Distributed on an "AS IS" basis without warranties
  or conditions of any kind, either express or implied.

USAGE
''' % (program_shortdesc, str(__date__))
    # Argument parsing
    parser = argparse.ArgumentParser(
        description=program_license,
        formatter_class=argparse.RawDescriptionHelpFormatter)
    parser.add_argument(
        '-V', '--version', action='version',
        version=program_version_message)
    parser.add_argument(
        '-v', '--verbose', dest='verbose', action='count',
        help="set verbosity level [default: %(default)s]")
    parser.add_argument(
        '--login',
        help="Odoo user [default: %(default)s]",
        default='admin',
    )
    parser.add_argument(
        '--password',
        help="Odoo user password",
        required=True,
    )
    parser.add_argument(
        '-p',
        '--port',
        help="Odoo port [default: %(default)s]",
        default=8069,
    )
    parser.add_argument(
        '--host',
        help="Odoo host [default: %(default)s]",
        default='localhost',
    )
    parser.add_argument(
        '--protocol',
        help="Odoo protocol [default: %(default)s]",
        default='jsonrpc',
        # TODO point to odoorpc values
        choices=['jsonrpc', 'jsonrpc+ssl'],
    )
    parser.add_argument(
        '--timeout',
        help="Odoo timeout [default: %(default)s]",
        default=36000,
    )
    parser.add_argument(
        '-d',
        '--database',
        help="Odoo database",
        required=True,
    )
    # TODO indicate that either a file or a directory is required
    parser.add_argument(
        '-f',
        '--file',
        help="CSV to import",
        action='append',
    )
    parser.add_argument(
        '--directory',
        help="Directory of CSV files to import",
        action='append',
    )

    nmspc = parser.parse_args(argv)
    verbose = nmspc.verbose
    if not verbose:
        level = logging.WARN
    elif verbose == 1:
        level = logging.INFO
    else:
        level = logging.DEBUG
    logging.basicConfig(
        level=level,
        format='%(asctime)s %(levelname)8s [%(lineno)3d]: %(message)s',
    # files need to be named according to the following regexp
    pattern = re.compile(
        r'(?P<path>.*/)'
        r'(?:\d+\s)?'
        r'(?P<model>[\w.]+?)'
        r'(?:@(?P<lang>\w{2,3}(?:@latin|_\w{2})))?'
        r'(?:.csv)?'
    )
    file_list = list()
    if nmspc.file:
        file_list.extend(nmspc.file)
    if nmspc.directory:
        for directory in nmspc.directory:
            for root, subdirs, files in os.walk(directory):
                file_list.extend(os.path.join(root, f) for f in sorted(files))
    for filename in file_list:
        _logger.debug(f"{filename} to import")
        result = pattern.fullmatch(filename)
        if result:
            model = result.group('model')
            lang = result.group('lang')
            model_filenames.append((model, filename, lang))
            _logger.warning('Unrecognized file name "%s", ignored', filename)

    return import_csv(
        login=nmspc.login,
        password=nmspc.password,
        port=nmspc.port,
        host=nmspc.host,
        protocol=nmspc.protocol,
        timeout=nmspc.timeout,
        database=nmspc.database,
        model_filenames=model_filenames,
    )


if __name__ == "__main__":
    return_code = main(sys.argv[1:])
    if return_code:
        exit(return_code)