Definição
O estágio $cachedLookup realiza uma junção externa esquerda do fluxo de mensagens do seu $source para uma collection do Atlas no seu Registro de Conexão.
Este estágio funciona de forma semelhante ao estágio $lookup, mas armazena em cache os resultados de suas queries de acordo com parâmetros configuráveis.
Importante
$cachedLookup não suporta os campos let ou pipeline.
Para saber mais, consulte Sintaxe $lookup.
O seguinte formulário protótipo ilustra todos os campos disponíveis:
{ "$lookup": { "ttl": { "size": <int>, "unit": "ms" | "second" | "minute" | "hour" | "day" }, "maxMemUsageBytes": <int>, "from": { "connectionName": "<registered-atlas-connection>", "db": "<registered-database-name>", "coll": "<atlas-collection-name>" }, "localField": "<field-in-source-messages>", "foreignField": "<field-in-from-collection>", "as": "<output-array-field>" } }
Sintaxe
O $cachedLookup utiliza alguns dos mesmos campos que a versão geral do $lookup. $cachedLookup inclui campos para configurar o comportamento do cache de query e fornece uma sintaxe modificada para o campo from para query de dados por meio de uma conexão do seu registro de conexão.
Campo | Tipo | necessidade | Descrição |
|---|---|---|---|
TTL | documento | Obrigatório | Documento que especifica o TTL das suas queries em cache. |
ttl.size | int | Obrigatório | Tamanho do TTL das suas queries em cache no |
ttl.unit | string | Obrigatório | Unidade de tempo na qual medir o TTL das suas queries em cache. Deve ser um dos seguintes:
|
maxMemUsageBytes | int | Obrigatório | Memória máxima, em bytes, para alocar para cache de query. Se o tamanho do cache exceder esse valor, o Atlas Stream Processing primeiro removerá os resultados mais antigos para liberar espaço. Se não houver resultados expirados suficientes para ficar abaixo desse limite, o Atlas Stream Processing removerá aleatoriamente as queries em cache até que o tamanho do cache esteja abaixo do limite. O padrão é 10% da RAM disponível em sua área de trabalho de processamento de fluxo. Não é possível definir |
from | documento | Obrigatório | Documento que especifica uma collection em um banco de dados Atlas para unir às mensagens do seu Se você especificar este campo, você deverá especificar valores para todos os campos neste documento. |
from.connectionName | string | Obrigatório | Nome da conexão no registro de conexões. |
from.db | string | Obrigatório | Nome do banco de dados do Atlas que contém a coleção que você deseja unir. |
from.coll | string | Obrigatório | Nome da coleção da qual você deseja participar. |
localField | string | Obrigatório | Campo a partir de suas mensagens |
foreignField | string | Obrigatório | Campo de documentos na coleção |
como | string | Obrigatório | Nome do novo campo de array a ser adicionado aos documentos de entrada. Este novo campo de array contém os documentos correspondentes da coleção |
Comportamento
$cachedLookup realiza uma junção externa esquerda de mensagens de seu $source e dos documentos em uma Atlas collection especificada. Essa versão se comporta de forma semelhante ao estágio $lookup disponível em um banco de dados MongoDB padrão. No entanto, esta versão exige que você especifique uma coleção do Atlas do seu Registro de Conexão como o valor para o campo from.
Além disso, o $cachedLookup armazena em cache os resultados de suas queries por um período de tempo configurável. Use essa funcionalidade para queries em dados alterados com pouca frequência para melhorar a eficiência. Quando o TTL de uma entrada em cache termina, o Atlas Stream Processing despeja essa entrada. Se o tamanho total das entradas em cache for igual a maxMemoryUsageBytes quando você fizer uma nova query, o Atlas Stream Processing removerá as entradas até que haja espaço para armazenar em cache a nova query.
Exemplos
Uma fonte de dados de streaming gera relatórios meteorológicos detalhados de vários locais, em conformidade com o esquema do conjunto de dados meteorológicos de amostra. Uma coleção chamada humidity_descriptions contém documentos do formato:
{ 'dew_point': 16.2, 'relative_humidity': 79, 'condition': 'sticky, oppressive' }
Onde o campo relative_humidity descreve a Umidade relativa à temperatura ambiente (20 Celsius) e condition lista descritores verbais apropriados para esse nível de Umidade. Você pode usar o estágio $cachedLookup para enriquecer a transmissão de relatórios meteorológicos com descritores sugeridos para os meteorológicos usarem em gravações meteorológicas.
A seguinte agregação tem quatro fases:
O estágio estabelece
$sourceuma conexão com o broker do Apache Kafka que coleta esses relatórios em um tópico chamadomy_weatherdata, expondo cada registro à medida que ele é ingerido aos estágios de agregação posteriores. Esse estágio também substitui o nome do campo de carimbo de data/hora que ele projeta, definindo-o comoingestionTime.O estágio
$cachedLookupune os registros do banco de dados dohumidity_descriptionsnos relatórios meteorológicos no campodewPoint. Cada query tem um5 minuteTTL e o Atlas Stream Processing armazena até 200 MB de resultados.A fase
$matchexclui documentos que têm um campohumidity_infovazio e passa documentos com um campohumidity_infopreenchido para a próxima fase.O estágio
$mergegrava a saída na coleção do Atlas chamadaenriched_streamno banco de dadossample_weatherstream. Se não existir tal banco de dados de dados ou coleção, o Atlas os criará.
{ '$source': { connectionName: 'sample_weatherdata', topic: 'my_weatherdata', tsFieldName: 'ingestionTime' } }, { '$cachedLookup': { "ttl": { "size": 5, "unit": "minute" }, "maxMemUsageBytes": 209715200, from: { connectionName: 'weatherStream', db: 'humidity', coll: 'humidity_descriptions' }, 'localField':'dewPoint.value', 'foreignField':'dewPoint', 'as': 'humidity_info' } }, { '$match': { 'humidity_info': { '$ne': [] } } }, { '$merge': { into: { connectionName: 'weatherStream', db: 'sample_weatherstream', coll: 'enriched_stream' } } }
Para visualizar os documentos na coleção sample_weatherstream.enriched_stream resultante, conecte-se ao cluster Atlas e execute o seguinte comando:
db.getSiblingDB("sample_weatherstream").enriched_stream.find()
{ st: 'x+55100+006100', position: { type: 'Point', coordinates: [ 92.7, -53.6 ] }, elevation: 9999, callLetters: 'UECN', qualityControlProcess: 'V020', dataSource: '4', type: 'FM-13', airTemperature: { value: -11, quality: '9' }, dewPoint: { value: 12.5, quality: '1' }, pressure: { value: 1032.7, quality: '9' }, wind: { direction: { angle: 300, quality: '9' }, type: '9', speed: { rate: 23.6, quality: '2' } }, visibility: { distance: { value: 14000, quality: '1' }, variability: { value: 'N', quality: '1' } }, skyCondition: { ceilingHeight: { value: 390, quality: '9', determination: 'C' }, cavok: 'N' }, sections: [ 'SA1', 'AA1', 'OA1', 'AY1', 'AG1' ], precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 21 }, atmosphericPressureChange: { tendency: { code: '1', quality: '1' }, quantity3Hours: { value: 5.5, quality: '1' }, quantity24Hours: { value: 99.9, quality: '9' } }, seaSurfaceTemperature: { value: 1.3, quality: '9' }, waveMeasurement: { method: 'M', waves: { period: 4, height: 2.5, quality: '9' }, seaState: { code: '00', quality: '9' } }, pastWeatherObservationManual: { atmosphericCondition: { value: '4', quality: '1' }, period: { value: 6, quality: '1' } }, skyConditionObservation: { totalCoverage: { value: '07', opaque: '99', quality: '1' }, lowestCloudCoverage: { value: '06', quality: '1' }, lowCloudGenus: { value: '07', quality: '9' }, lowestCloudBaseHeight: { value: 2250, quality: '9' }, midCloudGenus: { value: '07', quality: '9' }, highCloudGenus: { value: '00', quality: '1' } }, presentWeatherObservationManual: { condition: '75', quality: '1' }, atmosphericPressureObservation: { altimeterSetting: { value: 9999.9, quality: '9' }, stationPressure: { value: 1032.6, quality: '1' } }, skyCoverLayer: { coverage: { value: '09', quality: '1' }, baseHeight: { value: 240, quality: '9' }, cloudType: { value: '99', quality: '9' } }, liquidPrecipitation: { period: 6, depth: 3670, condition: '9', quality: '9' }, extremeAirTemperature: { period: 99.9, code: 'N', value: -30.9, quantity: '9' }, ingestionTime: ISODate('2024-09-19T20:04:34.346Z'), humidity_info: [ { _id: ObjectId('66ec805ad3cfbba767ebf7a5'), dewPoint: 12.5, relativeHumidity: 62, condition: 'humid, muggy' } ], }
Observação
O exemplo anterior é representativo. Os dados de streaming não são estáticos e cada usuário vê documentos distintos.