Skip to content
Snippets Groups Projects
import_csv.py 8.6 KiB
Newer Older
#!/usr/bin/env python3
"""Imports CSV files into an Odoo server.

Dependencies as indicated in setup.py:
- odoorpc==0.7.0
- requests_toolbelt==0.8.0
"""
# Version 2.22
import argparse
import hashlib
import hmac
import logging
import os
import sys
from datetime import datetime
from typing import List, Tuple

import odoorpc
from requests_toolbelt import MultipartEncoder

_logger = logging.getLogger(__name__)



def import_csv(
        login: str, password: str,
        database: str,
        protocol: str, host: str, port: int,
        timeout: int,
        model_filenames: List[Tuple[str, str, str]],
        update_parameter: bool = True,
) -> int:
    o, session_id = odoo_login(
        database, host, login, password, port, protocol, timeout)

    import_obj = o.env["base_import.import"]
    param_obj = o.env["ir.config_parameter"]

    _logger.info("Getting a CSRF token")
    # Reproduce what odoo.http::csrf_token does to build a CSRF token.
    max_ts = ''
    db_secret = param_obj.get_param('database.secret')
    csrf_token = (
        '%so%s' % (
            hmac.new(
                db_secret.encode(),
                msg=('%s%s' % (session_id, max_ts)).encode(),
                digestmod=hashlib.sha1,
            ).hexdigest(),
            max_ts
        )
    )
    _logger.debug(f"CSRF token: {csrf_token}")

    _logger.info("Importing CSV files")

    import_options = {
        "encoding": "utf-8",
        "quoting": '"',
        "separator": ",",
        "headers": True,
    }
    _logger.info(f"{len(model_filenames)} files to import")
    for model, csv_file, lang in model_filenames:
        _logger.info(
            'Importing - %s in model %s (lang %s)', csv_file, model, lang)
        context = {
            'lang': lang,
            # TODO add tz maybe
        }
        import_dlg_id = import_obj.create(
            {'res_model': model},
            context=context,
        )
        with open(csv_file, 'rb') as f:
            m = MultipartEncoder(
                fields={
                    'csrf_token': csrf_token,
                    'import_id': str(import_dlg_id),
                    'file': ('filename', f, 'text/csv'),
                }
            )
            data = m.to_string()
            response = o.http(
                'base_import/set_file',
                data=data,
                headers={'Content-Type': m.content_type}
            )
            # in Odoo, context contains user lang, but that is ok so far
            _logger.debug('response: %s', response)
        # This returns a lot of stuff we don't need, except for the field list.
        fields = import_obj.parse_preview(
            import_dlg_id, import_options,
        )["headers"]
        errors = list()
        # this is where setting the context is necessary
        results = import_obj.do(
            import_dlg_id, fields, import_options, context=context
        )
        for result in results:
            if result['type'] == 'error':
                errors.append(result['message'])
        if errors:
            _logger.fatal(
                'Error while importing - %s in model %s:', csv_file, model)
            for error in errors:
                _logger.warning(' - %s', error)
            return 1
        else:
            _logger.info('Imported - %s in model %s', csv_file, model)

    if update_parameter:
        _logger.debug("Setting import_csv parameter")
        param_obj.set_param('import_csv.write_date', str(datetime.now()))

    _logger.info("Done.")
    return 0


def odoo_login(database, host, login, password, port, protocol, timeout):
    _logger.info(
        f"Connecting to Odoo on <{protocol}://{host}:{port}> ({database} DB)"
    )
    o = odoorpc.ODOO(host, port=port, protocol=protocol, timeout=timeout)
    _logger.info("Connected")
    # Do "o.login(ODOO_DB, 'admin', ODOO_ADMIN_PASS)" by hand here so we can
    # grab the session ID (odoorpc does not store it anywhere).
    data = o.json(
        "/web/session/authenticate",
        {"db": database, "login": login, "password": password},
    )
    session_id = data["result"]["session_id"]
    uid = data["result"]["uid"]
    if uid:
        context = data["result"]["user_context"]
        o._env = odoorpc.env.Environment(o, database, uid, context=context)
        o._login = login
        o._password = password
    else:
        raise odoorpc.error.RPCError("Wrong login ID or password")
    return o, session_id


