Atlas Stream Processing を使用すると、Atlas データベースで使用されるのと同じ 集計操作を使用して、データ ストリームの読み取り、書き込み、変換を行うことができます。Atlas Stream Processing を使用すると、以下を行うことができます。
継続的な検証を実行して、メッセージが正しく作成されているかどうかを確認し、メッセージの破損を検出し、遅延データを検出します。
ドキュメントがパイプラインを通過するときにフィールドを変換し、各ドキュメント内のフィールドまたは式をキーとして使用して、それらのドキュメントを個別のデータベース、 Kafkaトピック、またはその他の外部シンクにルーティングします。
Atlas コレクションまたはApache Kafka クラスターに結果を継続的に公開し、最新のビューとデータ分析を確保します。
Atlas Stream Processing コンポーネントは Atlas プロジェクトに直接属し、Atlas クラスターとは独立して動作します。
データのストリーミング
ストリームは、1 つ以上のソースから発生する不変データの連続したフローです。データストリームの例には、センサーからの温度や負荷の読み取り、金融トランザクションの記録、または変更データをキャプチャするイベントが含まれます。
データ ストリームは、 Apache Kafkaトピック やMongoDB変更ストリーム などのソースに基づきます。その後、 Apache Kafkaトピック 、 Atlas コレクション、外部関数、またはクラウドデータ ストアなどの処理されたデータをシンクに書き込むことができます。
Atlas Stream Processing は、 保存データベースの時間や計算上の制約なしに連続データを操作するネイティブの Stream Processing 機能を提供します。
ストリームプロセッサの構造
ストリーム プロセッサは、概念を 3 つのフェーズに分割できるパイプラインの形式をとります。
ソース
ストリーム プロセッサは、Atlas Stream Processing が接続されているストリーミングデータのソースからドキュメントを取り込むことで始まります。これらは、Apache Kafka などのサーバー システムや、Atlas の読み取り/書き込み操作によって生成されるようなデータベース変更ストリームでもあります。これらの入力は有効な json
または ejson
ドキュメントである必要があります。$source
ステージがドキュメントを取り込むと、必要に応じてそのドキュメントにMongoDB集計を適用して変換できます。
Atlas Stream Processing は、ストリーミングソースからのデータを取り込むだけでなく、接続された Atlas クラスターのデータを結合するために、HTTPS requests と $lookup 操作のデータを使用してドキュメントを強化することもサポートされています。
パイプライン
ストリーム プロセッサは、取り込まれたデータを変換し、価値のあるインサイトを抽出するために、 MongoDBの標準的な 集計演算子とステージ に加えて、 集計パイプラインステージ と 集計演算子 を活用します。Atlas Stream Processing は、処理できないドキュメントを deadline 文字キュー に書き込むことができます。
ドキュメントを再構築したり、フィールドを追加または削除したり、コレクションから情報を検索したりすることで、ドキュメントを増やします。Atlas Stream Processing では、Windowsを使用してイベントを収集し、任意の関数を実行することもできます。
Windows
Windows は、設定された期間内のストリーミングデータを集計するパイプラインステージです。これにより、データをグループ化したり、平均を取得したり、最小値と最大値を見つけたり、ストリーミングデータには適用できないその他のさまざまな操作を実行したりできます。各ストリーム プロセッサには、ウィンドウステージが 1 つだけ含めることができます。
関数
Atlas Stream Processing は、ストリーム プロセスが渡す各ドキュメントに対して実行されるカスタム JavaScript 関数 または Amazon Web Services Lambda 関数 への呼び出しをサポートしています。
Sinks
取り込まれたデータを処理した後、ストリーム プロセッサはそれを Sink に書き込むようにします。Atlas Stream Processing は、さまざまな Sink タイプへの書き込みのための $emit ステージと $merge ステージを提供します。これらのステージは互いに排他的であり、各ストリーム プロセッサが持つことができる Sink ステージは 1 つだけです。パイプラインには、同じ Sink 接続内の別のKafkaトピックまたは Atlas コレクションに処理されたドキュメントを書込むロジックを含めることができます。
Atlas Stream Processing リージョン
Atlas Stream Processing は、Amazon Web Services、Azure、Google Cloud Platformでの Atlas Stream Processing インスタンスの作成をサポートしています。利用可能なリージョンのリストについては、次の ストリーム プロセシング インスタンス セクションを参照してください。
ストリーム プロセッサは、異なるクラウドプロバイダーまたは異なるリージョンでホストされているクラスターから読み取り、書き込みができます。
請求
請求の詳細については、Atlas Stream Processing の請求ページを参照してください。
次のステップ
Atlas Stream Processing を使ったサンプル処理を開始するには、Atlas Stream Processing を使い始める を参照してください。
Atlas Stream Processing の主要概念の詳細については、以下を参照してください。