Artificial Intelligence with PHP
  • Getting Started
    • Introduction
    • Audience
    • How to Read This Book
    • Glossary
    • Contributors
    • Resources
    • Changelog
  • Artificial Intelligence
    • Introduction
    • Overview of AI
      • History of AI
      • How Does AI Work?
      • Structure of AI
      • Will AI Take Over the World?
      • Types of AI
        • Limited Memory AI
        • Reactive AI
        • Theory of Mind AI
        • Self-Aware AI
    • AI Capabilities in PHP
      • Introduction to LLM Agents PHP SDK
      • Overview of AI Libraries in PHP
    • AI Agents
      • Introduction to AI Agents
      • Structure of AI Agent
      • Components of AI Agents
      • Types of AI Agents
      • AI Agent Architecture
      • AI Agent Environment
      • Application of Agents in AI
      • Challenges in AI Agent Development
      • Future of AI Agents
      • Turing Test in AI
      • LLM AI Agents
        • Introduction to LLM AI Agents
        • Implementation in PHP
          • Sales Analyst Agent
          • Site Status Checker Agent
    • Theoretical Foundations of AI
      • Introduction to Theoretical Foundations of AI
      • Problem Solving in AI
        • Introduction
        • Types of Search Algorithms
          • Comparison of Search Algorithms
          • Informed (Heuristic) Search
            • Global Search
              • Beam Search
              • Greedy Search
              • Iterative Deepening A* Search
              • A* Search
                • A* Graph Search
                • A* Graph vs A* Tree Search
                • A* Tree Search
            • Local Search
              • Hill Climbing Algorithm
                • Introduction
                • Best Practices and Optimization
                • Practical Applications
                • Implementation in PHP
              • Simulated Annealing Search
              • Local Beam Search
              • Genetic Algorithms
              • Tabu Search
          • Uninformed (Blind) Search
            • Global Search
              • Bidirectional Search (BDS)
              • Breadth-First Search (BFS)
              • Depth-First Search (DFS)
              • Iterative Deepening Depth-First Search (IDDFS)
              • Uniform Cost Search (UCS)
            • Local Search
              • Depth-Limited Search (DLS)
              • Random Walk Search (RWS)
          • Adversarial Search
          • Means-Ends Analysis
      • Knowledge & Uncertainty in AI
        • Knowledge-Based Agents
        • Knowledge Representation
          • Introduction
          • Approaches to KR in AI
          • The KR Cycle in AI
          • Types of Knowledge in AI
          • KR Techniques
            • Logical Representation
            • Semantic Network Representation
            • Frame Representation
            • Production Rules
        • Reasoning in AI
        • Uncertain Knowledge Representation
        • The Wumpus World
        • Applications and Challenges
      • Cybernetics and AI
      • Philosophical and Ethical Foundations of AI
    • Mathematics for AI
      • Computational Theory in AI
      • Logic and Reasoning
        • Classification of Logics
        • Formal Logic
          • Propositional Logic
            • Basics of Propositional Logic
            • Implementation in PHP
          • Predicate Logic
            • Basics of Predicate Logic
            • Implementation in PHP
          • Second-order and Higher-order Logic
          • Modal Logic
          • Temporal Logic
        • Informal Logic
        • Semi-formal Logic
      • Set Theory and Discrete Mathematics
      • Decision Making in AI
    • Key Application of AI
      • AI in Astronomy
      • AI in Agriculture
      • AI in Automotive Industry
      • AI in Data Security
      • AI in Dating
      • AI in E-commerce
      • AI in Education
      • AI in Entertainment
      • AI in Finance
      • AI in Gaming
      • AI in Healthcare
      • AI in Robotics
      • AI in Social Media
      • AI in Software Development
      • AI in Adult Entertainment
      • AI in Criminal Justice
      • AI in Criminal World
      • AI in Military Domain
      • AI in Terrorist Activities
      • AI in Transforming Our World
      • AI in Travel and Transport
    • Practice
  • Machine Learning
    • Introduction
    • Overview of ML
      • History of ML
        • Origins and Early Concepts
        • 19th Century
        • 20th Century
        • 21st Century
        • Coming Years
      • Key Terms and Principles
      • Machine Learning Life Cycle
      • Problems and Challenges
    • ML Capabilities in PHP
      • Overview of ML Libraries in PHP
      • Configuring an Environment for PHP
        • Direct Installation
        • Using Docker
        • Additional Notes
      • Introduction to PHP-ML
      • Introduction to Rubix ML
    • Mathematics for ML
      • Linear Algebra
        • Scalars
          • Definition and Operations
          • Scalars with PHP
        • Vectors
          • Definition and Operations
          • Vectors in Machine Learning
          • Vectors with PHP
        • Matrices
          • Definition and Types
          • Matrix Operations
          • Determinant of a Matrix
          • Inverse Matrices
          • Cofactor Matrices
          • Adjugate Matrices
          • Matrices in Machine Learning
          • Matrices with PHP
        • Tensors
          • Definition of Tensors
          • Tensor Properties
            • Tensor Types
            • Tensor Dimension
            • Tensor Rank
            • Tensor Shape
          • Tensor Operations
          • Practical Applications
          • Tensors in Machine Learning
          • Tensors with PHP
        • Linear Transformations
          • Introduction
          • LT with PHP
          • LT Role in Neural Networks
        • Eigenvalues and Eigenvectors
        • Norms and Distances
        • Linear Algebra in Optimization
      • Calculus
      • Probability and Statistics
      • Information Theory
      • Optimization Techniques
      • Graph Theory and Networks
      • Discrete Mathematics and Combinatorics
      • Advanced Topics
    • Data Fundamentals
      • Data Types and Formats
        • Data Types
        • Structured Data Formats
        • Unstructured Data Formats
        • Implementation with PHP
      • General Data Processing
        • Introduction
        • Storage and Management
          • Data Security and Privacy
          • Data Serialization and Deserialization in PHP
          • Data Versioning and Management
          • Database Systems for AI
          • Efficient Data Storage Techniques
          • Optimizing Data Retrieval for AI Algorithms
          • Big Data Considerations
            • Introduction
            • Big Data Techniques in PHP
      • ML Data Processing
        • Introduction
        • Types of Data in ML
        • Stages of Data Processing
          • Data Acquisition
            • Data Collection
            • Ethical Considerations in Data Preparation
          • Data Cleaning
            • Data Cleaning Examples
            • Data Cleaning Types
            • Implementation with PHP
          • Data Transformation
            • Data Transformation Examples
            • Data Transformation Types
            • Implementation with PHP ?..
          • Data Integration
          • Data Reduction
          • Data Validation and Testing
            • Data Splitting and Sampling
          • Data Representation
            • Data Structures in PHP
            • Data Visualization Techniques
          • Typical Problems with Data
    • ML Algorithms
      • Classification of ML Algorithms
        • By Methods Used
        • By Learning Types
        • By Tasks Resolved
        • By Feature Types
        • By Model Depth
      • Supervised Learning
        • Regression
          • Linear Regression
            • Types of Linear Regression
            • Finding Best Fit Line
            • Gradient Descent
            • Assumptions of Linear Regression
            • Evaluation Metrics for Linear Regression
            • How It Works by Math
            • Implementation in PHP
              • Multiple Linear Regression
              • Simple Linear Regression
          • Polynomial Regression
            • Introduction
            • Implementation in PHP
          • Support Vector Regression
        • Classification
        • Recommendation Systems
          • Matrix Factorization
          • User-Based Collaborative Filtering
      • Unsupervised Learning
        • Clustering
        • Dimension Reduction
        • Search and Optimization
        • Recommendation Systems
          • Item-Based Collaborative Filtering
          • Popularity-Based Recommendations
      • Semi-Supervised Learning
        • Regression
        • Classification
        • Clustering
      • Reinforcement Learning
      • Distributed Learning
    • Integrating ML into Web
      • Open-Source Projects
      • Introduction to EasyAI-PHP
    • Key Applications of ML
    • Practice
  • Neural Networks
    • Introduction
    • Overview of NN
      • History of NN
      • Basic Components of NN
        • Activation Functions
        • Connections and Weights
        • Inputs
        • Layers
        • Neurons
      • Problems and Challenges
      • How NN Works
    • NN Capabilities in PHP
    • Mathematics for NN
    • Types of NN
      • Classification of NN Types
      • Linear vs Non-Linear Problems in NN
      • Basic NN
        • Simple Perceptron
        • Implementation in PHP
          • Simple Perceptron with Libraries
          • Simple Perceptron with Pure PHP
      • NN with Hidden Layers
      • Deep Learning
      • Bayesian Neural Networks
      • Convolutional Neural Networks (CNN)
      • Recurrent Neural Networks (RNN)
    • Integrating NN into Web
    • Key Applications of NN
    • Practice
  • Natural Language Processing
    • Introduction
    • Overview of NLP
      • History of NLP
        • Ancient Times
        • Medieval Period
        • 15th-16th Century
        • 17th-18th Century
        • 19th Century
        • 20th Century
        • 21st Century
        • Coming Years
      • NLP and Text
      • Key Concepts in NLP
      • Common Challenges in NLP
      • Machine Learning Role in NLP
    • NLP Capabilities in PHP
      • Overview of NLP Libraries in PHP
      • Challenges in NLP with PHP
    • Mathematics for NLP
    • NLP Techniques
      • Basic Text Processing with PHP
      • NLP Workflow
      • Popular Tools and Frameworks for NLP
      • Techniques and Algorithms in NLP
        • Basic NLP Techniques
        • Advanced NLP Techniques
      • Advanced NLP Topics
    • Integrating NLP into Web
    • Key Applications of NLP
    • Practice
  • Computer Vision
    • Introduction
  • Overview of CV
    • History of CV
    • Common Use Cases
  • CV Capabilities in PHP
  • Mathematics for CV
  • CV Techniques
  • Integrating CV into Web
  • Key Applications of CV
  • Practice
  • Robotics
    • Introduction
  • Overview of Robotics
    • History and Evolution of Robotics
    • Core Components
      • Sensors (Perception)
      • Actuators (Action)
      • Controllers (Processing and Logic)
    • The Role of AI in Robotics
      • Object Detection and Recognition
      • Path Planning and Navigation
      • Decision Making and Learning
  • Robotics Capabilities in PHP
  • Mathematics for Robotics
  • Building Robotics
  • Integration Robotics into Web
  • Key Applications of Robotics
  • Practice
  • Expert Systems
    • Introduction
    • Overview of ES
      • History of ES
        • Origins and Early ES
        • Milestones in the Evolution of ES
        • Expert Systems in Modern AI
      • Core Components and Architecture
      • Challenges and Limitations
      • Future Trends
    • ES Capabilities in PHP
    • Mathematics for ES
    • Building ES
      • Knowledge Representation Approaches
      • Inference Mechanisms
      • Best Practices for Knowledge Base Design and Inference
    • Integration ES into Web
    • Key Applications of ES
    • Practice
  • Cognitive Computing
    • Introduction
    • Overview of CC
      • History of CC
      • Differences Between CC and AI
    • CC Compatibilities in PHP
    • Mathematics for CC
    • Building CC
      • Practical Implementation
    • Integration CC into Web
    • Key Applications of CC
    • Practice
  • AI Ethics and Safety
    • Introduction
    • Overview of AI Ethics
      • Core Principles of AI Ethics
      • Responsible AI Development
      • Looking Ahead: Ethical AI Governance
    • Building Ethics & Safety AI
      • Fairness, Bias, and Transparency
        • Bias in AI Models
        • Model Transparency and Explainability
        • Auditing, Testing, and Continuous Monitoring
      • Privacy and Security in AI
        • Data Privacy and Consent
        • Safety Mechanisms in AI Integration
        • Preventing and Handling AI Misuse
      • Ensuring AI Accountability
        • Ethical AI in Decision Making
        • Regulations & Compliance
        • AI Risk Assessment
    • Key Applications of AI Ethics
    • Practice
  • Epilog
    • Summing-up
