diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index b2f171a0a6215a5b97fb49708f5688f2ed599812..e88bc1374cc346d40a4bc52701c38207d61587b2 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -22,8 +22,13 @@ # https://hub.docker.com/r/library/python/tags/ image: python:latest -# include: -# - template: Security/Dependency-Scanning.gitlab-ci.yml +include: + - template: Security/SAST.gitlab-ci.yml + - template: Code-Quality.gitlab-ci.yml + # - template: Security/Secret-Detection.gitlab-ci.yml + # NOTE: Available with Ultimate subscription + # - template: Security/Dependency-Scanning.gitlab-ci.yml + # - template: Verify/Load-Performance-Testing.gitlab-ci.yml # # DOC: https://docs.gitlab.com/ee/user/application_security/dependency_scanning/ # gemnasium-dependency_scanning: @@ -59,10 +64,6 @@ cache: &global_cache before_script: # - python --version # For debugging - # - pip install virtualenv - # - virtualenv venv - # - source venv/bin/activate - - python --version # For debugging # - pip install --upgrade pip virtualenv # - virtualenv venv - python -m venv venv @@ -81,6 +82,9 @@ lint-static: dist-test-docs: stage: build + needs: + - job: lint-static + artifacts: false environment: testing resource_group: testing retry: 2 @@ -91,17 +95,21 @@ dist-test-docs: # policy: pull # dependencies: # - build + variables: + # Use slow compression for artifacts, resulting in smaller archives + ARTIFACT_COMPRESSION_LEVEL: "slowest" + TOX_PARALLEL_NO_SPINNER: 1 before_script: - pip install tox script: # - python setup.py test - - TOX_PARALLEL_NO_SPINNER=1 tox --develop --parallel + - tox --develop --parallel after_script: - mv .tox/docs_out/ public/ - mv cov_html public/coverage # TODO: remove # coverage: '/Code coverage: \d+\.\d+/' - coverage: '/Code coverage: ^TOTAL.+?(\d+\%)$/' + # coverage: '/Code coverage: ^TOTAL.+?(\d+\%)$/' artifacts: when: always paths: @@ -112,19 +120,26 @@ dist-test-docs: - public reports: junit: report.xml - cobertura: coverage.xml + coverage_report: + coverage_format: cobertura + path: coverage.xml coverage: stage: test + needs: + - job: dist-test-docs + artifacts: true environment: testing resource_group: testing retry: 2 - dependencies: - - dist-test-docs + variables: + CODECOV_TOKEN: $codecov_token before_script: # - pip install codecov # - pip install tox + - echo -e "\e[0Ksection_start:`date +%s`:my_first_section[collapsed=true]\r\e[0KCodecov installation" - | + echo Install codecov curl https://keybase.io/codecovsecurity/pgp_keys.asc | gpg --no-default-keyring --keyring trustedkeys.gpg --import # One-time step curl -Os https://uploader.codecov.io/latest/linux/codecov curl -Os https://uploader.codecov.io/latest/linux/codecov.SHA256SUM @@ -132,12 +147,12 @@ coverage: gpgv codecov.SHA256SUM.sig codecov.SHA256SUM shasum -a 256 -c codecov.SHA256SUM chmod +x ./codecov + - echo -e "\e[0Ksection_end:`date +%s`:my_first_section\r\e[0K" script: # NOTE: https://docs.gitlab.com/ee/ci/yaml/script.html - # ./codecov - # --token $codecov_token + # ./codecov --token $codecov_token - > - CODECOV_TOKEN="$codecov_token" ./codecov + ./codecov --file "coverage.xml" --name "codecov-$CI_PROJECT_NAME" --branch "$CI_COMMIT_BRANCH" @@ -147,66 +162,71 @@ coverage: .deploy: stage: deploy + when: manual environment: testing - cache: - # inherit all global cache settings - <<: *global_cache - resource_group: testing retry: 2 needs: - job: dist-test-docs artifacts: true - when: on_success + cache: + # inherit all global cache settings + <<: *global_cache + resource_group: testing before_script: # - pip install setuptools wheel twine - pip install twine script: # - python setup.py sdist bdist_wheel - twine check dist/* + after_script: # Publish after build # - twine upload -u "__token__" -p "pypi-$test_pypi_api_token" --skip-existing $artifact # - twine upload -u "__token__" -p "pypi-$test_pypi_api_token" --skip-existing dist/* + # - TWINE_USERNAME=gitlab-ci-token TWINE_PASSWORD=${CI_JOB_TOKEN} twine upload --skip-existing --repository gitlab dist/* + # - TWINE_USERNAME=gitlab-ci-token TWINE_PASSWORD=${CI_JOB_TOKEN} twine upload --skip-existing --repository-url ${CI_API_V4_URL}/projects/${CI_PROJECT_ID}/packages/pypi dist/* + - twine upload --skip-existing dist/* gitlab-packages: # https://docs.gitlab.com/ee/user/packages/pypi_repository/ extends: .deploy - stage: test + when: on_success environment: name: testing url: https://gitlab.com/api/v4/projects/target-core/packages/pypi + variables: + TWINE_USERNAME: gitlab-ci-token + TWINE_PASSWORD: ${CI_JOB_TOKEN} + TWINE_REPOSITORY_URL: ${CI_API_V4_URL}/projects/${CI_PROJECT_ID}/packages/pypi script: - !reference [.deploy, script] - # - TWINE_USERNAME=gitlab-ci-token TWINE_PASSWORD=${CI_JOB_TOKEN} twine upload --skip-existing --repository gitlab dist/* - - TWINE_USERNAME=gitlab-ci-token TWINE_PASSWORD=${CI_JOB_TOKEN} twine upload --skip-existing --repository-url ${CI_API_V4_URL}/projects/${CI_PROJECT_ID}/packages/pypi dist/* test-pypi: extends: .deploy + rules: + - if: $CI_COMMIT_BRANCH == $CI_DEFAULT_BRANCH environment: name: staging url: https://test.pypi.org resource_group: staging - when: manual - rules: - - if: $CI_COMMIT_BRANCH == $CI_DEFAULT_BRANCH - script: - - !reference [.deploy, script] - - TWINE_USERNAME=__token__ TWINE_PASSWORD="$test_pypi_api_token" twine upload --skip-existing --repository testpypi dist/* + variables: + TWINE_USERNAME: __token__ + TWINE_PASSWORD: $test_pypi_api_token + TWINE_REPOSITORY_URL: testpypi pypi: extends: .deploy + rules: + - if: $CI_COMMIT_TAG environment: name: production url: https://pypi.org resource_group: production - when: manual - rules: - - if: $CI_COMMIT_TAG - script: - - !reference [.deploy, script] - - TWINE_USERNAME=__token__ TWINE_PASSWORD="$pypi_api_token" twine upload --skip-existing dist/* + variables: + TWINE_USERNAME: __token__ + TWINE_PASSWORD: $pypi_api_token artifacts: name: release - expire_in: 3 year + expire_in: 2 year paths: - report.xml - coverage.xml @@ -220,7 +240,8 @@ pypi: pages: stage: deploy rules: - - if: $CI_COMMIT_TAG + # - if: $CI_COMMIT_TAG + - if: $CI_COMMIT_BRANCH == $CI_DEFAULT_BRANCH environment: name: production url: https://omegax.gitlab.com/target-core/index.html @@ -238,9 +259,19 @@ pages: CACHE_REQUEST_TIMEOUT: 5 dependencies: - dist-test-docs - script: [echo Pages website upload] + needs: + - job: dist-test-docs + artifacts: true + script: [echo Pages website Upload] artifacts: name: pages expire_in: 1 year paths: - public + +code_quality: + stage: .post + rules: + - if: $CODE_QUALITY_DISABLED + when: never + - if: $CI_COMMIT_TAG || $CI_COMMIT_BRANCH == $CI_DEFAULT_BRANCH diff --git a/README.md b/README.md index d720f78898a6033b429c45867f8b008b2d2bbcbb..ec7478b90dbd10789b4f83f9ec736a4314ffc5b5 100644 --- a/README.md +++ b/README.md @@ -8,8 +8,7 @@ [![PyPI version](https://badge.fury.io/py/target-core.svg)](https://badge.fury.io/py/target-core) [![PyPi project installs](https://img.shields.io/pypi/dm/target-core.svg?maxAge=2592000&label=installs&color=%2327B1FF)](https://pypi.org/project/target-core) -[Singer](https://www.singer.io/) target that uploads loads data to S3 in JSONL format -following the [Singer spec](https://github.com/singer-io/getting-started/blob/master/docs/SPEC.md). +[**Singer**](https://www.singer.io/) target core provide safe tools to easily build new `targets` following the [*Singer spec*](https://github.com/singer-io/getting-started/blob/master/docs/SPEC.md) *convention* and *protocol*. ## How to use it diff --git a/docs/conf.py b/docs/conf.py index e854b3ffb5e7ced9c8e9138424bd436c9dbb13f0..492c6aed1ef44b459c1dca9498abbdc3bdfcb101 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -10,9 +10,9 @@ # add these directories to sys.path here. If the directory is relative to the # documentation root, use os.path.abspath to make it absolute, like shown here. # -import os -import sys -sys.path.insert(0, os.path.abspath('..')) +from os.path import abspath +from sys import path +path.insert(0, abspath('..')) # -- Project information ----------------------------------------------------- @@ -22,7 +22,7 @@ copyright = '2022, Eddy ∆' author = 'Eddy ∆' # The full version, including alpha/beta/rc tags -release = '0.0.0' +release = '0.0.1' # -- General configuration --------------------------------------------------- @@ -30,7 +30,13 @@ release = '0.0.0' # Add any Sphinx extension module names here, as strings. They can be # extensions coming with Sphinx (named 'sphinx.ext.*') or your custom # ones. -extensions = ['sphinx.ext.todo', 'sphinx.ext.viewcode', 'sphinx.ext.autodoc'] +extensions = [ + 'sphinx.ext.todo', + 'sphinx.ext.viewcode', + 'sphinx.ext.autodoc', + 'sphinx.ext.napoleon', # Numpy doc style + 'sphinx.ext.autosummary', +] # Add any paths that contain templates here, relative to this directory. templates_path = ['_templates'] diff --git a/docs/target.rst b/docs/target.rst index d6d4696e9fd36dbd578f7206df8a98bba3db82be..e7ec03c9934e426a02e10a0efea2449d0925e788 100644 --- a/docs/target.rst +++ b/docs/target.rst @@ -1,5 +1,14 @@ -target package -============== +Welcome to Target Core package documentation! +============================================= + +`Singer `_ **target-core** provide safe tools to easily build new `targets` +following the `Singer spec `_ *convention* and *protocol*. + +.. note:: + This project is under active development. + +.. autosummary:: + :toctree: generated Submodules ---------- diff --git a/pyproject.toml b/pyproject.toml index a53d241511d453813e328fcdf32e18e11d6e99aa..3f7c2b3b988c957d8ffc76f25b23133bcf170a63 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,9 +6,6 @@ requires = [ build-backend = "setuptools.build_meta" [tool.mypy] -# TODO: unlock later -ignore_errors = true - show_error_context = true ignore_missing_imports = true @@ -30,13 +27,13 @@ disallow_untyped_defs = true warn_redundant_casts = true warn_unused_configs = true warn_unused_ignores = true +disallow_untyped_calls = true +no_implicit_reexport = true +strict_equality = true # The following need to have changes made to be able to enable them: # disallow_any_generics = true -# disallow_untyped_calls = true # no_implicit_optional = true -# no_implicit_reexport = true -# strict_equality = true # warn_return_any = true [[tool.mypy.overrides]] # Overrides for currently untyped modules diff --git a/setup.cfg b/setup.cfg index c9e3df628b827d5c082772a471ed105c9245835e..e7cddae370ff585fd0fe05b1a306659e633eddaf 100644 --- a/setup.cfg +++ b/setup.cfg @@ -40,7 +40,11 @@ dist = wheel # build deploy = twine -docs = sphinx-rtd-theme +docs = + sphinx + sphinx-rtd-theme + sphinx-automodapi + numpydoc [options.packages.find] exclude = @@ -76,6 +80,10 @@ max-complexity = 10 builder = html warning-is-error = true # keep-going = true +# project = 'Target Core' +# version = attr: target.__version__ +# release = attr: target.__version__ +# source-dir = 'docs' [tox:tox] # requires = tox-pipenv diff --git a/target/__init__.py b/target/__init__.py index 15400fd77bddc26eefabeeff0b3d7e11fbfbad6a..dabc48da0da186ce9facfc27f1eac62cac10bf8f 100644 --- a/target/__init__.py +++ b/target/__init__.py @@ -2,6 +2,7 @@ __version__ = '0.0.1' +from typing import Any, Callable, Dict, TextIO, Tuple, Optional import argparse import json from pathlib import Path @@ -29,7 +30,7 @@ CONFIG_PARAMS = { } -def add_metadata_columns_to_schema(schema_message): +def add_metadata_columns_to_schema(schema_message: Dict) -> Dict: '''Metadata _sdc columns according to the stitch documentation at https://www.stitchdata.com/docs/data-structure/integration-schemas#sdc-columns @@ -47,7 +48,7 @@ def add_metadata_columns_to_schema(schema_message): return schema_message -def add_metadata_values_to_record(record_message, schema_message, timestamp): +def add_metadata_values_to_record(record_message: Dict, schema_message: Dict, timestamp: datetime.datetime) -> Dict: '''Populate metadata _sdc columns from incoming record message The location of the required attributes are fixed in the stream ''' @@ -64,7 +65,7 @@ def add_metadata_values_to_record(record_message, schema_message, timestamp): return record_message['record'] -def remove_metadata_values_from_record(record_message): +def remove_metadata_values_from_record(record_message: Dict) -> Dict: '''Removes every metadata _sdc column from a given record message ''' for key in { @@ -82,7 +83,7 @@ def remove_metadata_values_from_record(record_message): return record_message['record'] -def emit_state(state): +def emit_state(state: Optional[Any]) -> None: if state is not None: line = json.dumps(state) LOGGER.debug('Emitting state {}'.format(line)) @@ -90,7 +91,7 @@ def emit_state(state): sys.stdout.flush() -def float_to_decimal(value): +def float_to_decimal(value: Any) -> Any: '''Walk the given data structure and turn all instances of float into double.''' if isinstance(value, float): @@ -102,22 +103,65 @@ def float_to_decimal(value): return value -def get_target_key(stream, config, date_time=None): - '''Creates and returns an S3 key for the stream''' +def get_target_key(stream: str, config: Dict[str, Any], date_time: datetime.datetime) -> str: + '''Creates and returns an S3 key for the stream + + Parameters + ---------- + stream : str + incoming stream name that is written in the file + config : dict + configuration dictionary + date_time : datetime + Date used in the path template + + Returns + ------- + out : ``str`` + The formatted path. + ''' # NOTE: Replace dynamic tokens - key = config.get('path_template').format(stream=stream, date_time=date_time, uuid=uuid4()) + key = config['path_template'].format(stream=stream, date_time=date_time, uuid=uuid4()) prefix = config.get('key_prefix', '') return str(Path(key).parent / f'{prefix}{Path(key).name}') if prefix else key -def persist_lines(messages, config, save_records=save_jsonl_file): +def persist_lines(messages: TextIO, config: Dict, save_records: Callable = save_jsonl_file) -> Tuple[Optional[Any], Dict[Any, Any]]: + '''Process the lines received from the Singer Tap. + + This is the core of the messages processing. + Each line is processed function of its type, and the *RECORD* lines are saved using and functions provided as an argument. + By default they are written as a *jsonl* in the *work_dir* working directory provided in the default `config` file. + + Parameters + ---------- + config : dict + configuration dictionary. + date_time : datetime + Date (``datetime``) used in the path template + + Raises + ------ + json.decoder.JSONDecodeError + If the line structure is inconsistent or cnotains errors. + + Returns + ------- + out : list[dict, dict] + A `state` closure info. + + See Also + -------- + `Singer spec `_ *convention* and *protocol*. + ''' + state = None schemas = {} key_properties = {} - validators = {} - file_data = {} + validators: Dict = {} + file_data: Dict = {} # NOTE: Use the system specific temp directory if no custom work_dir provided work_dir = Path(config.get('work_dir', gettempdir())).expanduser() @@ -125,7 +169,7 @@ def persist_lines(messages, config, save_records=save_jsonl_file): # NOTE: Create work_dir if not exists work_dir.mkdir(parents=True, exist_ok=True) - timezone = datetime.timezone(datetime.timedelta(hours=config.get('timezone_offset'))) if config.get('timezone_offset') is not None else None + timezone = datetime.timezone(datetime.timedelta(hours=config['timezone_offset'])) if config.get('timezone_offset') is not None else None now = datetime.datetime.now(timezone) for line in messages: @@ -152,9 +196,7 @@ def persist_lines(messages, config, save_records=save_jsonl_file): record_to_load = add_metadata_values_to_record(m, {}, now) if config.get('add_metadata_columns') else remove_metadata_values_from_record(m) file_data[stream]['file_data'].append(record_to_load) - # NOTE: write the lines into the temporary file when received data over 64Mb default memory buffer - if sys.getsizeof(file_data[stream]['file_data']) > config.get('memory_buffer'): - save_records(file_data[stream], config) + save_records(file_data[stream], config) state = None @@ -192,20 +234,23 @@ def persist_lines(messages, config, save_records=save_jsonl_file): else: LOGGER.warning('Unknown line type "{}" in line "{}"'.format(m['type'], m)) - for _, file_info in file_data.items(): - save_records(file_info, config) - return state, file_data -def main(): +def main() -> None: parser = argparse.ArgumentParser() parser.add_argument('-c', '--config', help='Config file', required=True) args = parser.parse_args() + config = config_compression(config_file(args.config)) - state, _ = persist_lines( + state, file_data = persist_lines( sys.stdin, - config_compression(config_file(args.config))) + config, + save_jsonl_file) + + config['memory_buffer'] = 0 + for _, file_info in file_data.items(): + save_jsonl_file(file_info, config) emit_state(state) LOGGER.debug('Exiting normally') diff --git a/target/file.py b/target/file.py index 27c798820603f4f4628788776f536e66a2b7839a..16f13081b68c48453a9b627a347bdf4562549ea3 100644 --- a/target/file.py +++ b/target/file.py @@ -1,3 +1,5 @@ +from typing import Dict, Any +import sys import gzip import lzma import json @@ -8,12 +10,12 @@ from target.logger import get_logger LOGGER = get_logger() -def config_file(config_path, datetime_format={ - 'date_time_format': '%FT%T.%f'}): +def config_file(config_path: str, datetime_format: Dict = { + 'date_time_format': '%FT%T.%f'}) -> Dict: - path_template_default = '{stream}-{date_time:%s}.json' % datetime_format['date_time_format'] + path_template_default: str = '{stream}-{date_time:%s}.json' % datetime_format['date_time_format'] - config = { + config: Dict[str, Any] = { 'path_template': path_template_default, 'memory_buffer': 64e6 } @@ -28,8 +30,8 @@ def config_file(config_path, datetime_format={ return config -def config_compression(config_default): - config = { +def config_compression(config_default: Dict) -> Dict: + config: Dict[str, Any] = { 'compression': 'none' } config.update(config_default) @@ -56,10 +58,11 @@ def config_compression(config_default): return config -def save_jsonl_file(file_data, config): - if any(file_data['file_data']): - with config.get('open_func')(file_data['file_name'], 'at', encoding='utf-8') as output_file: +def save_jsonl_file(file_data: Dict, config: Dict[str, Any]) -> None: + # NOTE: write the lines into the temporary file when received data over 64Mb default memory buffer + if sys.getsizeof(file_data['file_data']) > config.get('memory_buffer', 0) and any(file_data['file_data']): + with config['open_func'](file_data['file_name'], 'at', encoding='utf-8') as output_file: output_file.writelines((json.dumps(record) + '\n' for record in file_data['file_data'])) del file_data['file_data'][:] - LOGGER.debug("'{}' file saved using open_func '{}'".format(file_data['file_name'], config.get('open_func').__name__)) + LOGGER.debug("'{}' file saved using open_func '{}'".format(file_data['file_name'], config['open_func'].__name__)) diff --git a/target/logger.py b/target/logger.py index 30c250d5fba1a39f6530ad177721bf1ed02e0347..7aac87a955c200b242dd3d8ba36d1b23e19d6ac0 100644 --- a/target/logger.py +++ b/target/logger.py @@ -1,8 +1,8 @@ from pathlib import Path -from logging import config, getLogger +from logging import config, getLogger, Logger -def get_logger(): +def get_logger() -> Logger: '''Return a Logger instance appropriate for using in a Tap or a Target.''' # See # https://docs.python.org/3.5/library/logging.config.html#logging.config.fileConfig diff --git a/target/s3.py b/target/s3.py index 2a62aa9fa6f8317ab916288f5f18513c9c6cb1c8..05558e3f01d04225195a717b1df4f4c63f9fb81c 100644 --- a/target/s3.py +++ b/target/s3.py @@ -2,14 +2,14 @@ import os import backoff import boto3 +from typing import Callable, Dict, Any from botocore.exceptions import ClientError from target.logger import get_logger - LOGGER = get_logger() -def retry_pattern(): +def retry_pattern() -> Callable: return backoff.on_exception( backoff.expo, ClientError, @@ -18,12 +18,12 @@ def retry_pattern(): factor=10) -def log_backoff_attempt(details): +def log_backoff_attempt(details: Dict) -> None: LOGGER.info("Error detected communicating with Amazon, triggering backoff: %d try", details.get("tries")) @retry_pattern() -def create_client(config): +def create_client(config: Dict) -> Any: LOGGER.info("Attempting to create AWS session") # Get the required parameters from config file and/or environment variables @@ -51,8 +51,8 @@ def create_client(config): # pylint: disable=too-many-arguments @retry_pattern() -def upload_file(s3_client, filename, bucket, s3_key, - encryption_type=None, encryption_key=None): +def upload_file(s3_client: Any, filename: str, bucket: str, s3_key: str, + encryption_type: str = None, encryption_key: str = None) -> None: if encryption_type is None or encryption_type.lower() == "none": # No encryption config (defaults to settings on the bucket): diff --git a/tests/test_target.py b/tests/test_target.py index 7ed49f13f7039738dff988a94f6ccd11aea7d04c..6be0ba2bd24e6685ead2dadb908a1f9506e495cc 100644 --- a/tests/test_target.py +++ b/tests/test_target.py @@ -309,6 +309,7 @@ def test_get_target_key(config): def test_persist_lines(caplog, config, input_data, input_multi_stream_data, invalid_row_data, invalid_order_data, state, file_metadata): '''TEST : simple persist_lines call''' + config['memory_buffer'] = 0 output_state, output_file_metadata = persist_lines(input_multi_stream_data, config) file_paths = set(path for path in Path(config['work_dir']).iterdir())