Skip to content
This repository has been archived by the owner on May 4, 2021. It is now read-only.

Commit

Permalink
Merge pull request #4 from SvenskaSpel/feature/hdfs-support
Browse files Browse the repository at this point in the history
Keep HDFS and Hive in sync
  • Loading branch information
mrunesson authored Sep 26, 2018
2 parents 83430d6 + fe202c7 commit 03740af
Show file tree
Hide file tree
Showing 17 changed files with 1,297 additions and 97 deletions.
12 changes: 9 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,13 @@ Cobra-policytool can manage have row level filtering policies for
[Apache Hive](https://hive.apache.org/) based on tags. Ranger requires one
row level policy per table, but with cobra-policytool one can
have one rule per tag. This rule is then expanded by the cobra-policytool
to one rule fore each table having the tag. This eases the maintenance
and reduce risks for errors.
to one rule fore each table having the tag.

Most often one want to have the same access rights hive tables and corresponding
files and directories on hdfs. Cobra-policytool can automatically convert a policy
for a Hive table to policy for hdfs.

This eases the maintenance and reduce risks for errors.


To be able to use the tool you need to have the right permissions in the
Expand Down Expand Up @@ -68,6 +73,7 @@ out in the [LICENSE](LICENSE.txt).

### Tagging of resources
* Sync of table and column tags from metadata files to Atlas.
* Keep tags between hive corresponding directory on hdfs in sync (use option --hdfs)
* Audit to show differences between metadata and Atlas.
* New tag definitions are automatically added to Atlas on sync.
* Verbose output to provide changes done.
Expand Down Expand Up @@ -148,7 +154,7 @@ We recommend to read the [convention document](docs/Conventions.md) and


---
Copyright 2015 AB SvenskaSpel
Copyright 2018 AB SvenskaSpel

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down
35 changes: 30 additions & 5 deletions docs/Configfile.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# Config file

Before using cobra-policytool for first time you need to configure it for your
environment.
environment, eg point out atlas and ranger servers. Cobra-policytool expect that
everything authenticate using Kerberos.

## Location of config file
You can either provide the config file with the argument `-c` which is the
Expand All @@ -21,17 +22,35 @@ option `--environment`, see also [conventions](Conventions.md). Example:
{
"name": "prod",
"atlas_api_url": "http://atlas.prod.myorg.com:21000/api/atlas",
"ranger_api_url": "http://ranger.prod.myorg.com:6080"
"ranger_api_url": "http://ranger.prod.myorg.com:6080",
"hive_server": "hiveserver2.prod.myorg.com",
"hive_port": "10000"
},{
"name": "test",
"atlas_api_url": "http://atlas.test.myorg.com:21000/api/atlas",
"ranger_api_url": "http://ranger.test.myorg.com:6080"
"ranger_api_url": "http://ranger.test.myorg.com:6080",
"hive_server": "hiveserver2.test.myorg.com",
"hive_port": "10000"
}]
}
```
The two environments in the config file above shows the minimum config you need
for each environment.

If you want to let cobra-policytool to set policies or tags on hdfs paths corresponding
to the Hive tables you manage you must also point out your hive server:
```
{"environments": [
{
"name": "prod",
"atlas_api_url": "http://atlas.prod.myorg.com:21000/api/atlas",
"ranger_api_url": "http://ranger.prod.myorg.com:6080",
"hive_server": "hiveserver2.prod.myorg.com",
"hive_port": "10000"
}]
}
```

In ranger_policies.json you can refer to variables. These can be defined in a variables section
for each environment. This makes our policy definitions very powerful and easy to use the same
file for different setups. In the following example we have defined three environment "prod", "autotest" and
Expand All @@ -42,6 +61,8 @@ file for different setups. In the following example we have defined three enviro
"name": "prod",
"atlas_api_url": "http://atlas.prod.host:21000/api/atlas",
"ranger_api_url": "http://ranger.prod.host:6080",
"hive_server": "hiveserver2.prod.myorg.com",
"hive_port": "10000"
"variables": [
{ "name": "installation",
"value": "prod"}
Expand All @@ -50,6 +71,8 @@ file for different setups. In the following example we have defined three enviro
"name": "autotest",
"atlas_api_url": "http://atlas.test.host:21000/api/atlas",
"ranger_api_url": "http://ranger.test.host:6080",
"hive_server": "hiveserver2.test.myorg.com",
"hive_port": "10000"
"variables": [
{ "name": "installation",
"value": "test"}
Expand All @@ -58,6 +81,8 @@ file for different setups. In the following example we have defined three enviro
"name": "misctest",
"atlas_api_url": "http://atlas.test.host:21000/api/atlas",
"ranger_api_url": "http://ranger.test.host:6080",
"hive_server": "hiveserver2.test.myorg.com",
"hive_port": "10000"
"variables": [
{ "name": "installation",
"value": "test"}
Expand All @@ -68,7 +93,7 @@ file for different setups. In the following example we have defined three enviro
```

Introducing the variable `installation` gives us the possibility to have one variable that defines meaning
per cluster. This is useful for instance for services in the policy file, both "autotest" and "misctest" will
have the same services. If we prefix or suffix our service name with prod and test respectively we can use
per cluster. One useful example of this is if you have one hadoop cluster you run in multitenancy mood and
have several different environments. If we prefix or suffix our service name with prod and test respectively we can use
the installation variable in our policy file. You can see how this is done in our
[example file](../example/ranger_policies.json).
9 changes: 9 additions & 0 deletions docs/indata.md
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,10 @@ access must be left out.
```
{
"command": "apply_rule",
"options": {
"expandHiveResourceToHdfs": true,
"hdfsService": "svs${installation}_hadoop"
},
"policy": {
"service": "${installation}_hive",
"name": "${project_name}_${environment}_vanilla",
Expand Down Expand Up @@ -254,6 +258,11 @@ right to delegate its rights to other users
You may have many access rules for one resource but they must all be in the
same policy object and listed as different policy items.

The `options` part in the example tells cobra-policytool to also create a corresponding
rule for hdfs. The option `hdfsService` is used for point out the name of the hdfs service
in Ranger. Note that this can be used both when you point out tables explicitly as in the example
or when using tags.


#### Masking policy

Expand Down
2 changes: 2 additions & 0 deletions example/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
"name": "prod",
"atlas_api_url": "http://atlas.host:21000/api/atlas",
"ranger_api_url": "http://ranger.host:6080",
"hive_server": "hive.server.2",
"hive_port": "10000",
"variables": [
{ "name": "user_suffix",
"value": ""},
Expand Down
80 changes: 70 additions & 10 deletions policytool/atlas.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import requests

import urlutil


class Client:

Expand All @@ -14,28 +16,37 @@ def __init__(self, url_prefix, auth=None):
def _search(self, query):
return requests.post(self.url_prefix + "/v2/search/basic", json=query, auth=self.auth)

def _post_entity(self, entity):
return requests.post(self.url_prefix + "/v2/entity", json=entity, auth=self.auth)

def _create_qualifiedname_query(self, type_name, *values):
"""
See _get_qualified_name for reason why implemented like this.
:param type: type of expected entities
:param values: Provide as many as you know of schema, table, column in that order.
:return: Query to be sent to Atlas API.
"""
query = {}
query['typeName'] = type_name
query['excludeDeletedEntities'] = True
query['limit'] = 10000
entity_filter = {}
entity_filter['condition']='AND'
query = {
'typeName': type_name,
'excludeDeletedEntities': True,
'limit': 10000
}
entity_filter = {
'condition': 'AND'
}
criterion = []
n = 0
for v in values:
criteria = {}
criteria['attributeName'] = 'qualifiedName'
criteria['operator'] = 'STARTSWITH' if n==0 else 'CONTAINS'
criteria['attributeValue'] = v
criteria = {
'attributeName': 'qualifiedName',
'operator': 'STARTSWITH' if n == 0 else 'CONTAINS',
'attributeValue': v
}
n += 1
criterion.append(criteria)
if type_name == 'hive_table':
# Ignore temporary tables.
criterion.append({'operator': '=', 'attributeName': 'temporary', 'attributeValue': False})
entity_filter['criterion'] = criterion
query['entityFilters'] = entity_filter
return query
Expand Down Expand Up @@ -170,6 +181,55 @@ def add_tag_definitions(self, tags):
if response.status_code != 200:
raise AtlasError(response.content, response.status_code)

def get_tags_on_guid(self, guid):
"""
Return tags on the entity guid in Atlas.
:param guid: Guid to find tags for
:return: List of tags.
"""
response = requests.get(self.url_prefix + "/entities/" + guid, auth=self.auth)
if response.status_code == 200:
json_response = response.json()
if json_response['definition'].has_key('traitNames'):
return set(json_response['definition']['traitNames'])
else:
return set()
else:
raise AtlasError("Cannot look up guid {}.".format(guid, response.status_code))

def add_hdfs_path(self, hdfs_path):
"""
Post to http://atlas.hadoop.svenskaspel.se/api/atlas/v2/entity
Post data {"entity":{"typeName":"hdfs_path","attributes":{"description":null,"name":"hdfs://svsprod/apps/hive/warehouse/hadoop_out_prod.db/country_d","owner":null,"qualifiedName":"hdfs://svsprod/apps/hive/warehouse/hadoop_out_prod.db/country_d","createTime":1536098400000,"fileSize":null,"group":null,"isFile":null,"isSymlink":null,"modifiedTime":1536098400000,"path":"hdfs://svsprod/apps/hive/warehouse/hadoop_out_prod.db/country_d","clusterName":null,"numberOfReplicas":null},"guid":-1},"referredEntities":{}}
Response: {"mutatedEntities":{"CREATE":[{"typeName":"hdfs_path","attributes":{"qualifiedName":"hdfs://svsprod/apps/hive/warehouse/hadoop_out_prod.db/country_d"},"guid":"e20823a6-5521-4dc9-b2a7-b5a1d9babecd","status":"ACTIVE"}]},"guidAssignments":{"-1":"e20823a6-5521-4dc9-b2a7-b5a1d9babecd"}}
:param hdfs_path: Full url to the file or directory hdfs://environment/my/path/
:return: guid assigned
"""
cluster_name = urlutil.get_host(hdfs_path)
name = urlutil.get_path(hdfs_path)
entity = {
"entity": {
"typeName": "hdfs_path",
"attributes": {
"description": "Created/Updated by cobra-policytool.",
"name": name,
"qualifiedName": hdfs_path,
"path": hdfs_path,
"clusterName": cluster_name,
},
"guid": -1
},
}
response = self._post_entity(entity)
if response.status_code == 200:
json_response = response.json()
if json_response.has_key('guidAssignments'):
return json_response['guidAssignments']['-1']
else:
AtlasError("Failed to add hdfs path {} content mismatch {}".format(hdfs_path, response.content))
else:
raise AtlasError(response.content, response.status_code)


class AtlasError(Exception):
def __init__(self, message, http_code=None):
Expand Down
30 changes: 22 additions & 8 deletions policytool/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from click import ClickException
from requests_kerberos import HTTPKerberosAuth
import atlas
import hive
import tagsync
import ranger
import rangersync
Expand Down Expand Up @@ -31,19 +32,22 @@ def cli():
pass


def _tags_to_atlas(srcdir, environment, retry, verbose, config):
def _tags_to_atlas(srcdir, environment, hdfs, retry, verbose, config):
conf = JSONPropertiesFile(config).get(environment)
table_file = os.path.join(srcdir, 'table_tags.csv')
column_file = os.path.join(srcdir, 'column_tags.csv')
missing_files = _missing_files([table_file, column_file])
if len(missing_files) != 0:
print("Following files are missing: " ", ".join(missing_files))
print("Following files are missing: " + ", ".join(missing_files))
print("Will not run, exiting!")
return 0

auth = HTTPKerberosAuth()
atlas_client = atlas.Client(conf['atlas_api_url'], auth=auth)
sync_client = tagsync.Sync(atlas_client, retry*conf.get('retries', 1), SLEEP_ON_RETRY_SECONDS)
hive_client = None
if hdfs:
hive_client = hive.Client(conf['hive_server'], conf['hive_port'])
sync_client = tagsync.Sync(atlas_client, retry*conf.get('retries', 1), SLEEP_ON_RETRY_SECONDS, hive_client)

try:
if verbose > 0:
Expand All @@ -57,18 +61,25 @@ def _tags_to_atlas(srcdir, environment, retry, verbose, config):
log = sync_client.sync_column_tags(tagsync.add_environment(src_data_column, environment))
if verbose > 0:
tagsync.print_sync_worklog(log)
if hdfs:
if verbose > 0:
print("Syncing tags for table storage.")
log = sync_client.sync_table_storage_tags(src_data_table)
if verbose > 0:
tagsync.print_sync_worklog(log)
except (tagsync.SyncError, IOError) as e:
raise ClickException(e.message + "\nTag sync not complete, fix errors and re-run.")


@cli.command("tags_to_atlas", help="sync tags from source files to Atlas.")
@click.option('-s', '--srcdir', help='The schema for the generated table', default='src/main/tags')
@click.option('-e', '--environment', help='Destination environment', required=True)
@click.option('--hdfs/--no-hdfs', help='Set tags on hive tables corresponding hdfs directory.', default=False)
@click.option('-r', '--retry', help='Retry on fail. Number of retries is controlled by \'retries\' in config.', count=True)
@click.option('-v', '--verbose', help='Provide verbose output', count=True)
@click.option('-c', '--config', help='Config file', type=click.Path(exists=True))
def tags_to_atlas(srcdir, environment, retry, verbose, config):
_tags_to_atlas(srcdir, environment, retry, verbose, config)
def tags_to_atlas(srcdir, environment, hdfs, retry, verbose, config):
_tags_to_atlas(srcdir, environment, hdfs, retry, verbose, config)


def _rules_to_ranger_cmd(srcdir, project_name, environment, config, verbose, dryrun):
Expand Down Expand Up @@ -102,13 +113,16 @@ def _rules_to_ranger_cmd(srcdir, project_name, environment, config, verbose, dry
"table_columns": table_columns,
}

if conf.has_key('hive_server'):
context_dict['hive_client'] = hive.Client(conf['hive_server'], conf['hive_port'])

# Add variables from config to context_dict.
for var in conf.get('variables', []):
context_dict[var['name']] = var['value']

context = Context(context_dict)

with open(policy_file) as f:
with open(policy_file, 'rU') as f:
policy_commands = json.load(f)

policies = rangersync.apply_commands(policy_commands, context)
Expand Down Expand Up @@ -153,9 +167,9 @@ def _audit(srcdir, environment, config):
print("Tag(s) missing in Atlas: " + ", ".join(diff_tags).decode("utf-8"))

schemas = tagsync.schemas_from_src(src_data_table)
full_tables_atlas = sync_client.tables_from_atlas(schemas)
full_tables_atlas = sync_client.get_tables_for_schema_from_atlas(schemas)
tables = tagsync.tables_from_src(src_data_column)
full_columns_atlas = sync_client.columns_from_atlas(tables)
full_columns_atlas = sync_client.get_columns_for_tables_from_atlas(tables)

# Tables only in Atlas
tables_atlas = set(full_tables_atlas.keys())
Expand Down
Loading

0 comments on commit 03740af

Please sign in to comment.