def main(argv=None):  # IGNORE:C0111
    """Parse arguments and launch conversion
    """
    program_version = __version__
    program_build_date = str(__updated__)
    program_version_message = '%%(prog)s %s (%s)' % (
        program_version, program_build_date)
    program_shortdesc = __doc__.split(".")[0]
    program_license = '''%s

  Created by Vincent Hatakeyama on %s.
  Copyright 2019 XCG Consulting. All rights reserved.

  Licensed under the MIT License

  Distributed on an "AS IS" basis without warranties
  or conditions of any kind, either express or implied.

USAGE
''' % (program_shortdesc, str(__date__))
    # Argument parsing
    parser = odoo_connect_parser(program_license, program_version_message)
    # TODO indicate that either a file or a directory is required
    parser.add_argument(
        '-f',
        '--file',
        help="CSV to import",
        action='append',
    )
    parser.add_argument(
        '--directory',
        help="Directory of CSV files to import",
        action='append',
    )

    nmspc = parser.parse_args(argv)
    logging_from_verbose(nmspc)

    model_filenames = list()
    # files need to be named according to the following regexp
    pattern = re.compile(
        r'(?P<path>.*/)'
        r'(?:[\d\s\w_-]*\s)?'
        r'(?P<model>[\w.]+?)'
        r'(?:@(?P<lang>\w{2,3}(?:@latin|_\w{2})))?'
        r'(?:.csv)?'
    )
    file_list = list()
    if nmspc.file:
        file_list.extend(nmspc.file)
    if nmspc.directory:
        for directory in nmspc.directory:
            for root, subdirs, files in os.walk(directory):
                file_list.extend(os.path.join(root, f) for f in sorted(files))
    for filename in file_list:
        _logger.debug(f"{filename} to import")
        result = pattern.fullmatch(filename)
        if result:
            model = result.group('model')
            lang = result.group('lang')
            model_filenames.append((model, filename, lang))
        else:
            _logger.warning('Unrecognized file name "%s", ignored', filename)

    return import_csv(
        login=nmspc.login,
        password=nmspc.password,
        port=nmspc.port,
        host=nmspc.host,
        protocol=nmspc.protocol,
        timeout=nmspc.timeout,
        database=nmspc.database,
        model_filenames=model_filenames,
    )


def logging_from_verbose(namespace: argparse.Namespace):
    verbose = namespace.verbose
    if not verbose:
        level = logging.WARN
    elif verbose == 1:
        level = logging.INFO
    else:
        level = logging.DEBUG
    logging.basicConfig(
        level=level,
        format='%(asctime)s %(levelname)8s [%(lineno)3d]: %(message)s',
    )


def odoo_connect_parser(description, version) -> argparse.ArgumentParser:
        formatter_class=argparse.RawDescriptionHelpFormatter)
    parser.add_argument(
        '-V', '--version', action='version',
    parser.add_argument(
        '-v', '--verbose', dest='verbose', action='count',
        help="set verbosity level [default: %(default)s]")
    parser.add_argument(
        '--login',
        help="Odoo user [default: %(default)s]",
        default='admin',
    )
    parser.add_argument(
        '--password',
        help="Odoo user password",
        required=True,
    )
    parser.add_argument(
        '-p',
        '--port',
        help="Odoo port [default: %(default)s]",
        default=8069,
    )
    parser.add_argument(
        '--host',
        help="Odoo host [default: %(default)s]",
        default='localhost',
    )
    parser.add_argument(
        '--protocol',
        help="Odoo protocol [default: %(default)s]",
        default='jsonrpc',
        # TODO point to odoorpc values
        choices=['jsonrpc', 'jsonrpc+ssl'],
    )
    parser.add_argument(
        '--timeout',
        help="Odoo timeout [default: %(default)s]",
        default=36000,
    )
    parser.add_argument(
        '-d',
        '--database',
        help="Odoo database",
        required=True,
    )


if __name__ == "__main__":
    return_code = main(sys.argv[1:])
    if return_code:
        exit(return_code)