千家信息网

Apache Atlas是如何构建自己的API

发表于:2024-11-14 作者:千家信息网编辑
千家信息网最后更新 2024年11月14日,本篇文章为大家展示了Apache Atlas是如何构建自己的API,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。Apache Atlas是一个优秀的服务治理组
千家信息网最后更新 2024年11月14日Apache Atlas是如何构建自己的API

本篇文章为大家展示了Apache Atlas是如何构建自己的API,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。

Apache Atlas是一个优秀的服务治理组件,用于企业Hadoop集群上的数据治理和元数据管理的数据治理工具。接下来我们将讨论构建自己的Java API,这些Java API可使用Apache atlas客户端与Apache Atlas交互以在其中创建新的实体和类型。

一、Atlas客户端Maven依赖关系

以下依赖项可用于pom.xml文件

        org.apache.atlas      atlas-client      0.7-incubating          org.apache.atlas      atlas-typesystem      0.7-incubating          org.apache.atlas      atlas-notification      0.7-incubating          org.apache.atlas      atlas-repository      0.7-incubating  

二、设置atlas-application.properties

Apache Atlas客户端使用atlas-application属性在我们的API和Apache Atlas服务器之间建立连接。这些属性应放置在resources/atlas-application.properties中

#########  Security Properties  ########## SSL configatlas.enableTLS=false#########  Server Properties  #########atlas.rest.address=http://192.168.5.95:21000atlas.hook.demo.kafka.retries=1atlas.kafka.zookeeper.connect=192.168.5.93:2181,192.168.5.94:2181,192.168.5.95:2181atlas.kafka.bootstrap.servers=192.168.5.93:9092,192.168.5.94:9092,192.168.5.95:9092atlas.kafka.zookeeper.session.timeout.ms=4000atlas.kafka.zookeeper.connection.timeout.ms=2000atlas.kafka.zookeeper.sync.time.ms=20atlas.kafka.auto.commit.interval.ms=1000atlas.kafka.hook.group.id=atlas

三、创建与Atlas服务器的连接

要与Apache atlas Server,baseUrl和用户名创建连接,必须在AtlasClient构造函数中传递密码

final AtlasClient atlasClient = new AtlasClient            (new String[]{"http://192.168.5.95:21000"},                    new String[]{"admin",                            "admin"});

四、关于Type相关的测试类

