«

AI深度学习2048

zjr0330 发布于 阅读:410


import numpy as np

import torch

import torch.nn as nn

import torch.optim as optim

from collections import deque

import random

from itertools import count

# 游戏环境

class Game2048:

    def __init__(self):

        self.reset()

    def reset(self):

        self.grid = np.zeros((4, 4), dtype=int)

        self.add_new_tile()

        self.add_new_tile()

        self.score = 0

        self.max_tile = 2

        return self.get_state()

    def add_new_tile(self):

        empty_cells = [(i, j) for i in range(4) for j in range(4) if self.grid[i][j] == 0]

        if empty_cells:

            i, j = random.choice(empty_cells)

            self.grid[i][j] = 2 if random.random() < 0.9 else 4

    def move(self, direction):

        # 0:上, 1:右, 2:下, 3:左

        original_grid = self.grid.copy()

        moved = False

        if direction == 0:  # 上

            for j in range(4):

                col = self.grid[:, j]

                new_col = self.merge(col)

                if not np.array_equal(col, new_col):

                    moved = True

                self.grid[:, j] = new_col

        elif direction == 1:  # 右

            for i in range(4):

                row = self.grid[i, :][::-1]

                new_row = self.merge(row)

                if not np.array_equal(row, new_row):

                    moved = True

                self.grid[i, :] = new_row[::-1]

        elif direction == 2:  # 下

            for j in range(4):

                col = self.grid[:, j][::-1]

                new_col = self.merge(col)

                if not np.array_equal(col, new_col):

                    moved = True

                self.grid[:, j] = new_col[::-1]

        elif direction == 3:  # 左

            for i in range(4):

                row = self.grid[i, :]

                new_row = self.merge(row)

                if not np.array_equal(row, new_row):

                    moved = True

                self.grid[i, :] = new_row

        if moved:

            self.add_new_tile()

            return True

        return False

    def merge(self, line):

        non_zero = line[line != 0]

        merged = []

        skip = False

        for i in range(len(non_zero)):

            if skip:

                skip = False

                continue

            if i < len(non_zero)-1 and non_zero[i] == non_zero[i+1]:

                merged_val = non_zero[i] * 2

                merged.append(merged_val)

                self.score += merged_val

                if merged_val > self.max_tile:

                    self.max_tile = merged_val

                skip = True

            else:

                merged.append(non_zero[i])

                skip = False

        merged += [0]*(4 - len(merged))

        return np.array(merged)

    def get_state(self):

        # 将数值转换为log2并归一化

        state = np.where(self.grid > 0, np.log2(self.grid)/11, 0)

        return torch.FloatTensor(state).unsqueeze(0).unsqueeze(0)

    def is_game_over(self):

        for i in range(4):

            for j in range(4):

                if self.grid[i][j] == 0:

                    return False

                if i < 3 and self.grid[i][j] == self.grid[i+1][j]:

                    return False

                if j < 3 and self.grid[i][j] == self.grid[i][j+1]:

                    return False

        return True

# DQN模型

class DQN(nn.Module):

    def __init__(self):

        super(DQN, self).__init__()

        self.conv1 = nn.Conv2d(1, 32, kernel_size=3, padding=1)

        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)

        self.fc1 = nn.Linear(64*4*4, 256)

        self.fc2 = nn.Linear(256, 4)

    def forward(self, x):

        x = torch.relu(self.conv1(x))

        x = torch.relu(self.conv2(x))

        x = x.view(x.size(0), -1)

        x = torch.relu(self.fc1(x))

        return self.fc2(x)

# 强化学习代理

class DQNAgent:

    def __init__(self):

        self.model = DQN()

        self.target_model = DQN()

        self.target_model.load_state_dict(self.model.state_dict())

        self.optimizer = optim.Adam(self.model.parameters(), lr=0.001)

        self.memory = deque(maxlen=10000)

        self.batch_size = 64

        self.gamma = 0.99

    def get_action(self, state, epsilon):

        if random.random() < epsilon:

            return random.randint(0, 3)

        else:

            with torch.no_grad():

                return self.model(state).argmax().item()

    def remember(self, state, action, reward, next_state, done):

        self.memory.append((state, action, reward, next_state, done))

    def replay(self):

        if len(self.memory) < self.batch_size:

            return

        batch = random.sample(self.memory, self.batch_size)

        states = torch.cat([x[0] for x in batch])

        actions = torch.tensor([x[1] for x in batch])

        rewards = torch.tensor([x[2] for x in batch])

        next_states = torch.cat([x[3] for x in batch])

        dones = torch.tensor([x[4] for x in batch])

        current_q = self.model(states).gather(1, actions.unsqueeze(1))

        next_q = self.target_model(next_states).max(1)[0].detach()

        target_q = rewards + (1 - dones) * self.gamma * next_q

        loss = nn.MSELoss()(current_q.squeeze(), target_q)

        self.optimizer.zero_grad()

        loss.backward()

        self.optimizer.step()

    def update_target_model(self):

        self.target_model.load_state_dict(self.model.state_dict())

# 训练函数

def train(episodes=1000):

    env = Game2048()

    agent = DQNAgent()

    epsilon = 1.0

    epsilon_min = 0.1

    epsilon_decay = 0.995

    update_target_freq = 10

    for episode in range(episodes):

        state = env.reset()

        total_reward = 0

        current_max = 2

        while True:

            action = agent.get_action(state, epsilon)

            prev_max = env.max_tile

            valid_move = env.move(action)

            done = env.is_game_over()

            # 奖励计算

            reward = 0

            if valid_move:

                reward += env.score * 0.1  # 基础得分奖励

                if env.max_tile > current_max:

                    reward += (env.max_tile ** 2) * 10  # 新最大方块奖励

                    current_max = env.max_tile

                if env.max_tile >= 2048:

                    reward += 100000  # 达成目标奖励

                    done = True

            next_state = env.get_state()

            agent.remember(state, action, reward, next_state, done)

            total_reward += reward

            state = next_state

            if done:

                break

            agent.replay()

        epsilon = max(epsilon_min, epsilon * epsilon_decay)

        if episode % update_target_freq == 0:

            agent.update_target_model()

        print(f"Episode: {episode+1}, Total Reward: {total_reward:.1f}, Max Tile: {env.max_tile}")

    return agent

# 演示游戏过程

def play_game(agent):

    env = Game2048()

    state = env.reset()

    while True:

        print("Current Grid:")

        print(env.grid)

        print(f"Score: {env.score}, Max Tile: {env.max_tile}\n")

        action = agent.get_action(state, epsilon=0)

        valid_move = env.move(action)

        if env.is_game_over():

            print("Game Over!")

            print(f"Final Score: {env.score}, Max Tile: {env.max_tile}")

            if env.max_tile >= 2048:

                print("Successfully synthesized 2048!")

            break

        state = env.get_state()

# 主程序

if __name__ == "__main__":

    trained_agent = train(episodes=500)  # 调整训练次数

    play_game(trained_agent)

请先 登录 再评论