Powered by GitBook
On this page
  • Comprehensive Guide to Big Data Techniques in PHP
  • 1. Handling Large-scale Datasets
  • 2. Distributed Storage Systems
  • 3. Sharding Strategies
  • 4. Data Processing Patterns
  1. Machine Learning
  2. Data Fundamentals
  3. General Data Processing
  4. Storage and Management
  5. Big Data Considerations

Big Data Techniques in PHP

Comprehensive Guide to Big Data Techniques in PHP

1. Handling Large-scale Datasets

A. Chunked Processing

Chunked processing is crucial when dealing with datasets that are too large to fit in memory. This technique involves processing data in smaller, manageable pieces.

Detailed Implementation:

Example of Chunked Processor
class ChunkedProcessor {
    private $chunkSize;
    private $maxMemoryUsage;
    private $logHandler;

    // Adjusted the constructor to avoid type errors
    public function __construct($chunkSize = 1000, $maxMemoryUsage = '256M', LogHandler $logHandler = null) {
        $this->chunkSize = $chunkSize;
        $this->maxMemoryUsage = $maxMemoryUsage;
        $this->logHandler = $logHandler ?? new LogHandler(); // Default if not provided
    }

    public function processLargeDataset($filename, callable $processor) {
        try {
            // Validate and set memory limit
            if (!ini_set('memory_limit', $this->maxMemoryUsage)) {
                throw new Exception("Failed to set memory limit.");
            }

            if (!file_exists($filename)) {
                throw new Exception("File not found: $filename");
            }

            $handle = fopen($filename, 'r');
            if ($handle === false) {
                throw new Exception("Failed to open file: $filename");
            }

            $stats = [
                'processed_rows' => 0,
                'failed_rows' => 0,
                'start_time' => microtime(true),
                'memory_peak' => 0
            ];

            // Process file in chunks
            while (!feof($handle)) {
                try {
                    $chunk = [];
                    $count = 0;

                    // Build chunk
                    while ($count < $this->chunkSize && ($line = fgets($handle)) !== false) {
                        $decodedLine = json_decode($line, true);
                        if (json_last_error() === JSON_ERROR_NONE) {
                            $chunk[] = $decodedLine;
                        } else {
                            $this->logHandler->warning("Failed to decode line: $line");
                            $stats['failed_rows']++;
                        }
                        $count++;
                    }

                    // Process current chunk
                    $processor($chunk);

                    // Update statistics
                    $stats['processed_rows'] += count($chunk);
                    $stats['memory_peak'] = max($stats['memory_peak'], memory_get_peak_usage(true));

                    // Log progress
                    $this->logProgress($stats);

                    // Clean up
                    unset($chunk);
                    if ($stats['processed_rows'] % ($this->chunkSize * 10) === 0) {
                        gc_collect_cycles();
                    }
                } catch (Exception $e) {
                    $stats['failed_rows'] += count($chunk);
                    $this->logHandler->error("Chunk processing failed: " . $e->getMessage());
                    continue;
                }
            }

            fclose($handle);
            return $this->generateReport($stats);
        } catch (Exception $e) {
            throw new Exception("Dataset processing failed: " . $e->getMessage());
        }
    }

