Technical Articles
SAP Data Intelligence – SAPシステムとnon-SAPシステムのデータ連携
はじめに
SAP Data Intelligence(以降、DI)は、「データ管理」と「AIの開発・運用」を一つに統合したソリューションです。詳細については、SAP Data Intelligenceで実現するエンタープライズAIをご参照ください。
本ブログの目的
DIのパイプラインを使うことで、SAPやnon-SAPにまたがってデータが散らばる複雑なランドスケープにおいて、データを効率よく連携・管理できることを理解。
前提知識
- ABAP:CDS Viewを書いたことがあるレベル
説明の流れ
- DI-S/4 HANAとDI-Kafkaの接続設定
- パイプライン作成
- パイプライン実行
本内容は、IT管理者やデータエンジニアに実施して頂く想定になります。
作成するパイプライン
CDS Viewの内容を読み取り、JSON形式に変換した上でnon-SAPシステムに格納するパイプラインを作成します。
non-SAPシステムとしてApache Kafka(以降、Kafka)を使用しますが、Kafkaの知識は必要ありません。
補足)Kafkaとは?
高スループット、かつ低レイテンシなオープンソースの分散ストリーミングプラットフォームです。
1. DI-S/4 HANAとDI-Kafkaの接続設定
ここでは、読み取り対象のCDS Viewを準備し、DIのConnection ManagementからS/4 HANAとKafkaへの接続設定を行います。
補足)Connection Managementは、SAPやnon-SAPの様々なデータソースへの接続を定義し、管理する機能です。ここで定義した接続設定を用いることで、データカタログ機能によりDBのテーブルを確認したり、パイプライン機能によりデータ連携先として簡単にデータソースにアクセスできるようになります。
CDS View準備
まずは、ABAP Development Tools(ADT)でDIの読み取り対象になるCDS Viewを準備します。
補足)CDS Viewの作成方法がわからない方は、こちらをご参照ください。
DIから読み取りが出来るようにするために、CDS Viewに以下のAnnotationを付与します。
@Analytics: {
dataExtraction: {
enabled: true,
delta.changeDataCapture.automatic: true
}
}
今回は以下のようなCDS Viewを使用します。
@AbapCatalog.sqlViewName: 'ZCDSMARA_0'
@AbapCatalog.compiler.compareFilter: true
@AbapCatalog.preserveKey: true
@AccessControl.authorizationCheck: #CHECK
@EndUserText.label: 'ZCDS MARA test'
@Analytics: {
dataExtraction: {
enabled: true,
delta.changeDataCapture.automatic: true
}
}
define view ZCDS_MARA_0 as select from mara {
key matnr,
ersda,
created_at_time,
scm_matid_guid16
}
次に、ADTで読み取り対象のCDS Viewに以下のようにデータが入っていることを確認します。
DIとS/4 HANAの接続設定
DIのLaunchpadで、Connection Managementを選択します。
“Create Connection”ボタン(+)を押します。
“Connection Type”で”ABAP”を選択します。
接続対象となるS/4 HANAのホスト名、認証情報等を設定し、”Create”ボタンを押します。
Connectionの一覧画面で、”Action”ボタンから”Check Status”を実行します。
以下のようにOKと表示されたら、対象のS/4 HANAとの接続に成功しています。
DIとKafkaの接続設定
Connection一覧画面に戻り、“Create Connection”ボタン(+)を押します。
“Connection Type”で”Kafka”を選択します。
接続対象となるKafkaのホスト名、ポート等を設定し、”Create”ボタンを押します。
これでDIとKafkaの接続設定は完了です。
2. パイプライン作成
ここでは、CDS Viewを読み取り、JSONに変換してKafkaに格納するまでを行うDIのパイプラインを作成します。
新規パイプライン作成
DIのLaunchpadで、”Modeler”を選択します。
“Graphs”を選択してから、”Create Graph”ボタンをクリックします。
補足)Graphというのは、パイプラインのフローのことを指します。
“Save”ボタンを押し、作成したGraphに名前を付けます。
今回は”CDS to Kafka”という名前で保存します。
フロー作成:CDS View読み取り
ここからは、Operatorという小さな機能の単位を組み合わせてデータの流れを制御するフローを作成します。
まずはCDS Viewを読み取るためのOperatorである”ABAP CDS Reader”をダブルクリックし、フロー内に配置します。
読み取るCDS Viewを設定するために、フロー内の”ABAP CDS Reader”を選択し、”Open Configuration”をクリックします。
指定した”ABAP CDS Reader”オペレーターの設定画面が右側に開きます。
“ABAP Connection”のドロップダウンボタンをクリックするとConnection Managementで設定したABAP接続の一覧が表示されるため、接続対象のS/4 HANAを選択します。
“Version”の選択ボックスをクリックすると、”Select Version”画面が表示されます。
ABAP CDS Readerに複数のバージョンがありますが、最新版のV2を選択します。
補足)新規にパイプラインを作成する際は、古いバージョンで検証を行っていた等の特別な事情がない限り、新しいバージョンを利用してください。
“ABAP CDS Name”に対象の読み取り対象のCDS View名を記載します。
“Transfer Mode”は今回はInitial Loadを指定します。
ここまででCDS Viewの読み取り設定は完了です。
フロー作成:CSV ⇒ JSON変換
次に、CDS Viewから読み取ったCSVをJSONに変換するフローを作成します。
CSVからJSONに変更する機能を持つ”Format Converter”オペレーターをダブルクリックし、フロー内に配置します。
“ABAP CDS Reader V2″オペレーターと”Format Converter”オペレーターを接続するために、各々のOutputポートとInputポートのデータ形式を確認します。
まずは、”ABAP CDS Reader V2″オペレーターのOutputポートにマウスカーソルを合わせると、以下のように”Data Type: message”と表示されます。
次に、”Format Converter”オペレーターのInputポートにマウスカーソルを合わせると、以下のように”Data Type: blob”と表示されます。
接続したいオペレーター同士のデータ形式が違うため、これを統一する必要があります。
message形式のデータをblob形式に変換するために、”blob”で検索すると、”ToBlob Converter”オペレーターというものが見つかるので、ダブルクリックしてフロー内に配置します。
先ほどと同じように”ToBlob Converter”オペレーターのInputポート、Outputポートを確認すると、Inputが”Data Type: any”、Outputが”Data Type: blob”となり、”ABAP CDS Reader V2″オペレーターと”Format Converter”オペレーターを接続する際のデータ形式の仲介役に利用できることがわかります。
接続したいOperatorをそれぞれ、以下のようにマウスでドラッグして接続します。
次に、”Format Converter”を選択し、”Open Configuration”をクリックします。
“Target Format”に返還後のデータ形式である”JSON”を指定します。
“Fields”にJSON変換時のヘッダー情報を記入します。
冒頭のCDS View確認時のヘッダーを参考に、”Fields”は以下のように設定します。
これでCSV ⇒ JSONのフローが完成です。
フロー作成:Kafkaへの書き込み
最後に、Kafkaへのデータ書き込みのフローを作成します。
“Kafka”で検索すると、以下のように2つのオペレーターが表示されます。
ConsumerはKafkaからデータを取得するオペレーターで、ProducerはKafkaにデータを書き込むオペレーターです。
今回はKafkaに対してデータを書き込みたいので、”Kafka Producer”オペレーターをダブルクリックしてフロー内に配置します。
“Format Converter”オペレーターのOutputポートと、”Kafka Producer”オペレーターのInputポートを接続します。
Kafkaへのデータ書き込み設定をするために、フロー内の”Kafka Producer”を選択し、”Open Configuration”をクリックします。
”Kafka Producer”オペレーターの設定画面が右側に開きます。
“Connection Type”のドロップダウンボタンをクリックし、”connection management”を選択します。
補足)”manual”を選択した場合は、Kafkaのホスト名等、接続情報を記入します。
今回は事前にConnection ManagementでKafkaの接続情報を設定済みのため、既存の接続情報からKafkaの宛先を選ぶことができる”connection management”を選択しました。
次に、”Kafka Producer Connection”の”Editproperty”ボタンをクリックします。
“Edit property ‘Kafka Producer Connection'”画面が表示されるので、Connection IDのドロップダウンのリストから接続対象のKafkaを選択し、”save”ボタンをクリックします。
“Topic”にデータ書き込み先のKafkaの対象トピック名を記入します。
補足)トピックというのは、Kafkaにおけるデータストリーミングの1つの箱のようなものだと考えてください。
ここまででフローの作成は完了です。”Save”ボタンをクリックしてフローを保存します(Ctrl+Sでも可)。
3. パイプライン実行
パイプライン実行前の環境確認
以下のようにKafkaに格納されたデータを確認すると、パイプライン実行前は空であることがわかります。
# ./bin/kafka-console-consumer.sh --bootstrap-server <Kafka Host>:<Kafka Port> --topic cdsTestTopic --from-beginning
パイプライン実行
Modeler画面の“Run”ボタンで作成したパイプラインを実行します。
“Status”欄に実行状態が表示されます。”Running”と表示されたら、パイプラインによるデータ転送処理を実行中です。
パイプライン実行後の環境確認
再度Kafkaに格納されたデータを確認すると、以下のようにデータが格納されていることがわかります。
# ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic cdsTestTopic --from-beginning
[
{"created_at":"11:49:49.000","esrda":"2020-09-12","guid":"42010AEE-E443-1EDA-BD9D-C1FE403C4C0B","matnr":"TG22"},
{"created_at":"11:49:49.000","esrda":"2020-09-12","guid":"42010AEE-E443-1EDA-BD9D-C1FE403C2C0B","matnr":"TG21"},
{"created_at":"11:49:49.000","esrda":"2020-09-12","guid":"42010AEE-E443-1EDA-BD9D-C1FE403C0C0B","matnr":"TG20"},
……
{"created_at":"03:44:10.000","esrda":"2021-01-06","guid":"42010AEE-E443-1EDB-93FA-2DECC2C485FD","matnr":"000000000000000015"}
]
パイプライン停止
最後に、”Stop Process”ボタンを押し、パイプラインを停止します。
まとめ
以上のように、DIを活用することにより、様々な場所に散らばったデータを効率よく連携・変換することができます。
データは重要な資産であり、インサイトの発見に欠かせません。
しかし、今日ではデータがオンプレのSQL-DB、NoSQL-DB、クラウド上のデータレイク等あちこちに散らばり、複雑なランドスケープを形成しています。
そのため、データサイエンティストがデータ分析をする際、必要なデータを集める前準備だけで多くの時間を消費してしまいます。
DIを使うことで、複雑なランドスケープにおけるデータ管理を効率化したり、可視化することでデータフローを直感的に掴めるようになります。
これにより、インサイトの発見やイノベーション創出に役立てることが出来ます。
データ管理基盤として是非、DIをご活用頂けたら幸いです!