public class AtlasTypesTest {    final AtlasClient atlasClient = new AtlasClient            (new String[]{"http://192.168.5.95:21000"},                    new String[]{"admin",                            "admin"});    static final String DATABASE_TYPE = "DB_Sync";    static final String COLUMN_TYPE = "Column_Sync";    static final String TABLE_TYPE = "Table_Sync";    static final String VIEW_TYPE = "View_Sync";    public static final String DB_ATTRIBUTE = "db";    static final String STORAGE_DESC_TYPE = "StorageDesc";    public static final String COLUMNS_ATTRIBUTE = "columns";    public static final String INPUT_TABLES_ATTRIBUTE = "inputTables";    private static final String[] TYPES =            {DATABASE_TYPE, TABLE_TYPE, STORAGE_DESC_TYPE, COLUMN_TYPE, VIEW_TYPE, "JdbcAccess",                    "ETL", "Metric", "PII", "Fact", "Dimension", "Log Data"};    /**     * 组织定义types     * @return     */    TypesDef createTypeDefinitions() {        HierarchicalTypeDefinition dbClsDef = TypesUtil                .createClassTypeDef(DATABASE_TYPE, DATABASE_TYPE, null,                        TypesUtil.createUniqueRequiredAttrDef("name", DataTypes.STRING_TYPE),                        attrDef("description", DataTypes.STRING_TYPE.getName()), attrDef("locationUri", DataTypes.STRING_TYPE.getName()),                        attrDef("owner", DataTypes.STRING_TYPE.getName()), attrDef("createTime", DataTypes.LONG_TYPE.getName()));        HierarchicalTypeDefinition columnClsDef = TypesUtil                .createClassTypeDef(COLUMN_TYPE, COLUMN_TYPE, null, attrDef("name", DataTypes.STRING_TYPE.getName()),                        attrDef("dataType", DataTypes.STRING_TYPE.getName()), attrDef("comment", DataTypes.STRING_TYPE.getName()));        HierarchicalTypeDefinition tblClsDef = TypesUtil                .createClassTypeDef(TABLE_TYPE, TABLE_TYPE, ImmutableSet.of("DataSet"),                        new AttributeDefinition(DB_ATTRIBUTE, DATABASE_TYPE, Multiplicity.REQUIRED, false, null),                        new AttributeDefinition("sd", STORAGE_DESC_TYPE, Multiplicity.REQUIRED, true, null),                        attrDef("owner", DataTypes.STRING_TYPE.getName()), attrDef("createTime", DataTypes.LONG_TYPE.getName()),                        attrDef("lastAccessTime", DataTypes.LONG_TYPE.getName()), attrDef("retention", DataTypes.LONG_TYPE.getName()),                        attrDef("viewOriginalText", DataTypes.STRING_TYPE.getName()),                        attrDef("viewExpandedText", DataTypes.STRING_TYPE.getName()), attrDef("tableType", DataTypes.STRING_TYPE.getName()),                        attrDef("temporary", DataTypes.BOOLEAN_TYPE.getName()),                        new AttributeDefinition(COLUMNS_ATTRIBUTE, DataTypes.arrayTypeName(COLUMN_TYPE),                                Multiplicity.COLLECTION, true, null));        HierarchicalTypeDefinition viewClsDef = TypesUtil                .createClassTypeDef(VIEW_TYPE, VIEW_TYPE, ImmutableSet.of("DataSet"),                        new AttributeDefinition("db", DATABASE_TYPE, Multiplicity.REQUIRED, false, null),                        new AttributeDefinition("inputTables", DataTypes.arrayTypeName(TABLE_TYPE),                                Multiplicity.COLLECTION, false, null));        return TypesUtil.getTypesDef(ImmutableList.of(), ImmutableList.of(),                ImmutableList.of(),                ImmutableList.of(dbClsDef, columnClsDef, tblClsDef, viewClsDef));    }    private void createTypes() throws Exception {        TypesDef typesDef = createTypeDefinitions();        String typesAsJSON =  TypesSerialization.toJson(typesDef);        System.out.println("typesAsJSON = " + typesAsJSON);        atlasClient.createType(typesAsJSON);        verifyTypesCreated();    }    private void verifyTypesCreated() throws Exception {        List types = atlasClient.listTypes();        for (String type : TYPES) {            assert types.contains(type);        }    }    AttributeDefinition attrDef(String name, String dT) {        return attrDef(name, dT, Multiplicity.OPTIONAL, false, null);    }    AttributeDefinition attrDef(String name, String dT, Multiplicity m, boolean isComposite,                                String reverseAttributeName) {        return new AttributeDefinition(name, dT, m, isComposite, reverseAttributeName);    }    @Test    public void createNewTypes() throws Exception {        createTypes();    }}

五、关于Entities相关的测试类

public class AtlasEntitiesTest {    final AtlasClient atlasClient = new AtlasClient            (new String[]{"http://192.168.5.95:21000"},                    new String[]{"admin",                            "admin"});    /**     * 创建实例并返创建的Id对象     * @param referenceable     * @return     * @throws Exception     */    private Id createInstance(Referenceable referenceable) throws Exception {        String typeName = referenceable.getTypeName();        String entityJSON = InstanceSerialization.toJson(referenceable, true);        System.out.println("Submitting new entity= " + entityJSON);        List guids = atlasClient.createEntity(entityJSON);        System.out.println("created instance for type " + typeName + ", guid: " + guids);        return new Id(guids.get(guids.size() - 1), referenceable.getId().getVersion(),                referenceable.getTypeName());    }    /**     * 创建数据库实例并返创建的数据库Id对象     * @param name     * @param description     * @param owner     * @param locationUri     * @param traitNames     * @return     * @throws Exception     */    Id database(String name, String description, String owner, String locationUri, String... traitNames)            throws Exception {        Referenceable referenceable = new Referenceable(DATABASE_TYPE, traitNames);        referenceable.set("name", name);        referenceable.set("description", description);        referenceable.set("owner", owner);        referenceable.set("locationUri", locationUri);        referenceable.set("createTime", System.currentTimeMillis());        return createInstance(referenceable);    }    /**     * 创建列的实例并返创建的列的实例对象     * @param name     * @param dataType     * @param comment     * @param traitNames     * @return     * @throws Exception     */    Referenceable column(String name, String dataType, String comment, String... traitNames) throws Exception {        Referenceable referenceable = new Referenceable(COLUMN_TYPE, traitNames);        referenceable.set("name", name);        referenceable.set("dataType", dataType);        referenceable.set("comment", comment);        return referenceable;    }    /**     * 创建表的实例并返创建的表的Id对象     * @param name     * @param description     * @param dbId     * @param sd     * @param owner     * @param tableType     * @param columns     * @param traitNames     * @return     * @throws Exception     */    Id table(String name, String description, Id dbId, Referenceable sd, String owner, String tableType,             List columns, String... traitNames) throws Exception {        Referenceable referenceable = new Referenceable(TABLE_TYPE, traitNames);        referenceable.set("name", name);        referenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name);        referenceable.set("description", description);        referenceable.set("owner", owner);        referenceable.set("tableType", tableType);        referenceable.set("createTime", System.currentTimeMillis());        referenceable.set("lastAccessTime", System.currentTimeMillis());        referenceable.set("retention", System.currentTimeMillis());        referenceable.set("db", dbId);        referenceable.set("sd", sd);        referenceable.set("columns", columns);        return createInstance(referenceable);    }    /**     * 创建视图的实例并返创建的视图的Id对象     * @param name     * @param dbId     * @param inputTables     * @param traitNames     * @return     * @throws Exception     */    Id view(String name, Id dbId, List inputTables, String... traitNames) throws Exception {        Referenceable referenceable = new Referenceable(VIEW_TYPE, traitNames);        referenceable.set("name", name);        referenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name);        referenceable.set("db", dbId);        referenceable.set(INPUT_TABLES_ATTRIBUTE, inputTables);        return createInstance(referenceable);    }    /**     * 原始存储描述符     * @param location     * @param inputFormat     * @param outputFormat     * @param compressed     * @return     * @throws Exception     */    Referenceable storageDescriptor(String location, String inputFormat, String outputFormat, boolean compressed)            throws Exception {        Referenceable referenceable = new Referenceable(STORAGE_DESC_TYPE);        referenceable.set("location", location);        referenceable.set("inputFormat", inputFormat);        referenceable.set("outputFormat", outputFormat);        referenceable.set("compressed", compressed);        return referenceable;    }    @Test    public void createEntities() throws Exception {        //创建数据库实例        Id syncDB = database("sy_sync", "Sync Database", "root", "");        //存储描述符        Referenceable sd =                storageDescriptor("", "TextInputFormat", "TextOutputFormat",                        true);        //创建列实例        //1、数据源        List databaseColumns = ImmutableList                .of(column("id", "long", "id"),                        column("name", "string", "name"),                        column("type", "string", "type"),                        column("url", "string", "url"),                        column("database_name", "string", "database name"),                        column("username", "string", "username"),                        column("password","string","password"),                        column("description", "string", "description"),                        column("create_time", "string", "create time"),                        column("update_time", "string", "update time"),                        column("create_id", "long", "user id"),                        column("update_id", "long", "user id"));        //2、同步文件夹        List syncFolderColumns = ImmutableList                .of(column("id", "long", "id"),                        column("name", "string", "name"),                        column("description", "string", "description"),                        column("create_time", "string", "create time"),                        column("update_time", "string", "update time"),                        column("create_id", "long", "user id"),                        column("update_id", "long", "user id"));        //创建表实例        Id database = table("datasource", "database table", syncDB, sd, "root", "External", databaseColumns);        Id syncFolder = table("folder", "sync folder table", syncDB, sd, "root", "External", syncFolderColumns);        //创建视图实例    }    @Test    public void getEntity() throws AtlasServiceException {        Referenceable referenceable = atlasClient.getEntity("1406ddd0-5d51-41d4-b174-859bd4f34a5b");        System.out.println(InstanceSerialization.toJson(referenceable, true));    }}

上述内容就是Apache Atlas是如何构建自己的API,你们学到知识或技能了吗?如果还想学到更多技能或者丰富自己的知识储备,欢迎关注行业资讯频道。

0