    private function logProgress(array $stats) {
        $memoryUsage = memory_get_usage(true) / 1024 / 1024;
        $timeElapsed = microtime(true) - $stats['start_time'];
        $rowsPerSecond = $stats['processed_rows'] / $timeElapsed;

        $this->logHandler->info(sprintf(
            "Processed %d rows | Memory: %.2f MB | Speed: %.2f rows/sec",
            $stats['processed_rows'],
            $memoryUsage,
            $rowsPerSecond
        ));
    }

    private function generateReport(array $stats) {
        return [
            'total_processed' => $stats['processed_rows'],
            'total_failed' => $stats['failed_rows'],
            'memory_peak_mb' => $stats['memory_peak'] / 1024 / 1024,
            'time_taken_sec' => microtime(true) - $stats['start_time']
        ];
    }
}

// Example of usage
class LogHandler {
    public function info($message) {
        echo "INFO: $message\n";
    }
    public function error($message) {
        echo "ERROR: $message\n";
    }
    public function warning($message) {
        echo "WARNING: $message\n";
    }
}

Example of Use:

$logHandler = new LogHandler();
$processor = new ChunkedProcessor(1000, '512M', $logHandler);

$result = $processor->processLargeDataset(dirname(__FILE__) . '/large_data.json', function($chunk) {
    // Custom processing logic for each chunk
    foreach ($chunk as $data) {
        // Simulate processing (e.g., database insert, API call, etc.)
        echo "Processing: " . json_encode($data) . "\n";
    }
});

