MRRA LogoMRRA
Core Concepts

Activity Chains

Extract, summarize and run activity chains in agent construction

Activity Chains

Activity chains are time-ordered sequences of adjacent activity transitions that record movement patterns at both location and purpose levels. They are essential for:

  • Sequential pattern mining (e.g., "work→dining→work→home")
  • Prediction constraints and explanations (limiting/guiding next-step candidates)
  • User profiling and anomaly detection (irregular transitions/irregular timing)

Extraction Process

Activity chains are extracted through the following steps:

  1. Group by user and sort activities by start time
  2. Generate transition records for adjacent activities (i-1 → i), including from_place/to_place, from_purpose/to_purpose, timestamps, etc.
  3. Aggregate into "purpose transition matrices" or "location transition statistics"

Implementation Example

Here's how to extract and cache activity chains:

from collections import defaultdict
from mrra.persist.cache import CacheManager, compute_tb_hash

cm = CacheManager()
tb_hash = compute_tb_hash(tb)
acts = cm.load_activities(tb_hash, "default") or acts

# Group activities by user
user_groups = defaultdict(list)
for a in sorted(acts, key=lambda r: (r.user_id, r.start)):
    user_groups[a.user_id].append(a)

# Generate chain records
chain_records = []
for uid, seq in user_groups.items():
    for i in range(1, len(seq)):
        prev, cur = seq[i-1], seq[i]
        chain_records.append({
            "user_id": uid,
            "from_place": prev.place_id,
            "to_place": cur.place_id,
            "from_purpose": getattr(prev, "purpose", "Other"),
            "to_purpose": getattr(cur, "purpose", "Other"),
            "at": str(cur.start),
        })

# Cache the results
cm.save_json(
    tb_hash, 
    "chains_default", 
    {"count": len(chain_records), "records": chain_records[:1000]}, 
    kind="chains"
)

Chain Analysis

Once you have activity chains, you can perform various analyses:

Transition Matrix Analysis

from collections import Counter
import pandas as pd

# Purpose transition analysis
purpose_transitions = Counter()
location_transitions = Counter()

for record in chain_records:
    purpose_transitions[(record["from_purpose"], record["to_purpose"])] += 1
    location_transitions[(record["from_place"], record["to_place"])] += 1

# Convert to DataFrame for analysis
purpose_df = pd.DataFrame([
    {"from": k[0], "to": k[1], "count": v} 
    for k, v in purpose_transitions.items()
])

print("Top purpose transitions:")
print(purpose_df.sort_values("count", ascending=False).head(10))

Temporal Pattern Analysis

import pandas as pd

# Convert timestamps for temporal analysis
chain_df = pd.DataFrame(chain_records)
chain_df["timestamp"] = pd.to_datetime(chain_df["at"])
chain_df["hour"] = chain_df["timestamp"].dt.hour
chain_df["dow"] = chain_df["timestamp"].dt.dayofweek

# Analyze temporal patterns
temporal_patterns = chain_df.groupby(["hour", "from_purpose", "to_purpose"]).size()
print("Peak transition hours:")
print(temporal_patterns.sort_values(ascending=False).head(10))

Integration with Agent Construction

Activity chains should be generated and cached during agent construction for optimal performance:

Recommended workflow:

  1. Complete "activity extraction + purpose assignment"
  2. Immediately generate and cache activity chains
  3. Agents can use "recent purpose chains/location chains" as prompts or additional evidence

Usage in Retrieval

Activity chains can be used to enhance retrieval in several ways:

from mrra.retriever.graph_rag import GraphRAGGenerate

# Enhanced retriever with chain context
class ChainEnhancedRetriever(GraphRAGGenerate):
    def __init__(self, *args, chain_records=None, **kwargs):
        super().__init__(*args, **kwargs)
        self.chain_records = chain_records or []
    
    def get_recent_chains(self, user_id, purpose=None, limit=5):
        """Get recent activity chains for context"""
        user_chains = [
            c for c in self.chain_records 
            if c["user_id"] == user_id
        ]
        
        if purpose:
            user_chains = [
                c for c in user_chains 
                if purpose in [c["from_purpose"], c["to_purpose"]]
            ]
        
        return sorted(user_chains, key=lambda x: x["at"])[-limit:]
    
    def get_relevant_documents(self, query):
        """Enhanced retrieval with chain context"""
        docs = super().get_relevant_documents(query)
        
        # Add chain context if user_id provided
        if "user_id" in query:
            recent_chains = self.get_recent_chains(
                query["user_id"], 
                query.get("purpose")
            )
            
            # Add chain context to first document
            if docs and recent_chains:
                chain_text = f"Recent activity transitions: {recent_chains[-3:]}"
                docs[0].page_content = f"{chain_text}\n\n{docs[0].page_content}"
        
        return docs

Chain-based Prediction Constraints

Activity chains can also be used to constrain and validate predictions:

def validate_prediction_with_chains(prediction, user_chains, current_purpose):
    """Validate prediction against historical transition patterns"""
    
    # Get common next purposes from chains
    next_purposes = [
        c["to_purpose"] for c in user_chains 
        if c["from_purpose"] == current_purpose
    ]
    
    if not next_purposes:
        return True  # No historical data, allow prediction
    
    # Check if predicted purpose is reasonable
    purpose_frequency = Counter(next_purposes)
    common_purposes = [p for p, count in purpose_frequency.most_common(3)]
    
    predicted_purpose = getattr(prediction, 'purpose', None)
    
    if predicted_purpose and predicted_purpose not in common_purposes:
        # Add confidence penalty for unusual transitions
        confidence_penalty = 0.3
        prediction.confidence *= (1 - confidence_penalty)
        prediction.rationale += f" (Unusual transition: {current_purpose}{predicted_purpose})"
    
    return prediction

Performance Considerations

Cache Management: Activity chains can become large for long-term users. Consider:

  • Limiting chain records (e.g., last 1000 transitions)
  • Time-based filtering (e.g., last 30 days)
  • Periodic cache cleanup

Efficient Chain Storage

# Efficient chain storage with time windows
def store_chains_with_window(chain_records, days_back=30):
    from datetime import datetime, timedelta
    
    cutoff = datetime.now() - timedelta(days=days_back)
    
    recent_chains = [
        c for c in chain_records 
        if datetime.fromisoformat(c["at"]) > cutoff
    ]
    
    return {
        "count": len(recent_chains),
        "records": recent_chains,
        "window_days": days_back,
        "cutoff": cutoff.isoformat()
    }

Use Cases and Applications

1. Route Recommendation

Use chains to recommend optimal routes based on historical transition patterns.

2. Anomaly Detection

Identify unusual activity transitions that deviate from established patterns.

3. Personalized Predictions

Weight predictions based on individual transition preferences.

4. Temporal Optimization

Optimize timing predictions based on historical transition timing.

Next Steps