定義
$externalFunction
ステージは、特定の AWS Lambda リソース内でプロセスをトリガーします。AWS Lambda プロセスへのリクエストは、同期または非同期のいずれかで行うことができます。
AWS Lambda を作成し、統合 AWS アクセスで認証する
Atlas Stream Processing パイプライン内から AWS Lambda リソースを呼び出すには、AWS Lambda が Atlas Stream Processing と同じ AWS リージョンにデプロイされている必要があります。AWS Lambda リソースのデプロイの詳細については、AWS のドキュメントを参照してください。
Amazon Web Services Lambda関数を作成します。
AWS CLI を使用するか AWS UI を通じて、Lambda 関数を作成します。
統合 AWS アクセスを構成します。
注意
ここで説明している手順は、Atlas UI における基本的な設定フローのみを対象としています。詳しくは、「統合 AWS アクセスの設定」ドキュメントを参照してください。
必要なアクセス権
統合 AWS アクセスを設定するには、そのプロジェクトに対して Organization Owner
または Project Owner
のアクセス権が必要です。
前提条件
注意
AWS IAM ポリシーには
lambda:InvokeFunction
アクションを含める必要があります。プレースホルダー
ExternalId
とResource
の値は独自の値(統合 AWS アクセスの構成プロセスを通じて入手可能)に置き換える必要があります。この例のExternalId
には、Lambda 関数のうち名前がfunction-
で始まるものと一致するワイルドカードが含まれます。
Atlas UI で既存のロールに信頼関係を追加する
次に、自己管理の AWS IAM ロールを有効にして、AWS Lambda リソースを実行できるようにする必要があります。
permission-policy.json
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "lambda:InvokeFunction" ], "Resource": "arn:aws:lambda:us-east-1:257394458927:function:<function-name>" } ] }
Atlas プロジェクトの AWS IAM 統合ページに移動し、Authorize an AWS IAM role ボタンをクリックしてください。
モーダルに表示されている
role-trust-policy.json
を使用して、新しいロールを作成するか、既存のロールを修正してください。ロールが作成された(または既存のロールが新しい信頼ポリシーで更新された)後、ロールのARNをモーダルに貼り付けてください。
AWS コンソールで IAM > Roles に移動し、ご自身のロールを選択します。
permissions タブで、このロールが Lambda を呼び出せるようにするための「インライン許可」を新たに追加してください。上記に示した例の
permission-policy.json
は、<function-name>
という名前を持つ任意の Lambda を実行する権限を追加するものです。最後に、Atlas Stream Processing インスタンスに移動し、新しいAWS Lambda connectionを追加し、前の手順で構成したAWS IAM Role ARNを選択してください。
Atlas Stream Processing インスタンスを AWS Lambda 関数に接続する
Atlas Stream Processing パイプライン内から AWS Lambda リソースにリクエストを送信するには、まずその AWS Lambda リソースを 接続として Atlas Stream Processing リソースに追加する必要があります。
次の例に示すように、 Atlas UI、 Atlas CLI、または Atlas APIを介して、 Amazon Web Services Lambdaリソースを接続として追加できます。 例にある roleArn
プレースホルダーは、Amazon Web Services IAM 構成 の arn
で更新できます。
curl --user "username:password" --digest \ --header "Content-Type: application/json" \ --header "Accept: application/vnd.atlas.2023-02-01+json" \ --include \ --data '{"name": "TestAWSLambdaConnection","type": "AWSLambda","aws": {"roleArn": "arn:aws:iam::<aws_account>:role/<role_name>"}}' \ --request POST "https://cloud.mongodb.com/api/atlas/v2/groups/<group_id>/streams/<tenant_name>/connections"
構文
最小限のリクエスト
次の例は、最小限のリクエストに必要なフィールドを示しています。
{ $externalFunction: { connectionName: "myLambdaConnection", functionName: "arn:aws:lambda:region:account-id:function:function-name", as: "response", }}
カスタマイズ リクエスト
次のカスタマイズされた例では、上記に示した必須フィールドに加えて、エラー処理、同期実行、および前処理済みの Atlas Stream Processing ドキュメントをペイロードとして指定します。
{ $externalFunction: { connectionName: "myLambdaConnection", functionName: "arn:aws:lambda:region:account-id:function:function-name", execution: "sync" as: "response", onError: "fail", payload: [{$replaceRoot: { newRoot: "$fullDocument.payloadToSend" } }, { $addFields: { sum: { $sum: "$randomArray" }}}, { $project: { success: 1, sum: 1 }}], }}
注意
onError
フィールドは、AWS Lambda リソースに対する同期および非同期リクエストの両方に対する API レベルのエラーの動作と、同期リクエストに対する AWS Lambda 関数エラーの動作を定義します。
$externalFunction
ステージは、次のフィールドを持つドキュメントを取得します。
フィールド | タイプ | 必要性 | 説明 |
---|---|---|---|
| string | 必須 | リクエストの送信先となる接続レジストリ内の接続を識別するラベル。 |
| string | 必須 | トリガーされる AWS Lambda 関数の完全な AWS ARN または関数名。 |
| 列挙 | 任意 | Amazon Web Services Lambda関数を同期的に呼び出すか非同期に呼び出すかを指定するパラメーター。 指定できる値は以下のとおりです。
|
| string | 任意 | REST API 応答のフィールド名。 エンドポイントが 0 バイトを返す場合、演算子は |
| string | 任意 | ChatGPT:オペレーターが
デフォルトは |
| 配列 | 任意 | API エンドポイントに送信するリクエストボディをカスタマイズできるカスタム内部パイプライン。
|
動作
$externalFunction
ステージは、指定されたAmazon Web Services Lambdaリソースにリクエストを送信します。リクエストが同期されている場合、指定されたフィールドに応答が返され、パイプラインは処理を続行します。リクエストが非同期の場合、パイプラインは応答を待たずに処理を続行します。
同期リクエストが失敗した場合、パイプラインのエラー処理動作は onError
フィールドによって決まります。リクエストが非同期の場合、onError
フィールドはAmazon Web Services Lambda関数エラーではなく、 Amazon Web Services Services APIエラーにのみ適用されます。 onError
フィールドが指定されていない場合、デフォルトの動作は影響を受けたドキュメントをデッドレターキュー (DLQ)に送信します。
onError 値 | 動作 |
---|---|
| 影響を受けたドキュメントは、 デッドレターキュー (DLQ)に送信されます。 |
| 影響を受けたドキュメントは無視されます。 |
| ストリーム プロセッサはエラーで終了します。 |
パイプラインがドキュメントを適切なJSONに変換できない場合、または構成されたパイプラインが最終ステージの結果として有効なBSONオブジェクトを作成しない場合は、 の設定に関係なく、ドキュメントはデッドレターキュー onError
(DLQ)に送信されます。
無効な式など、$externalFunction
演算子自体の誤った構成に発生するエラーは、onError
の動作をトリガーしません。 代わりに、ストリーム プロセッサは失敗し、エラー メッセージが表示されます。Atlas Stream Processingパイプラインは、一時的なエラーが原因で失敗したリクエストを再試行します。