echo "\n\n";
print_r($result);

Key Benefits:

  • Memory efficient: Only loads a small portion of data at a time

  • Fault tolerant: Errors in one chunk don't affect others

  • Progress tracking: Enables monitoring of long-running processes

  • Resource management: Regular garbage collection prevents memory leaks

B. Generator Functions

Generators provide a memory-efficient way to iterate over large datasets by yielding values one at a time.

Example of Dataset Generator
class DatasetGenerator {
    private $bufferSize;
    
    public function __construct($bufferSize = 8192) {
        $this->bufferSize = $bufferSize;
    }
    
    public function readLargeFile($filename) {
        $handle = fopen($filename, 'r');
        
        while (!feof($handle)) {
            $buffer = fread($handle, $this->bufferSize);
            $lines = explode("\n", $buffer);
            
            foreach ($lines as $line) {
                if (trim($line) !== '') {
                    yield json_decode($line, true);
                }
            }
        }
        
        fclose($handle);
    }
    
    public function processBatchedData($filename, $batchSize = 100) {
        $batch = [];
        
        foreach ($this->readLargeFile($filename) as $item) {
            $batch[] = $item;
            
            if (count($batch) >= $batchSize) {
                yield $batch;
                $batch = [];
            }
        }
        
        if (!empty($batch)) {
            yield $batch;
        }
    }
}

Example of Use:

// Instantiate the DatasetGenerator with an optional buffer size
// You can set the buffer size here or use the default
$generator = new DatasetGenerator(4096);

// Specify the path to the large file
$filename = dirname(__FILE__) . '/large_data.json';

// Process the file in batches of 100 items
foreach ($generator->processBatchedData($filename, 100) as $batch) {
    // Perform processing on each batch (e.g., save to a database or further transform data)
    foreach ($batch as $item) {
        // Example of handling each item in the batch
        echo 'Processing item: ' . json_encode($item) . PHP_EOL;

        // Add your specific logic here, such as inserting data into a database or validating items
    }
}

2. Distributed Storage Systems

A. HDFS Integration

Hadoop Distributed File System (HDFS) is a core component of the Apache Hadoop ecosystem, designed to handle the storage and distribution of massive datasets across a network of commodity hardware. It is optimized for storing large files and is engineered to deliver high throughput access to application data, making it well-suited for batch processing and big data applications.

Key characteristics of HDFS include:

  1. Scalability: HDFS can handle petabytes of data by spreading it across many machines, making it highly scalable. It supports clusters with thousands of nodes.

  2. Fault Tolerance: HDFS replicates data blocks across multiple nodes to ensure data reliability and resiliency against hardware failures.

  3. High Throughput: Optimized for high data throughput, HDFS enables quick data transfer for tasks that require streaming access patterns, where data is read in large sequential chunks.

  4. Data Locality: HDFS is designed to move computations to where data is stored, reducing network congestion and improving processing speeds.

Typical use cases for HDFS include data warehousing, machine learning, log processing, and any application that requires storage of large-scale data. HDFS forms the foundation for other Hadoop ecosystem tools like MapReduce, YARN, and Apache Hive, enabling a complete solution for distributed data processing and analysis.

