Create a dataflow for protocols sources Using the Flow Service API
This tutorial covers the steps for retrieving data from a protocols source and bringing them to Platform using Flow Service API.
Getting started
This tutorial also requires you to have a working understanding of the following components of Adobe Experience Platform:
-
Experience Data Model (XDM) System: The standardized framework by which Experience Platform organizes customer experience data.
- Basics of schema composition: Learn about the basic building blocks of XDM schemas, including key principles and best practices in schema composition.
- Schema Registry developer guide: Includes important information that you need to know in order to successfully perform calls to the Schema Registry API. This includes your
{TENANT_ID}
, the concept of “containers”, and the required headers for making requests (with special attention to the Accept header and its possible values).
-
Catalog Service: Catalog is the system of record for data location and lineage within Experience Platform.
-
Batch ingestion: The Batch Ingestion API allows you to ingest data into Experience Platform as batch files.
-
Sandboxes: Experience Platform provides virtual sandboxes which partition a single Platform instance into separate virtual environments to help develop and evolve digital experience applications.
Using Platform APIs
For information on how to successfully make calls to Platform APIs, see the guide on getting started with Platform APIs.
Create a source connection source
You can create a source connection by making a POST request to the Flow Service API. A source connection consists of a connection ID, a path to the source data file, and a connection spec ID.
To create a source connection, you must also define an enum value for the data format attribute.
Use the following the enum values for file-based connectors:
delimited
json
parquet
For all table-based connectors, set the value to tabular
.
API format
POST /sourceConnections
Request
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/sourceConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Generic OData source connection",
"baseConnectionId": "a5c6b647-e784-4b58-86b6-47e784ab580b",
"description": "Generic OData source connection",
"data": {
"format": "tabular",
},
"params": {
"tableName": "Orders",
"columns": [
{
"name": "OrderID",
"type": "integer",
"xdm": {
"type": "integer",
"minimum": -2147483648,
"maximum": 2147483647
}
},
{
"name": "CustomerID",
"type": "string",
"xdm": {
"type": "string"
}
},
{
"name": "OrderDate",
"type": "string",
"meta:xdmType": "date-time",
"xdm": {
"type": "string",
"format": "date-time"
}
},
{
"name": "ShippedDate",
"type": "string",
"meta:xdmType": "date-time",
"xdm": {
"type": "string",
"format": "date-time"
}
}
]
},
"connectionSpec": {
"id": "8e6b41a8-d998-4545-ad7d-c6a9fff406c3",
"version": "1.0"
}
}'
baseConnectionId
params.path
connectionSpec.id
Response
A successful response returns the unique identifier (id
) of the newly created source connection. This ID is required in later steps to create a target connection.
{
"id": "0a768941-ddfb-499d-b689-41ddfbf99db0",
"etag": "\"8f00753e-0000-0200-0000-5e8a547d0000\""
}
Create a target XDM schema target-schema
In order for the source data to be used in Platform, a target schema must be created to structure the source data according to your needs. The target schema is then used to create a Platform dataset in which the source data is contained.
A target XDM schema can be created by performing a POST request to the Schema Registry API.
For detailed steps on how to create a target XDM schema, see the tutorial on creating a schema using the API.
Create a target dataset target-dataset
A target dataset can be created by performing a POST request to the Catalog Service API, providing the ID of the target schema within the payload.
For detailed steps on how to create a target dataset, see the tutorial on creating a dataset using the API.
Create a target connection target-connection
A target connection represents the connection to the destination where the ingested data lands in. To create a target connection, you must provide the fixed connection spec ID associated to the Data Lake. This connection spec ID is: c604ff05-7f1a-43c0-8e18-33bf874cb11c
.
You now have the unique identifiers a target schema a target dataset and the connection spec ID to data lake. Using the Flow Service API, you can create a target connection by specifying these identifiers along with the dataset that will contain the inbound source data.
API format
POST /targetConnections
Request
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/targetConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Target Connection for protocols",
"description": "Target Connection for protocols",
"data": {
"format": "parquet_xdm",
"schema": {
"id": "https://ns.adobe.com/{TENANT_ID}/schemas/e669d7aba5a02f294fafb7b269af25f7cd4a66ce59193545",
"version" "application/vnd.adobe.xed-full+json;version=1"
}
},
"params": {
"dataSetId": "5e8a55ca53662c18af37a83a"
},
"connectionSpec": {
"id": "c604ff05-7f1a-43c0-8e18-33bf874cb11c",
"version": "1.0"
}
}'
data.schema.id
$id
of the target XDM schema.params.dataSetId
connectionSpec.id
c604ff05-7f1a-43c0-8e18-33bf874cb11c
.Response
A successful response returns the new target connection’s unique identifier (id
). This value is required in a later step to create a dataflow.
{
"id": "576d5ecf-f114-4587-ad5e-cff1144587f4",
"etag": "\"13013506-0000-0200-0000-5e8a56d80000\""
}
Create a mapping mapping
In order for the source data to be ingested into a target dataset, it must first be mapped to the target schema that the target dataset adheres to.
To create a mapping set, make a POST request to the mappingSets
endpoint of the Data Prep API while providing your target XDM schema $id
and the details of the mapping sets you want to create.
API format
POST /mappingSets
Request
curl -X POST \
'https://platform.adobe.io/data/foundation/conversion/mappingSets' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"version": 0,
"xdmSchema": "https://ns.adobe.com/{TENANT_ID}/schemas/e669d7aba5a02f294fafb7b269af25f7cd4a66ce59193545",
"xdmVersion": "1.0",
"id": null,
"mappings": [
{
"destinationXdmPath": "_id",
"sourceAttribute": "OrderID",
"identity": false,
"identityGroup": null,
"namespaceCode": null,
"version": 0
},
{
"destinationXdmPath": "_id",
"sourceAttribute": "CustomerID",
"identity": false,
"identityGroup": null,
"namespaceCode": null,
"version": 0
},
{
"destinationXdmPath": "_id",
"sourceAttribute": "EmployeeID",
"identity": false,
"identityGroup": null,
"namespaceCode": null,
"version": 0
},
{
"destinationXdmPath": "createdByBatchID",
"sourceAttribute": "OrderDate",
"identity": false,
"identityGroup": null,
"namespaceCode": null,
"version": 0
}
]
}'
xdmSchema
$id
of the target XDM schema.Response
A successful response returns details of the newly created mapping including its unique identifier (id
). This ID is required in a later step to create a dataflow.
{
"id": "37409d3017e24a3eb4a2dc21020f7a5b",
"version": 0,
"createdDate": 1586124873209,
"modifiedDate": 1586124873209,
"createdBy": "{CREATED_BY}",
"modifiedBy": "{MODIFIED_BY}"
}
Look up dataflow specifications specs
A dataflow is responsible for collecting data from sources and bringing them into Platform. In order to create a dataflow, you must first obtain the dataflow specifications that are responsible for collecting data from your protocols application.
API format
GET /flowSpecs?property=name=="CRMToAEP"
Request
curl -X GET \
'https://platform.adobe.io/data/foundation/flowservice/flowSpecs?property=name=="CRMToAEP"' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}'
Response
A successful response returns the details of the dataflow specification responsible for bringing data from your source into Platform. The response includes the unique flow spec id
required to create a new dataflow.
code language-json |
---|
|
Create a dataflow
The last step towards collecting data is to create a dataflow. At this point, you should have the following required values prepared:
A dataflow is responsible for scheduling and collecting data from a source. You can create a dataflow by performing a POST request while providing the previously mentioned values within the payload.
To schedule an ingestion, you must first set the start time value to epoch time in seconds. Then, you must set the frequency value to one of the five options: once
, minute
, hour
, day
, or week
. The interval value designates the period between two consecutive ingestions and creating a one-time ingestion does not require an interval to be set. For all other frequencies, the interval value must be set to equal or greater than 15
.
API format
POST /flows
Request
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/flows' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Creating a dataflow for a protocols source",
"description": "Creating a dataflow for a protocols source",
"flowSpec": {
"id": "14518937-270c-4525-bdec-c2ba7cce3860",
"version": "1.0"
},
"sourceConnectionIds": [
"0a768941-ddfb-499d-b689-41ddfbf99db0"
],
"targetConnectionIds": [
"576d5ecf-f114-4587-ad5e-cff1144587f4"
],
"transformations": [
{
"name": "Copy",
"params": {
"deltaColumn": {
"name": "updatedAt",
"dateFormat": "YYYY-MM-DD",
"timezone": "UTC"
}
}
},
{
"name": "Mapping",
"params": {
"mappingId": "7409d3017e24a3eb4a2dc21020f7a5b",
"mappingVersion": 0
}
}
],
"scheduleParams": {
"startTime": "1567411548",
"frequency":"minute",
"interval":"30"
}
}'
flowSpec.id
sourceConnectionIds
targetConnectionIds
transformations.params.mappingId
transformations.params.deltaColum
deltaColumn
when using Generic OData is yyyy-MM-ddTHH:mm:ssZ
.transformations.params.mappingId
scheduleParams.startTime
scheduleParams.frequency
once
, minute
, hour
, day
, or week
.scheduleParams.interval
once
and should be greater than or equal to 15
for other frequency values.Response
A successful response returns the ID id
of the newly created dataflow.
{
"id": "8256cfb4-17e6-432c-a469-6aedafb16cd5",
"etag": "\"04004fe9-0000-0200-0000-5ebc4c8b0000\""
}
Monitor your dataflow
Once your dataflow has been created, you can monitor the data that is being ingested through it to see information on flow runs, completion status, and errors. For more information on how to monitor dataflows, see the tutorial on monitoring dataflows in the API
Next steps
By following this tutorial, you have created a source connector to collect data from a protocols application on a scheduled basis. Incoming data can now be used by downstream Platform services such as Real-Time Customer Profile and Data Science Workspace. See the following documents for more details: