name: add-datalake-consumer description: Adds an event consumer that writes to Azure Data Lake (Parquet) following BI_SALES_RISK plan. Creates events/consumers/[Name]DataLakeCollector.ts subscribing to RabbitMQ, building Parquet rows, writing to /path_prefix/year=YYYY/month=MM/day=DD/. Use when adding DataLakeCollector in logging or similar “event to Data Lake” pipelines.
Add Data Lake Consumer
Event consumer that subscribes to RabbitMQ and writes to Azure Data Lake (Parquet). Pattern: logging’s DataLakeCollector for risk.evaluated (BI_SALES_RISK_IMPLEMENTATION_PLAN §3.5, §9.1). BI Sales Risk: Paths and Parquet columns MUST match documentation/requirements/BI_SALES_RISK_DATA_LAKE_LAYOUT.md (§2.1 risk.evaluated, §2.2 ml_outcomes, §4 config).
1. Consumer
Path: src/events/consumers/[Name]DataLakeCollector.ts
EventConsumerwithqueue,exchange: coder_events,bindings: e.g.['risk.evaluated','ml.prediction.completed','opportunity.updated','forecast.generated'].- Handler: map event to row. For risk.evaluated use columns in Data Lake Layout §2.1. Build path:
{path_prefix}/year={YYYY}/month={MM}/day={DD}/...(Layout §1). - Write via
@azure/storage-blob(BlockBlob) or@azure/storage-blob+parquetjs(or Arrow) for Parquet. Buffer/batch by time or count if needed. - Config:
data_lake.connection_string,data_lake.container,data_lake.path_prefix(e.g./risk_evaluations).
2. Config
config/default.yaml:
data_lake:
connection_string: ${DATA_LAKE_CONNECTION_STRING}
container: ${DATA_LAKE_CONTAINER:-risk}
path_prefix: ${DATA_LAKE_PATH_PREFIX:-/risk_evaluations}
rabbitmq:
url: ${RABBITMQ_URL}
exchange: coder_events
queue: [module]_data_lake
bindings:
- risk.evaluated
- ml.prediction.completed
# ...
config/schema.json: add data_lake with connection_string, container, path_prefix.
3. Server
In server.ts: await dataLakeCollector.start() after RabbitMQ connect.
4. Checklist
- Consumer in
events/consumers/, subscribe to RabbitMQ (no Azure Service Bus) - Path:
{path_prefix}/year=.../month=.../day=.../; format Parquet - Config:
data_lake.*and schema;rabbitmqqueue and bindings - Start collector in server