Example of HDFS Manager
class HDFSManager {
    private $webhdfsClient;
    private $configuration;
    
    public function __construct(array $config) {
        $this->configuration = $config;
        $this->webhdfsClient = new WebHDFSClient($config['namenode_host'], 
                                                $config['namenode_port']);
    }
    
    public function writeFile($localPath, $hdfsPath, $replication = 3) {
        try {
            // Create file with specified replication
            $response = $this->webhdfsClient->create($hdfsPath, [
                'overwrite' => 'true',
                'replication' => $replication,
                'blocksize' => $this->configuration['block_size']
            ]);
            
            // Stream file content
            $handle = fopen($localPath, 'r');
            while (!feof($handle)) {
                $chunk = fread($handle, 8192);
                $this->webhdfsClient->append($hdfsPath, $chunk);
            }
            fclose($handle);
            
            return [
                'status' => 'success',
                'path' => $hdfsPath,
                'size' => filesize($localPath)
            ];
            
        } catch (Exception $e) {
            throw new HDFSException("Failed to write file: " . $e->getMessage());
        }
    }
    
    public function readFile($hdfsPath, $processor) {
        try {
            $metadata = $this->webhdfsClient->getFileStatus($hdfsPath);
            $blockSize = $metadata['blockSize'];
            $fileSize = $metadata['length'];
            
            // Read file in blocks
            for ($offset = 0; $offset < $fileSize; $offset += $blockSize) {
                $data = $this->webhdfsClient->read($hdfsPath, $offset, $blockSize);
                $processor($data);
            }
            
        } catch (Exception $e) {
            throw new HDFSException("Failed to read file: " . $e->getMessage());
        }
    }
}

B. Object Storage Implementation

Object storage systems are designed to store and manage large amounts of unstructured data, such as images, videos, and backups, by storing data as discrete units (objects) instead of traditional file hierarchies. Implementing an object storage solution with features like multipart uploads and retry logic enhances the system’s reliability, efficiency, and fault tolerance. Here’s how these features work in practice:

Key Features of Object Storage Implementation

  1. Multipart Uploads:

    • Multipart uploads enable the storage system to break large files into smaller parts and upload them independently. Each part is uploaded individually, and once all parts are uploaded, they are combined into a single object.

    • This approach reduces the risk of upload failures on large files, as only the individual part needs re-uploading if a failure occurs, rather than the entire file.

    • Multipart uploads are especially beneficial for uploading large files over unstable networks, where uploading in chunks ensures that network interruptions do not require restarting the entire process.

    • Examples of object storage systems that support multipart uploads include Amazon S3, Azure Blob Storage, and Google Cloud Storage.

  2. Retry Logic:

    • Retry logic is a mechanism to automatically attempt the upload or download of an object again if an error or interruption occurs. This is essential for maintaining data integrity and ensuring that operations complete successfully even in cases of temporary issues, like network instability or server errors.

    • A typical implementation will have a set number of retries and a delay between attempts, which might increase gradually (exponential backoff) to avoid server overload.

    • Retry logic is crucial for operations that need to ensure data availability and consistency, and it can be configured to handle various error types, such as connection timeouts or service unavailability.

Example of a robust object storage system integration with features like multipart uploads and retry logic.

Example of Object Storage Manager
class ObjectStorageManager {
    private $client;
    private $retryAttempts;
    private $retryDelay;
    
    public function __construct(S3Client $client, $retryAttempts = 3, $retryDelay = 1) {
        $this->client = $client;
        $this->retryAttempts = $retryAttempts;
        $this->retryDelay = $retryDelay;
    }
    
    public function uploadLargeFile($bucket, $key, $filepath) {
        try {
            // Initialize multipart upload
            $upload = $this->client->createMultipartUpload([
                'Bucket' => $bucket,
                'Key'    => $key,
                'ServerSideEncryption' => 'AES256'
            ]);
            
            $uploadId = $upload['UploadId'];
            $parts = [];
            
            // Calculate optimal part size
            $filesize = filesize($filepath);
            $partSize = max(
                ceil($filesize / 10000), // Ensure we don't exceed max parts
                5 * 1024 * 1024         // Minimum 5MB part size
            );
            
            // Upload parts with retry logic
            $handle = fopen($filepath, 'r');
            $partNumber = 1;
            
            while (!feof($handle)) {
                $partData = fread($handle, $partSize);
                
                $result = $this->retryOperation(function() use ($bucket, $key, $uploadId, $partNumber, $partData) {
                    return $this->client->uploadPart([
                        'Bucket'     => $bucket,
                        'Key'        => $key,
                        'UploadId'   => $uploadId,
                        'PartNumber' => $partNumber,
                        'Body'       => $partData
                    ]);
                });
                
                $parts[] = [
                    'PartNumber' => $partNumber,
                    'ETag'       => $result['ETag']
                ];
                
                $partNumber++;
            }
            
            fclose($handle);
            
            // Complete upload
            $result = $this->client->completeMultipartUpload([
                'Bucket'          => $bucket,
                'Key'             => $key,
                'UploadId'        => $uploadId,
                'MultipartUpload' => ['Parts' => $parts]
            ]);
            
            return $result;
            
        } catch (Exception $e) {
            // Abort upload on failure
            if (isset($uploadId)) {
                $this->client->abortMultipartUpload([
                    'Bucket'   => $bucket,
                    'Key'      => $key,
                    'UploadId' => $uploadId
                ]);
            }
            
            throw new ObjectStorageException("Upload failed: " . $e->getMessage());
        }
    }
    
