diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml
index b2f171a0a6215a5b97fb49708f5688f2ed599812..e88bc1374cc346d40a4bc52701c38207d61587b2 100644
--- a/.gitlab-ci.yml
+++ b/.gitlab-ci.yml
@@ -22,8 +22,13 @@
# https://hub.docker.com/r/library/python/tags/
image: python:latest
-# include:
-# - template: Security/Dependency-Scanning.gitlab-ci.yml
+include:
+ - template: Security/SAST.gitlab-ci.yml
+ - template: Code-Quality.gitlab-ci.yml
+ # - template: Security/Secret-Detection.gitlab-ci.yml
+ # NOTE: Available with Ultimate subscription
+ # - template: Security/Dependency-Scanning.gitlab-ci.yml
+ # - template: Verify/Load-Performance-Testing.gitlab-ci.yml
# # DOC: https://docs.gitlab.com/ee/user/application_security/dependency_scanning/
# gemnasium-dependency_scanning:
@@ -59,10 +64,6 @@ cache: &global_cache
before_script:
# - python --version # For debugging
- # - pip install virtualenv
- # - virtualenv venv
- # - source venv/bin/activate
- - python --version # For debugging
# - pip install --upgrade pip virtualenv
# - virtualenv venv
- python -m venv venv
@@ -81,6 +82,9 @@ lint-static:
dist-test-docs:
stage: build
+ needs:
+ - job: lint-static
+ artifacts: false
environment: testing
resource_group: testing
retry: 2
@@ -91,17 +95,21 @@ dist-test-docs:
# policy: pull
# dependencies:
# - build
+ variables:
+ # Use slow compression for artifacts, resulting in smaller archives
+ ARTIFACT_COMPRESSION_LEVEL: "slowest"
+ TOX_PARALLEL_NO_SPINNER: 1
before_script:
- pip install tox
script:
# - python setup.py test
- - TOX_PARALLEL_NO_SPINNER=1 tox --develop --parallel
+ - tox --develop --parallel
after_script:
- mv .tox/docs_out/ public/
- mv cov_html public/coverage
# TODO: remove
# coverage: '/Code coverage: \d+\.\d+/'
- coverage: '/Code coverage: ^TOTAL.+?(\d+\%)$/'
+ # coverage: '/Code coverage: ^TOTAL.+?(\d+\%)$/'
artifacts:
when: always
paths:
@@ -112,19 +120,26 @@ dist-test-docs:
- public
reports:
junit: report.xml
- cobertura: coverage.xml
+ coverage_report:
+ coverage_format: cobertura
+ path: coverage.xml
coverage:
stage: test
+ needs:
+ - job: dist-test-docs
+ artifacts: true
environment: testing
resource_group: testing
retry: 2
- dependencies:
- - dist-test-docs
+ variables:
+ CODECOV_TOKEN: $codecov_token
before_script:
# - pip install codecov
# - pip install tox
+ - echo -e "\e[0Ksection_start:`date +%s`:my_first_section[collapsed=true]\r\e[0KCodecov installation"
- |
+ echo Install codecov
curl https://keybase.io/codecovsecurity/pgp_keys.asc | gpg --no-default-keyring --keyring trustedkeys.gpg --import # One-time step
curl -Os https://uploader.codecov.io/latest/linux/codecov
curl -Os https://uploader.codecov.io/latest/linux/codecov.SHA256SUM
@@ -132,12 +147,12 @@ coverage:
gpgv codecov.SHA256SUM.sig codecov.SHA256SUM
shasum -a 256 -c codecov.SHA256SUM
chmod +x ./codecov
+ - echo -e "\e[0Ksection_end:`date +%s`:my_first_section\r\e[0K"
script:
# NOTE: https://docs.gitlab.com/ee/ci/yaml/script.html
- # ./codecov
- # --token $codecov_token
+ # ./codecov --token $codecov_token
- >
- CODECOV_TOKEN="$codecov_token" ./codecov
+ ./codecov
--file "coverage.xml"
--name "codecov-$CI_PROJECT_NAME"
--branch "$CI_COMMIT_BRANCH"
@@ -147,66 +162,71 @@ coverage:
.deploy:
stage: deploy
+ when: manual
environment: testing
- cache:
- # inherit all global cache settings
- <<: *global_cache
- resource_group: testing
retry: 2
needs:
- job: dist-test-docs
artifacts: true
- when: on_success
+ cache:
+ # inherit all global cache settings
+ <<: *global_cache
+ resource_group: testing
before_script:
# - pip install setuptools wheel twine
- pip install twine
script:
# - python setup.py sdist bdist_wheel
- twine check dist/*
+ after_script:
# Publish after build
# - twine upload -u "__token__" -p "pypi-$test_pypi_api_token" --skip-existing $artifact
# - twine upload -u "__token__" -p "pypi-$test_pypi_api_token" --skip-existing dist/*
+ # - TWINE_USERNAME=gitlab-ci-token TWINE_PASSWORD=${CI_JOB_TOKEN} twine upload --skip-existing --repository gitlab dist/*
+ # - TWINE_USERNAME=gitlab-ci-token TWINE_PASSWORD=${CI_JOB_TOKEN} twine upload --skip-existing --repository-url ${CI_API_V4_URL}/projects/${CI_PROJECT_ID}/packages/pypi dist/*
+ - twine upload --skip-existing dist/*
gitlab-packages:
# https://docs.gitlab.com/ee/user/packages/pypi_repository/
extends: .deploy
- stage: test
+ when: on_success
environment:
name: testing
url: https://gitlab.com/api/v4/projects/target-core/packages/pypi
+ variables:
+ TWINE_USERNAME: gitlab-ci-token
+ TWINE_PASSWORD: ${CI_JOB_TOKEN}
+ TWINE_REPOSITORY_URL: ${CI_API_V4_URL}/projects/${CI_PROJECT_ID}/packages/pypi
script:
- !reference [.deploy, script]
- # - TWINE_USERNAME=gitlab-ci-token TWINE_PASSWORD=${CI_JOB_TOKEN} twine upload --skip-existing --repository gitlab dist/*
- - TWINE_USERNAME=gitlab-ci-token TWINE_PASSWORD=${CI_JOB_TOKEN} twine upload --skip-existing --repository-url ${CI_API_V4_URL}/projects/${CI_PROJECT_ID}/packages/pypi dist/*
test-pypi:
extends: .deploy
+ rules:
+ - if: $CI_COMMIT_BRANCH == $CI_DEFAULT_BRANCH
environment:
name: staging
url: https://test.pypi.org
resource_group: staging
- when: manual
- rules:
- - if: $CI_COMMIT_BRANCH == $CI_DEFAULT_BRANCH
- script:
- - !reference [.deploy, script]
- - TWINE_USERNAME=__token__ TWINE_PASSWORD="$test_pypi_api_token" twine upload --skip-existing --repository testpypi dist/*
+ variables:
+ TWINE_USERNAME: __token__
+ TWINE_PASSWORD: $test_pypi_api_token
+ TWINE_REPOSITORY_URL: testpypi
pypi:
extends: .deploy
+ rules:
+ - if: $CI_COMMIT_TAG
environment:
name: production
url: https://pypi.org
resource_group: production
- when: manual
- rules:
- - if: $CI_COMMIT_TAG
- script:
- - !reference [.deploy, script]
- - TWINE_USERNAME=__token__ TWINE_PASSWORD="$pypi_api_token" twine upload --skip-existing dist/*
+ variables:
+ TWINE_USERNAME: __token__
+ TWINE_PASSWORD: $pypi_api_token
artifacts:
name: release
- expire_in: 3 year
+ expire_in: 2 year
paths:
- report.xml
- coverage.xml
@@ -220,7 +240,8 @@ pypi:
pages:
stage: deploy
rules:
- - if: $CI_COMMIT_TAG
+ # - if: $CI_COMMIT_TAG
+ - if: $CI_COMMIT_BRANCH == $CI_DEFAULT_BRANCH
environment:
name: production
url: https://omegax.gitlab.com/target-core/index.html
@@ -238,9 +259,19 @@ pages:
CACHE_REQUEST_TIMEOUT: 5
dependencies:
- dist-test-docs
- script: [echo Pages website upload]
+ needs:
+ - job: dist-test-docs
+ artifacts: true
+ script: [echo Pages website Upload]
artifacts:
name: pages
expire_in: 1 year
paths:
- public
+
+code_quality:
+ stage: .post
+ rules:
+ - if: $CODE_QUALITY_DISABLED
+ when: never
+ - if: $CI_COMMIT_TAG || $CI_COMMIT_BRANCH == $CI_DEFAULT_BRANCH
diff --git a/README.md b/README.md
index d720f78898a6033b429c45867f8b008b2d2bbcbb..ec7478b90dbd10789b4f83f9ec736a4314ffc5b5 100644
--- a/README.md
+++ b/README.md
@@ -8,8 +8,7 @@
[](https://badge.fury.io/py/target-core)
[](https://pypi.org/project/target-core)
-[Singer](https://www.singer.io/) target that uploads loads data to S3 in JSONL format
-following the [Singer spec](https://github.com/singer-io/getting-started/blob/master/docs/SPEC.md).
+[**Singer**](https://www.singer.io/) target core provide safe tools to easily build new `targets` following the [*Singer spec*](https://github.com/singer-io/getting-started/blob/master/docs/SPEC.md) *convention* and *protocol*.
## How to use it
diff --git a/docs/conf.py b/docs/conf.py
index e854b3ffb5e7ced9c8e9138424bd436c9dbb13f0..492c6aed1ef44b459c1dca9498abbdc3bdfcb101 100644
--- a/docs/conf.py
+++ b/docs/conf.py
@@ -10,9 +10,9 @@
# add these directories to sys.path here. If the directory is relative to the
# documentation root, use os.path.abspath to make it absolute, like shown here.
#
-import os
-import sys
-sys.path.insert(0, os.path.abspath('..'))
+from os.path import abspath
+from sys import path
+path.insert(0, abspath('..'))
# -- Project information -----------------------------------------------------
@@ -22,7 +22,7 @@ copyright = '2022, Eddy ∆'
author = 'Eddy ∆'
# The full version, including alpha/beta/rc tags
-release = '0.0.0'
+release = '0.0.1'
# -- General configuration ---------------------------------------------------
@@ -30,7 +30,13 @@ release = '0.0.0'
# Add any Sphinx extension module names here, as strings. They can be
# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom
# ones.
-extensions = ['sphinx.ext.todo', 'sphinx.ext.viewcode', 'sphinx.ext.autodoc']
+extensions = [
+ 'sphinx.ext.todo',
+ 'sphinx.ext.viewcode',
+ 'sphinx.ext.autodoc',
+ 'sphinx.ext.napoleon', # Numpy doc style
+ 'sphinx.ext.autosummary',
+]
# Add any paths that contain templates here, relative to this directory.
templates_path = ['_templates']
diff --git a/docs/target.rst b/docs/target.rst
index d6d4696e9fd36dbd578f7206df8a98bba3db82be..e7ec03c9934e426a02e10a0efea2449d0925e788 100644
--- a/docs/target.rst
+++ b/docs/target.rst
@@ -1,5 +1,14 @@
-target package
-==============
+Welcome to Target Core package documentation!
+=============================================
+
+`Singer `_ **target-core** provide safe tools to easily build new `targets`
+following the `Singer spec `_ *convention* and *protocol*.
+
+.. note::
+ This project is under active development.
+
+.. autosummary::
+ :toctree: generated
Submodules
----------
diff --git a/pyproject.toml b/pyproject.toml
index a53d241511d453813e328fcdf32e18e11d6e99aa..3f7c2b3b988c957d8ffc76f25b23133bcf170a63 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -6,9 +6,6 @@ requires = [
build-backend = "setuptools.build_meta"
[tool.mypy]
-# TODO: unlock later
-ignore_errors = true
-
show_error_context = true
ignore_missing_imports = true
@@ -30,13 +27,13 @@ disallow_untyped_defs = true
warn_redundant_casts = true
warn_unused_configs = true
warn_unused_ignores = true
+disallow_untyped_calls = true
+no_implicit_reexport = true
+strict_equality = true
# The following need to have changes made to be able to enable them:
# disallow_any_generics = true
-# disallow_untyped_calls = true
# no_implicit_optional = true
-# no_implicit_reexport = true
-# strict_equality = true
# warn_return_any = true
[[tool.mypy.overrides]] # Overrides for currently untyped modules
diff --git a/setup.cfg b/setup.cfg
index c9e3df628b827d5c082772a471ed105c9245835e..e7cddae370ff585fd0fe05b1a306659e633eddaf 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -40,7 +40,11 @@ dist =
wheel
# build
deploy = twine
-docs = sphinx-rtd-theme
+docs =
+ sphinx
+ sphinx-rtd-theme
+ sphinx-automodapi
+ numpydoc
[options.packages.find]
exclude =
@@ -76,6 +80,10 @@ max-complexity = 10
builder = html
warning-is-error = true
# keep-going = true
+# project = 'Target Core'
+# version = attr: target.__version__
+# release = attr: target.__version__
+# source-dir = 'docs'
[tox:tox]
# requires = tox-pipenv
diff --git a/target/__init__.py b/target/__init__.py
index 15400fd77bddc26eefabeeff0b3d7e11fbfbad6a..dabc48da0da186ce9facfc27f1eac62cac10bf8f 100644
--- a/target/__init__.py
+++ b/target/__init__.py
@@ -2,6 +2,7 @@
__version__ = '0.0.1'
+from typing import Any, Callable, Dict, TextIO, Tuple, Optional
import argparse
import json
from pathlib import Path
@@ -29,7 +30,7 @@ CONFIG_PARAMS = {
}
-def add_metadata_columns_to_schema(schema_message):
+def add_metadata_columns_to_schema(schema_message: Dict) -> Dict:
'''Metadata _sdc columns according to the stitch documentation at
https://www.stitchdata.com/docs/data-structure/integration-schemas#sdc-columns
@@ -47,7 +48,7 @@ def add_metadata_columns_to_schema(schema_message):
return schema_message
-def add_metadata_values_to_record(record_message, schema_message, timestamp):
+def add_metadata_values_to_record(record_message: Dict, schema_message: Dict, timestamp: datetime.datetime) -> Dict:
'''Populate metadata _sdc columns from incoming record message
The location of the required attributes are fixed in the stream
'''
@@ -64,7 +65,7 @@ def add_metadata_values_to_record(record_message, schema_message, timestamp):
return record_message['record']
-def remove_metadata_values_from_record(record_message):
+def remove_metadata_values_from_record(record_message: Dict) -> Dict:
'''Removes every metadata _sdc column from a given record message
'''
for key in {
@@ -82,7 +83,7 @@ def remove_metadata_values_from_record(record_message):
return record_message['record']
-def emit_state(state):
+def emit_state(state: Optional[Any]) -> None:
if state is not None:
line = json.dumps(state)
LOGGER.debug('Emitting state {}'.format(line))
@@ -90,7 +91,7 @@ def emit_state(state):
sys.stdout.flush()
-def float_to_decimal(value):
+def float_to_decimal(value: Any) -> Any:
'''Walk the given data structure and turn all instances of float into
double.'''
if isinstance(value, float):
@@ -102,22 +103,65 @@ def float_to_decimal(value):
return value
-def get_target_key(stream, config, date_time=None):
- '''Creates and returns an S3 key for the stream'''
+def get_target_key(stream: str, config: Dict[str, Any], date_time: datetime.datetime) -> str:
+ '''Creates and returns an S3 key for the stream
+
+ Parameters
+ ----------
+ stream : str
+ incoming stream name that is written in the file
+ config : dict
+ configuration dictionary
+ date_time : datetime
+ Date used in the path template
+
+ Returns
+ -------
+ out : ``str``
+ The formatted path.
+ '''
# NOTE: Replace dynamic tokens
- key = config.get('path_template').format(stream=stream, date_time=date_time, uuid=uuid4())
+ key = config['path_template'].format(stream=stream, date_time=date_time, uuid=uuid4())
prefix = config.get('key_prefix', '')
return str(Path(key).parent / f'{prefix}{Path(key).name}') if prefix else key
-def persist_lines(messages, config, save_records=save_jsonl_file):
+def persist_lines(messages: TextIO, config: Dict, save_records: Callable = save_jsonl_file) -> Tuple[Optional[Any], Dict[Any, Any]]:
+ '''Process the lines received from the Singer Tap.
+
+ This is the core of the messages processing.
+ Each line is processed function of its type, and the *RECORD* lines are saved using and functions provided as an argument.
+ By default they are written as a *jsonl* in the *work_dir* working directory provided in the default `config` file.
+
+ Parameters
+ ----------
+ config : dict
+ configuration dictionary.
+ date_time : datetime
+ Date (``datetime``) used in the path template
+
+ Raises
+ ------
+ json.decoder.JSONDecodeError
+ If the line structure is inconsistent or cnotains errors.
+
+ Returns
+ -------
+ out : list[dict, dict]
+ A `state` closure info.
+
+ See Also
+ --------
+ `Singer spec `_ *convention* and *protocol*.
+ '''
+
state = None
schemas = {}
key_properties = {}
- validators = {}
- file_data = {}
+ validators: Dict = {}
+ file_data: Dict = {}
# NOTE: Use the system specific temp directory if no custom work_dir provided
work_dir = Path(config.get('work_dir', gettempdir())).expanduser()
@@ -125,7 +169,7 @@ def persist_lines(messages, config, save_records=save_jsonl_file):
# NOTE: Create work_dir if not exists
work_dir.mkdir(parents=True, exist_ok=True)
- timezone = datetime.timezone(datetime.timedelta(hours=config.get('timezone_offset'))) if config.get('timezone_offset') is not None else None
+ timezone = datetime.timezone(datetime.timedelta(hours=config['timezone_offset'])) if config.get('timezone_offset') is not None else None
now = datetime.datetime.now(timezone)
for line in messages:
@@ -152,9 +196,7 @@ def persist_lines(messages, config, save_records=save_jsonl_file):
record_to_load = add_metadata_values_to_record(m, {}, now) if config.get('add_metadata_columns') else remove_metadata_values_from_record(m)
file_data[stream]['file_data'].append(record_to_load)
- # NOTE: write the lines into the temporary file when received data over 64Mb default memory buffer
- if sys.getsizeof(file_data[stream]['file_data']) > config.get('memory_buffer'):
- save_records(file_data[stream], config)
+ save_records(file_data[stream], config)
state = None
@@ -192,20 +234,23 @@ def persist_lines(messages, config, save_records=save_jsonl_file):
else:
LOGGER.warning('Unknown line type "{}" in line "{}"'.format(m['type'], m))
- for _, file_info in file_data.items():
- save_records(file_info, config)
-
return state, file_data
-def main():
+def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument('-c', '--config', help='Config file', required=True)
args = parser.parse_args()
+ config = config_compression(config_file(args.config))
- state, _ = persist_lines(
+ state, file_data = persist_lines(
sys.stdin,
- config_compression(config_file(args.config)))
+ config,
+ save_jsonl_file)
+
+ config['memory_buffer'] = 0
+ for _, file_info in file_data.items():
+ save_jsonl_file(file_info, config)
emit_state(state)
LOGGER.debug('Exiting normally')
diff --git a/target/file.py b/target/file.py
index 27c798820603f4f4628788776f536e66a2b7839a..16f13081b68c48453a9b627a347bdf4562549ea3 100644
--- a/target/file.py
+++ b/target/file.py
@@ -1,3 +1,5 @@
+from typing import Dict, Any
+import sys
import gzip
import lzma
import json
@@ -8,12 +10,12 @@ from target.logger import get_logger
LOGGER = get_logger()
-def config_file(config_path, datetime_format={
- 'date_time_format': '%FT%T.%f'}):
+def config_file(config_path: str, datetime_format: Dict = {
+ 'date_time_format': '%FT%T.%f'}) -> Dict:
- path_template_default = '{stream}-{date_time:%s}.json' % datetime_format['date_time_format']
+ path_template_default: str = '{stream}-{date_time:%s}.json' % datetime_format['date_time_format']
- config = {
+ config: Dict[str, Any] = {
'path_template': path_template_default,
'memory_buffer': 64e6
}
@@ -28,8 +30,8 @@ def config_file(config_path, datetime_format={
return config
-def config_compression(config_default):
- config = {
+def config_compression(config_default: Dict) -> Dict:
+ config: Dict[str, Any] = {
'compression': 'none'
}
config.update(config_default)
@@ -56,10 +58,11 @@ def config_compression(config_default):
return config
-def save_jsonl_file(file_data, config):
- if any(file_data['file_data']):
- with config.get('open_func')(file_data['file_name'], 'at', encoding='utf-8') as output_file:
+def save_jsonl_file(file_data: Dict, config: Dict[str, Any]) -> None:
+ # NOTE: write the lines into the temporary file when received data over 64Mb default memory buffer
+ if sys.getsizeof(file_data['file_data']) > config.get('memory_buffer', 0) and any(file_data['file_data']):
+ with config['open_func'](file_data['file_name'], 'at', encoding='utf-8') as output_file:
output_file.writelines((json.dumps(record) + '\n' for record in file_data['file_data']))
del file_data['file_data'][:]
- LOGGER.debug("'{}' file saved using open_func '{}'".format(file_data['file_name'], config.get('open_func').__name__))
+ LOGGER.debug("'{}' file saved using open_func '{}'".format(file_data['file_name'], config['open_func'].__name__))
diff --git a/target/logger.py b/target/logger.py
index 30c250d5fba1a39f6530ad177721bf1ed02e0347..7aac87a955c200b242dd3d8ba36d1b23e19d6ac0 100644
--- a/target/logger.py
+++ b/target/logger.py
@@ -1,8 +1,8 @@
from pathlib import Path
-from logging import config, getLogger
+from logging import config, getLogger, Logger
-def get_logger():
+def get_logger() -> Logger:
'''Return a Logger instance appropriate for using in a Tap or a Target.'''
# See
# https://docs.python.org/3.5/library/logging.config.html#logging.config.fileConfig
diff --git a/target/s3.py b/target/s3.py
index 2a62aa9fa6f8317ab916288f5f18513c9c6cb1c8..05558e3f01d04225195a717b1df4f4c63f9fb81c 100644
--- a/target/s3.py
+++ b/target/s3.py
@@ -2,14 +2,14 @@
import os
import backoff
import boto3
+from typing import Callable, Dict, Any
from botocore.exceptions import ClientError
from target.logger import get_logger
-
LOGGER = get_logger()
-def retry_pattern():
+def retry_pattern() -> Callable:
return backoff.on_exception(
backoff.expo,
ClientError,
@@ -18,12 +18,12 @@ def retry_pattern():
factor=10)
-def log_backoff_attempt(details):
+def log_backoff_attempt(details: Dict) -> None:
LOGGER.info("Error detected communicating with Amazon, triggering backoff: %d try", details.get("tries"))
@retry_pattern()
-def create_client(config):
+def create_client(config: Dict) -> Any:
LOGGER.info("Attempting to create AWS session")
# Get the required parameters from config file and/or environment variables
@@ -51,8 +51,8 @@ def create_client(config):
# pylint: disable=too-many-arguments
@retry_pattern()
-def upload_file(s3_client, filename, bucket, s3_key,
- encryption_type=None, encryption_key=None):
+def upload_file(s3_client: Any, filename: str, bucket: str, s3_key: str,
+ encryption_type: str = None, encryption_key: str = None) -> None:
if encryption_type is None or encryption_type.lower() == "none":
# No encryption config (defaults to settings on the bucket):
diff --git a/tests/test_target.py b/tests/test_target.py
index 7ed49f13f7039738dff988a94f6ccd11aea7d04c..6be0ba2bd24e6685ead2dadb908a1f9506e495cc 100644
--- a/tests/test_target.py
+++ b/tests/test_target.py
@@ -309,6 +309,7 @@ def test_get_target_key(config):
def test_persist_lines(caplog, config, input_data, input_multi_stream_data, invalid_row_data, invalid_order_data, state, file_metadata):
'''TEST : simple persist_lines call'''
+ config['memory_buffer'] = 0
output_state, output_file_metadata = persist_lines(input_multi_stream_data, config)
file_paths = set(path for path in Path(config['work_dir']).iterdir())