    private function retryOperation(callable $operation) {
        $lastException = null;
        
        for ($attempt = 1; $attempt <= $this->retryAttempts; $attempt++) {
            try {
                return $operation();
            } catch (Exception $e) {
                $lastException = $e;
                if ($attempt < $this->retryAttempts) {
                    sleep($this->retryDelay * $attempt);
                }
            }
        }
        
        throw $lastException;
    }
}

3. Sharding Strategies

A. Range-Based Sharding

This strategy distributes data based on ranges of a key value.

Example of Range Shard Manager
class RangeShardManager {
    private $shardMap;
    private $ranges;
    
    public function __construct(array $shardConfig) {
        $this->initializeShards($shardConfig);
    }
    
    private function initializeShards(array $config) {
        // Example config:
        // [
        //     ['range' => [0, 1000000], 'connection' => 'shard1'],
        //     ['range' => [1000001, 2000000], 'connection' => 'shard2'],
        // ]
        
        foreach ($config as $shard) {
            $this->ranges[] = $shard['range'];
            $this->shardMap[] = $this->getConnection($shard['connection']);
        }
    }
    
    public function getShardForKey($key) {
        foreach ($this->ranges as $index => $range) {
            if ($key >= $range[0] && $key <= $range[1]) {
                return $this->shardMap[$index];
            }
        }
        
        throw new ShardingException("No shard found for key: $key");
    }
    
    public function executeAcrossShards(callable $operation) {
        $results = [];
        
        foreach ($this->shardMap as $shard) {
            try {
                $results[] = $operation($shard);
            } catch (Exception $e) {
                // Log error but continue with other shards
                error_log("Shard operation failed: " . $e->getMessage());
            }
        }
        
        return $results;
    }
}

B. Consistent Hashing

Implements consistent hashing for better distribution and rebalancing.

Example of Consistent Hash Ring
class ConsistentHashRing {
    private $nodes = [];
    private $positions = [];
    private $replicas = 64;
    
    public function addNode($node) {
        for ($i = 0; $i < $this->replicas; $i++) {
            $hash = $this->hash($node . $i);
            $this->positions[$hash] = $node;
        }
        
        ksort($this->positions);
        $this->nodes[] = $node;
    }
    
    public function removeNode($node) {
        for ($i = 0; $i < $this->replicas; $i++) {
            $hash = $this->hash($node . $i);
            unset($this->positions[$hash]);
        }
        
        $index = array_search($node, $this->nodes);
        unset($this->nodes[$index]);
    }
    
    public function getNode($key) {
        if (empty($this->positions)) {
            return null;
        }
        
        $hash = $this->hash($key);
        
        foreach ($this->positions as $position => $node) {
            if ($hash <= $position) {
                return $node;
            }
        }
        
        // Wrap around to first node
        return reset($this->positions);
    }
    
    private function hash($key) {
        return crc32($key);
    }
}

4. Data Processing Patterns

A. Map-Reduce Implementation

Custom implementation of map-reduce pattern for PHP.

Example of Map Reduce Framework
<?php

/**
 * A comprehensive MapReduce Framework implementation for PHP
 * Handles large-scale data processing with parallel execution support
 */
class MapReduceFramework {
    private $mappers = [];
    private $reducer;
    private $combiner;
    private $partitioner;
    private $maxWorkers;
    private $logger;
    private $errorHandler;
    
    /**
     * MapReduceFramework constructor.
     * @param int $maxWorkers Maximum number of parallel workers
     * @param LoggerInterface $logger Logger implementation
     */
    public function __construct(
        int $maxWorkers = 4,
        LoggerInterface $logger = null
    ) {
        $this->maxWorkers = $maxWorkers;
        $this->logger = $logger ?? new NullLogger();
        $this->errorHandler = function($error) {
            $this->logger->error("MapReduce error: " . $error->getMessage());
            throw $error;
        };
    }

    /**
     * Add a mapper function to the processing pipeline
     * @param callable $mapper Function that takes a single input and returns key-value pairs
     * @return self
     */
    public function addMapper(callable $mapper): self {
        $this->mappers[] = $mapper;
        return $this;
    }

    /**
     * Set the reducer function
     * @param callable $reducer Function that processes grouped values for a key
     * @return self
     */
    public function setReducer(callable $reducer): self {
        $this->reducer = $reducer;
        return $this;
    }

    /**
     * Set a combiner for local aggregation before shuffle
     * @param callable $combiner Function to combine mapper outputs locally
     * @return self
     */
    public function setCombiner(callable $combiner): self {
        $this->combiner = $combiner;
        return $this;
    }

    /**
     * Set a custom partitioner for data distribution
     * @param callable $partitioner Function to determine partition for a key
     * @return self
     */
    public function setPartitioner(callable $partitioner): self {
        $this->partitioner = $partitioner;
        return $this;
    }

    /**
     * Process the input data through the MapReduce pipeline
     * @param Iterator|array $data Input data
     * @return array Processed results
     * @throws MapReduceException
     */
    public function process($data): array {
        try {
            $this->validateConfiguration();
            
            // Map Phase
            $this->logger->info("Starting Map phase");
            $mappedData = $this->executeMapPhase($data);
            
            // Combine Phase (optional)
            if ($this->combiner) {
                $this->logger->info("Starting Combine phase");
                $mappedData = $this->executeCombinePhase($mappedData);
            }
            
            // Shuffle Phase
            $this->logger->info("Starting Shuffle phase");
            $shuffledData = $this->executeShufflePhase($mappedData);
            
            // Reduce Phase
            $this->logger->info("Starting Reduce phase");
            $result = $this->executeReducePhase($shuffledData);
            
            $this->logger->info("MapReduce job completed successfully");
            return $result;
            
        } catch (Exception $e) {
            ($this->errorHandler)($e);
        }
    }

    /**
     * Execute the map phase, potentially in parallel
     * @param Iterator|array $data Input data
     * @return array Mapped data
     */
    private function executeMapPhase($data): array {
        $mappedData = [];
        $chunks = $this->chunkData($data, $this->maxWorkers);
        
        if ($this->maxWorkers > 1 && function_exists('pcntl_fork')) {
            $mappedData = $this->parallelMap($chunks);
        } else {
            $mappedData = $this->serialMap($chunks);
        }
        
        return $mappedData;
    }

    /**
     * Execute mapping in parallel using process forking
     * @param array $chunks Chunked input data
     * @return array Mapped results
     */
    private function parallelMap(array $chunks): array {
        $sharedMemKey = ftok(__FILE__, 'm');
        $sharedMem = shmop_open($sharedMemKey, "c", 0644, 1024 * 1024);
        
        $pids = [];
        $chunkResults = [];
        
        foreach ($chunks as $i => $chunk) {
            $pid = pcntl_fork();
            
            if ($pid == -1) {
                throw new MapReduceException("Failed to fork process");
            } elseif ($pid) {
                // Parent process
                $pids[] = $pid;
            } else {
                // Child process
                try {
                    $result = $this->serialMap([$chunk]);
                    shmop_write($sharedMem, serialize($result), $i * 1024);
                    exit(0);
                } catch (Exception $e) {
                    exit(1);
                }
            }
        }
        
        // Wait for all child processes
        foreach ($pids as $pid) {
            pcntl_waitpid($pid, $status);
            if (!pcntl_wifexited($status) || pcntl_wexitstatus($status) != 0) {
                throw new MapReduceException("Child process failed");
            }
        }
        
        // Collect results
        $mappedData = [];
        for ($i = 0; $i < count($chunks); $i++) {
            $data = unserialize(shmop_read($sharedMem, $i * 1024, 1024));
            $mappedData = array_merge($mappedData, $data);
        }
        
        shmop_delete($sharedMem);
        shmop_close($sharedMem);
        
        return $mappedData;
    }

    /**
     * Execute mapping serially
     * @param array $chunks Chunked input data
     * @return array Mapped results
     */
    private function serialMap(array $chunks): array {
        $mappedData = [];
        
        foreach ($chunks as $chunk) {
            foreach ($chunk as $item) {
                foreach ($this->mappers as $mapper) {
                    $result = $mapper($item);
                    if (!empty($result)) {
                        $mappedData = array_merge($mappedData, $result);
                    }
                }
            }
        }
        
        return $mappedData;
    }

    /**
     * Execute the combine phase for local aggregation
     * @param array $mappedData Mapped data
     * @return array Combined data
     */
    private function executeCombinePhase(array $mappedData): array {
        $combinedData = [];
        
        foreach ($mappedData as $item) {
            foreach ($item as $key => $value) {
                if (!isset($combinedData[$key])) {
                    $combinedData[$key] = [];
                }
                $combinedData[$key][] = $value;
            }
        }
        
        $result = [];
        foreach ($combinedData as $key => $values) {
            $result[$key] = ($this->combiner)($key, $values);
        }
        
        return $result;
    }

    /**
     * Execute the shuffle phase to group data by key
     * @param array $mappedData Mapped/Combined data
     * @return array Shuffled data
     */
    private function executeShufflePhase(array $mappedData): array {
        $shuffled = [];
        
        foreach ($mappedData as $item) {
            foreach ($item as $key => $value) {
                $partition = $this->getPartition($key);
                if (!isset($shuffled[$partition][$key])) {
                    $shuffled[$partition][$key] = [];
                }
                $shuffled[$partition][$key][] = $value;
            }
        }
        
        return $shuffled;
    }

    /**
     * Execute the reduce phase
     * @param array $shuffledData Shuffled data
     * @return array Reduced results
     */
    private function executeReducePhase(array $shuffledData): array {
        $result = [];
        
        foreach ($shuffledData as $partition => $data) {
            foreach ($data as $key => $values) {
                $result[$key] = ($this->reducer)($key, $values);
            }
        }
        
        return $result;
    }

    /**
     * Get partition for a key using the partitioner
     * @param mixed $key
     * @return int Partition number
     */
    private function getPartition($key): int {
        if ($this->partitioner) {
            return ($this->partitioner)($key, $this->maxWorkers);
        }
        return abs(crc32(serialize($key))) % $this->maxWorkers;
    }

    /**
     * Chunk input data for parallel processing
     * @param Iterator|array $data Input data
     * @param int $numChunks Number of chunks
     * @return array Chunked data
     */
    private function chunkData($data, int $numChunks): array {
        if (is_array($data)) {
            return array_chunk($data, ceil(count($data) / $numChunks));
        }
        
        $chunks = array_fill(0, $numChunks, []);
        $i = 0;
        
        foreach ($data as $item) {
            $chunks[$i % $numChunks][] = $item;
            $i++;
        }
        
        return $chunks;
    }

    /**
     * Validate MapReduce configuration
     * @throws MapReduceException
     */
    private function validateConfiguration(): void {
        if (empty($this->mappers)) {
            throw new MapReduceException("No mappers configured");
        }
        
        if (!$this->reducer) {
            throw new MapReduceException("No reducer configured");
        }
    }
}

/**
 * Custom exception for MapReduce operations
 */
class MapReduceException extends Exception {}

/**
 * Example usage:
 */
// Word count example
$wordCount = new MapReduceFramework();

// Add mapper to split text into words
$wordCount->addMapper(function($text) {
    $words = str_word_count(strtolower($text), 1);
    $result = [];
    foreach ($words as $word) {
        $result[] = [$word => 1];
    }
    return $result;
});

// Set reducer to sum counts
$wordCount->setReducer(function($key, $values) {
    return array_sum($values);
});

// Optional: Add combiner for local aggregation
$wordCount->setCombiner(function($key, $values) {
    return array_sum($values);
});

// Optional: Custom partitioner
$wordCount->setPartitioner(function($key, $numPartitions) {
    return abs(crc32($key)) % $numPartitions;
});

// Process sample data
$texts = [
    "Hello world hello",
    "World of MapReduce",
    "Hello MapReduce world"
];

$result = $wordCount->process($texts);
print_r($result);

// Example output:
// Array
// (
//     [hello] => 3
//     [world] => 3
//     [of] => 1
//     [mapreduce] => 2
// )

/**
 * Advanced Example: Log Analysis
 */
class LogAnalyzer {
    public static function analyze(array $logs) {
        $mapReduce = new MapReduceFramework(8); // Use 8 workers
        
        // Map each log entry to [ip_address => response_size]
        $mapReduce->addMapper(function($logLine) {
            if (preg_match('/(\d+\.\d+\.\d+\.\d+).*?(\d+)$/', $logLine, $matches)) {
                return [$matches[1] => (int)$matches[2]];
            }
            return [];
        });
        
        // Combine local results
        $mapReduce->setCombiner(function($key, $values) {
            return [
                'count' => count($values),
                'total_size' => array_sum($values)
            ];
        });
        
        // Reduce to get final statistics
        $mapReduce->setReducer(function($key, $values) {
            $totalCount = 0;
            $totalSize = 0;
            
            foreach ($values as $value) {
                $totalCount += $value['count'];
                $totalSize += $value['total_size'];
            }
            
            return [
                'ip' => $key,
                'requests' => $totalCount,
                'total_size' => $totalSize,
                'avg_size' => $totalSize / $totalCount
            ];
        });
        
        return $mapReduce->process($logs);
    }
}

// Example usage of log analyzer:
$logs = [
    '192.168.1.1 - - [01/Jan/2024:00:00:01] "GET /index.html HTTP/1.1" 200 1024',
    '192.168.1.2 - - [01/Jan/2024:00:00:02] "GET /image.jpg HTTP/1.1" 200 2048',
    '192.168.1.1 - - [01/Jan/2024:00:00:03] "GET /style.css HTTP/1.1" 200 512'
];

$stats = LogAnalyzer::analyze($logs);
print_r($stats);

// Example output:
// Array
// (
//     [192.168.1.1] => Array
//         (
//             [ip] => 192.168.1.1
//             [requests] => 2
//             [total_size] => 1536
//             [avg_size] => 768
//         )
//     [192.168.1.2] => Array
//         (
//             [ip] => 192.168.1.2
//             [requests] => 1
//             [total_size] => 2048
//             [avg_size] => 2048
//         )
// )
PreviousIntroductionNextML Data Processing

Last updated 1 month ago

Official website:

To try this code yourself, install the example files from the official GitHub repository:

https://hadoop.apache.org
https://github.com/apphp/ai-with-php-examples